Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions runtime/python-executor/datamate/common/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,7 @@ def decrypt(enc_pass):
dec_pass = K.API().decrypt(0)
os.environ['KMC_PYTHON_ENCRYPT_DATA'] = ""
return dec_pass


def is_k8s():
return "KUBERNETES_SERVICE_HOST" in os.environ
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ async def submit(task_id, config_path):

await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'data_juicer_executor.py')} "
f"--config_path={config_path}")

def cancel(task_id):
return cmd_scheduler.cancel_task(task_id)
10 changes: 10 additions & 0 deletions runtime/python-executor/datamate/wrappers/datamate_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
# -*- coding: utf-8 -*-
import os

from datamate.common.utils import is_k8s
from datamate.scheduler import cmd_scheduler
from datamate.scheduler import ray_job_scheduler


async def submit(task_id, config_path, retry_count: int = 0):
current_dir = os.path.dirname(__file__)

if not is_k8s():
await cmd_scheduler.submit(task_id, f"python {os.path.join(current_dir, 'datamate_executor.py')} "
f"--config_path={config_path}")
return

script_path = os.path.join(current_dir, "datamate_executor.py")

# 根据 retry_count 设置日志路径
Expand All @@ -20,4 +28,6 @@ async def submit(task_id, config_path, retry_count: int = 0):


def cancel(task_id):
if not is_k8s():
return cmd_scheduler.cancel_task(task_id)
return ray_job_scheduler.cancel_task(task_id)
Loading