From 3547ef7418e2f2391ba21b15ed4ca7b472d69478 Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 2 Apr 2026 11:25:46 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=AE=E5=A4=8Ddocker=E4=B8=8B?= =?UTF-8?q?=E8=BF=90=E8=A1=8C=E5=A4=B1=E8=B4=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../python-executor/datamate/common/utils/__init__.py | 4 ++++ .../datamate/wrappers/data_juicer_wrapper.py | 3 +++ .../datamate/wrappers/datamate_wrapper.py | 10 ++++++++++ 3 files changed, 17 insertions(+) diff --git a/runtime/python-executor/datamate/common/utils/__init__.py b/runtime/python-executor/datamate/common/utils/__init__.py index 50bba4ef4..50e3b73ee 100644 --- a/runtime/python-executor/datamate/common/utils/__init__.py +++ b/runtime/python-executor/datamate/common/utils/__init__.py @@ -57,3 +57,7 @@ def decrypt(enc_pass): dec_pass = K.API().decrypt(0) os.environ['KMC_PYTHON_ENCRYPT_DATA'] = "" return dec_pass + + +def is_k8s(): + return "KUBERNETES_SERVICE_HOST" in os.environ diff --git a/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py b/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py index 09059a35b..06b3b40a3 100644 --- a/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py +++ b/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py @@ -9,3 +9,6 @@ async def submit(task_id, config_path): await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'data_juicer_executor.py')} " f"--config_path={config_path}") + +def cancel(task_id): + return cmd_scheduler.cancel_task(task_id) diff --git a/runtime/python-executor/datamate/wrappers/datamate_wrapper.py b/runtime/python-executor/datamate/wrappers/datamate_wrapper.py index 446208c4b..ff5a3804b 100644 --- a/runtime/python-executor/datamate/wrappers/datamate_wrapper.py +++ b/runtime/python-executor/datamate/wrappers/datamate_wrapper.py @@ -1,11 +1,19 @@ # -*- coding: utf-8 -*- import os +from datamate.common.utils import is_k8s +from datamate.scheduler import cmd_scheduler from datamate.scheduler import ray_job_scheduler async def submit(task_id, config_path, retry_count: int = 0): current_dir = os.path.dirname(__file__) + + if not is_k8s(): + await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'datamate_executor.py')} " + f"--config_path={config_path}") + return + script_path = os.path.join(current_dir, "datamate_executor.py") # 根据 retry_count 设置日志路径 @@ -20,4 +28,6 @@ async def submit(task_id, config_path, retry_count: int = 0): def cancel(task_id): + if not is_k8s(): + return cmd_scheduler.cancel_task(task_id) return ray_job_scheduler.cancel_task(task_id)