From 9afc1c9f2719f72a46ea22807472e590b3c621b4 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 12 Mar 2026 18:01:15 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E9=80=82=E9=85=8Ddatajuicer=E5=A4=9A?= =?UTF-8?q?=E6=A8=A1=E6=80=81=E7=AE=97=E5=AD=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../python-executor/datamate/core/base_op.py | 17 +++++++++++++++++ .../datamate/wrappers/data_juicer_executor.py | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/runtime/python-executor/datamate/core/base_op.py b/runtime/python-executor/datamate/core/base_op.py index c0256c17a..b4c5244b0 100644 --- a/runtime/python-executor/datamate/core/base_op.py +++ b/runtime/python-executor/datamate/core/base_op.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import json +import mimetypes import os import time import traceback @@ -170,6 +171,22 @@ def read_file_first(self, sample): if self.is_first_op: self.read_file(sample) + def convert_to_dj(self, sample): + filepath = sample[self.filepath_key] + mime_type, _ = mimetypes.guess_type(filepath) + file_type = None + if mime_type: + file_type = mime_type.split('/')[0] + if file_type == "text": + return self.read_file(sample) + elif file_type == "image": + sample["images"] = [filepath] + elif file_type == "audio": + sample["audios"] = [filepath] + elif file_type == "video": + sample["videos"] = [filepath] + return sample + @staticmethod def save_file_and_db(sample): if FileExporter().execute(sample): diff --git a/runtime/python-executor/datamate/wrappers/data_juicer_executor.py b/runtime/python-executor/datamate/wrappers/data_juicer_executor.py index 6d345f4bb..ec327c9e9 100644 --- a/runtime/python-executor/datamate/wrappers/data_juicer_executor.py +++ b/runtime/python-executor/datamate/wrappers/data_juicer_executor.py @@ -102,7 +102,7 @@ def run(self): dataset = self.load_dataset() logger.info('Read data...') - dataset = dataset.map(FileExporter().read_file, num_cpus=0.05) + dataset = dataset.map(FileExporter().convert_to_dj, num_cpus=0.05) # 保存原始数据文件ID集合,用于后续过滤数据检测 original_file_ids = set(dataset.unique("fileId")) From e342b114d9a6ba6190c02f060c3b31c3b689500d Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 12 Mar 2026 19:13:06 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E9=80=82=E9=85=8Ddatajuicer=E5=A4=9A?= =?UTF-8?q?=E6=A8=A1=E6=80=81=E7=AE=97=E5=AD=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- runtime/python-executor/datamate/core/base_op.py | 6 ++++++ runtime/python-executor/datamate/wrappers/executor.py | 3 +++ 2 files changed, 9 insertions(+) diff --git a/runtime/python-executor/datamate/core/base_op.py b/runtime/python-executor/datamate/core/base_op.py index b4c5244b0..03106d617 100644 --- a/runtime/python-executor/datamate/core/base_op.py +++ b/runtime/python-executor/datamate/core/base_op.py @@ -180,10 +180,16 @@ def convert_to_dj(self, sample): if file_type == "text": return self.read_file(sample) elif file_type == "image": + sample["text"] = "" + sample["data"] = b"" sample["images"] = [filepath] elif file_type == "audio": + sample["text"] = "" + sample["data"] = b"" sample["audios"] = [filepath] elif file_type == "video": + sample["text"] = "" + sample["data"] = b"" sample["videos"] = [filepath] return sample diff --git a/runtime/python-executor/datamate/wrappers/executor.py b/runtime/python-executor/datamate/wrappers/executor.py index 4b40948f7..de0eaccbf 100644 --- a/runtime/python-executor/datamate/wrappers/executor.py +++ b/runtime/python-executor/datamate/wrappers/executor.py @@ -52,6 +52,9 @@ def load_meta(self, line): if not meta.get("extraFileType"): meta["extraFileType"] = None meta["dataset_id"] = self.cfg.dataset_id + for key in ["images", "audios", "videos"]: + # 尝试删除,如果找不到该键,就安静地返回 None,不会报错 + meta.pop(key, None) return meta def run(self): From 21f7652da249e8b2a45620ac225a50453e38391b Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Fri, 13 Mar 2026 14:54:46 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E9=80=82=E9=85=8Ddatajuicer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../python-executor/datamate/core/base_op.py | 4 +- .../datamate/wrappers/data_juicer_executor.py | 2 +- .../datamate/wrappers/executor.py | 55 ++++++++++++++++++- 3 files changed, 56 insertions(+), 5 deletions(-) diff --git a/runtime/python-executor/datamate/core/base_op.py b/runtime/python-executor/datamate/core/base_op.py index 03106d617..2393723d9 100644 --- a/runtime/python-executor/datamate/core/base_op.py +++ b/runtime/python-executor/datamate/core/base_op.py @@ -480,6 +480,8 @@ def execute(self, sample: Dict[str, Any]): return False if sample[self.text_key] == '' and sample[self.data_key] == b'': + if sample.get("executor") == "datajuicer": + return True sample[self.filesize_key] = "0" return False @@ -590,7 +592,7 @@ def _get_from_text(self, sample: Dict[str, Any]) -> Dict[str, Any]: return sample def _get_from_text_or_data(self, sample: Dict[str, Any]) -> Dict[str, Any]: - if sample[self.data_key] is not None and sample[self.data_key] != b'' and sample[self.data_key] != "": + if sample.get(self.data_key) is not None and sample[self.data_key] != b'' and sample[self.data_key] != "": return self._get_from_data(sample) else: return self._get_from_text(sample) diff --git a/runtime/python-executor/datamate/wrappers/data_juicer_executor.py b/runtime/python-executor/datamate/wrappers/data_juicer_executor.py index ec327c9e9..abf0f10f1 100644 --- a/runtime/python-executor/datamate/wrappers/data_juicer_executor.py +++ b/runtime/python-executor/datamate/wrappers/data_juicer_executor.py @@ -118,7 +118,7 @@ def run(self): dj_config = self.client.init_config(self.dataset_path, self.export_path, self.cfg.process) result_path = self.client.execute_config(dj_config) - processed_dataset = self.load_dataset(result_path) + processed_dataset = self.load_dj_dataset(result_path) processed_dataset = processed_dataset.map_batches(self.add_column, num_cpus=0.05) processed_dataset = processed_dataset.map(FileExporter().save_file_and_db, num_cpus=0.05) for _ in processed_dataset.iter_batches(): diff --git a/runtime/python-executor/datamate/wrappers/executor.py b/runtime/python-executor/datamate/wrappers/executor.py index de0eaccbf..e908afbc7 100644 --- a/runtime/python-executor/datamate/wrappers/executor.py +++ b/runtime/python-executor/datamate/wrappers/executor.py @@ -1,5 +1,7 @@ import json +import shutil import time +from pathlib import Path from typing import Dict from datamate.common.utils.file_scanner import FileScanner @@ -52,9 +54,35 @@ def load_meta(self, line): if not meta.get("extraFileType"): meta["extraFileType"] = None meta["dataset_id"] = self.cfg.dataset_id - for key in ["images", "audios", "videos"]: - # 尝试删除,如果找不到该键,就安静地返回 None,不会报错 - meta.pop(key, None) + return meta + + def load_dj_meta(self, line): + meta = json.loads(line) + meta["executor"] = "datajuicer" + filepath = "" + file = "" + if meta.get("images"): + if isinstance(meta["images"], list): + filepath = meta["images"][0] + file = Path(filepath) + del meta["images"] + elif meta.get("audios"): + if isinstance(meta["audios"], list): + filepath = meta["audios"][0] + file = Path(filepath) + del meta["audios"] + elif meta.get("videos"): + if isinstance(meta["videos"], list): + filepath = meta["videos"][0] + file = Path(filepath) + del meta["videos"] + if filepath and file: + filename = f"{Path(meta['fileName']).name}.{file.suffix}" + shutil.move(filepath, f"/dataset/{self.cfg.dataset_id}/{filename}") + meta["fileName"] = filename + meta["filePath"] = filepath + meta["fileType"] = file.suffix + meta["fileSize"] = file.stat().st_size return meta def run(self): @@ -81,6 +109,27 @@ def load_dataset(self, jsonl_file_path = None): return dataset + def load_dj_dataset(self, jsonl_file_path = None): + retry = 0 + dataset = None + if jsonl_file_path is None: + jsonl_file_path = self.cfg.dataset_path + while True: + if check_valid_path(jsonl_file_path): + with open(jsonl_file_path, "r", encoding='utf-8') as meta: + lines = meta.readlines() + dataset = ray.data.from_items([self.load_dj_meta(line) for line in lines]) + break + if retry < 5: + retry += 1 + time.sleep(retry) + continue + else: + logger.error(f"can not load dataset from dataset_path") + raise RuntimeError(f"Load dataset Failed!, dataset_path: {self.cfg.dataset_path}.") + + return dataset + def update_db(self, status): task_info = TaskInfoPersistence() task_info.update_result(self.cfg.dataset_id, self.cfg.instance_id, status) From b1e0096db8c14e465590d5d2ccd4b3c1c1a28cc4 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Fri, 13 Mar 2026 14:54:46 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E9=80=82=E9=85=8Ddatajuicer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../python-executor/datamate/core/base_op.py | 7 ++- .../datamate/wrappers/data_juicer_executor.py | 7 ++- .../datamate/wrappers/executor.py | 56 ++++++++++++++++++- 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/runtime/python-executor/datamate/core/base_op.py b/runtime/python-executor/datamate/core/base_op.py index 03106d617..7dd2ff473 100644 --- a/runtime/python-executor/datamate/core/base_op.py +++ b/runtime/python-executor/datamate/core/base_op.py @@ -470,16 +470,17 @@ def execute(self, sample: Dict[str, Any]): try: start = time.time() + save_path = "" if file_type in self.text_support_ext: sample, save_path = self.get_textfile_handler(sample) elif file_type in self.data_support_ext: sample, save_path = self.get_datafile_handler(sample) elif file_type in self.medical_support_ext: sample, save_path = self.get_medicalfile_handler(sample) - else: - return False if sample[self.text_key] == '' and sample[self.data_key] == b'': + if sample.get("executor") == "datajuicer": + return True sample[self.filesize_key] = "0" return False @@ -590,7 +591,7 @@ def _get_from_text(self, sample: Dict[str, Any]) -> Dict[str, Any]: return sample def _get_from_text_or_data(self, sample: Dict[str, Any]) -> Dict[str, Any]: - if sample[self.data_key] is not None and sample[self.data_key] != b'' and sample[self.data_key] != "": + if sample.get(self.data_key) is not None and sample[self.data_key] != b'' and sample[self.data_key] != "": return self._get_from_data(sample) else: return self._get_from_text(sample) diff --git a/runtime/python-executor/datamate/wrappers/data_juicer_executor.py b/runtime/python-executor/datamate/wrappers/data_juicer_executor.py index ec327c9e9..4a97152d1 100644 --- a/runtime/python-executor/datamate/wrappers/data_juicer_executor.py +++ b/runtime/python-executor/datamate/wrappers/data_juicer_executor.py @@ -86,6 +86,7 @@ def __init__(self, cfg = None, meta = None): def add_column(self, batch): batch_size = len(batch["filePath"]) batch["execute_status"] = [SUCCESS_STATUS] * batch_size + batch["executor"] = ["datajuicer"] * batch_size batch[Fields.instance_id] = [self.cfg.instance_id] * batch_size batch[Fields.export_path] = [self.cfg.export_path] * batch_size return batch @@ -118,11 +119,11 @@ def run(self): dj_config = self.client.init_config(self.dataset_path, self.export_path, self.cfg.process) result_path = self.client.execute_config(dj_config) - processed_dataset = self.load_dataset(result_path) + processed_dataset = self.load_dj_dataset(result_path) processed_dataset = processed_dataset.map_batches(self.add_column, num_cpus=0.05) processed_dataset = processed_dataset.map(FileExporter().save_file_and_db, num_cpus=0.05) - for _ in processed_dataset.iter_batches(): - pass + + processed_dataset = processed_dataset.materialize() # 特殊处理:识别被过滤的数据 if processed_dataset.count() == 0: diff --git a/runtime/python-executor/datamate/wrappers/executor.py b/runtime/python-executor/datamate/wrappers/executor.py index de0eaccbf..6184dc894 100644 --- a/runtime/python-executor/datamate/wrappers/executor.py +++ b/runtime/python-executor/datamate/wrappers/executor.py @@ -1,5 +1,8 @@ import json +import os +import shutil import time +from pathlib import Path from typing import Dict from datamate.common.utils.file_scanner import FileScanner @@ -52,11 +55,37 @@ def load_meta(self, line): if not meta.get("extraFileType"): meta["extraFileType"] = None meta["dataset_id"] = self.cfg.dataset_id - for key in ["images", "audios", "videos"]: - # 尝试删除,如果找不到该键,就安静地返回 None,不会报错 - meta.pop(key, None) return meta + def load_dj_meta(self, line): + meta = json.loads(line) + filepath = "" + file = "" + if meta.get("images"): + if isinstance(meta["images"], list): + filepath = meta["images"][0] + file = Path(filepath) + del meta["images"] + elif meta.get("audios"): + if isinstance(meta["audios"], list): + filepath = meta["audios"][0] + file = Path(filepath) + del meta["audios"] + elif meta.get("videos"): + if isinstance(meta["videos"], list): + filepath = meta["videos"][0] + file = Path(filepath) + del meta["videos"] + if filepath and file: + filename = f"{Path(meta['fileName']).stem}.{file.suffix}" + meta["fileName"] = filename + meta["filePath"] = filepath + meta["fileType"] = file.suffix + meta["fileSize"] = file.stat().st_size + os.makedirs(f"/dataset/{self.cfg.dataset_id}", exist_ok=True) + shutil.move(filepath, f"/dataset/{self.cfg.dataset_id}/{filename}") + return {k: v for k, v in meta.items() if not (isinstance(k, str) and k.startswith('_'))} + def run(self): pass @@ -81,6 +110,27 @@ def load_dataset(self, jsonl_file_path = None): return dataset + def load_dj_dataset(self, jsonl_file_path = None): + retry = 0 + dataset = None + if jsonl_file_path is None: + jsonl_file_path = self.cfg.dataset_path + while True: + if check_valid_path(jsonl_file_path): + with open(jsonl_file_path, "r", encoding='utf-8') as meta: + lines = meta.readlines() + dataset = ray.data.from_items([self.load_dj_meta(line) for line in lines]) + break + if retry < 5: + retry += 1 + time.sleep(retry) + continue + else: + logger.error(f"can not load dataset from dataset_path") + raise RuntimeError(f"Load dataset Failed!, dataset_path: {self.cfg.dataset_path}.") + + return dataset + def update_db(self, status): task_info = TaskInfoPersistence() task_info.update_result(self.cfg.dataset_id, self.cfg.instance_id, status) From 460965e136321aee8ad54b2995f3b00933ae10d6 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Fri, 13 Mar 2026 17:16:54 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E9=80=82=E9=85=8Ddatajuicer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- runtime/python-executor/datamate/wrappers/executor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/python-executor/datamate/wrappers/executor.py b/runtime/python-executor/datamate/wrappers/executor.py index 6184dc894..478f47fa9 100644 --- a/runtime/python-executor/datamate/wrappers/executor.py +++ b/runtime/python-executor/datamate/wrappers/executor.py @@ -77,10 +77,10 @@ def load_dj_meta(self, line): file = Path(filepath) del meta["videos"] if filepath and file: - filename = f"{Path(meta['fileName']).stem}.{file.suffix}" + filename = f"{Path(meta['fileName']).stem}{file.suffix}" meta["fileName"] = filename - meta["filePath"] = filepath - meta["fileType"] = file.suffix + meta["filePath"] = f"/dataset/{self.cfg.dataset_id}/{filename}" + meta["fileType"] = file.suffix[1:] meta["fileSize"] = file.stat().st_size os.makedirs(f"/dataset/{self.cfg.dataset_id}", exist_ok=True) shutil.move(filepath, f"/dataset/{self.cfg.dataset_id}/{filename}")