diff --git a/conf/experimental/megatron_bridge/test/b200/megatron_bridge_qwen_30b.toml b/conf/experimental/megatron_bridge/test/b200/megatron_bridge_qwen_30b.toml index c93eac45a..c67f918c4 100644 --- a/conf/experimental/megatron_bridge/test/b200/megatron_bridge_qwen_30b.toml +++ b/conf/experimental/megatron_bridge/test/b200/megatron_bridge_qwen_30b.toml @@ -28,9 +28,9 @@ mount_as = "/opt/Megatron-Bridge" [cmd_args] gpu_type = "b200" container_image = "nvcr.io#nvidia/nemo:25.11.01" -model_name = "qwen3" -model_size = "30b_a3b" -gpus_per_node = 8 +model_family_name = "qwen3" +model_recipe_name = "30b_a3b" +gpus_per_node = 4 num_gpus = 8 domain = "llm" task = "pretrain" diff --git a/conf/experimental/megatron_bridge/test/gb200/megatron_bridge_qwen_30b.toml b/conf/experimental/megatron_bridge/test/gb200/megatron_bridge_qwen_30b.toml index 8ed62ead3..8802bb4b7 100644 --- a/conf/experimental/megatron_bridge/test/gb200/megatron_bridge_qwen_30b.toml +++ b/conf/experimental/megatron_bridge/test/gb200/megatron_bridge_qwen_30b.toml @@ -28,8 +28,8 @@ mount_as = "/opt/Megatron-Bridge" [cmd_args] gpu_type = "gb200" container_image = "nvcr.io#nvidia/nemo:25.11.01" -model_name = "qwen3" -model_size = "30b_a3b" +model_family_name = "qwen3" +model_recipe_name = "30b_a3b" gpus_per_node = 4 num_gpus = 8 domain = "llm" diff --git a/conf/experimental/megatron_bridge/test/gb300/megatron_bridge_qwen_30b.toml b/conf/experimental/megatron_bridge/test/gb300/megatron_bridge_qwen_30b.toml index 85f30ca16..9fc2db746 100644 --- a/conf/experimental/megatron_bridge/test/gb300/megatron_bridge_qwen_30b.toml +++ b/conf/experimental/megatron_bridge/test/gb300/megatron_bridge_qwen_30b.toml @@ -28,8 +28,8 @@ mount_as = "/opt/Megatron-Bridge" [cmd_args] gpu_type = "gb300" container_image = "nvcr.io#nvidia/nemo:25.11.01" -model_name = "qwen3" -model_size = "30b_a3b" +model_family_name = "qwen3" +model_recipe_name = "30b_a3b" gpus_per_node = 4 num_gpus = 8 domain = "llm" diff --git a/conf/experimental/megatron_bridge/test/h100/megatron_bridge_qwen_30b.toml b/conf/experimental/megatron_bridge/test/h100/megatron_bridge_qwen_30b.toml index f8a397973..4a556fc84 100644 --- a/conf/experimental/megatron_bridge/test/h100/megatron_bridge_qwen_30b.toml +++ b/conf/experimental/megatron_bridge/test/h100/megatron_bridge_qwen_30b.toml @@ -28,8 +28,8 @@ mount_as = "/opt/Megatron-Bridge" [cmd_args] gpu_type = "h100" container_image = "nvcr.io#nvidia/nemo:25.11.01" -model_name = "qwen3" -model_size = "30b_a3b" +model_family_name = "qwen3" +model_recipe_name = "30b_a3b" gpus_per_node = 8 num_gpus = 16 domain = "llm" diff --git a/doc/workloads/megatron_bridge.rst b/doc/workloads/megatron_bridge.rst index 8cbb18a78..7be785858 100644 --- a/doc/workloads/megatron_bridge.rst +++ b/doc/workloads/megatron_bridge.rst @@ -18,9 +18,9 @@ Test TOML example: [cmd_args] # Container can be an NGC/enroot URL (nvcr.io#...) or a local .sqsh path. container_image = "nvcr.io#nvidia/nemo:25.11.01" - - model_name = "qwen3" - model_size = "30b_a3b" + + model_family_name = "qwen3" + model_recipe_name = "30b_a3b" task = "pretrain" domain = "llm" compute_dtype = "fp8_mx" @@ -55,8 +55,8 @@ Test-in-Scenario example: [Tests.cmd_args] container_image = "nvcr.io#nvidia/nemo:25.11.01" - model_name = "qwen3" - model_size = "30b_a3b" + model_family_name = "qwen3" + model_recipe_name = "30b_a3b" task = "pretrain" domain = "llm" compute_dtype = "fp8_mx" diff --git a/src/cloudai/cli/handlers.py b/src/cloudai/cli/handlers.py index d474ff421..b295059b6 100644 --- a/src/cloudai/cli/handlers.py +++ b/src/cloudai/cli/handlers.py @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,11 +21,12 @@ import signal from contextlib import contextmanager from pathlib import Path -from typing import Callable, List, Optional +from typing import Any, Callable, List, Optional from unittest.mock import Mock import toml import yaml +from pydantic import ValidationError from cloudai.core import ( BaseInstaller, @@ -41,10 +42,10 @@ TestScenario, ) from cloudai.models.scenario import ReportConfig -from cloudai.models.workload import TestDefinition +from cloudai.models.workload import TestDefinition, TestRun from cloudai.parser import HOOK_ROOT from cloudai.systems.slurm import SingleSbatchRunner, SlurmSystem -from cloudai.util import prepare_output_dir +from cloudai.util import flatten_dict, prepare_output_dir def _log_installation_dirs(prefix: str, system: System) -> None: @@ -145,7 +146,23 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int: continue env = CloudAIGymEnv(test_run=test_run, runner=runner.runner) - agent = agent_class(env) + + try: + agent_config = test_run.test.agent_config + agent_overrides = ( + validate_agent_overrides(test_run, agent_type, agent_config) if agent_config is not None else None + ) + except ValidationError as e: + logging.error(f"Invalid agent_config for agent '{agent_type}': ") + for error in e.errors(): + logging.error(f" - {'.'.join(str(var_name) for var_name in error['loc'])}: {error['msg']}") + logging.error("Valid overrides: ") + for item, desc in validate_agent_overrides(test_run, agent_type).items(): + logging.error(f" - {item}: {desc}") + err = 1 + continue + agent = agent_class(env, **agent_overrides) if agent_overrides is not None else agent_class(env) + for step in range(agent.max_steps): result = agent.select_action() if result is None: @@ -166,6 +183,66 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int: return err +def validate_agent_overrides( + test_run: TestRun, agent_type: str, agent_config: Optional[dict[str, Any]] = None +) -> dict[str, Any]: + """ + Validate and process agent configuration overrides. + + If agent_config is empty, returns the available configuration fields for the agent type. + """ + registry = Registry() + config_class_map = {} + for agent_name, agent_class in registry.agents_map.items(): + if agent_class.config: + config_class_map[agent_name] = agent_class.config + + config_class = config_class_map.get(agent_type) + if not config_class: + valid_types = ", ".join(f"'{agent_name}'" for agent_name in config_class_map) + raise ValueError( + f"Agent type '{agent_type}' does not support configuration overrides. " + f"Valid agent types are: {valid_types}. " + ) + + if agent_config: + seed_parameters = agent_config.get("seed_parameters", None) + if seed_parameters: + validate_seed_parameters(test_run, seed_parameters) + + validated_config = config_class.model_validate(agent_config) + agent_kwargs = validated_config.model_dump(exclude_none=True) + logging.debug(f"Applying agent config overrides for '{agent_type}': {agent_kwargs}") + else: + agent_kwargs = {} + for field_name, field_info in config_class.model_fields.items(): + agent_kwargs[field_name] = field_info.description + return agent_kwargs + + +def validate_seed_parameters(test_run: TestRun, seed_parameters: dict[str, Any]) -> None: + """Validate seed parameters against DSE-able command-line arguments.""" + flat_cmd_args = flatten_dict(test_run.test.cmd_args.model_dump(exclude_none=True)) + dse_cmd_args = {k: v for k, v in flat_cmd_args.items() if isinstance(v, list)} + + logging.debug("Validating seed parameters against DSE-able command-line arguments:") + logging.debug(f"\t{dse_cmd_args}") + + for key, value in seed_parameters.items(): + if key not in dse_cmd_args: + raise KeyError( + f"Seed parameter '{key}' not found in DSE-able command-line arguments. " + f"Ensure that the key is one of the following available keys: {list(dse_cmd_args.keys())}" + ) + if value not in dse_cmd_args[key]: + raise ValueError( + f"Seed parameter '{key}' value '{value}' not found in DSE-able command-line arguments. " + f"Ensure that the value is one of the following available values: {dse_cmd_args[key]}" + ) + + logging.debug("Seed parameters validated successfully.") + + def generate_reports(system: System, test_scenario: TestScenario, result_dir: Path) -> None: registry = Registry() diff --git a/src/cloudai/configurator/base_agent.py b/src/cloudai/configurator/base_agent.py index dbd397099..a82502217 100644 --- a/src/cloudai/configurator/base_agent.py +++ b/src/cloudai/configurator/base_agent.py @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,7 +15,9 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import Any, Dict, Tuple +from typing import Any, Dict, Optional, Tuple + +from cloudai.models.agent_config import AgentConfig from .base_gym import BaseGym @@ -28,6 +30,8 @@ class BaseAgent(ABC): Automatically infers parameter types from TestRun's cmd_args. """ + config: Optional[AgentConfig] = None + def __init__(self, env: BaseGym): """ Initialize the agent with the environment. diff --git a/src/cloudai/models/agent_config.py b/src/cloudai/models/agent_config.py new file mode 100644 index 000000000..072bb928e --- /dev/null +++ b/src/cloudai/models/agent_config.py @@ -0,0 +1,28 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC +from typing import Any, Optional + +from pydantic import BaseModel, ConfigDict, Field + + +class AgentConfig(BaseModel, ABC): + """Base configuration for agent overrides.""" + + model_config = ConfigDict(extra="forbid") + random_seed: Optional[int] = Field(default=None, description="Random seed for reproducibility") + seed_parameters: Optional[dict[str, Any]] = Field(default=None, description="Seed parameters for reproducibility") diff --git a/src/cloudai/models/workload.py b/src/cloudai/models/workload.py index 1745ae734..0a962cf59 100644 --- a/src/cloudai/models/workload.py +++ b/src/cloudai/models/workload.py @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -107,6 +107,7 @@ class TestDefinition(BaseModel, ABC): agent_steps: int = 1 agent_metrics: list[str] = Field(default=["default"]) agent_reward_function: str = "inverse" + agent_config: Optional[dict[str, Any]] = None @property def cmd_args_dict(self) -> Dict[str, Union[str, List[str]]]: diff --git a/src/cloudai/workloads/megatron_bridge/megatron_bridge.py b/src/cloudai/workloads/megatron_bridge/megatron_bridge.py index ef07b8b29..dc2c5e1c8 100644 --- a/src/cloudai/workloads/megatron_bridge/megatron_bridge.py +++ b/src/cloudai/workloads/megatron_bridge/megatron_bridge.py @@ -17,7 +17,7 @@ import logging from typing import List, Optional, Union, cast -from pydantic import Field, ValidationInfo, field_validator +from pydantic import Field, field_validator, model_validator from cloudai.core import DockerImage, GitRepo, Installable, PythonExecutable from cloudai.models.workload import CmdArgs, TestDefinition @@ -33,29 +33,50 @@ class MegatronBridgeCmdArgs(CmdArgs): container_image: str = Field(default="") num_gpus: int = Field(default=8) gpus_per_node: int = Field(default=8) - custom_mounts: Optional[str] = Field(default=None) + custom_mounts: Optional[Union[str, List[str]]] = Field( + default=None, + description="Comma-separated or list of host_path:container_path mounts; merged with test-level extra_container_mounts for -cm.", + ) enable_vboost: Optional[bool] = Field(default=False) dryrun: Optional[bool] = Field(default=False) enable_nsys: Optional[bool] = Field(default=False) detach: Optional[bool] = Field(default=None) + # Domain / model overrides (argument_parser main + model) + domain: Optional[str] = Field( + default=None, + description="Domain: llm, vlm, or qwen3vl (default llm).", + ) + hidden_size: Optional[int] = Field(default=None, description="Override hidden size for experiment.") + num_layers: Optional[int] = Field(default=None, description="Override number of layers.") + pipeline_model_parallel_layout: Optional[str] = Field(default=None) + first_k_dense_replace: Optional[int] = Field( + default=None, + description="Number of MoE layers to convert to dense.", + ) + # Model/task - model_name: str = Field(min_length=1) - model_size: str = Field(min_length=1) - domain: str = Field(default="llm") + model_family_name: str = Field(default="") + model_recipe_name: str = Field(default="") + use_recipes: Optional[bool] = Field(default=None) task: str = Field(default="pretrain") compute_dtype: str = Field(default="bf16") fp8_recipe: Optional[str] = Field(default=None) hf_token: Optional[str] = Field(default=None) nemo_home: Optional[str] = Field(default=None) wandb_key: Optional[str] = Field(default=None) - wandb_prj_name: Optional[str] = Field(default=None) - wandb_exp_name: Optional[str] = Field(default=None) + wandb_project_name: Optional[str] = Field(default=None) + wandb_entity_name: Optional[str] = Field(default=None) + wandb_experiment_name: Optional[str] = Field(default=None) + wandb_save_dir: Optional[str] = Field(default=None) + + # Retries + max_retries: Optional[int] = Field(default=None) # Feature flags (allow sweeps) use_tokendrop: Optional[Union[bool, List[bool]]] = Field(default=None) use_megatron_fsdp: Optional[Union[bool, List[bool]]] = Field(default=None) - cuda_graph_impl: Optional[str] = Field(default=None) + cuda_graph_impl: Optional[Union[str, List[str]]] = Field(default=None) cuda_graph_scope: Optional[Union[str, List[str]]] = Field(default=None) # Parallelism @@ -69,6 +90,51 @@ class MegatronBridgeCmdArgs(CmdArgs): # Batch sizes mb: Optional[Union[int, List[int]]] = Field(default=None) gb: Optional[Union[int, List[int]]] = Field(default=None) + seq_length: Optional[Union[int, List[int]]] = Field(default=None) + + # Optimizer + lr: Optional[Union[float, List[float]]] = Field(default=None) + min_lr: Optional[Union[float, List[float]]] = Field(default=None) + warmup_iters: Optional[Union[int, List[int]]] = Field(default=None) + + # Checkpointing + pretrained_checkpoint: Optional[str] = Field(default=None) + save_dir: Optional[str] = Field(default=None) + load_dir: Optional[str] = Field(default=None) + save_interval: Optional[int] = Field(default=None) + most_recent_k: Optional[int] = Field(default=None) + save_config_filepath: Optional[str] = Field(default=None) + + # Data / Tokenizer + data: Optional[str] = Field(default=None) + dataset_paths: Optional[Union[str, List[str]]] = Field(default=None) + dataset_root: Optional[str] = Field(default=None) + index_mapping_dir: Optional[str] = Field(default=None) + dataset_name: Optional[str] = Field(default=None) + packed_sequence: Optional[bool] = Field(default=None) + head_only: Optional[bool] = Field(default=None) + tokenizer_type: Optional[str] = Field(default=None) + tokenizer_model: Optional[str] = Field(default=None) + vocab_size: Optional[int] = Field(default=None) + + # Profiling (performance group in argument_parser.py) + pytorch_profiler: Optional[bool] = Field(default=None) + profiling_start_step: Optional[int] = Field(default=None) + profiling_stop_step: Optional[int] = Field(default=None) + record_memory_history: Optional[bool] = Field(default=None) + profiling_gpu_metrics: Optional[bool] = Field(default=None) + profiling_ranks: Optional[Union[int, List[int]]] = Field(default=None) + nsys_trace: Optional[Union[str, List[str]]] = Field( + default=None, + description="Comma-separated nsys trace events (e.g. cuda,nvtx).", + ) + nsys_extra_args: Optional[Union[str, List[str]]] = Field( + default=None, + description="Comma-separated extra arguments for nsys.", + ) + + # Performance + nccl_ub: Optional[Union[bool, List[bool]]] = Field(default=None) # Perf/tuning moe_a2a_overlap: Optional[Union[bool, List[bool]]] = Field(default=None) @@ -80,7 +146,17 @@ class MegatronBridgeCmdArgs(CmdArgs): # Optional distributed optimizer instances (for constraints/divisor) num_distributed_optimizer_instances: Optional[int] = Field(default=None) - @field_validator("hf_token", mode="after") + # Config variant (argument_parser config_variant group) + config_variant: Optional[str] = Field( + default=None, + description="Config variant (e.g. v1, v2). Launcher default is v2.", + ) + list_config_variants: Optional[bool] = Field( + default=None, + description="If true, list config variants and interactively select (--list_config_variants).", + ) + + @field_validator("hf_token", mode="after", check_fields=False) @classmethod def validate_hf_token(cls, v: Optional[str]) -> Optional[str]: token = (v or "").strip() @@ -88,13 +164,16 @@ def validate_hf_token(cls, v: Optional[str]) -> Optional[str]: raise ValueError("cmd_args.hf_token is required. Please set it to your literal HF token string.") return token - @field_validator("model_name", "model_size", mode="after") - @classmethod - def validate_model_fields(cls, v: str, info: ValidationInfo) -> str: - s = v.strip() - if not s: - raise ValueError(f"cmd_args.{info.field_name} cannot be empty.") - return s + @model_validator(mode="after") + def validate_model_fields_non_empty(self) -> "MegatronBridgeCmdArgs": + """Ensure model_family_name and model_recipe_name are non-empty and stripped.""" + for name in ("model_family_name", "model_recipe_name"): + val = getattr(self, name, "") or "" + s = str(val).strip() + if not s: + raise ValueError(f"cmd_args.{name} cannot be empty.") + setattr(self, name, s) + return self class MegatronBridgeTestDefinition(TestDefinition): @@ -323,14 +402,16 @@ def _normalize_str_list(val: Optional[Union[str, List[str]]]) -> list[str]: else: constraint10 = True - # Constraint 11: CUDA graphs require a2a overlap disabled + # Constraint 11: When cuda_graph_impl is set (not none), a2a overlap must be disabled + # moe_a2a_overlap can only be true when cuda_graph_impl is 'none' or unset a2a_overlap = _as_bool(self.cmd_args.moe_a2a_overlap) - constraint11 = not (cuda_graphs and a2a_overlap) + cuda_impl_enabled = cgi not in {"", "none", "null"} + constraint11 = not (cuda_impl_enabled and a2a_overlap) if not constraint11: logging.error( - "Constraint 11 failed: cuda_graphs=true requires moe_a2a_overlap=false. " - "cuda_graphs=%s moe_a2a_overlap=%s", - cuda_graphs, + "Constraint 11 failed: moe_a2a_overlap must be false when cuda_graph_impl is not 'none'. " + "cuda_graph_impl=%s moe_a2a_overlap=%s", + cgi, a2a_overlap, ) @@ -432,6 +513,27 @@ def _normalize_str_list(val: Optional[Union[str, List[str]]]) -> list[str]: else: constraint17 = True + # Constraint 18: Valid (PP, VP) combinations for DeepSeek v3 pipeline layout + # Only specific (pp, vp) pairs are supported by DeepSeek v3's pipeline layout mapping + model_recipe = (self.cmd_args.model_recipe_name or "").lower() + is_deepseek_v3 = "deepseek_v3" in model_recipe or "deepseekv3" in model_recipe + + if is_deepseek_v3: + valid_pp_vp_combinations = {(1, 1), (4, 1), (8, 1), (4, 2), (16, 1), (8, 2), (4, 4)} + current_vp = vp if vp is not None else 1 + pp_vp_pair = (pp, current_vp) + constraint18 = pp_vp_pair in valid_pp_vp_combinations + if not constraint18: + logging.error( + "Constraint 18 failed: Invalid (PP, VP) combination for DeepSeek v3. pp=%s vp=%s. " + "Valid combinations: %s", + pp, + current_vp, + sorted(valid_pp_vp_combinations), + ) + else: + constraint18 = True # Skip this constraint for non-DeepSeek v3 models + return bool( constraint1 and constraint2 @@ -450,4 +552,5 @@ def _normalize_str_list(val: Optional[Union[str, List[str]]]) -> list[str]: and constraint15 and constraint16 and constraint17 + and constraint18 ) diff --git a/src/cloudai/workloads/megatron_bridge/slurm_command_gen_strategy.py b/src/cloudai/workloads/megatron_bridge/slurm_command_gen_strategy.py index c1f4ff287..18df61685 100644 --- a/src/cloudai/workloads/megatron_bridge/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/megatron_bridge/slurm_command_gen_strategy.py @@ -19,7 +19,7 @@ import logging import stat from pathlib import Path -from typing import Any, cast +from typing import Any, Optional, cast import toml @@ -130,13 +130,13 @@ def _wrap_launcher_for_job_id_and_quiet_output(self, launcher_cmd: str) -> str: "", ': >"$LOG"', "LAUNCH_RC=0", - f'{launcher_cmd} >>"$LOG" 2>&1 || LAUNCH_RC=$?', + f"{launcher_cmd} >>\"$LOG\" 2>&1 || LAUNCH_RC=$?", "", # Parse job id from Megatron-Bridge output (multiple possible formats) + # Patterns: "Job id: 694112", "- Job id: 694112", "Job ID: 694112" "", 'JOB_ID=""', - 'JOB_ID=$(grep -Eio "Job[[:space:]]+id[: ]+[0-9]+" "$LOG" | ' - 'tail -n1 | grep -Eo "[0-9]+" | tail -n1 || true)', + 'JOB_ID=$(grep -Eio "(Job id[: ]+[0-9]+|-[ ]*Job id[: ]+[0-9]+)" "$LOG" | tail -n1 | grep -Eo "[0-9]+" | tail -n1 || true)', "", # Emit a canonical line for CloudAI to parse "", @@ -162,13 +162,23 @@ def _wrap_launcher_for_job_id_and_quiet_output(self, launcher_cmd: str) -> str: return f"bash {wrapper_path}" + def _list_or_comma_str(self, val: Any) -> Optional[str]: + """Normalize list or comma-separated string; return None if empty or val is None.""" + if val is None: + return None + if isinstance(val, str): + s = val.strip() + else: + s = ",".join(str(x) for x in val).strip() + return s if s else None + def _build_launcher_parts( # noqa: C901 self, args: MegatronBridgeCmdArgs, tdef: MegatronBridgeTestDefinition, repo_path: Path, launcher_py: Path ) -> list[str]: fields_set = args.model_fields_set force_fields = { - "model_name", - "model_size", + "model_family_name", + "model_recipe_name", "num_gpus", "gpus_per_node", "hf_token", @@ -194,8 +204,13 @@ def _installed_container_path() -> str: else: container_path = _installed_container_path() + # Combine cmd_args custom_mounts with test-level extra_container_mounts; only pass -cm when non-empty mounts: list[str] = [] - mounts.append(f"{repo_path.absolute()}:/opt/Megatron-Bridge") + if args.custom_mounts is not None: + if isinstance(args.custom_mounts, str): + mounts.extend(m.strip() for m in args.custom_mounts.split(",") if m.strip()) + else: + mounts.extend(str(m).strip() for m in args.custom_mounts if str(m).strip()) mounts.extend(tdef.extra_container_mounts or []) venv_path = tdef.python_executable.venv_path or (self.system.install_path / tdef.python_executable.venv_name) @@ -211,6 +226,15 @@ def add(flag: str, value: Any) -> None: return if isinstance(value, bool): parts.extend([flag, "true" if value else "false"]) + elif isinstance(value, (list, tuple)): + if not value: + return + if flag == "--dataset_paths": + parts.extend([flag, *[str(x) for x in value]]) + elif flag == "--profiling_ranks": + parts.extend([flag, ",".join(str(x) for x in value)]) + else: + parts.extend([flag, str(value[0]) if len(value) == 1 else ",".join(str(x) for x in value)]) else: sv = str(value) if sv != "": @@ -235,8 +259,11 @@ def add_field(field: str, flag: str, value: Any) -> None: add_field("hf_token", "-hf", args.hf_token) add_field("nemo_home", "-nh", args.nemo_home) add_field("wandb_key", "-wdk", args.wandb_key) - add_field("wandb_prj_name", "-wdp", args.wandb_prj_name) - add_field("wandb_exp_name", "-wdj", args.wandb_exp_name) + add_field("wandb_project_name", "-wdp", args.wandb_project_name) + add_field("wandb_entity_name", "-wde", args.wandb_entity_name) + add_field("wandb_experiment_name", "-wdj", args.wandb_experiment_name) + add_field("wandb_save_dir", "-wds", args.wandb_save_dir) + add_field("max_retries", "--max_retries", args.max_retries) if args.dryrun and "dryrun" in fields_set: parts.append("-d") add_field("num_gpus", "-ng", args.num_gpus) @@ -244,22 +271,30 @@ def add_field(field: str, flag: str, value: Any) -> None: if mounts: add("-cm", ",".join(mounts)) - # Model flags (Megatron-Bridge r0.2.0 API) + # Model flags (Megatron-Bridge main-branch API) + add_field("domain", "--domain", args.domain) + if args.use_recipes and "use_recipes" in fields_set: + parts.append("--use_recipes") if "enable_vboost" in fields_set: add_field("enable_vboost", "-vb", bool(args.enable_vboost)) - if not args.model_name: - raise RuntimeError("Missing required cmd_args.model_name (maps to -m/--model_name).") - if not args.model_size: - raise RuntimeError("Missing required cmd_args.model_size (maps to -s/--model_size).") - add_field("model_name", "-m", args.model_name) - add_field("model_size", "-s", args.model_size) + if not args.model_family_name: + raise RuntimeError("Missing required cmd_args.model_family_name (maps to -m/--model_family_name).") + if not args.model_recipe_name: + raise RuntimeError("Missing required cmd_args.model_recipe_name (maps to -mr/--model_recipe_name).") + add_field("model_family_name", "-m", args.model_family_name) + add_field("model_recipe_name", "-mr", args.model_recipe_name) + add_field("hidden_size", "--hidden_size", args.hidden_size) + add_field("num_layers", "--num_layers", args.num_layers) + add_field("pipeline_model_parallel_layout", "--pipeline_model_parallel_layout", args.pipeline_model_parallel_layout) + add_field("first_k_dense_replace", "--first_k_dense_replace", args.first_k_dense_replace) if args.enable_nsys and "enable_nsys" in fields_set: parts.append("-en") - add_field("domain", "--domain", args.domain) if "use_tokendrop" in fields_set and args.use_tokendrop is not None: add_field("use_tokendrop", "--use_tokendrop", bool(args.use_tokendrop)) if "use_megatron_fsdp" in fields_set and args.use_megatron_fsdp is not None: add_field("use_megatron_fsdp", "--use_megatron_fsdp", bool(args.use_megatron_fsdp)) + if "nccl_ub" in fields_set and args.nccl_ub is not None: + add_field("nccl_ub", "--nccl_ub", bool(args.nccl_ub)) add_field("cuda_graph_impl", "--cuda_graph_impl", args.cuda_graph_impl) if args.cuda_graph_scope and "cuda_graph_scope" in fields_set: add_field( @@ -277,6 +312,7 @@ def add_field(field: str, flag: str, value: Any) -> None: # Batch add_field("mb", "-mb", args.mb) add_field("gb", "-gb", args.gb) + add_field("seq_length", "-sl", args.seq_length) # Misc if "moe_a2a_overlap" in fields_set: @@ -286,11 +322,51 @@ def add_field(field: str, flag: str, value: Any) -> None: add_field("activation_offload_layers", "-ol", args.activation_offload_layers) if args.recompute_modules and "recompute_modules" in fields_set: parts.extend(["--recompute_modules", self._normalize_recompute_modules(args.recompute_modules)]) - # r0.2.0 supports `--detach` / `--no-detach` flags (no boolean value) - if args.detach is True and "detach" in fields_set: - parts.append("--detach") - elif args.detach is False and "detach" in fields_set: - parts.append("--no-detach") + if "detach" in fields_set and args.detach is not None: + parts.extend(["--detach", "true" if args.detach else "false"]) + + # Optimizer + add_field("lr", "--lr", args.lr) + add_field("min_lr", "--min_lr", args.min_lr) + add_field("warmup_iters", "--warmup_iters", args.warmup_iters) + + # Checkpointing + add_field("pretrained_checkpoint", "--pretrained_checkpoint", args.pretrained_checkpoint) + add_field("save_dir", "--save_dir", args.save_dir) + add_field("load_dir", "--load_dir", args.load_dir) + add_field("save_interval", "--save_interval", args.save_interval) + add_field("most_recent_k", "--most_recent_k", args.most_recent_k) + add_field("save_config_filepath", "--save_config_filepath", args.save_config_filepath) + + # Data / Tokenizer + add_field("data", "--data", args.data) + add_field("dataset_paths", "--dataset_paths", args.dataset_paths) + add_field("dataset_root", "--dataset_root", args.dataset_root) + add_field("index_mapping_dir", "--index_mapping_dir", args.index_mapping_dir) + add_field("dataset_name", "--dataset_name", args.dataset_name) + if args.packed_sequence and "packed_sequence" in fields_set: + parts.append("--packed_sequence") + if args.head_only and "head_only" in fields_set: + parts.append("--head_only") + add_field("tokenizer_type", "--tokenizer_type", args.tokenizer_type) + add_field("tokenizer_model", "--tokenizer_model", args.tokenizer_model) + add_field("vocab_size", "--vocab_size", args.vocab_size) + + # Profiling (performance group) + add_field("pytorch_profiler", "-pyp", args.pytorch_profiler) + add_field("profiling_start_step", "--profiling_start_step", args.profiling_start_step) + add_field("profiling_stop_step", "--profiling_stop_step", args.profiling_stop_step) + add_field("record_memory_history", "-mh", args.record_memory_history) + if args.profiling_gpu_metrics and "profiling_gpu_metrics" in fields_set: + parts.append("--profiling_gpu_metrics") + add_field("profiling_ranks", "--profiling_ranks", args.profiling_ranks) + add_field("nsys_trace", "--nsys_trace", self._list_or_comma_str(args.nsys_trace)) + add_field("nsys_extra_args", "--nsys_extra_args", self._list_or_comma_str(args.nsys_extra_args)) + + # Config variant + add_field("config_variant", "-cv", args.config_variant) + if args.list_config_variants and "list_config_variants" in fields_set: + parts.append("--list_config_variants") # Extra user args (dict -> string) if tdef.extra_cmd_args: diff --git a/tests/slurm_command_gen_strategy/test_megatron_bridge_slurm_command_gen_strategy.py b/tests/slurm_command_gen_strategy/test_megatron_bridge_slurm_command_gen_strategy.py index 3062116c8..8ce5ec502 100644 --- a/tests/slurm_command_gen_strategy/test_megatron_bridge_slurm_command_gen_strategy.py +++ b/tests/slurm_command_gen_strategy/test_megatron_bridge_slurm_command_gen_strategy.py @@ -38,8 +38,8 @@ def test_run(self, tmp_path: Path) -> TestRun: args = MegatronBridgeCmdArgs( container_image=str(sqsh), hf_token="dummy_token", - model_name="qwen3", - model_size="30b_a3b", + model_family_name="qwen3", + model_recipe_name="30b_a3b", cuda_graph_scope="[moe_router,moe_preprocess]", compute_dtype="fp8_mx", num_gpus=8, @@ -102,7 +102,7 @@ def test_model_fields_whitespace_only_rejected(self, field_name: str) -> None: MegatronBridgeCmdArgs.model_validate(data) def test_git_repos_can_pin_megatron_bridge_commit(self) -> None: - args = MegatronBridgeCmdArgs(hf_token="dummy_token", model_name="qwen3", model_size="30b_a3b") + args = MegatronBridgeCmdArgs(hf_token="dummy_token", model_family_name="qwen3", model_recipe_name="30b_a3b") tdef = MegatronBridgeTestDefinition( name="mb", description="desc", @@ -126,8 +126,8 @@ def test_defaults_not_emitted_when_not_set_in_toml(self, slurm_system: SlurmSyst args = MegatronBridgeCmdArgs( container_image=str(sqsh), hf_token="dummy_token", - model_name="qwen3", - model_size="30b_a3b", + model_family_name="qwen3", + model_recipe_name="30b_a3b", num_gpus=8, gpus_per_node=4, ) @@ -186,11 +186,11 @@ def test_cuda_graph_scope_normalization(self, cmd_gen: MegatronBridgeSlurmComman assert "--cuda_graph_scope moe_router,moe_preprocess" in wrapper_content @pytest.mark.parametrize( - "detach, expected, not_expected", + "detach, expected", [ - (True, "--detach", "--no-detach"), - (False, "--no-detach", "--detach"), - (None, None, "--detach"), + (True, "--detach true"), + (False, "--detach false"), + (None, None), ], ) def test_detach_flags( @@ -199,7 +199,6 @@ def test_detach_flags( test_run: TestRun, detach: bool | None, expected: str | None, - not_expected: str, ) -> None: tdef = cast(MegatronBridgeTestDefinition, test_run.test) @@ -218,11 +217,9 @@ def test_detach_flags( wrapper_content = wrapper.read_text() if detach is None: assert "--detach" not in wrapper_content - assert "--no-detach" not in wrapper_content else: assert expected is not None assert expected in wrapper_content - assert not_expected not in wrapper_content def test_generated_command_file_written( self, cmd_gen: MegatronBridgeSlurmCommandGenStrategy, test_run: TestRun @@ -235,3 +232,78 @@ def test_generated_command_file_written( assert cmd in content assert content.startswith("bash ") assert "cloudai_megatron_bridge_submit_and_parse_jobid.sh" in content + + def test_use_recipes_emitted_only_when_true(self, slurm_system: SlurmSystem, tmp_path: Path) -> None: + sqsh = tmp_path / "img.sqsh" + sqsh.write_text("x") + + tdef_true = MegatronBridgeTestDefinition( + name="mb", + description="desc", + test_template_name="MegatronBridge", + cmd_args=MegatronBridgeCmdArgs( + container_image=str(sqsh), + hf_token="dummy_token", + model_family_name="qwen3", + model_recipe_name="30b_a3b", + num_gpus=8, + gpus_per_node=4, + use_recipes=True, + ), + extra_container_mounts=[], + git_repos=[ + { + "url": "https://github.com/NVIDIA-NeMo/Megatron-Bridge.git", + "commit": "r0.2.0", + "mount_as": "/opt/Megatron-Bridge", + } + ], # type: ignore[arg-type] + ) + + (tmp_path / "run_repo").mkdir() + (tmp_path / "run_venv").mkdir() + (tmp_path / "mbridge_repo").mkdir() + tdef_true.python_executable.git_repo.installed_path = tmp_path / "run_repo" + tdef_true.python_executable.venv_path = tmp_path / "run_venv" + tdef_true.megatron_bridge_repo.installed_path = tmp_path / "mbridge_repo" + tdef_true.docker_image.installed_path = tmp_path / "cached.sqsh" + + tr_true = TestRun(test=tdef_true, name="tr", num_nodes=1, nodes=[], output_path=tmp_path / "out_true") + slurm_system.account = "acct" + slurm_system.default_partition = "gb300" + cmd_gen_true = MegatronBridgeSlurmCommandGenStrategy(slurm_system, tr_true) + cmd_gen_true.gen_exec_command() + wrapper_true = tr_true.output_path / "cloudai_megatron_bridge_submit_and_parse_jobid.sh" + assert "--use_recipes" in wrapper_true.read_text() + + tdef_none = MegatronBridgeTestDefinition( + name="mb", + description="desc", + test_template_name="MegatronBridge", + cmd_args=MegatronBridgeCmdArgs( + container_image=str(sqsh), + hf_token="dummy_token", + model_family_name="qwen3", + model_recipe_name="30b_a3b", + num_gpus=8, + gpus_per_node=4, + ), + extra_container_mounts=[], + git_repos=[ + { + "url": "https://github.com/NVIDIA-NeMo/Megatron-Bridge.git", + "commit": "r0.2.0", + "mount_as": "/opt/Megatron-Bridge", + } + ], # type: ignore[arg-type] + ) + tdef_none.python_executable.git_repo.installed_path = tmp_path / "run_repo" + tdef_none.python_executable.venv_path = tmp_path / "run_venv" + tdef_none.megatron_bridge_repo.installed_path = tmp_path / "mbridge_repo" + tdef_none.docker_image.installed_path = tmp_path / "cached.sqsh" + + tr_none = TestRun(test=tdef_none, name="tr", num_nodes=1, nodes=[], output_path=tmp_path / "out_none") + cmd_gen_none = MegatronBridgeSlurmCommandGenStrategy(slurm_system, tr_none) + cmd_gen_none.gen_exec_command() + wrapper_none = tr_none.output_path / "cloudai_megatron_bridge_submit_and_parse_jobid.sh" + assert "--use_recipes" not in wrapper_none.read_text()