diff --git a/runtime/python-executor/datamate/common/utils/__init__.py b/runtime/python-executor/datamate/common/utils/__init__.py index 50bba4ef4..50e3b73ee 100644 --- a/runtime/python-executor/datamate/common/utils/__init__.py +++ b/runtime/python-executor/datamate/common/utils/__init__.py @@ -57,3 +57,7 @@ def decrypt(enc_pass): dec_pass = K.API().decrypt(0) os.environ['KMC_PYTHON_ENCRYPT_DATA'] = "" return dec_pass + + +def is_k8s(): + return "KUBERNETES_SERVICE_HOST" in os.environ diff --git a/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py b/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py index 09059a35b..06b3b40a3 100644 --- a/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py +++ b/runtime/python-executor/datamate/wrappers/data_juicer_wrapper.py @@ -9,3 +9,6 @@ async def submit(task_id, config_path): await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'data_juicer_executor.py')} " f"--config_path={config_path}") + +def cancel(task_id): + return cmd_scheduler.cancel_task(task_id) diff --git a/runtime/python-executor/datamate/wrappers/datamate_wrapper.py b/runtime/python-executor/datamate/wrappers/datamate_wrapper.py index 446208c4b..ff5a3804b 100644 --- a/runtime/python-executor/datamate/wrappers/datamate_wrapper.py +++ b/runtime/python-executor/datamate/wrappers/datamate_wrapper.py @@ -1,11 +1,19 @@ # -*- coding: utf-8 -*- import os +from datamate.common.utils import is_k8s +from datamate.scheduler import cmd_scheduler from datamate.scheduler import ray_job_scheduler async def submit(task_id, config_path, retry_count: int = 0): current_dir = os.path.dirname(__file__) + + if not is_k8s(): + await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'datamate_executor.py')} " + f"--config_path={config_path}") + return + script_path = os.path.join(current_dir, "datamate_executor.py") # 根据 retry_count 设置日志路径 @@ -20,4 +28,6 @@ async def submit(task_id, config_path, retry_count: int = 0): def cancel(task_id): + if not is_k8s(): + return cmd_scheduler.cancel_task(task_id) return ray_job_scheduler.cancel_task(task_id)