-
Notifications
You must be signed in to change notification settings - Fork 24
Limit concurrent workers in batch #873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
| import stat | ||
| import sys | ||
| import time | ||
| from numbers import Integral | ||
| from typing import Dict, List, Optional | ||
|
|
||
| import arc.parser.parser as parser | ||
|
|
@@ -51,16 +52,31 @@ class PipeRun: | |
| run_id (str): Unique identifier for this pipe run. | ||
| tasks (List[TaskSpec]): Task specifications to execute. | ||
| cluster_software (str): Cluster scheduler type. | ||
| max_workers (int): Maximum number of concurrent array workers. | ||
| max_workers (int): Maximum total array worker slots (array size). | ||
| max_concurrent (Optional[int]): Max workers running simultaneously | ||
| (like PBS ``%N``). Must be ``None`` (unthrottled) or a positive | ||
| integer; ``0``, negatives, and non-integers raise ``ValueError``. | ||
| max_attempts (int): Maximum retry attempts per task. | ||
| """ | ||
|
|
||
| @staticmethod | ||
| def _validate_max_concurrent(max_concurrent: Optional[int]) -> None: | ||
| """Accept ``None`` or a positive integer for throttle settings.""" | ||
| if max_concurrent is None: | ||
| return | ||
| if isinstance(max_concurrent, bool) or not isinstance(max_concurrent, Integral): | ||
| raise ValueError('PipeRun max_concurrent must be None or a positive integer.') | ||
| if max_concurrent > 0: | ||
| return | ||
| raise ValueError('PipeRun max_concurrent must be None or a positive integer.') | ||
|
|
||
| def __init__(self, | ||
| project_directory: str, | ||
| run_id: str, | ||
| tasks: List[TaskSpec], | ||
| cluster_software: str, | ||
| max_workers: int = 100, | ||
| max_concurrent: Optional[int] = None, | ||
| max_attempts: int = 3, | ||
| pipe_root: Optional[str] = None, | ||
| ): | ||
|
|
@@ -69,6 +85,8 @@ def __init__(self, | |
| self.tasks = tasks | ||
| self.cluster_software = cluster_software | ||
| self.max_workers = max_workers | ||
| self._validate_max_concurrent(max_concurrent) | ||
| self.max_concurrent = None if max_concurrent is None else int(max_concurrent) | ||
| self.max_attempts = max_attempts | ||
| self.pipe_root = pipe_root if pipe_root is not None \ | ||
| else os.path.join(project_directory, 'calcs', 'pipe_' + run_id) | ||
|
|
@@ -103,6 +121,7 @@ def _save_run_metadata(self) -> None: | |
| 'status': self.status.value, | ||
| 'cluster_software': self.cluster_software, | ||
| 'max_workers': self.max_workers, | ||
| 'max_concurrent': self.max_concurrent, | ||
| 'max_attempts': self.max_attempts, | ||
| 'task_family': task_family, | ||
| 'engine': engine, | ||
|
|
@@ -146,6 +165,7 @@ def from_dir(cls, pipe_root: str) -> 'PipeRun': | |
| tasks=tasks, | ||
| cluster_software=data['cluster_software'], | ||
| max_workers=data.get('max_workers', 100), | ||
| max_concurrent=data.get('max_concurrent'), | ||
| max_attempts=data.get('max_attempts', 3), | ||
| pipe_root=pipe_root, | ||
| ) | ||
|
|
@@ -192,12 +212,41 @@ def _submission_resources(self): | |
| Derive resource settings from the homogeneous task list. | ||
|
|
||
| Returns: | ||
| Tuple[int, int, int]: ``(cpus, memory_mb, array_size)`` | ||
| Tuple[int, int, int, Optional[int]]: | ||
| ``(cpus, memory_mb, array_size, throttle)`` where ``throttle`` | ||
| caps workers running simultaneously (clamped to ``array_size``), | ||
| or ``None`` if unthrottled. | ||
| """ | ||
| cpus = self.tasks[0].required_cores if self.tasks else 1 | ||
| memory_mb = self.tasks[0].required_memory_mb if self.tasks else 4096 | ||
| array_size = min(self.max_workers, len(self.tasks)) if self.tasks else self.max_workers | ||
| return cpus, memory_mb, array_size | ||
| throttle = None | ||
| if self.max_concurrent is not None: | ||
| throttle = min(self.max_concurrent, array_size) | ||
| return cpus, memory_mb, array_size, throttle | ||
|
Comment on lines
+223
to
+226
|
||
|
|
||
| def _render_throttle(self, array_size: int, throttle: Optional[int]) -> Dict[str, str]: | ||
| """ | ||
| Render scheduler-specific array-range and extra-directives strings. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add the Google-style docstring, explaining |
||
|
|
||
| SLURM/PBS encode the throttle as an inline ``%K`` suffix on the range. | ||
| SGE uses a separate ``-tc`` directive. HTCondor uses ``max_materialize`` | ||
| and takes a bare count (not a range) for ``queue``. | ||
| """ | ||
| cs = 'sge' if self.cluster_software == 'oge' else self.cluster_software | ||
| if cs == 'htcondor': | ||
| array_range = str(array_size) | ||
| extra = f'max_materialize = {throttle}' if throttle else '' | ||
| elif cs == 'sge': | ||
| array_range = f'1-{array_size}' | ||
| extra = f'#$ -tc {throttle}' if throttle else '' | ||
| elif cs in ('slurm', 'pbs'): | ||
| suffix = f'%{throttle}' if throttle else '' | ||
| array_range = f'1-{array_size}{suffix}' | ||
| extra = '' | ||
| else: | ||
| raise NotImplementedError(f'No throttle rendering for {self.cluster_software}') | ||
| return {'array_range': array_range, 'extra_directives': extra} | ||
|
|
||
| def write_submit_script(self) -> str: | ||
| """ | ||
|
|
@@ -215,7 +264,8 @@ def write_submit_script(self) -> str: | |
| raise NotImplementedError( | ||
| f'No pipe submit template for cluster software: {self.cluster_software}. ' | ||
| f'Available templates: {list(pipe_submit.keys())}') | ||
| cpus, memory_mb, array_size = self._submission_resources() | ||
| cpus, memory_mb, array_size, throttle = self._submission_resources() | ||
| throttle_fields = self._render_throttle(array_size, throttle) | ||
| server = servers_dict.get('local', {}) | ||
| queue, _ = next(iter(server.get('queues', {}).items()), ('', None)) | ||
| engine = self.tasks[0].engine if self.tasks else '' | ||
|
|
@@ -226,7 +276,8 @@ def write_submit_script(self) -> str: | |
| env_setup = f'{env_setup}\n{scratch_export}' if env_setup else scratch_export | ||
| content = pipe_submit[template_key].format( | ||
| name=f'pipe_{self.run_id}', | ||
| max_task_num=array_size, | ||
| array_range=throttle_fields['array_range'], | ||
| extra_directives=throttle_fields['extra_directives'], | ||
| pipe_root=self.pipe_root, | ||
| python_exe=sys.executable, | ||
| cpus=cpus, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when rebasing, need to up the typehints to the Py14 style