From d86134b9fe0b7ce6279cb12cfbacf3f33e3725d1 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 19 Mar 2026 19:28:38 +0800 Subject: [PATCH 01/10] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E5=90=AF=E5=81=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datamate/scheduler/__init__.py | 4 +- .../datamate/scheduler/job_task_scheduler.py | 302 ++++++++++++++++++ .../datamate/wrappers/data_juicer_wrapper.py | 10 +- .../datamate/wrappers/datamate_executor.py | 3 +- .../datamate/wrappers/datamate_wrapper.py | 8 +- 5 files changed, 317 insertions(+), 10 deletions(-) create mode 100644 runtime/python-executor/datamate/scheduler/job_task_scheduler.py diff --git a/runtime/python-executor/datamate/scheduler/__init__.py b/runtime/python-executor/datamate/scheduler/__init__.py index dca899c64..0606116df 100644 --- a/runtime/python-executor/datamate/scheduler/__init__.py +++ b/runtime/python-executor/datamate/scheduler/__init__.py @@ -1,6 +1,8 @@ from .cmd_task_scheduler import CommandScheduler from .func_task_scheduler import CallableScheduler +from .job_task_scheduler import RayJobScheduler cmd_scheduler = CommandScheduler(max_concurrent=5) -func_scheduler = CallableScheduler(max_concurrent=5) \ No newline at end of file +func_scheduler = CallableScheduler(max_concurrent=5) +ray_job_scheduler = RayJobScheduler(max_concurrent=5) diff --git a/runtime/python-executor/datamate/scheduler/job_task_scheduler.py b/runtime/python-executor/datamate/scheduler/job_task_scheduler.py new file mode 100644 index 000000000..cf6a77d64 --- /dev/null +++ b/runtime/python-executor/datamate/scheduler/job_task_scheduler.py @@ -0,0 +1,302 @@ +import asyncio +import os +from datetime import datetime +from typing import Optional, List, Dict, Any + +from loguru import logger + +from .scheduler import Task, TaskStatus, TaskResult, TaskScheduler + +# Default Ray dashboard address +RAY_DASHBOARD_ADDRESS = os.getenv("RAY_DASHBOARD_ADDRESS", "http://datamate-raycluster-head-svc:8265") + + +class RayJobTask(Task): + """Ray Job 任务包装类""" + + def __init__( + self, + task_id: str, + entrypoint: str, + runtime_env: Optional[Dict[str, Any]] = None, + log_path: Optional[str] = None, + timeout: Optional[int] = None, + *args, + **kwargs, + ): + super().__init__(task_id, *args, **kwargs) + self.entrypoint = entrypoint + self.runtime_env = runtime_env or {} + self.log_path = log_path or f"/flow/{task_id}/output.log" + self.timeout = timeout + self.job_id: Optional[str] = None + self._client = None + + def _get_client(self): + """延迟初始化 JobSubmissionClient""" + if self._client is None: + from ray.job_submission import JobSubmissionClient + + self._client = JobSubmissionClient(RAY_DASHBOARD_ADDRESS) + return self._client + + def start(self) -> "RayJobTask": + """启动任务""" + if self.status == TaskStatus.PENDING: + self.status = TaskStatus.RUNNING + self.started_at = datetime.now() + self._task = asyncio.create_task(self._execute()) + return self + + async def _execute(self): + """执行 Ray Job""" + try: + self.status = TaskStatus.RUNNING + self.started_at = datetime.now() + + client = self._get_client() + + # 确保 log 目录存在 + log_dir = os.path.dirname(self.log_path) + if log_dir: + os.makedirs(log_dir, exist_ok=True) + + # 提交 Ray Job + self.job_id = client.submit_job( + entrypoint=self.entrypoint, + runtime_env=self.runtime_env, + metadata={"task_id": self.task_id}, + ) + logger.info(f"Submitted Ray Job: {self.job_id} for task {self.task_id}") + + # 轮询 Job 状态 + poll_interval = 2 # 2 秒轮询一次 + elapsed_time = 0 + + while True: + if self._cancelled: + logger.info( + f"Task {self.task_id} cancelled, stopping Ray Job {self.job_id}" + ) + self._stop_job(client) + self.status = TaskStatus.CANCELLED + break + + try: + info = client.get_job_info(self.job_id) + job_status = info.status + + if job_status == "SUCCEEDED": + self.status = TaskStatus.COMPLETED + logger.info(f"Ray Job {self.job_id} completed successfully") + break + elif job_status == "FAILED": + self.status = TaskStatus.FAILED + self.error = info.message or "Job failed" + logger.error(f"Ray Job {self.job_id} failed: {self.error}") + break + elif job_status == "STOPPED": + if self._cancelled: + self.status = TaskStatus.CANCELLED + else: + self.status = TaskStatus.FAILED + self.error = "Job was stopped unexpectedly" + logger.info(f"Ray Job {self.job_id} stopped") + break + + # 检查超时 + if self.timeout and elapsed_time >= self.timeout: + logger.warning( + f"Ray Job {self.job_id} timed out after {self.timeout} seconds" + ) + self._stop_job(client) + self.status = TaskStatus.FAILED + self.error = f"Job timed out after {self.timeout} seconds" + break + + except Exception as e: + logger.warning(f"Error checking job status: {e}") + + await asyncio.sleep(poll_interval) + elapsed_time += poll_interval + + except asyncio.CancelledError: + logger.info(f"Task {self.task_id} received CancelledError") + if self.job_id: + client = self._get_client() + self._stop_job(client) + self.status = TaskStatus.CANCELLED + self._cancelled = True + + except Exception as e: + self.status = TaskStatus.FAILED + self.error = str(e) + logger.error(f"RayJobTask(id: {self.task_id}) run failed. Cause: {e}") + + finally: + self.completed_at = datetime.now() + + def _stop_job(self, client): + """停止 Ray Job""" + if self.job_id: + try: + client.stop_job(self.job_id) + logger.info(f"Stopped Ray Job {self.job_id}") + except Exception as e: + logger.warning(f"Failed to stop Ray Job {self.job_id}: {e}") + + def cancel(self) -> bool: + """取消任务""" + if self.status == TaskStatus.RUNNING: + self._cancelled = True + logger.info(f"Marked Ray Job task {self.task_id} for cancellation") + return True + return False + + def to_result(self) -> TaskResult: + """转换为结果对象""" + self.result = { + "entrypoint": self.entrypoint, + "job_id": self.job_id, + } + return super().to_result() + + +class RayJobScheduler(TaskScheduler): + """Ray Job 调度器""" + + def __init__(self, max_concurrent: int = 5, ray_address: Optional[str] = None): + super().__init__(max_concurrent) + self.ray_address = ray_address or RAY_DASHBOARD_ADDRESS + self._client = None + + def _get_client(self): + """延迟初始化 JobSubmissionClient""" + if self._client is None: + from ray.job_submission import JobSubmissionClient + + self._client = JobSubmissionClient(self.ray_address) + return self._client + + async def submit( + self, + task_id: str, + script_path: str, + script_args: str = "", + runtime_env: Optional[Dict[str, Any]] = None, + log_path: Optional[str] = None, + timeout: Optional[int] = None, + **kwargs, + ) -> str: + """提交 Ray Job 任务""" + # 构建 entrypoint + entrypoint = f"python {script_path}" + if script_args: + entrypoint = f"{entrypoint} {script_args}" + + # 默认 runtime_env + if runtime_env is None: + runtime_env = {"working_dir": "/opt/runtime"} + + # 默认 log_path + if log_path is None: + log_path = f"/flow/{task_id}/output.log" + + task = RayJobTask( + task_id=task_id, + entrypoint=entrypoint, + runtime_env=runtime_env, + log_path=log_path, + timeout=timeout, + **kwargs, + ) + self.tasks[task_id] = task + + # 使用信号量限制并发 + async with self.semaphore: + task.start() + + logger.info(f"Ray Job 任务 {task_id} 已提交并开始执行") + return task_id + + def get_task_status(self, task_id: str) -> Optional[TaskResult]: + """获取任务状态""" + task = self.tasks.get(task_id) + if task: + return task.to_result() + return None + + def get_all_tasks(self) -> List[TaskResult]: + """获取所有任务状态""" + return [task.to_result() for task in self.tasks.values()] + + def cancel_task(self, task_id: str) -> bool: + """取消任务""" + task = self.tasks.get(task_id) + if not task: + logger.warning(f"Task {task_id} not found, considering already cancelled") + return True + + if task.status == TaskStatus.RUNNING: + cancelled = task.cancel() + if cancelled: + logger.info(f"Ray Job 任务 {task_id} 已标记为取消") + return cancelled + + # 任务未运行,直接返回成功 + if task.status in [TaskStatus.PENDING, TaskStatus.COMPLETED, TaskStatus.FAILED]: + return True + + return False + + def get_tasks_by_status(self, status: TaskStatus) -> List[TaskResult]: + """根据状态获取任务""" + return [ + task.to_result() for task in self.tasks.values() if task.status == status + ] + + async def wait_for_task( + self, task_id: str, timeout: Optional[float] = None + ) -> TaskResult: + """等待任务完成""" + task = self.tasks.get(task_id) + if not task: + raise ValueError(f"任务 {task_id} 不存在") + + if task.status in [ + TaskStatus.COMPLETED, + TaskStatus.FAILED, + TaskStatus.CANCELLED, + ]: + return task.to_result() + + # 等待任务完成 + if task.get(): + try: + await asyncio.wait_for(task.get(), timeout=timeout) + except asyncio.TimeoutError: + raise TimeoutError(f"任务 {task_id} 等待超时") + + return task.to_result() + + async def shutdown(self): + """关闭调度器,取消所有运行中的任务""" + logger.info("正在关闭 Ray Job 调度器...") + + running_tasks = [ + task for task in self.tasks.values() if task.status == TaskStatus.RUNNING + ] + + for task in running_tasks: + logger.info(f"取消运行中的 Ray Job 任务: {task.task_id}") + task.cancel() + + # 等待所有任务完成 + for task in running_tasks: + if task.get() and not task.get().done(): + try: + await asyncio.wait_for(task.get(), timeout=5.0) + except asyncio.TimeoutError: + logger.warning(f"任务 {task.task_id} 无法正常停止") + + logger.info("Ray Job 调度器已关闭") diff --git a/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py b/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py index c40fdf518..fa7cc946f 100644 --- a/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py +++ b/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py @@ -1,11 +1,15 @@ # -*- coding: utf-8 -*- import os -from datamate.scheduler import cmd_scheduler +from datamate.scheduler import ray_job_scheduler async def submit(task_id, config_path): current_dir = os.path.dirname(__file__) + script_path = os.path.join(current_dir, "data_juicer_executor.py") - await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'data_juicer_executor.py')} " - f"--config_path={config_path}") \ No newline at end of file + await ray_job_scheduler.submit(task_id, script_path, f"--config_path={config_path}") + + +def cancel(task_id): + return ray_job_scheduler.cancel_task(task_id) diff --git a/runtime/python-executor/datamate/wrappers/datamate_executor.py b/runtime/python-executor/datamate/wrappers/datamate_executor.py index ac247166e..2613864fa 100644 --- a/runtime/python-executor/datamate/wrappers/datamate_executor.py +++ b/runtime/python-executor/datamate/wrappers/datamate_executor.py @@ -44,8 +44,7 @@ def run(self): tend = time.time() logger.info(f'All Ops are done in {tend - tstart:.3f}s.') - for _ in dataset.data.iter_batches(): - pass + dataset.data.materialize() self.scan_files() diff --git a/runtime/python-executor/datamate/wrappers/datamate_wrapper.py b/runtime/python-executor/datamate/wrappers/datamate_wrapper.py index c8d508f54..3fea6c5a7 100644 --- a/runtime/python-executor/datamate/wrappers/datamate_wrapper.py +++ b/runtime/python-executor/datamate/wrappers/datamate_wrapper.py @@ -1,15 +1,15 @@ # -*- coding: utf-8 -*- import os -from datamate.scheduler import cmd_scheduler +from datamate.scheduler import ray_job_scheduler async def submit(task_id, config_path): current_dir = os.path.dirname(__file__) + script_path = os.path.join(current_dir, "datamate_executor.py") - await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'datamate_executor.py')} " - f"--config_path={config_path}") + await ray_job_scheduler.submit(task_id, script_path, f"--config_path={config_path}") def cancel(task_id): - return cmd_scheduler.cancel_task(task_id) + return ray_job_scheduler.cancel_task(task_id) From fde0ebec6fd44797bf53044f65c483fe9ef9af0c Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Mon, 23 Mar 2026 09:26:20 +0800 Subject: [PATCH 02/10] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E4=B8=8D=E6=8A=9B=E5=87=BA=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../python-executor/datamate/core/base_op.py | 249 ++++++++++++------ 1 file changed, 162 insertions(+), 87 deletions(-) diff --git a/runtime/python-executor/datamate/core/base_op.py b/runtime/python-executor/datamate/core/base_op.py index 7dd2ff473..06050f7bb 100644 --- a/runtime/python-executor/datamate/core/base_op.py +++ b/runtime/python-executor/datamate/core/base_op.py @@ -21,7 +21,7 @@ from datamate.core.constant import Fields from datamate.sql_manager.persistence_atction import TaskInfoPersistence -OPERATORS = Registry('Operators') +OPERATORS = Registry("Operators") FAILED_STATUS = "FAILED" SUCCESS_STATUS = "COMPLETED" @@ -57,24 +57,24 @@ class BaseOp: custom_ops = False def __init__(self, *args, **kwargs): - self.accelerator = kwargs.get('accelerator', "cpu") - self.is_last_op = kwargs.get('is_last_op', False) - self.is_first_op = kwargs.get('is_first_op', False) - self._name = kwargs.get('op_name', None) + self.accelerator = kwargs.get("accelerator", "cpu") + self.is_last_op = kwargs.get("is_last_op", False) + self.is_first_op = kwargs.get("is_first_op", False) + self._name = kwargs.get("op_name", None) self.infer_model = None - self.text_key = kwargs.get('text_key', "text") - self.data_key = kwargs.get('data_key', "data") - self.image_key = kwargs.get('image_key', "image") - self.video_key = kwargs.get('video_key', "video") - self.audio_key = kwargs.get('audio_key', "audio") - self.filename_key = kwargs.get('fileName_key', "fileName") - self.filetype_key = kwargs.get('fileType_key', "fileType") - self.fileid_key = kwargs.get('fileId_key', "fileId") - self.filepath_key = kwargs.get('filePath_key', "filePath") - self.filesize_key = kwargs.get('fileSize_key', "fileSize") - self.export_path_key = kwargs.get('export_path_key', "export_path") - self.ext_params_key = kwargs.get('ext_params_key', "ext_params") - self.target_type_key = kwargs.get('target_type_key', "target_type") + self.text_key = kwargs.get("text_key", "text") + self.data_key = kwargs.get("data_key", "data") + self.image_key = kwargs.get("image_key", "image") + self.video_key = kwargs.get("video_key", "video") + self.audio_key = kwargs.get("audio_key", "audio") + self.filename_key = kwargs.get("fileName_key", "fileName") + self.filetype_key = kwargs.get("fileType_key", "fileType") + self.fileid_key = kwargs.get("fileId_key", "fileId") + self.filepath_key = kwargs.get("filePath_key", "filePath") + self.filesize_key = kwargs.get("fileSize_key", "fileSize") + self.export_path_key = kwargs.get("export_path_key", "export_path") + self.ext_params_key = kwargs.get("ext_params_key", "ext_params") + self.target_type_key = kwargs.get("target_type_key", "target_type") @property def name(self): @@ -87,13 +87,16 @@ def name(self): def is_npu_available(): try: import torch_npu + return torch_npu.npu.is_available() except ImportError as e: logger.warning("Import torch_npu failed.") return False @staticmethod - def update_kwargs(sample: Dict[str, Any], not_update_keys=("text", "data", "meta")) -> Dict: + def update_kwargs( + sample: Dict[str, Any], not_update_keys=("text", "data", "meta") + ) -> Dict: """获取sample_data中文件相关的信息""" res = {} for k, v in sample.items(): @@ -116,19 +119,23 @@ def _get_error_info(e: BaseException) -> Tuple[str, str]: def use_npu(self): """确认算子是否可以使用npu""" - return self.accelerator == 'npu' and self.is_npu_available() + return self.accelerator == "npu" and self.is_npu_available() def get_model(self, *args, **kwargs): if self.infer_model is None and self.use_model: return self.init_model(*args, **kwargs) else: - logger.info(f"Op named {self.name} get infer model Failed. please " - f" check Attribute self.use_model: {self.use_model} or model has been initialized!") + logger.info( + f"Op named {self.name} get infer model Failed. please " + f" check Attribute self.use_model: {self.use_model} or model has been initialized!" + ) return self.infer_model def init_model(self, *args, **kwargs): """执行函数(子类实现)""" - raise NotImplementedError("This is in BaseOp, plese re-define this method in Sub-classes") + raise NotImplementedError( + "This is in BaseOp, plese re-define this method in Sub-classes" + ) def fill_sample_params(self, sample: Dict[str, Any], **kwargs): if not sample.get(self.text_key, None): @@ -140,10 +147,16 @@ def fill_sample_params(self, sample: Dict[str, Any], **kwargs): if not sample[self.data_key] and not sample[self.text_key]: sample.update(kwargs) - def create_failure_sample(self, sample: Dict[str, Any], op_name, excp: BaseException): + def create_failure_sample( + self, sample: Dict[str, Any], op_name, excp: BaseException + ): sample["execute_result"] = False error_code, exc_info = self._get_error_info(excp) - failed_reason = {"op_name": op_name, "error_code": error_code, "reason": exc_info} + failed_reason = { + "op_name": op_name, + "error_code": error_code, + "reason": exc_info, + } sample["failed_reason"] = failed_reason def read_file(self, sample): @@ -154,11 +167,13 @@ def read_file(self, sample): sample[self.text_key] = "\n\n".join([str(el) for el in elements]) sample[self.data_key] = b"" elif filetype in ["txt", "md", "markdown", "xml", "html", "json", "jsonl"]: - with open(filepath, 'rb') as f: + with open(filepath, "rb") as f: content = f.read() - sample[self.text_key] = content.decode("utf-8-sig").replace("\r\n", "\n") + sample[self.text_key] = content.decode("utf-8-sig").replace( + "\r\n", "\n" + ) sample[self.data_key] = b"" - elif filetype in ['jpg', 'jpeg', 'png', 'bmp']: + elif filetype in ["jpg", "jpeg", "png", "bmp"]: image_np = cv2.imdecode(np.fromfile(filepath, dtype=np.uint8), -1) if image_np.size: data = cv2.imencode(f".{filetype}", image_np)[1] @@ -176,7 +191,7 @@ def convert_to_dj(self, sample): mime_type, _ = mimetypes.guess_type(filepath) file_type = None if mime_type: - file_type = mime_type.split('/')[0] + file_type = mime_type.split("/")[0] if file_type == "text": return self.read_file(sample) elif file_type == "image": @@ -217,13 +232,17 @@ def __call__(self, sample: Dict[str, Any], **kwargs): except Exception as e: # 算子执行失败,记录文件执行信息到数据库,并更该文件执行结果状态 self.create_failure_sample(sample, self.name, e) - logger.error(f"Ops named {self.name} map failed, Error Info: \n" - f"{str(get_exception_info(e))}") + logger.error( + f"Ops named {self.name} map failed, Error Info: \n" + f"{str(get_exception_info(e))}" + ) sample["execute_status"] = execute_status sample[self.filesize_key] = "0" sample[self.filetype_key] = "" + sample["execute_result"] = False TaskInfoPersistence().update_task_result(sample) - raise e + # 不抛出异常,跳过当前文件继续处理下一个文件 + return sample sample["execute_status"] = execute_status # 加载文件成功执行信息到数据库 @@ -241,7 +260,9 @@ def __call__(self, sample: Dict[str, Any], **kwargs): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: """执行函数(子类实现)""" - raise NotImplementedError("This is in Mapper Class, plese re-define this method in Sub-classes") + raise NotImplementedError( + "This is in Mapper Class, plese re-define this method in Sub-classes" + ) class Slicer(BaseOp): @@ -264,8 +285,10 @@ def __call__(self, sample: Dict[str, Any], **kwargs): # 算子执行失败,记录文件执行信息到数据库,并更该文件执行结果状态 self.create_failure_sample(sample, self.name, e) self.load_sample_to_sample(sample, sample_list) - logger.error(f"Ops named {self.name} map failed, Error Info: \n" - f"{str(get_exception_info(e))}") + logger.error( + f"Ops named {self.name} map failed, Error Info: \n" + f"{str(get_exception_info(e))}" + ) sample["execute_status"] = execute_status sample[self.filesize_key] = "0" sample[self.filetype_key] = "" @@ -292,13 +315,15 @@ def load_sample_to_sample(sample: Dict, sample_list: List[Dict]): def execute(self, sample: Dict[str, Any]) -> List[Dict]: """执行函数(子类实现)""" - raise NotImplementedError("This is in Mapper Class, plese re-define this method in Sub-classes") + raise NotImplementedError( + "This is in Mapper Class, plese re-define this method in Sub-classes" + ) def save_patch_sample(self, sample: Dict[str, Any], patch_no, save_format="text"): if save_format == "text": - target_file_type = 'txt' + target_file_type = "txt" elif save_format == "image": - target_file_type = 'png' + target_file_type = "png" else: target_file_type = None raise RuntimeError(f"target file type is {target_file_type}!") @@ -313,8 +338,10 @@ def get_save_path(self, sample: Dict[str, Any], patch_no, target_type) -> str: logger.info(f"export path: {export_path}.") base_file_name, _ = os.path.splitext(sample[self.filename_key]) file_id = str(sample[self.fileid_key]) - new_file_name = file_id + '_' + str(patch_no) + '.' + target_type - logger.info(f"base_file_name: {base_file_name}, new file name: {new_file_name}.") + new_file_name = file_id + "_" + str(patch_no) + "." + target_type + logger.info( + f"base_file_name: {base_file_name}, new file name: {new_file_name}." + ) if not check_valid_path(export_path): os.makedirs(export_path, exist_ok=True) res = os.path.join(export_path, new_file_name) @@ -322,8 +349,12 @@ def get_save_path(self, sample: Dict[str, Any], patch_no, target_type) -> str: def save_file(self, sample, save_path): # 以二进制格式保存文件 - file_sample = sample[self.text_key].encode('utf-8') if sample[self.text_key] else sample[self.data_key] - with open(save_path, 'wb') as f: + file_sample = ( + sample[self.text_key].encode("utf-8") + if sample[self.text_key] + else sample[self.data_key] + ) + with open(save_path, "wb") as f: f.write(file_sample) os.chmod(save_path, 0o640) @@ -353,15 +384,9 @@ def __call__(self, sample: Dict[str, Any], **kwargs): except Exception as e: # 如果filter算子过滤失败, 不保留文件, 并记录文件执行信息到数据库 self.create_failure_sample(sample, self.name, e) - sample["execute_status"] = execute_status - logger.error(f"Ops named {self.name} map failed, Error Info: \n" - f"{str(get_exception_info(e))}") - sample[self.filesize_key] = "0" - sample[self.filetype_key] = "" - TaskInfoPersistence().update_task_result(sample) - raise e - + return False sample["execute_status"] = execute_status + # 文件无内容会被过滤 if sample[self.text_key] == "" and sample[self.data_key] == b"": task_info = TaskInfoPersistence() @@ -377,7 +402,9 @@ def __call__(self, sample: Dict[str, Any], **kwargs): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: """执行函数(子类实现)""" - raise NotImplementedError("This is in Filter Class, plese re-define this method in Sub-classes") + raise NotImplementedError( + "This is in Filter Class, plese re-define this method in Sub-classes" + ) class LLM(Mapper): @@ -390,23 +417,34 @@ def __init__(self, *args, **kwargs): def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: """执行函数(子类实现)""" - raise NotImplementedError("This is in LLM Class, plese re-define this method in Sub-classes") + raise NotImplementedError( + "This is in LLM Class, plese re-define this method in Sub-classes" + ) @staticmethod def get_llm(*args, **kwargs): - url = kwargs.get("LLMUrl", '') + url = kwargs.get("LLMUrl", "") header = kwargs.get("LLMHeaders", {"Content-type": "application/json"}) body = kwargs.get("LLMBody", {}) access_type = kwargs.get("accessType", False) is_https = kwargs.get("isHttps", False) is_certificate = kwargs.get("isCertificate", False) certificate_path = kwargs.get("certificatePath", None) - return LlmReq(url=url, header=header, body=body, access_type=access_type, is_https=is_https, - is_certificate=is_certificate, certificate_path=certificate_path) + return LlmReq( + url=url, + header=header, + body=body, + access_type=access_type, + is_https=is_https, + is_certificate=is_certificate, + certificate_path=certificate_path, + ) def build_llm_prompt(self, *args, **kwargs): """执行函数(子类实现)""" - raise NotImplementedError("This is in LLM Class, plese re-define this method in Sub-classes") + raise NotImplementedError( + "This is in LLM Class, plese re-define this method in Sub-classes" + ) def save_sample(self, object_list: List, sample: Dict[str, Any]): if self.target_file_type: @@ -421,8 +459,10 @@ def get_save_path(self, sample: Dict[str, Any], target_type) -> str: logger.info(f"export path: {export_path}.") base_file_name, _ = os.path.splitext(sample[self.filename_key]) file_id = str(sample[self.fileid_key]) - new_file_name = file_id + '.' + target_type - logger.info(f"base_file_name: {base_file_name}, new file name: {new_file_name}.") + new_file_name = file_id + "." + target_type + logger.info( + f"base_file_name: {base_file_name}, new file name: {new_file_name}." + ) if not check_valid_path(export_path): os.makedirs(export_path, exist_ok=True) res = os.path.join(export_path, new_file_name) @@ -431,13 +471,15 @@ def get_save_path(self, sample: Dict[str, Any], target_type) -> str: @staticmethod def save_json_file(object_list: List, save_path): if len(object_list) == 0: - logger.warning("Please check the param: object_list, which has length equal to 0.") + logger.warning( + "Please check the param: object_list, which has length equal to 0." + ) return try: - with open(save_path, 'w', encoding='utf-8') as f: + with open(save_path, "w", encoding="utf-8") as f: for item in object_list: json_str = json.dumps(item, ensure_ascii=False) - f.write(json_str + '\n') + f.write(json_str + "\n") os.chmod(save_path, 0o640) try: @@ -447,7 +489,9 @@ def save_json_file(object_list: List, save_path): logger.warning("Failed to modify the permission on the parent_dir.") except Exception as e: - raise RuntimeError(f"Save jsonl file Failed!, save_path: {save_path}.") from e + raise RuntimeError( + f"Save jsonl file Failed!, save_path: {save_path}." + ) from e logger.info(f"LLM output has been save to {save_path}.") @@ -458,11 +502,31 @@ class FileExporter(BaseOp): def __init__(self, *args, **kwargs): super(FileExporter, self).__init__(*args, **kwargs) self.last_ops = True - self.text_support_ext = kwargs.get("text_support_ext", ['txt', 'html', 'md', 'markdown', - 'xlsx', 'xls', 'csv', 'pptx', 'ppt', - 'xml', 'json', 'doc', 'docx', 'pdf']) - self.data_support_ext = kwargs.get("data_support_ext", ['jpg', 'jpeg', 'png', 'bmp']) - self.medical_support_ext = kwargs.get("medical_support_ext", ['svs', 'tif', 'tiff']) + self.text_support_ext = kwargs.get( + "text_support_ext", + [ + "txt", + "html", + "md", + "markdown", + "xlsx", + "xls", + "csv", + "pptx", + "ppt", + "xml", + "json", + "doc", + "docx", + "pdf", + ], + ) + self.data_support_ext = kwargs.get( + "data_support_ext", ["jpg", "jpeg", "png", "bmp"] + ) + self.medical_support_ext = kwargs.get( + "medical_support_ext", ["svs", "tif", "tiff"] + ) def execute(self, sample: Dict[str, Any]): file_name = sample[self.filename_key] @@ -478,7 +542,7 @@ def execute(self, sample: Dict[str, Any]): elif file_type in self.medical_support_ext: sample, save_path = self.get_medicalfile_handler(sample) - if sample[self.text_key] == '' and sample[self.data_key] == b'': + if sample[self.text_key] == "" and sample[self.data_key] == b"": if sample.get("executor") == "datajuicer": return True sample[self.filesize_key] = "0" @@ -486,16 +550,16 @@ def execute(self, sample: Dict[str, Any]): if save_path: save_path = self.save_file(sample, save_path) - sample[self.text_key] = '' - sample[self.data_key] = b'' + sample[self.text_key] = "" + sample[self.data_key] = b"" sample[Fields.result] = True - file_type = save_path.split('.')[-1] + file_type = save_path.split(".")[-1] sample[self.filetype_key] = file_type file_name = os.path.basename(save_path) base_name, _ = os.path.splitext(file_name) - new_file_name = base_name + '.' + file_type + new_file_name = base_name + "." + file_type sample[self.filename_key] = new_file_name sample[self.filepath_key] = save_path @@ -503,18 +567,22 @@ def execute(self, sample: Dict[str, Any]): sample[self.filesize_key] = f"{file_size}" logger.info(f"origin file named {file_name} has been save to {save_path}") - logger.info(f"fileName: {sample[self.filename_key]}, " - f"method: FileExporter costs {time.time() - start:.6f} s") + logger.info( + f"fileName: {sample[self.filename_key]}, " + f"method: FileExporter costs {time.time() - start:.6f} s" + ) except UnicodeDecodeError as err: - logger.error(f"fileName: {sample[self.filename_key]}, " - f"method: FileExporter causes decode error: {err}") + logger.error( + f"fileName: {sample[self.filename_key]}, " + f"method: FileExporter causes decode error: {err}" + ) raise return True def get_save_path(self, sample: Dict[str, Any], target_type): export_path = os.path.abspath(sample[self.export_path_key]) file_name = sample[self.filename_key] - new_file_name = os.path.splitext(file_name)[0] + '.' + target_type + new_file_name = os.path.splitext(file_name)[0] + "." + target_type if not check_valid_path(export_path): os.makedirs(export_path, exist_ok=True) @@ -530,7 +598,7 @@ def get_textfile_handler(self, sample: Dict[str, Any]): # 不存在则保存为txt文件,正常文本清洗 else: sample = self._get_from_text(sample) - save_path = self.get_save_path(sample, 'txt') + save_path = self.get_save_path(sample, "txt") return sample, save_path def get_datafile_handler(self, sample: Dict[str, Any]): @@ -547,7 +615,7 @@ def get_datafile_handler(self, sample: Dict[str, Any]): return sample, save_path def get_medicalfile_handler(self, sample: Dict[str, Any]): - target_type = 'png' + target_type = "png" sample = self._get_from_data(sample) save_path = self.get_save_path(sample, target_type) @@ -556,11 +624,15 @@ def get_medicalfile_handler(self, sample: Dict[str, Any]): def save_file(self, sample, save_path): # 以二进制格式保存文件 - file_sample = sample[self.text_key].encode('utf-8') if sample[self.text_key] else sample[self.data_key] + file_sample = ( + sample[self.text_key].encode("utf-8") + if sample[self.text_key] + else sample[self.data_key] + ) path_obj = Path(save_path).resolve() parent_dir = path_obj.parent - stem = path_obj.stem # 文件名不含后缀 - suffix = path_obj.suffix # 后缀 (.txt) + stem = path_obj.stem # 文件名不含后缀 + suffix = path_obj.suffix # 后缀 (.txt) counter = 0 current_path = path_obj @@ -568,7 +640,7 @@ def save_file(self, sample, save_path): try: # x 模式保证:如果文件存在则报错,如果不存在则创建。 # 这个检查+创建的过程是操作系统级的原子操作,没有竞态条件。 - with open(current_path, 'xb') as f: + with open(current_path, "xb") as f: f.write(file_sample) break except FileExistsError: @@ -582,21 +654,24 @@ def save_file(self, sample, save_path): def _get_from_data(self, sample: Dict[str, Any]) -> Dict[str, Any]: sample[self.data_key] = bytes(sample[self.data_key]) - sample[self.text_key] = '' + sample[self.text_key] = "" return sample def _get_from_text(self, sample: Dict[str, Any]) -> Dict[str, Any]: - sample[self.data_key] = b'' + sample[self.data_key] = b"" sample[self.text_key] = str(sample[self.text_key]) return sample def _get_from_text_or_data(self, sample: Dict[str, Any]) -> Dict[str, Any]: - if sample.get(self.data_key) is not None and sample[self.data_key] != b'' and sample[self.data_key] != "": + if ( + sample.get(self.data_key) is not None + and sample[self.data_key] != b"" + and sample[self.data_key] != "" + ): return self._get_from_data(sample) else: return self._get_from_text(sample) - @staticmethod def _get_uuid(): return str(uuid.uuid4()) From 06b1e7e151110c975f7639b406b2940e6555a89d Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Mon, 23 Mar 2026 14:35:07 +0800 Subject: [PATCH 03/10] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E8=90=BD=E7=9B=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../app/module/cleaning/runtime_client.py | 6 +++--- .../service/cleaning_task_scheduler.py | 2 +- .../datamate/operator_runtime.py | 11 +++++++--- .../datamate/scheduler/job_task_scheduler.py | 21 +++++++++++++++++++ .../datamate/wrappers/data_juicer_wrapper.py | 10 +++------ .../datamate/wrappers/datamate_wrapper.py | 12 +++++++++-- 6 files changed, 46 insertions(+), 16 deletions(-) diff --git a/runtime/datamate-python/app/module/cleaning/runtime_client.py b/runtime/datamate-python/app/module/cleaning/runtime_client.py index 0983256fc..97c3523c3 100644 --- a/runtime/datamate-python/app/module/cleaning/runtime_client.py +++ b/runtime/datamate-python/app/module/cleaning/runtime_client.py @@ -12,13 +12,13 @@ def __init__(self, base_url: str = "http://datamate-runtime:8081"): self.base_url = base_url self.client = httpx.AsyncClient(timeout=60.0) - async def submit_task(self, task_id: str) -> bool: + async def submit_task(self, task_id: str, retry_count: int = 0) -> bool: """Submit cleaning task to runtime executor""" try: url = f"{self.base_url}/api/task/{task_id}/submit" - response = await self.client.post(url) + response = await self.client.post(url, json={"retry_count": retry_count}) response.raise_for_status() - logger.info(f"Task {task_id} submitted successfully") + logger.info(f"Task {task_id} submitted successfully with retry_count={retry_count}") return True except httpx.HTTPError as e: logger.error(f"Failed to submit task {task_id}: {e}") diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py index cd1d1321f..3efa7ae64 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_scheduler.py @@ -25,7 +25,7 @@ async def execute_task(self, db: AsyncSession, task_id: str, retry_count: int) - task.retry_count = retry_count await self.task_repo.update_task(db, task) - return await self.runtime_client.submit_task(task_id) + return await self.runtime_client.submit_task(task_id, retry_count) async def stop_task(self, db: AsyncSession, task_id: str) -> bool: """Stop cleaning task""" diff --git a/runtime/python-executor/datamate/operator_runtime.py b/runtime/python-executor/datamate/operator_runtime.py index 37d49ca80..c7f67aed8 100644 --- a/runtime/python-executor/datamate/operator_runtime.py +++ b/runtime/python-executor/datamate/operator_runtime.py @@ -84,10 +84,15 @@ async def query_task_info(request: QueryTaskRequest): raise APIException(ErrorCode.UNKNOWN_ERROR) +class SubmitTaskRequest(BaseModel): + retry_count: int = 0 + + @app.post("/api/task/{task_id}/submit") -async def submit_task(task_id): +async def submit_task(task_id, request: SubmitTaskRequest = None): + retry_count = request.retry_count if request else 0 config_path = f"/flow/{task_id}/process.yaml" - logger.info("Start submitting job...") + logger.info(f"Start submitting job with retry_count={retry_count}...") dataset_path = get_from_cfg(task_id, "dataset_path") if not check_valid_path(dataset_path): @@ -96,7 +101,7 @@ async def submit_task(task_id): try: executor_type = get_from_cfg(task_id, "executor_type") - await WRAPPERS.get(executor_type).submit(task_id, config_path) + await WRAPPERS.get(executor_type).submit(task_id, config_path, retry_count) except Exception as e: logger.error(f"Error happens during submitting task. Error Info following: {e}") diff --git a/runtime/python-executor/datamate/scheduler/job_task_scheduler.py b/runtime/python-executor/datamate/scheduler/job_task_scheduler.py index cf6a77d64..c73080c0e 100644 --- a/runtime/python-executor/datamate/scheduler/job_task_scheduler.py +++ b/runtime/python-executor/datamate/scheduler/job_task_scheduler.py @@ -72,6 +72,7 @@ async def _execute(self): # 轮询 Job 状态 poll_interval = 2 # 2 秒轮询一次 elapsed_time = 0 + last_log_position = 0 # 记录已写入的日志位置 while True: if self._cancelled: @@ -86,6 +87,10 @@ async def _execute(self): info = client.get_job_info(self.job_id) job_status = info.status + # 获取并写入日志 + await self._fetch_and_write_logs(client, last_log_position) + last_log_position = os.path.getsize(self.log_path) if os.path.exists(self.log_path) else 0 + if job_status == "SUCCEEDED": self.status = TaskStatus.COMPLETED logger.info(f"Ray Job {self.job_id} completed successfully") @@ -136,6 +141,22 @@ async def _execute(self): finally: self.completed_at = datetime.now() + async def _fetch_and_write_logs(self, client, last_position: int = 0): + """获取 Ray Job 日志并追加写入日志文件""" + try: + logs = client.get_job_logs(self.job_id) + if logs: + log_content = logs if isinstance(logs, str) else str(logs) + # 只追加新日志 + if len(log_content) > last_position: + new_logs = log_content[last_position:] + with open(self.log_path, "a", encoding="utf-8") as f: + f.write(new_logs) + if not new_logs.endswith("\n"): + f.write("\n") + except Exception as e: + logger.warning(f"Failed to fetch logs for job {self.job_id}: {e}") + def _stop_job(self, client): """停止 Ray Job""" if self.job_id: diff --git a/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py b/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py index fa7cc946f..09059a35b 100644 --- a/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py +++ b/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py @@ -1,15 +1,11 @@ # -*- coding: utf-8 -*- import os -from datamate.scheduler import ray_job_scheduler +from datamate.scheduler import cmd_scheduler async def submit(task_id, config_path): current_dir = os.path.dirname(__file__) - script_path = os.path.join(current_dir, "data_juicer_executor.py") - await ray_job_scheduler.submit(task_id, script_path, f"--config_path={config_path}") - - -def cancel(task_id): - return ray_job_scheduler.cancel_task(task_id) + await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'data_juicer_executor.py')} " + f"--config_path={config_path}") diff --git a/runtime/python-executor/datamate/wrappers/datamate_wrapper.py b/runtime/python-executor/datamate/wrappers/datamate_wrapper.py index 3fea6c5a7..446208c4b 100644 --- a/runtime/python-executor/datamate/wrappers/datamate_wrapper.py +++ b/runtime/python-executor/datamate/wrappers/datamate_wrapper.py @@ -4,11 +4,19 @@ from datamate.scheduler import ray_job_scheduler -async def submit(task_id, config_path): +async def submit(task_id, config_path, retry_count: int = 0): current_dir = os.path.dirname(__file__) script_path = os.path.join(current_dir, "datamate_executor.py") - await ray_job_scheduler.submit(task_id, script_path, f"--config_path={config_path}") + # 根据 retry_count 设置日志路径 + if retry_count > 0: + log_path = f"/flow/{task_id}/output.log.{retry_count}" + else: + log_path = f"/flow/{task_id}/output.log" + + await ray_job_scheduler.submit( + task_id, script_path, f"--config_path={config_path}", log_path=log_path + ) def cancel(task_id): From 655d3c0ca5ee3090fd2ed8459dad7378f9c0684b Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Tue, 24 Mar 2026 11:24:06 +0800 Subject: [PATCH 04/10] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E9=83=A8?= =?UTF-8?q?=E5=88=86=E6=88=90=E5=8A=9F=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/components/ActionDropdown.tsx | 2 +- frontend/src/components/DetailHeader.tsx | 7 +- frontend/src/i18n/locales/en/common.json | 2 + frontend/src/i18n/locales/zh/common.json | 1 + .../pages/DataCleansing/Detail/TaskDetail.tsx | 14 +- .../Detail/components/BasicInfo.tsx | 26 ++-- .../Detail/components/FileTable.tsx | 147 +++++++++++++++--- .../Detail/components/LogsTable.tsx | 90 +++++++---- .../Detail/components/OperatorTable.tsx | 30 ++-- .../pages/DataCleansing/cleansing.const.tsx | 7 + .../pages/DataCleansing/cleansing.model.ts | 1 + frontend/src/pages/Layout/Sidebar.tsx | 2 +- .../repository/cleaning_result_repository.py | 14 +- .../app/module/cleaning/schema/cleaning.py | 26 +++- .../cleaning/service/cleaning_task_service.py | 7 +- .../sql_manager/persistence_atction.py | 47 +++++- runtime/python-executor/pyproject.toml | 1 + 17 files changed, 318 insertions(+), 106 deletions(-) diff --git a/frontend/src/components/ActionDropdown.tsx b/frontend/src/components/ActionDropdown.tsx index 6d7d6ba4d..87cbc6482 100644 --- a/frontend/src/components/ActionDropdown.tsx +++ b/frontend/src/components/ActionDropdown.tsx @@ -112,7 +112,7 @@ const ActionDropdown = ({ <> dropdownContent} + popupRender={() => dropdownContent} trigger={["click"]} placement={placement} open={open} diff --git a/frontend/src/components/DetailHeader.tsx b/frontend/src/components/DetailHeader.tsx index cbd332619..c918944a8 100644 --- a/frontend/src/components/DetailHeader.tsx +++ b/frontend/src/components/DetailHeader.tsx @@ -271,8 +271,8 @@ function DetailHeader({

{(data as any)?.description}

- {statistics.map((stat: any) => ( -
+ {statistics.map((stat: StatisticItem, index: number) => ( +
{stat.icon} {stat.value}
@@ -281,10 +281,11 @@ function DetailHeader({
- {operations.map((op: any) => { + {operations.map((op: OperationItem) => { if (op.isDropdown) { return ( diff --git a/frontend/src/i18n/locales/en/common.json b/frontend/src/i18n/locales/en/common.json index 3fe17885d..34c895c83 100644 --- a/frontend/src/i18n/locales/en/common.json +++ b/frontend/src/i18n/locales/en/common.json @@ -1594,6 +1594,7 @@ }, "status": { "completed": "Completed", + "partialSuccess": "Partial Success", "failed": "Failed", "running": "Running", "pending": "Pending", @@ -2600,6 +2601,7 @@ "unknown": "Unknown", "processing": "Processing", "completed": "Completed", + "partialSuccess": "Partial Success", "failed": "Failed" }, "operations": { diff --git a/frontend/src/i18n/locales/zh/common.json b/frontend/src/i18n/locales/zh/common.json index d16f37ccd..0e3b71471 100644 --- a/frontend/src/i18n/locales/zh/common.json +++ b/frontend/src/i18n/locales/zh/common.json @@ -1594,6 +1594,7 @@ }, "status": { "completed": "已完成", + "partialSuccess": "部分成功", "failed": "失败", "running": "运行中", "pending": "等待中", diff --git a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx index 0d5203efc..d6e772299 100644 --- a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx +++ b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx @@ -119,19 +119,17 @@ export default function CleansingTaskDetail() { { icon: , label: t("dataCleansing.detail.statistics.successFiles"), - value: task?.progress?.succeedFileNum || "0", + value: task?.progress?.succeedFileNum || 0, }, { icon: , label: t("dataCleansing.detail.statistics.failedFiles"), - value: (task?.status.value === TaskStatus.RUNNING || task?.status.value === TaskStatus.PENDING) ? - task?.progress.failedFileNum : - task?.progress?.totalFileNum - task?.progress.succeedFileNum, + value: task?.progress?.failedFileNum || 0, }, { icon: , label: t("dataCleansing.detail.statistics.successRate"), - value: task?.progress?.successRate ? task?.progress?.successRate + "%" : "--", + value: task?.progress?.process ? task?.progress?.process + "%" : "--", }, ]; @@ -146,11 +144,11 @@ export default function CleansingTaskDetail() { }, ] : []), - ...([TaskStatus.PENDING, TaskStatus.STOPPED, TaskStatus.FAILED].includes(task?.status?.value) + ...([TaskStatus.PENDING, TaskStatus.STOPPED, TaskStatus.FAILED, TaskStatus.PARTIAL_SUCCESS].includes(task?.status?.value) ? [ { key: "start", - label: t("dataCleansing.actions.updateTask"), + label: t("dataCleansing.actions.retryTask"), icon: , onClick: startTask, }, @@ -217,7 +215,7 @@ export default function CleansingTaskDetail() { )} {activeTab === "operators" && } {activeTab === "files" && } - {activeTab === "logs" && } + {activeTab === "logs" && }
diff --git a/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx b/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx index 65d5f08a1..0ea0e643b 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx @@ -83,7 +83,7 @@ export default function BasicInfo({ task }: { task: CleansingTask }) {
- {task?.progress?.succeedFileNum || "0"} + {task?.progress?.succeedFileNum ?? 0}
{t("dataCleansing.detail.statistics.successFiles")}
@@ -91,8 +91,8 @@ export default function BasicInfo({ task }: { task: CleansingTask }) {
{(task?.status.value === TaskStatus.RUNNING || task?.status.value === TaskStatus.PENDING) ? - task?.progress.failedFileNum : - task?.progress?.totalFileNum - task?.progress.succeedFileNum} + (task?.progress?.failedFileNum ?? 0) : + Math.max(0, (task?.progress?.totalFileNum ?? 0) - (task?.progress?.succeedFileNum ?? 0))}
{t("dataCleansing.detail.statistics.failedFiles")}
@@ -113,33 +113,35 @@ export default function BasicInfo({ task }: { task: CleansingTask }) { column={2} bordered={false} size="middle" - labelStyle={{ fontWeight: 500, color: "#555" }} - contentStyle={{ fontSize: 14 }} + styles={{ label: { fontWeight: 500, color: "#555" }, content: { fontSize: 14 } }} items={descriptionItems} > {/* 处理进度 */}

{t("dataCleansing.detail.basicInfo.processingProgress")}

- { task?.status?.value === TaskStatus.FAILED ? + { task?.status?.value === TaskStatus.FAILED ? ( - : - } + ) : task?.status?.value === TaskStatus.PARTIAL_SUCCESS ? ( + + ) : ( + + )}
- {t("dataCleansing.detail.basicInfo.completed", { count: task?.progress?.succeedFileNum || "0" })} + {t("dataCleansing.detail.basicInfo.completed", { count: task?.progress?.succeedFileNum ?? 0 })}
{t("dataCleansing.detail.basicInfo.processing", { count: (task?.status.value === TaskStatus.RUNNING || task?.status.value === TaskStatus.PENDING) ? - task?.progress?.totalFileNum - task?.progress.succeedFileNum : 0 })} + Math.max(0, (task?.progress?.totalFileNum ?? 0) - (task?.progress?.succeedFileNum ?? 0)) : 0 })}
{t("dataCleansing.detail.basicInfo.failed", { count: (task?.status.value === TaskStatus.RUNNING || task?.status.value === TaskStatus.PENDING) ? - task?.progress.failedFileNum : - task?.progress?.totalFileNum - task?.progress.succeedFileNum })} + (task?.progress?.failedFileNum ?? 0) : + Math.max(0, (task?.progress?.totalFileNum ?? 0) - (task?.progress?.succeedFileNum ?? 0)) })}
diff --git a/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx index cb48c0948..a577a61d4 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx @@ -6,6 +6,72 @@ import {TaskStatus} from "@/pages/DataCleansing/cleansing.model.ts"; import {getTaskStatusMap} from "@/pages/DataCleansing/cleansing.const.tsx"; import { useTranslation } from "react-i18next"; +// 渲染 JSON 值的辅助组件 +function JsonValue({ value, depth = 0 }: { value: any; depth?: number }) { + if (value === null) { + return null; + } + if (value === undefined) { + return undefined; + } + if (typeof value === 'boolean') { + return {value.toString()}; + } + if (typeof value === 'number') { + return {value}; + } + if (typeof value === 'string') { + // 处理换行符,保留空白格式 + const lines = value.split('\n'); + if (lines.length > 1) { + return ( + + {lines.map((line, i) => ( + + {line} + {i < lines.length - 1 &&
} +
+ ))} +
+ ); + } + return {value}; + } + if (Array.isArray(value)) { + if (value.length === 0) { + return []; + } + return ( +
+ {value.map((item, i) => ( +
+ [{i}] + +
+ ))} +
+ ); + } + if (typeof value === 'object') { + const keys = Object.keys(value); + if (keys.length === 0) { + return {"{}"}; + } + return ( +
+ {keys.map((key) => ( +
+ {key} + : + +
+ ))} +
+ ); + } + return {String(value)}; +} + // 模拟文件列表数据 export default function FileTable({result, fetchTaskResult}) { const { id = "" } = useParams(); @@ -20,7 +86,7 @@ export default function FileTable({result, fetchTaskResult}) { const handleSelectAllFiles = (checked: boolean) => { if (checked) { - setSelectedFileIds(result.map((file) => file.instanceId)); + setSelectedFileIds(result.map((file) => file.srcFileId)); } else { setSelectedFileIds([]); } @@ -87,8 +153,8 @@ export default function FileTable({result, fetchTaskResult}) { render: (_text: string, record: any) => ( handleSelectFile(record.id, e.target.checked)} + checked={selectedFileIds.includes(record.srcFileId)} + onChange={(e) => handleSelectFile(record.srcFileId, e.target.checked)} className="w-4 h-4" /> ), @@ -283,48 +349,85 @@ export default function FileTable({result, fetchTaskResult}) { title: t("dataCleansing.detail.fileTable.result"), dataIndex: "result", key: "result", - width: 200, - render: (text: string) => { - if (!text) return -; + width: 120, + render: (text: string, record: any) => { + // 如果结果为空或特殊值,则不展示 + if (!text || text === '' || text === '{}' || text === '[]' || + text === 'null' || text === 'undefined' || text.trim() === '') { + return -; + } + // 尝试解析JSON try { const parsed = JSON.parse(text); - const jsonString = JSON.stringify(parsed, null, 2); - const displayText = typeof parsed === 'object' - ? (Array.isArray(parsed) ? `[${parsed.length} items]` : '{...}') - : String(parsed); + // 如果是空对象或空数组则不展示 + if (Array.isArray(parsed) && parsed.length === 0) { + return -; + } + if (typeof parsed === 'object' && parsed !== null && !Array.isArray(parsed) && Object.keys(parsed).length === 0) { + return -; + } + + // 检查 reason 字段:如果只有 reason 且为 null/空,则不展示 + if (typeof parsed === 'object' && parsed !== null && !Array.isArray(parsed)) { + + const keys = Object.keys(parsed); + // 如果只有 reason 字段且为空 + if (keys.length > 0 && keys.includes('reason')) { + const reason = parsed['reason']; + if (reason === null || reason === undefined || reason === '' || + (typeof reason === 'string' && reason.trim() === '')) { + return -; + } + } + } + + // 有内容需要展示,显示 {...} 点击查看详情 return ( - {jsonString} - +
+ +
} title={t("dataCleansing.detail.fileTable.resultDetail")} trigger="click" > - - {displayText} + + {"{...}"}
); } catch { - const displayText = text.length > 30 ? text.substring(0, 30) + '...' : text; + // 普通字符串:如果为空则不展示 + if (text.trim() === '') { + return -; + } + // 有内容的普通字符串,点击查看详情(支持换行) + const lines = text.split('\n'); return ( {text}} + content={ +
+ {lines.map((line, i) => ( + + {line} + {i < lines.length - 1 &&
} +
+ ))} +
+ } title={t("dataCleansing.detail.fileTable.resultDetail")} trigger="click" - disabled={text.length <= 30} > - - {displayText} + + {"{...}"}
); } - }, + } }, { title: t("dataCleansing.detail.fileTable.status"), @@ -403,7 +506,7 @@ export default function FileTable({result, fetchTaskResult}) { dataSource={result} pagination={{ pageSize: 10, showSizeChanger: true }} size="middle" - rowKey="id" + rowKey={(record) => record.srcFileId || record.instanceId} /> {/* 文件对比弹窗 */} diff --git a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx index b474f412d..7080803bb 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx @@ -1,40 +1,54 @@ -import { useEffect, useState, useRef } from "react"; +import { useEffect, useState, useRef, useMemo } from "react"; import { useParams } from "react-router"; import { FileClock, Download } from "lucide-react"; import { useTranslation } from "react-i18next"; import { App } from "antd"; +import VirtualList from 'rc-virtual-list'; import { streamCleaningTaskLog, downloadCleaningTaskLog } from "../../cleansing.api"; +import { TaskStatus } from "../../cleansing.model"; interface LogEntry { level: string; message: string; } -export default function LogsTable({ taskLog: initialLogs, fetchTaskLog, retryCount, taskName }: { taskLog: LogEntry[], fetchTaskLog: () => Promise, retryCount: number, taskName: string }) { +interface LogEntryWithIndex extends LogEntry { + index: number; +} + +export default function LogsTable({ + taskLog: initialLogs, + fetchTaskLog, + retryCount, + taskName, + taskStatus +}: { + taskLog: LogEntry[], + fetchTaskLog: () => Promise, + retryCount: number, + taskName: string, + taskStatus?: TaskStatus +}) { const { id = "" } = useParams(); const { t } = useTranslation(); const [selectedLog, setSelectedLog] = useState(retryCount + 1); const [streamingLogs, setStreamingLogs] = useState([]); const [isStreaming, setIsStreaming] = useState(false); - const logContainerRef = useRef(null); const eventSourceRef = useRef(null); const { message } = App.useApp(); + // Only stream when task is RUNNING and viewing the latest run + const shouldStream = taskStatus === TaskStatus.RUNNING && selectedLog - 1 === retryCount; + useEffect(() => { - if (selectedLog - 1 === retryCount) { + if (shouldStream) { startStreaming(); } else { stopStreaming(); fetchTaskLog(selectedLog - 1); } return () => stopStreaming(); - }, [id, selectedLog, retryCount]); - - useEffect(() => { - if (logContainerRef.current && isStreaming) { - logContainerRef.current.scrollTop = logContainerRef.current.scrollHeight; - } - }, [streamingLogs, isStreaming]); + }, [id, selectedLog, retryCount, shouldStream]); const startStreaming = () => { stopStreaming(); @@ -73,7 +87,20 @@ export default function LogsTable({ taskLog: initialLogs, fetchTaskLog, retryCou setIsStreaming(false); }; - const displayLogs = selectedLog - 1 === retryCount ? streamingLogs : initialLogs; + // Use streaming logs only when actively streaming, otherwise use initial logs + const displayLogs = isStreaming ? streamingLogs : initialLogs; + + // Add index to logs for virtual list key + const logsWithIndex: LogEntryWithIndex[] = useMemo(() => { + return (displayLogs || []).map((log, index) => ({ ...log, index })); + }, [displayLogs]); + + // Get log level color class + const getLevelClass = (level: string) => { + if (level === "ERROR" || level === "FATAL") return "text-red-500"; + if (level === "WARNING" || level === "WARN") return "text-yellow-500"; + return "text-green-500"; + }; const handleSelectChange = (value: number) => { setSelectedLog(value); @@ -130,29 +157,24 @@ export default function LogsTable({ taskLog: initialLogs, fetchTaskLog, retryCou
-
- {displayLogs?.map?.((log, index) => ( -
- - [{log.level}] - - {log.message} -
- ))} +
+ + {(log: LogEntryWithIndex) => ( +
+ + [{log.level}] + + {log.message} +
+ )} +
{isStreaming && ( -
- [INFO] +
...
)} diff --git a/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx index d03887c9a..dfe4bf62f 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx @@ -114,18 +114,24 @@ export default function OperatorTable({ task }: { task: any }) { {t("dataCleansing.detail.operatorTable.description")} - ({ - id: item?.id, - name: item?.name, - startTime: new Date(task?.startedAt).toLocaleTimeString(), - endTime: task?.finishedAt - ? new Date(task.finishedAt).toLocaleTimeString() - : '-', - duration: task.duration, - status: task.status.label, - processedFiles: task.progress.finishedFileNum, - successRate: task?.progress.successRate, - }))} pagination={false} size="middle" /> +
({ + id: item?.id, + name: item?.name, + startTime: new Date(task?.startedAt).toLocaleTimeString(), + endTime: task?.finishedAt + ? new Date(task.finishedAt).toLocaleTimeString() + : '-', + duration: task.duration, + status: task.status.label, + processedFiles: task.progress.finishedFileNum, + successRate: task?.progress.successRate, + }))} + rowKey="id" + pagination={false} + size="middle" + /> diff --git a/frontend/src/pages/DataCleansing/cleansing.const.tsx b/frontend/src/pages/DataCleansing/cleansing.const.tsx index 5a27a3523..2442fa9ba 100644 --- a/frontend/src/pages/DataCleansing/cleansing.const.tsx +++ b/frontend/src/pages/DataCleansing/cleansing.const.tsx @@ -14,6 +14,7 @@ import { CheckCircleOutlined, AlertOutlined, PauseCircleOutlined, + WarningOutlined, } from "@ant-design/icons"; import { BrushCleaning, Layout } from "lucide-react"; @@ -37,6 +38,12 @@ export function getTaskStatusMap(t: (key: string) => string) { color: "green", icon: , }, + [TaskStatus.PARTIAL_SUCCESS]: { + label: t("dataCleansing.status.partialSuccess"), + value: TaskStatus.PARTIAL_SUCCESS, + color: "yellow", + icon: , + }, [TaskStatus.FAILED]: { label: t("dataCleansing.status.failed"), value: TaskStatus.FAILED, diff --git a/frontend/src/pages/DataCleansing/cleansing.model.ts b/frontend/src/pages/DataCleansing/cleansing.model.ts index 5a109b6ca..7b5bd1176 100644 --- a/frontend/src/pages/DataCleansing/cleansing.model.ts +++ b/frontend/src/pages/DataCleansing/cleansing.model.ts @@ -56,6 +56,7 @@ export enum TaskStatus { PENDING = "PENDING", RUNNING = "RUNNING", COMPLETED = "COMPLETED", + PARTIAL_SUCCESS = "PARTIAL_SUCCESS", FAILED = "FAILED", STOPPED = "STOPPED", } diff --git a/frontend/src/pages/Layout/Sidebar.tsx b/frontend/src/pages/Layout/Sidebar.tsx index 8926451fd..b7c2d8fd9 100644 --- a/frontend/src/pages/Layout/Sidebar.tsx +++ b/frontend/src/pages/Layout/Sidebar.tsx @@ -168,7 +168,7 @@ const AsiderAndHeaderLayout = () => { height="100%" open={settingVisible} onClose={() => dispatch(hideSettings())} - bodyStyle={{ padding: 0 }} + styles={{ body: { padding: 0 } }} destroyOnHidden={true} > diff --git a/runtime/datamate-python/app/module/cleaning/repository/cleaning_result_repository.py b/runtime/datamate-python/app/module/cleaning/repository/cleaning_result_repository.py index a6aa62e30..d203246d4 100644 --- a/runtime/datamate-python/app/module/cleaning/repository/cleaning_result_repository.py +++ b/runtime/datamate-python/app/module/cleaning/repository/cleaning_result_repository.py @@ -1,6 +1,6 @@ from typing import List, Optional from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, delete +from sqlalchemy import select, delete, func from app.db.models.cleaning import CleaningResult from app.module.cleaning.schema import CleaningResultDto @@ -59,6 +59,18 @@ async def count_by_instance_id( return (completed, failed) + async def count_total_by_instance_id( + self, + db: AsyncSession, + instance_id: str + ) -> int: + """Count total results by instance ID using efficient SQL COUNT""" + query = select(func.count()).select_from(self.model).where( + self.model.instance_id == instance_id + ) + result = await db.scalar(query) + return result or 0 + async def delete_by_instance_id( self, db: AsyncSession, diff --git a/runtime/datamate-python/app/module/cleaning/schema/cleaning.py b/runtime/datamate-python/app/module/cleaning/schema/cleaning.py index 0571b29cd..48c344d16 100644 --- a/runtime/datamate-python/app/module/cleaning/schema/cleaning.py +++ b/runtime/datamate-python/app/module/cleaning/schema/cleaning.py @@ -8,6 +8,7 @@ class CleaningTaskStatus: PENDING = "PENDING" RUNNING = "RUNNING" COMPLETED = "COMPLETED" + PARTIAL_SUCCESS = "PARTIAL_SUCCESS" STOPPED = "STOPPED" FAILED = "FAILED" @@ -25,28 +26,37 @@ class OperatorInstanceDto(BaseResponseModel): class CleaningProcess(BaseResponseModel): - """Task progress information (matches Java version)""" - process: float = Field(..., description="Progress percentage") + """Task progress information""" + process: float = Field(..., description="Progress percentage (based on successful files)") successRate: float = Field(..., description="Success rate percentage") totalFileNum: int = Field(..., description="Total file count") succeedFileNum: int = Field(..., description="Succeeded file count") failedFileNum: int = Field(..., description="Failed file count") - finishedFileNum: int = Field(..., description="Finished file count") + finishedFileNum: int = Field(..., description="Successfully processed file count (excludes failed)") @classmethod def of(cls, total: int, succeed: int, failed: int) -> 'CleaningProcess': - """Create progress info (matches Java version logic)""" - finished_file_num = succeed + failed + """Create progress info + + - finishedFileNum: only counts successfully processed files (excludes failed) + - process: progress percentage based on successful files only + - successRate: percentage of finished files that succeeded + """ + # finished_file_num only counts successfully processed files + finished_file_num = succeed if total == 0: process = 0.0 else: - process = round(finished_file_num * 100.0 / total, 2) + # Progress only counts successful files + process = round(succeed * 100.0 / total, 2) - if finished_file_num == 0: + # Calculate total processed (succeed + failed) for success rate + total_processed = succeed + failed + if total_processed == 0: success_rate = 0.0 else: - success_rate = round(succeed * 100.0 / finished_file_num, 2) + success_rate = round(succeed * 100.0 / total_processed, 2) return cls( process=process, diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index 96ea61660..59b8fcf72 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -76,9 +76,12 @@ async def get_tasks( return tasks async def _set_process(self, db: AsyncSession, task: CleaningTaskDto) -> None: - """Set task progress""" + """Set task progress using actual results from database""" completed, failed = await self.result_repo.count_by_instance_id(db, task.id) - task.progress = CleaningProcess.of(task.file_count or 0, completed, failed) + # Use actual total from database (t_clean_result table), fallback to task.file_count + actual_total = await self.result_repo.count_total_by_instance_id(db, task.id) + total = max(actual_total, task.file_count or 0) + task.progress = CleaningProcess.of(total, completed, failed) async def count_tasks( self, diff --git a/runtime/python-executor/datamate/sql_manager/persistence_atction.py b/runtime/python-executor/datamate/sql_manager/persistence_atction.py index dfceb78e7..8ee96fd2a 100644 --- a/runtime/python-executor/datamate/sql_manager/persistence_atction.py +++ b/runtime/python-executor/datamate/sql_manager/persistence_atction.py @@ -39,7 +39,7 @@ def update_task_result(self, sample, file_id = None): file_name = str(sample.get("fileName")) status = str(sample.get("execute_status")) - failed_reason = str(sample.get("failed_reason")) + failed_reason = json.dumps(sample.get("failed_reason"), ensure_ascii=False) result_data = { "instance_id": instance_id, "src_file_id": src_file_id, @@ -154,15 +154,58 @@ def update_result(self, dataset_id, instance_id, status): update_dataset_sql = str(self.sql_dict.get("update_dataset_sql")) self.insert_result(dataset_data, update_dataset_sql) + # Determine final status based on file results + final_status = self._determine_task_status(instance_id, status) + task_data = { "task_id": instance_id, - "status": status, + "status": final_status, "total_size": total_size, "finished_time": datetime.now() } update_task_sql = str(self.sql_dict.get("update_task_sql")) self.insert_result(task_data, update_task_sql) + def _determine_task_status(self, instance_id: str, executor_status: str) -> str: + """Determine final task status based on file processing results""" + if executor_status == "FAILED": + return "FAILED" + + # Query file results to determine status + try: + with SQLManager.create_connect() as conn: + # Count completed and failed files from t_clean_result (same table as progress calculation) + query = text(""" + SELECT + COUNT(CASE WHEN status = 'COMPLETED' THEN 1 END) as completed, + COUNT(CASE WHEN status = 'FAILED' THEN 1 END) as failed + FROM t_clean_result + WHERE instance_id = :instance_id + """) + result = conn.execute(query, {"instance_id": instance_id}) + row = result.fetchone() + + if row: + completed = row[0] or 0 + failed = row[1] or 0 + + logger.info(f"Task {instance_id} file results: completed={completed}, failed={failed}") + + # Determine status based on file results + if completed > 0 and failed > 0: + final_status = "PARTIAL_SUCCESS" + elif completed > 0: + final_status = "COMPLETED" + else: + final_status = "FAILED" + + logger.info(f"Task {instance_id} final status: {final_status}") + return final_status + except Exception as e: + logger.warning(f"Failed to determine task status for {instance_id}: {e}") + + return executor_status + def query_task_info(self, instance_ids: list[str]): result = {} current_result = None diff --git a/runtime/python-executor/pyproject.toml b/runtime/python-executor/pyproject.toml index 1572f7c20..95a3035b1 100644 --- a/runtime/python-executor/pyproject.toml +++ b/runtime/python-executor/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "uvicorn[standard]>=0.38.0", "sqlalchemy>=2.0.44", "psycopg2-binary>=2.9.11", + "requests>=2.32.0", ] [build-system] From 32426657e6900092a9b6458b3ce86a69e9efaaa1 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Wed, 25 Mar 2026 11:20:43 +0800 Subject: [PATCH 05/10] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E9=83=A8?= =?UTF-8?q?=E5=88=86=E6=88=90=E5=8A=9F=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pages/DataCleansing/Detail/TaskDetail.tsx | 4 +- .../Detail/components/OperatorTable.tsx | 42 +++++++++++-------- .../Home/components/TaskList.tsx | 21 ++++------ .../pages/DataCleansing/cleansing.const.tsx | 27 ++++-------- 4 files changed, 41 insertions(+), 53 deletions(-) diff --git a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx index d6e772299..bae3c10f9 100644 --- a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx +++ b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx @@ -134,7 +134,7 @@ export default function CleansingTaskDetail() { ]; const operations = [ - ...(task?.status === TaskStatus.RUNNING + ...(task?.status?.value === TaskStatus.RUNNING ? [ { key: "pause", @@ -148,7 +148,7 @@ export default function CleansingTaskDetail() { ? [ { key: "start", - label: t("dataCleansing.actions.retryTask"), + label: t("dataCleansing.actions.start"), icon: , onClick: startTask, }, diff --git a/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx index dfe4bf62f..c7d127bbc 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx @@ -3,6 +3,7 @@ import {useNavigate} from "react-router"; import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/Card" import { GitBranch } from "lucide-react"; import { useTranslation } from "react-i18next"; +import { TaskStatus } from "@/pages/DataCleansing/cleansing.model"; export default function OperatorTable({ task }: { task: any }) { const navigate = useNavigate(); @@ -83,23 +84,27 @@ export default function OperatorTable({ task }: { task: any }) { dataIndex: "status", key: "status", filters: [ - { text: t("dataCleansing.detail.operatorTable.completed"), value: t("dataCleansing.detail.operatorTable.completed") }, - { text: t("dataCleansing.detail.operatorTable.failed"), value: t("dataCleansing.detail.operatorTable.failed") }, - { text: t("dataCleansing.detail.operatorTable.running"), value: t("dataCleansing.detail.operatorTable.running") }, + { text: t("dataCleansing.detail.operatorTable.completed"), value: "completed" }, + { text: t("dataCleansing.detail.operatorTable.failed"), value: "failed" }, + { text: t("dataCleansing.detail.operatorTable.running"), value: "running" }, + { text: t("dataCleansing.detail.operatorTable.partialSuccess"), value: "partialSuccess" }, ], - onFilter: (value: string, record: any) => record.status === value, - render: (status: string) => ( - - ), + onFilter: (value: string, record: any) => record.statusValue === value, + render: (statusObj: { label: string; value: TaskStatus }) => { + let badgeStatus: "default" | "processing" | "success" | "error" | "warning" = "default"; + + if (statusObj?.value === TaskStatus.COMPLETED) { + badgeStatus = "success"; + } else if (statusObj?.value === TaskStatus.RUNNING) { + badgeStatus = "processing"; + } else if (statusObj?.value === TaskStatus.PARTIAL_SUCCESS) { + badgeStatus = "warning"; + } else if (statusObj?.value === TaskStatus.FAILED) { + badgeStatus = "error"; + } + + return ; + }, }, ] @@ -114,7 +119,7 @@ export default function OperatorTable({ task }: { task: any }) { {t("dataCleansing.detail.operatorTable.description")} -
({ id: item?.id, @@ -124,7 +129,8 @@ export default function OperatorTable({ task }: { task: any }) { ? new Date(task.finishedAt).toLocaleTimeString() : '-', duration: task.duration, - status: task.status.label, + status: task.status, + statusValue: task.status.value, processedFiles: task.progress.finishedFileNum, successRate: task?.progress.successRate, }))} diff --git a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx index b7895b2ee..60c39f45f 100644 --- a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx +++ b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx @@ -101,37 +101,32 @@ export default function TaskList() { const isRunning = record.status?.value === TaskStatus.RUNNING; const showStart = [ TaskStatus.PENDING, + TaskStatus.PARTIAL_SUCCESS, TaskStatus.FAILED, TaskStatus.STOPPED, ].includes(record.status?.value); - const isComplete = record.status?.value === TaskStatus.COMPLETED; const pauseBtn = { key: "pause", label: t("dataCleansing.actions.pause"), - icon: isRunning ? : , - onClick: pauseTask, // implement pause/play logic + icon: , + onClick: pauseTask, }; const startBtn = { key: "start", label: t("dataCleansing.actions.start"), - icon: isRunning ? : , - disabled: isComplete, - onClick: startTask, // implement pause/play logic + icon: , + onClick: startTask, }; return [ - ...(isRunning - ? [ pauseBtn ] - : []), - ...(showStart || isComplete - ? [ startBtn ] - : []), + ...(isRunning ? [pauseBtn] : []), + ...(showStart ? [startBtn] : []), { key: "delete", label: t("dataCleansing.actions.delete"), danger: true, icon: , - onClick: deleteTask, // implement delete logic + onClick: deleteTask, }, ]; }; diff --git a/frontend/src/pages/DataCleansing/cleansing.const.tsx b/frontend/src/pages/DataCleansing/cleansing.const.tsx index 2442fa9ba..111503cdb 100644 --- a/frontend/src/pages/DataCleansing/cleansing.const.tsx +++ b/frontend/src/pages/DataCleansing/cleansing.const.tsx @@ -8,14 +8,7 @@ import { formatDateTime, formatExecutionDuration, } from "@/utils/unit"; -import { - ClockCircleOutlined, - PlayCircleOutlined, - CheckCircleOutlined, - AlertOutlined, - PauseCircleOutlined, - WarningOutlined, -} from "@ant-design/icons"; + import { BrushCleaning, Layout } from "lucide-react"; export function getTaskStatusMap(t: (key: string) => string) { @@ -23,38 +16,32 @@ export function getTaskStatusMap(t: (key: string) => string) { [TaskStatus.PENDING]: { label: t("dataCleansing.status.pending"), value: TaskStatus.PENDING, - color: "gray", - icon: , + color: "default", }, [TaskStatus.RUNNING]: { label: t("dataCleansing.status.running"), value: TaskStatus.RUNNING, - color: "blue", - icon: , + color: "processing", }, [TaskStatus.COMPLETED]: { label: t("dataCleansing.status.completed"), value: TaskStatus.COMPLETED, - color: "green", - icon: , + color: "success", }, [TaskStatus.PARTIAL_SUCCESS]: { label: t("dataCleansing.status.partialSuccess"), value: TaskStatus.PARTIAL_SUCCESS, - color: "yellow", - icon: , + color: "warning", }, [TaskStatus.FAILED]: { label: t("dataCleansing.status.failed"), value: TaskStatus.FAILED, - color: "red", - icon: , + color: "error", }, [TaskStatus.STOPPED]: { label: t("dataCleansing.status.stopped"), value: TaskStatus.STOPPED, - color: "orange", - icon: , + color: "default", }, }; } From 9b31328c2e64b87592bdac2125b67d43fbb8069d Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 26 Mar 2026 20:29:07 +0800 Subject: [PATCH 06/10] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E9=83=A8?= =?UTF-8?q?=E5=88=86=E6=88=90=E5=8A=9F=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/public/config/error-code.json | 2 ++ .../DataCleansing/Home/components/TaskList.tsx | 18 +++++++++++------- frontend/vite.config.ts | 6 +++--- .../app/core/exception/codes.py | 3 +++ .../cleaning/service/cleaning_task_service.py | 12 ++++++++++++ 5 files changed, 31 insertions(+), 10 deletions(-) diff --git a/frontend/public/config/error-code.json b/frontend/public/config/error-code.json index 6eb1c30a4..43bde7e5a 100644 --- a/frontend/public/config/error-code.json +++ b/frontend/public/config/error-code.json @@ -14,6 +14,8 @@ "cleaning.0009": "设置解析错误", "cleaning.0010": "任务ID不能为空", "cleaning.0011": "无法删除预制模板", + "cleaning.0012": "清洗任务日志文件不存在", + "cleaning.0013": "任务状态无效,无法执行此操作", "operator.0001": "算子不存在", "operator.0002": "算子被编排于模版中或处在正在进行的任务中,无法删除", "operator.0003": "无法删除预置算子", diff --git a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx index 60c39f45f..e12563bee 100644 --- a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx +++ b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx @@ -118,16 +118,20 @@ export default function TaskList() { icon: , onClick: startTask, }; + + const deleteBtn = { + key: "delete", + label: t("dataCleansing.actions.delete"), + icon: , + danger: true, + disabled: isRunning, // 运行中的任务禁用删除按钮 + onClick: deleteTask, + }; + return [ ...(isRunning ? [pauseBtn] : []), ...(showStart ? [startBtn] : []), - { - key: "delete", - label: t("dataCleansing.actions.delete"), - danger: true, - icon: , - onClick: deleteTask, - }, + deleteBtn, ]; }; diff --git a/frontend/vite.config.ts b/frontend/vite.config.ts index 1c69395cb..e8a58c53e 100644 --- a/frontend/vite.config.ts +++ b/frontend/vite.config.ts @@ -15,7 +15,7 @@ export default defineConfig({ host: "0.0.0.0", proxy: (() => { const pythonProxyConfig = { - target: "http://localhost:18000", + target: "http://localhost:32033", changeOrigin: true, secure: false, configure: (proxy: { on: (event: string, handler: (arg: unknown) => void) => void }) => { @@ -32,7 +32,7 @@ export default defineConfig({ }; const javaProxyConfig = { - target: "http://localhost:8080", + target: "http://localhost:32033", changeOrigin: true, secure: false, configure: (proxy: { on: (event: string, handler: (arg: unknown) => void) => void }) => { @@ -56,7 +56,7 @@ export default defineConfig({ const proxy: Record = {}; // SSE 端点需要禁用缓冲 proxy["/api/cleaning"] = { - target: "http://localhost:8080", + target: "http://localhost:32033", changeOrigin: true, secure: false, configure: (proxy: { on: (event: string, handler: (arg: unknown) => void) => void }) => { diff --git a/runtime/datamate-python/app/core/exception/codes.py b/runtime/datamate-python/app/core/exception/codes.py index 9eaef93a9..5ef9d56eb 100644 --- a/runtime/datamate-python/app/core/exception/codes.py +++ b/runtime/datamate-python/app/core/exception/codes.py @@ -183,6 +183,9 @@ def __init__(self): CLEANING_TASK_LOG_NOT_FOUND: Final = ErrorCode( "cleaning.0012", "Cleaning task log file not found", 404 ) + CLEANING_TASK_STATUS_INVALID: Final = ErrorCode( + "cleaning.0013", "Cleaning task status is invalid for this operation", 400 + ) # ========== 算子市场模块 ========== OPERATOR_NOT_FOUND: Final = ErrorCode("operator.0001", "Operator not found", 404) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index 59b8fcf72..9a71fcc53 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -423,10 +423,22 @@ async def delete_task(self, db: AsyncSession, task_id: str) -> None: """Delete task""" self.validator.check_task_id(task_id) + task = await self.task_repo.find_task_by_id(db, task_id) + if not task: + raise BusinessError(ErrorCodes.CLEANING_TASK_NOT_FOUND, task_id) + + # 运行中的任务无法删除 + if task.status == CleaningTaskStatus.RUNNING: + raise BusinessError( + ErrorCodes.CLEANING_TASK_STATUS_INVALID, + "Task is running, cannot be deleted. Please stop the task first." + ) + await self.task_repo.delete_task_by_id(db, task_id) await self.operator_instance_repo.delete_by_instance_id(db, task_id) await self.result_repo.delete_by_instance_id(db, task_id) + # 删除任务相关文件 task_path = Path(f"{FLOW_PATH}/{task_id}") if task_path.exists(): try: From 0ce9c8019f0001badf5cf78bd5c929b6cc7269a1 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Fri, 27 Mar 2026 10:00:22 +0800 Subject: [PATCH 07/10] =?UTF-8?q?worker=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E5=AE=9E=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deployment/helm/datamate/values.yaml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/deployment/helm/datamate/values.yaml b/deployment/helm/datamate/values.yaml index 78570f4fb..c14306ad9 100644 --- a/deployment/helm/datamate/values.yaml +++ b/deployment/helm/datamate/values.yaml @@ -337,6 +337,10 @@ ray-cluster: key: DB_PASSWORD - name: PG_DATABASE value: "datamate" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name resources: limits: cpu: "8" @@ -352,7 +356,7 @@ ray-cluster: volumeMounts: - mountPath: /tmp/ray name: log-volume - subPath: ray/worker + subPath: ray/$(POD_NAME) - mountPath: /dataset name: dataset-volume - mountPath: /flow From a6dadc61ec051437a571769821e88e2d0e03cea7 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Fri, 27 Mar 2026 10:04:38 +0800 Subject: [PATCH 08/10] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E9=83=A8?= =?UTF-8?q?=E5=88=86=E6=88=90=E5=8A=9F=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/i18n/locales/zh/common.json | 2 +- .../DataCleansing/Home/components/TaskList.tsx | 4 ---- frontend/src/utils/request.ts | 13 +++++++++---- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/frontend/src/i18n/locales/zh/common.json b/frontend/src/i18n/locales/zh/common.json index 0e3b71471..8e5a6bc75 100644 --- a/frontend/src/i18n/locales/zh/common.json +++ b/frontend/src/i18n/locales/zh/common.json @@ -1526,7 +1526,7 @@ "currentDisplay": "当前展示: 第 {{num}} 次", "nthRun": "第 {{num}} 次", "noLogs": "当前任务无可用日志", - "streaming": "实时流式输出中...", + "streaming": "实时输出中...", "download": "下载日志", "downloadFailed": "下载日志文件失败" }, diff --git a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx index e12563bee..a573abf46 100644 --- a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx +++ b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx @@ -93,10 +93,6 @@ export default function TaskList() { setDeleteModal({ visible: false, taskId: "", taskName: "" }); }; - useEffect(() => { - fetchData(); - }, [t]); - const taskOperations = (record: CleansingTask) => { const isRunning = record.status?.value === TaskStatus.RUNNING; const showStart = [ diff --git a/frontend/src/utils/request.ts b/frontend/src/utils/request.ts index 36f4edefa..5ae883b42 100644 --- a/frontend/src/utils/request.ts +++ b/frontend/src/utils/request.ts @@ -2,9 +2,6 @@ import {message} from "antd"; import Loading from "./loading"; import {errorConfigStore} from "@/utils/errorConfigStore.ts"; import i18n from "@/i18n"; -import i18n from "@/i18n"; -import i18n from "@/i18n"; -import i18n from "@/i18n"; /** * 通用请求工具类 @@ -218,7 +215,7 @@ class Request { * 处理XHR响应 */ async handleXHRResponse(xhrResponse, config) { - // 模拟fetch响应格式用于拦截器 + // 模拟fetch响应格式用于拦截器(添加 clone/json 方法) const mockResponse = { ok: xhrResponse.status >= 200 && xhrResponse.status < 300, status: xhrResponse.status, @@ -226,6 +223,14 @@ class Request { headers: { get: (key) => xhrResponse.xhr.getResponseHeader(key), }, + data: xhrResponse.data, + clone: () => mockResponse, + json: async () => { + if (typeof xhrResponse.data === "string") { + return JSON.parse(xhrResponse.data); + } + return xhrResponse.data; + }, }; // 执行响应拦截器 From 4641fdd1ef0b2846976c5a3b3ee7a8b4b2f6195d Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Fri, 27 Mar 2026 10:40:49 +0800 Subject: [PATCH 09/10] =?UTF-8?q?fix:=20=E4=BC=98=E5=8C=96sql=E6=80=A7?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Home/components/TaskList.tsx | 5 ++- .../datamate-python/app/db/models/cleaning.py | 12 ++++- .../repository/cleaning_result_repository.py | 45 ++++++++++++++----- .../repository/cleaning_task_repository.py | 21 +++++++++ .../cleaning/service/cleaning_task_service.py | 16 +++++-- 5 files changed, 82 insertions(+), 17 deletions(-) diff --git a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx index a573abf46..d1d26cce3 100644 --- a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx +++ b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx @@ -212,9 +212,12 @@ export default function TaskList() { key: "process", width: 150, render: (_, record: CleansingTask) => { - if (record?.status?.value == TaskStatus.FAILED) { + if (record?.status?.value === TaskStatus.FAILED) { return ; } + if (record?.status?.value === TaskStatus.PARTIAL_SUCCESS) { + return ; + } return ; }, }, diff --git a/runtime/datamate-python/app/db/models/cleaning.py b/runtime/datamate-python/app/db/models/cleaning.py index c2965be96..e2288f705 100644 --- a/runtime/datamate-python/app/db/models/cleaning.py +++ b/runtime/datamate-python/app/db/models/cleaning.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, String, BigInteger, Integer, TIMESTAMP +from sqlalchemy import Column, String, BigInteger, Integer, TIMESTAMP, Index from app.db.models.base_entity import BaseEntity, Base @@ -21,6 +21,11 @@ class CleaningTask(BaseEntity): started_at = Column(TIMESTAMP, nullable=True, comment="Task start time") finished_at = Column(TIMESTAMP, nullable=True, comment="Task finish time") + __table_args__ = ( + Index("ix_clean_task_status", "status"), + Index("ix_clean_task_created_at", "created_at"), + ) + class CleaningTemplate(BaseEntity): """Data cleaning template entity""" @@ -47,6 +52,11 @@ class CleaningResult(Base): status = Column(String(50), nullable=True, comment="Cleaning status: COMPLETED, FAILED, etc.") result = Column(String(1024), nullable=True, comment="Cleaning result message") + __table_args__ = ( + Index("ix_clean_result_instance_id", "instance_id"), + Index("ix_clean_result_instance_status", "instance_id", "status"), + ) + class OperatorInstance(Base): """Operator instance in task or template""" diff --git a/runtime/datamate-python/app/module/cleaning/repository/cleaning_result_repository.py b/runtime/datamate-python/app/module/cleaning/repository/cleaning_result_repository.py index d203246d4..e03a70c29 100644 --- a/runtime/datamate-python/app/module/cleaning/repository/cleaning_result_repository.py +++ b/runtime/datamate-python/app/module/cleaning/repository/cleaning_result_repository.py @@ -1,6 +1,6 @@ from typing import List, Optional from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, delete, func +from sqlalchemy import select, delete, func, Integer from app.db.models.cleaning import CleaningResult from app.module.cleaning.schema import CleaningResultDto @@ -48,16 +48,14 @@ async def count_by_instance_id( db: AsyncSession, instance_id: str ) -> tuple[int, int]: - """Count results by instance ID (completed, failed)""" - total_query = select(self.model).where(self.model.instance_id == instance_id) - completed_query = total_query.where(self.model.status == "COMPLETED") - failed_query = total_query.where(self.model.status == "FAILED") - - total = len((await db.execute(total_query)).scalars().all()) - completed = len((await db.execute(completed_query)).scalars().all()) - failed = len((await db.execute(failed_query)).scalars().all()) - - return (completed, failed) + """Count results by instance ID (completed, failed) using single SQL""" + query = select( + func.sum(func.cast(self.model.status == "COMPLETED", Integer)).label("completed"), + func.sum(func.cast(self.model.status == "FAILED", Integer)).label("failed"), + ).where(self.model.instance_id == instance_id) + result = await db.execute(query) + row = result.one() + return (row.completed or 0, row.failed or 0) async def count_total_by_instance_id( self, @@ -71,6 +69,31 @@ async def count_total_by_instance_id( result = await db.scalar(query) return result or 0 + async def batch_count_by_instance_ids( + self, + db: AsyncSession, + instance_ids: list[str] + ) -> dict[str, tuple[int, int, int]]: + """Batch count completed/failed/total for multiple instance IDs using single SQL. + + Returns dict: {instance_id: (completed, failed, total)} + """ + if not instance_ids: + return {} + + query = select( + self.model.instance_id, + func.sum(func.cast(self.model.status == "COMPLETED", Integer)).label("completed"), + func.sum(func.cast(self.model.status == "FAILED", Integer)).label("failed"), + func.count().label("total"), + ).where(self.model.instance_id.in_(instance_ids)).group_by(self.model.instance_id) + + result = await db.execute(query) + return { + row.instance_id: (row.completed or 0, row.failed or 0, row.total or 0) + for row in result.all() + } + async def delete_by_instance_id( self, db: AsyncSession, diff --git a/runtime/datamate-python/app/module/cleaning/repository/cleaning_task_repository.py b/runtime/datamate-python/app/module/cleaning/repository/cleaning_task_repository.py index f136ffa53..f4c6b50ed 100644 --- a/runtime/datamate-python/app/module/cleaning/repository/cleaning_task_repository.py +++ b/runtime/datamate-python/app/module/cleaning/repository/cleaning_task_repository.py @@ -133,6 +133,27 @@ async def delete_task_by_id(self, db: AsyncSession, task_id: str) -> None: await db.execute(query) await db.flush() + async def count_tasks( + self, + db: AsyncSession, + status: Optional[str] = None, + keyword: Optional[str] = None + ) -> int: + """Count cleaning tasks using SQL COUNT""" + query = select(func.count()).select_from(self.model) + + if status: + query = query.where(self.model.status == status) + + if keyword: + keyword_pattern = f"%{keyword}%" + query = query.where( + self.model.name.ilike(keyword_pattern) | self.model.description.ilike(keyword_pattern) + ) + + result = await db.scalar(query) + return result or 0 + async def is_name_exist(self, db: AsyncSession, name: str) -> bool: """Check if task name exists""" query = select(func.count()).select_from(self.model).where(self.model.name == name) diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py index 9a71fcc53..93ddd3a3f 100644 --- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py +++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py @@ -70,8 +70,17 @@ async def get_tasks( """Get cleaning tasks""" tasks = await self.task_repo.find_tasks(db, status, keyword, page, size) + if not tasks: + return tasks + + # Batch query progress for all tasks in a single SQL (avoids N+1) + task_ids = [task.id for task in tasks] + progress_map = await self.result_repo.batch_count_by_instance_ids(db, task_ids) + for task in tasks: - await self._set_process(db, task) + completed, failed, actual_total = progress_map.get(task.id, (0, 0, 0)) + total = max(actual_total, task.file_count or 0) + task.progress = CleaningProcess.of(total, completed, failed) return tasks @@ -89,9 +98,8 @@ async def count_tasks( status: str | None = None, keyword: str | None = None, ) -> int: - """Count cleaning tasks""" - tasks = await self.task_repo.find_tasks(db, status, keyword, None, None) - return len(tasks) + """Count cleaning tasks using SQL COUNT""" + return await self.task_repo.count_tasks(db, status, keyword) async def get_task(self, db: AsyncSession, task_id: str) -> CleaningTaskDto: """Get task by ID""" From 535fae677165ca92e59a492a698aa5426c5189c8 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Fri, 27 Mar 2026 11:35:04 +0800 Subject: [PATCH 10/10] =?UTF-8?q?fix:=20=E4=BC=98=E5=8C=96sql=E6=80=A7?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/public/config/error-code.json | 4 +-- frontend/src/i18n/locales/en/common.json | 3 ++ frontend/src/i18n/locales/zh/common.json | 3 ++ .../Create/hooks/useOperatorOperations.ts | 28 ++++++++++++++++++- .../Create/components/ConfigureStep.tsx | 23 +++++++++++++-- .../service/cleaning_task_validator.py | 23 +++++++++++---- 6 files changed, 72 insertions(+), 12 deletions(-) diff --git a/frontend/public/config/error-code.json b/frontend/public/config/error-code.json index 43bde7e5a..d5dd78969 100644 --- a/frontend/public/config/error-code.json +++ b/frontend/public/config/error-code.json @@ -17,7 +17,7 @@ "cleaning.0012": "清洗任务日志文件不存在", "cleaning.0013": "任务状态无效,无法执行此操作", "operator.0001": "算子不存在", - "operator.0002": "算子被编排于模版中或处在正在进行的任务中,无法删除", + "operator.0002": "算子被编排于模版中或处在尚未结束的任务中,无法删除", "operator.0003": "无法删除预置算子", "operator.0004": "不支持的文件类型,当前仅支持tar和zip", "operator.0005": "解析算子包失败", @@ -51,4 +51,4 @@ "404": "请求的资源不存在", "500": "服务器内部错误,请稍后重试", "502": "网关错误" -} \ No newline at end of file +} diff --git a/frontend/src/i18n/locales/en/common.json b/frontend/src/i18n/locales/en/common.json index 34c895c83..bbbc00ad0 100644 --- a/frontend/src/i18n/locales/en/common.json +++ b/frontend/src/i18n/locales/en/common.json @@ -1458,6 +1458,9 @@ "deleteDesc": "Are you sure you want to delete \"{{itemName}}\"? This action cannot be undone." } }, + "create": { + "typeMismatch": "Type mismatch: previous operator outputs \"{{from}}\", but current operator requires \"{{to}}\"" + }, "template": { "columns": { "templateName": "Template Name", diff --git a/frontend/src/i18n/locales/zh/common.json b/frontend/src/i18n/locales/zh/common.json index 8e5a6bc75..ce3283139 100644 --- a/frontend/src/i18n/locales/zh/common.json +++ b/frontend/src/i18n/locales/zh/common.json @@ -1458,6 +1458,9 @@ "deleteDesc": "删除「{{itemName}}」后将无法恢复,确定要执行此操作吗?" } }, + "create": { + "typeMismatch": "类型不兼容:上一个算子输出类型为「{{from}}」,当前算子输入类型为「{{to}}」" + }, "template": { "columns": { "templateName": "模板名称", diff --git a/frontend/src/pages/DataCleansing/Create/hooks/useOperatorOperations.ts b/frontend/src/pages/DataCleansing/Create/hooks/useOperatorOperations.ts index 25e67db22..fb9ccd58f 100644 --- a/frontend/src/pages/DataCleansing/Create/hooks/useOperatorOperations.ts +++ b/frontend/src/pages/DataCleansing/Create/hooks/useOperatorOperations.ts @@ -1,4 +1,4 @@ -import { useEffect, useState } from "react"; +import { useEffect, useState, useMemo } from "react"; import { OperatorI } from "@/pages/OperatorMarket/operator.model"; import { CleansingTemplate } from "../../cleansing.model"; import {queryCleaningTemplateByIdUsingGet, queryCleaningTemplatesUsingGet} from "../../cleansing.api"; @@ -8,9 +8,22 @@ import { } from "@/pages/OperatorMarket/operator.api"; import {useParams} from "react-router"; import i18n from "@/i18n"; +import { App } from "antd"; + +function checkTypeCompatible( + prevOutputs: string | undefined, + nextInputs: string | undefined +): boolean { + if (!prevOutputs || !nextInputs) return true; + const outputs = prevOutputs.toLowerCase().trim(); + const inputs = nextInputs.toLowerCase().trim(); + if (outputs === "multimodal" || inputs === "multimodal") return true; + return outputs === inputs; +} export function useOperatorOperations() { const { id = "" } = useParams(); + const { message } = App.useApp(); const [currentStep, setCurrentStep] = useState(1); const [operators, setOperators] = useState([]); @@ -115,6 +128,19 @@ export function useOperatorOperations() { selectedOperators.filter((op) => op.id !== operator.id) ); } else { + // Validate type compatibility with the last selected operator + if (selectedOperators.length > 0) { + const lastOp = selectedOperators[selectedOperators.length - 1]; + if (!checkTypeCompatible(lastOp.outputs, operator.inputs)) { + message.warning( + i18n.t("dataCleansing.create.typeMismatch", { + from: lastOp.outputs, + to: operator.inputs, + }) + ); + return; + } + } setSelectedOperators([...selectedOperators, { ...operator }]); } }; diff --git a/frontend/src/pages/OperatorMarket/Create/components/ConfigureStep.tsx b/frontend/src/pages/OperatorMarket/Create/components/ConfigureStep.tsx index e79df9ab2..aa957821b 100644 --- a/frontend/src/pages/OperatorMarket/Create/components/ConfigureStep.tsx +++ b/frontend/src/pages/OperatorMarket/Create/components/ConfigureStep.tsx @@ -1,10 +1,19 @@ -import { Alert, Input, Form } from "antd"; +import { Alert, Input, Form, Select } from "antd"; import TextArea from "antd/es/input/TextArea"; import React, { useEffect } from "react"; import { useTranslation } from "react-i18next"; import ParamConfig from "@/pages/DataCleansing/Create/components/ParamConfig.tsx"; import { ChevronRight, Plus, Trash2 } from "lucide-react"; import { MetricI } from "@/pages/OperatorMarket/operator.model.ts"; +import type { MediaType } from "@/pages/OperatorMarket/operator.const.tsx"; + +const MEDIA_TYPE_OPTIONS: { label: string; value: MediaType }[] = [ + { label: "Text", value: "text" }, + { label: "Image", value: "image" }, + { label: "Audio", value: "audio" }, + { label: "Video", value: "video" }, + { label: "Multimodal", value: "multimodal" }, +]; export default function ConfigureStep({ parsedInfo, @@ -127,14 +136,22 @@ export default function ConfigureStep({ name="inputs" rules={[{ required: true }]} > - + +