diff --git a/ax/analysis/healthcheck/early_stopping_healthcheck.py b/ax/analysis/healthcheck/early_stopping_healthcheck.py index 49bfca2e30c..aa154053f60 100644 --- a/ax/analysis/healthcheck/early_stopping_healthcheck.py +++ b/ax/analysis/healthcheck/early_stopping_healthcheck.py @@ -95,7 +95,7 @@ def __init__( single-objective unconstrained experiments. min_savings_threshold: Minimum savings threshold to suggest early stopping. Default is 0.1 (10% savings). - max_pending_trials: Maximum number of pending trials for replay + max_pending_trials: Maximum number of concurrent trials for replay orchestrator. Default is 5. auto_early_stopping_config: A string for configuring automated early stopping strategy. diff --git a/ax/benchmark/benchmark_method.py b/ax/benchmark/benchmark_method.py index bea5463bd2f..19ec4631ce8 100644 --- a/ax/benchmark/benchmark_method.py +++ b/ax/benchmark/benchmark_method.py @@ -16,7 +16,7 @@ class BenchmarkMethod(Base): """Benchmark method, represented in terms of Ax generation strategy (which tells us which models to use when) and Orchestrator options (which tell us extra execution - information like maximum parallelism, early stopping configuration, etc.). + information like maximum pending trials, early stopping configuration, etc.). Args: name: String description. Defaults to the name of the generation strategy. diff --git a/ax/core/runner.py b/ax/core/runner.py index 39d71ed99f8..474869c1f1f 100644 --- a/ax/core/runner.py +++ b/ax/core/runner.py @@ -81,7 +81,7 @@ def poll_available_capacity(self) -> int: artificially force this method to limit capacity; ``Orchestrator`` has other limitations in place to limit number of trials running at once, like the ``OrchestratorOptions.max_pending_trials`` setting, or - more granular control in the form of the `max_parallelism` + more granular control in the form of the `max_concurrency` setting in each of the `GenerationStep`s of a `GenerationStrategy`). Returns: diff --git a/ax/early_stopping/experiment_replay.py b/ax/early_stopping/experiment_replay.py index d4a1f5a5fe2..2db6a04fb01 100644 --- a/ax/early_stopping/experiment_replay.py +++ b/ax/early_stopping/experiment_replay.py @@ -130,7 +130,7 @@ def estimate_hypothetical_early_stopping_savings( Args: experiment: The experiment to analyze. metric: The metric to use for early stopping replay. - max_pending_trials: Maximum number of pending trials for the replay + max_pending_trials: Maximum number of concurrent trials for the replay orchestrator. Defaults to 5. Returns: diff --git a/ax/generation_strategy/dispatch_utils.py b/ax/generation_strategy/dispatch_utils.py index f2721d47fc6..d036720a030 100644 --- a/ax/generation_strategy/dispatch_utils.py +++ b/ax/generation_strategy/dispatch_utils.py @@ -31,7 +31,7 @@ logger: logging.Logger = get_logger(__name__) -DEFAULT_BAYESIAN_PARALLELISM = 3 +DEFAULT_BAYESIAN_CONCURRENCY = 3 # `BO_MIXED` optimizes all range parameters once for each combination of choice # parameters, then takes the optimum of those optima. The cost associated with this # method grows with the number of combinations, and so it is only used when the @@ -49,7 +49,7 @@ def _make_sobol_step( num_trials: int = -1, min_trials_observed: int | None = None, enforce_num_trials: bool = True, - max_parallelism: int | None = None, + max_concurrency: int | None = None, seed: int | None = None, should_deduplicate: bool = False, ) -> GenerationStep: @@ -62,9 +62,8 @@ def _make_sobol_step( ceil(num_trials / 2) if min_trials_observed is None else min_trials_observed ), enforce_num_trials=enforce_num_trials, - max_parallelism=max_parallelism, + max_parallelism=max_concurrency, generator_kwargs={"deduplicate": True, "seed": seed}, - should_deduplicate=should_deduplicate, use_all_trials_in_exp=True, ) @@ -73,7 +72,7 @@ def _make_botorch_step( num_trials: int = -1, min_trials_observed: int | None = None, enforce_num_trials: bool = True, - max_parallelism: int | None = None, + max_concurrency: int | None = None, generator: GeneratorRegistryBase = Generators.BOTORCH_MODULAR, generator_kwargs: dict[str, Any] | None = None, winsorization_config: None @@ -130,7 +129,7 @@ def _make_botorch_step( ceil(num_trials / 2) if min_trials_observed is None else min_trials_observed ), enforce_num_trials=enforce_num_trials, - max_parallelism=max_parallelism, + max_parallelism=max_concurrency, generator_kwargs=generator_kwargs, should_deduplicate=should_deduplicate, ) @@ -325,11 +324,11 @@ def choose_generation_strategy_legacy( enforce_sequential_optimization: Whether to enforce that 1) the generation strategy needs to be updated with ``min_trials_observed`` observations for a given generation step before proceeding to the next one and 2) maximum - number of trials running at once (max_parallelism) if enforced for the + number of trials running at once (concurrency) if enforced for the BayesOpt step. NOTE: ``max_parallelism_override`` and ``max_parallelism_cap`` settings will still take their effect on max - parallelism even if ``enforce_sequential_optimization=False``, so if those - settings are specified, max parallelism will be enforced. + concurrency even if ``enforce_sequential_optimization=False``, so if those + settings are specified, max concurrency will be enforced. random_seed: Fixed random seed for the Sobol generator. torch_device: The device to use for generation steps implemented in PyTorch (e.g. via BoTorch). Some generation steps (in particular EHVI-based ones @@ -360,21 +359,21 @@ def choose_generation_strategy_legacy( min_sobol_trials_observed: Minimum number of Sobol trials that must be observed before proceeding to the next generation step. Defaults to `ceil(num_initialization_trials / 2)`. - max_parallelism_cap: Integer cap on parallelism in this generation strategy. - If specified, ``max_parallelism`` setting in each generation step will be + max_parallelism_cap: Integer cap on concurrency in this generation strategy. + If specified, the concurrency setting in each generation step will be set to the minimum of the default setting for that step and the value of this cap. ``max_parallelism_cap`` is meant to just be a hard limit on - parallelism (e.g. to avoid overloading machine(s) that evaluate the + concurrency (e.g. to avoid overloading machine(s) that evaluate the experiment trials). Specify only if not specifying ``max_parallelism_override``. max_parallelism_override: Integer, with which to override the default max - parallelism setting for all steps in the generation strategy returned from + concurrency setting for all steps in the generation strategy returned from this function. Each generation step has a ``max_parallelism`` value, which - restricts how many trials can run simultaneously during a given generation - step. By default, the parallelism setting is chosen as appropriate for the + restricts how many trials can run concurrently during a given generation + step. By default, the concurrency setting is chosen as appropriate for the model in a given generation step. If ``max_parallelism_override`` is -1, - no max parallelism will be enforced for any step of the generation - strategy. Be aware that parallelism is limited to improve performance of + no max concurrency will be enforced for any step of the generation + strategy. Be aware that concurrency is limited to improve performance of Bayesian optimization, so only disable its limiting if necessary. optimization_config: Used to infer whether to use MOO. should_deduplicate: Whether to deduplicate the parameters of proposed arms @@ -416,30 +415,30 @@ def choose_generation_strategy_legacy( optimization_config=optimization_config, use_saasbo=use_saasbo, ) - # Determine max parallelism for the generation steps. + # Determine max concurrency for the generation steps. if max_parallelism_override == -1: - # `max_parallelism_override` of -1 means no max parallelism enforcement in + # `max_parallelism_override` of -1 means no max concurrency enforcement in # the generation strategy, which means `max_parallelism=None` in gen. steps. - sobol_parallelism = bo_parallelism = None + sobol_concurrency = bo_concurrency = None elif max_parallelism_override is not None: - sobol_parallelism = bo_parallelism = max_parallelism_override + sobol_concurrency = bo_concurrency = max_parallelism_override elif max_parallelism_cap is not None: # Max parallelism override is None by now - sobol_parallelism = max_parallelism_cap - bo_parallelism = min(max_parallelism_cap, DEFAULT_BAYESIAN_PARALLELISM) + sobol_concurrency = max_parallelism_cap + bo_concurrency = min(max_parallelism_cap, DEFAULT_BAYESIAN_CONCURRENCY) elif not enforce_sequential_optimization: - # If no max parallelism settings specified and not enforcing sequential - # optimization, do not limit parallelism. - sobol_parallelism = bo_parallelism = None - else: # No additional max parallelism settings, use defaults - sobol_parallelism = None # No restriction on Sobol phase - bo_parallelism = DEFAULT_BAYESIAN_PARALLELISM + # If no max concurrency settings specified and not enforcing sequential + # optimization, do not limit concurrency. + sobol_concurrency = bo_concurrency = None + else: # No additional max concurrency settings, use defaults + sobol_concurrency = None # No restriction on Sobol phase + bo_concurrency = DEFAULT_BAYESIAN_CONCURRENCY if not force_random_search and suggested_model is not None: if not enforce_sequential_optimization and ( max_parallelism_override is not None or max_parallelism_cap is not None ): logger.info( - "If `enforce_sequential_optimization` is False, max parallelism is " + "If `enforce_sequential_optimization` is False, max concurrency is " "not enforced and other max parallelism settings will be ignored." ) if max_parallelism_override is not None and max_parallelism_cap is not None: @@ -503,7 +502,7 @@ def choose_generation_strategy_legacy( min_trials_observed=min_sobol_trials_observed, enforce_num_trials=enforce_sequential_optimization, seed=random_seed, - max_parallelism=sobol_parallelism, + max_concurrency=sobol_concurrency, should_deduplicate=should_deduplicate, ) ) @@ -512,7 +511,7 @@ def choose_generation_strategy_legacy( generator=suggested_model, winsorization_config=winsorization_config, derelativize_with_raw_status_quo=derelativize_with_raw_status_quo, - max_parallelism=bo_parallelism, + max_concurrency=bo_concurrency, generator_kwargs=generator_kwargs, should_deduplicate=should_deduplicate, disable_progbar=disable_progbar, @@ -544,7 +543,7 @@ def choose_generation_strategy_legacy( _make_sobol_step( seed=random_seed, should_deduplicate=should_deduplicate, - max_parallelism=sobol_parallelism, + max_concurrency=sobol_concurrency, ) ] ) diff --git a/ax/generation_strategy/generation_node.py b/ax/generation_strategy/generation_node.py index 435bccc9f8d..80aedbe0df8 100644 --- a/ax/generation_strategy/generation_node.py +++ b/ax/generation_strategy/generation_node.py @@ -972,7 +972,7 @@ class GenerationStep: observed` have not been completed, a call to `generation_strategy.gen` will fail with a `DataRequiredError`. max_parallelism: How many trials generated in the course of this step are - allowed to be run (i.e. have `trial.status` of `RUNNING`) simultaneously. + allowed to be run (i.e. have `trial.status` of `RUNNING`) concurrently. If `max_parallelism` trials from this step are already running, a call to `generation_strategy.gen` will fail with a `MaxParallelismReached Exception`, indicating that more trials need to be completed before @@ -1055,7 +1055,7 @@ def __new__( ) if max_parallelism is not None and max_parallelism < 1: raise UserInputError( - "Maximum parallelism should be None (if no limit) or " + "Maximum concurrency should be None (if no limit) or " f"a positive number. Got: {max_parallelism} for " f"step {generator_name}." ) diff --git a/ax/generation_strategy/tests/test_dispatch_utils.py b/ax/generation_strategy/tests/test_dispatch_utils.py index c382dd9fe3a..65725f7ece6 100644 --- a/ax/generation_strategy/tests/test_dispatch_utils.py +++ b/ax/generation_strategy/tests/test_dispatch_utils.py @@ -19,7 +19,7 @@ _make_botorch_step, calculate_num_initialization_trials, choose_generation_strategy_legacy, - DEFAULT_BAYESIAN_PARALLELISM, + DEFAULT_BAYESIAN_CONCURRENCY, ) from ax.generation_strategy.generation_node import GenerationNode from ax.generation_strategy.transition_criterion import ( @@ -621,14 +621,14 @@ def test_enforce_sequential_optimization(self) -> None: sobol_gpei._nodes[0].transition_criteria[0], MinTrials ) self.assertTrue(node0_min_trials.block_gen_if_met) - # Check that max_parallelism is set by verifying MaxGenerationParallelism + # Check that max_concurrency is set by verifying MaxGenerationParallelism # criterion exists on node 1 - node1_max_parallelism = [ + node1_max_concurrency = [ tc for tc in sobol_gpei._nodes[1].transition_criteria if isinstance(tc, MaxGenerationParallelism) ] - self.assertTrue(len(node1_max_parallelism) > 0) + self.assertTrue(len(node1_max_concurrency) > 0) with self.subTest("False"): sobol_gpei = choose_generation_strategy_legacy( search_space=get_branin_search_space(), @@ -646,14 +646,14 @@ def test_enforce_sequential_optimization(self) -> None: sobol_gpei._nodes[0].transition_criteria[0], MinTrials ) self.assertFalse(node0_min_trials.block_gen_if_met) - # Check that max_parallelism is None by verifying no + # Check that max_concurrency is None by verifying no # MaxGenerationParallelism criterion exists on node 1 - node1_max_parallelism = [ + node1_max_concurrency = [ tc for tc in sobol_gpei._nodes[1].transition_criteria if isinstance(tc, MaxGenerationParallelism) ] - self.assertEqual(len(node1_max_parallelism), 0) + self.assertEqual(len(node1_max_concurrency), 0) with self.subTest("False and max_parallelism_override"): with self.assertLogs( choose_generation_strategy_legacy.__module__, logging.INFO @@ -706,7 +706,7 @@ def test_max_parallelism_override(self) -> None: search_space=get_branin_search_space(), max_parallelism_override=10 ) self.assertTrue( - all(self._get_max_parallelism(s) == 10 for s in sobol_gpei._nodes) + all(self._get_max_concurrency(s) == 10 for s in sobol_gpei._nodes) ) def test_winsorization(self) -> None: @@ -817,47 +817,47 @@ def test_fixed_num_initialization_trials(self) -> None: 3, ) - def _get_max_parallelism(self, node: GenerationNode) -> int | None: - """Helper to extract max_parallelism from transition criteria.""" + def _get_max_concurrency(self, node: GenerationNode) -> int | None: + """Helper to extract max_concurrency from transition criteria.""" for tc in node.transition_criteria: if isinstance(tc, MaxGenerationParallelism): return tc.threshold return None - def test_max_parallelism_adjustments(self) -> None: + def test_max_concurrency_adjustments(self) -> None: # No adjustment. sobol_gpei = choose_generation_strategy_legacy( search_space=get_branin_search_space() ) - self.assertIsNone(self._get_max_parallelism(sobol_gpei._nodes[0])) + self.assertIsNone(self._get_max_concurrency(sobol_gpei._nodes[0])) self.assertEqual( - self._get_max_parallelism(sobol_gpei._nodes[1]), - DEFAULT_BAYESIAN_PARALLELISM, + self._get_max_concurrency(sobol_gpei._nodes[1]), + DEFAULT_BAYESIAN_CONCURRENCY, ) # Impose a cap of 1 on max parallelism for all steps. sobol_gpei = choose_generation_strategy_legacy( search_space=get_branin_search_space(), max_parallelism_cap=1 ) self.assertEqual( - self._get_max_parallelism(sobol_gpei._nodes[0]), + self._get_max_concurrency(sobol_gpei._nodes[0]), 1, ) self.assertEqual( - self._get_max_parallelism(sobol_gpei._nodes[1]), + self._get_max_concurrency(sobol_gpei._nodes[1]), 1, ) # Disable enforcing max parallelism for all steps. sobol_gpei = choose_generation_strategy_legacy( search_space=get_branin_search_space(), max_parallelism_override=-1 ) - self.assertIsNone(self._get_max_parallelism(sobol_gpei._nodes[0])) - self.assertIsNone(self._get_max_parallelism(sobol_gpei._nodes[1])) + self.assertIsNone(self._get_max_concurrency(sobol_gpei._nodes[0])) + self.assertIsNone(self._get_max_concurrency(sobol_gpei._nodes[1])) # Override max parallelism for all steps. sobol_gpei = choose_generation_strategy_legacy( search_space=get_branin_search_space(), max_parallelism_override=10 ) - self.assertEqual(self._get_max_parallelism(sobol_gpei._nodes[0]), 10) - self.assertEqual(self._get_max_parallelism(sobol_gpei._nodes[1]), 10) + self.assertEqual(self._get_max_concurrency(sobol_gpei._nodes[0]), 10) + self.assertEqual(self._get_max_concurrency(sobol_gpei._nodes[1]), 10) def test_set_should_deduplicate(self) -> None: sobol_gpei = choose_generation_strategy_legacy( diff --git a/ax/orchestration/orchestrator.py b/ax/orchestration/orchestrator.py index a56357a3b4c..19a07cd10a3 100644 --- a/ax/orchestration/orchestrator.py +++ b/ax/orchestration/orchestrator.py @@ -1645,7 +1645,7 @@ def _validate_options(self, options: OrchestratorOptions) -> None: ) def _get_max_pending_trials(self) -> int: - """Returns the maximum number of pending trials specified in the options, or + """Returns the maximum number of concurrent trials specified in the options, or zero, if the failure rate limit has been exceeded at any point during the optimization. """ @@ -1690,8 +1690,9 @@ def _prepare_trials( max_pending_upper_bound = max_pending_trials - num_pending_trials if max_pending_upper_bound < 1: self.logger.debug( - f"`max_pending_trials={max_pending_trials}` and {num_pending_trials} " - "trials are currently pending; not initiating any additional trials." + f"`max_pending_trials={max_pending_trials}` and " + f"{num_pending_trials} trials are currently pending; " + "not initiating any additional trials." ) return [], [] n = max_pending_upper_bound if n == -1 else min(max_pending_upper_bound, n) diff --git a/ax/orchestration/orchestrator_options.py b/ax/orchestration/orchestrator_options.py index 70ab3b04cf3..184db46fedc 100644 --- a/ax/orchestration/orchestrator_options.py +++ b/ax/orchestration/orchestrator_options.py @@ -90,7 +90,7 @@ class OrchestratorOptions: deployment. The size of the groups will be determined as the minimum of ``self.poll_available_capacity()`` and the number of generator runs that the generation strategy is able to produce - without more data or reaching its allowed max paralellism limit. + without more data or reaching its allowed max concurrency limit. debug_log_run_metadata: Whether to log run_metadata for debugging purposes. early_stopping_strategy: A ``BaseEarlyStoppingStrategy`` that determines whether a trial should be stopped given the current state of diff --git a/ax/service/ax_client.py b/ax/service/ax_client.py index cb49ed1e39c..f47f6f2368c 100644 --- a/ax/service/ax_client.py +++ b/ax/service/ax_client.py @@ -837,38 +837,38 @@ def get_trials_data_frame(self) -> pd.DataFrame: return self.experiment.to_df() def get_max_parallelism(self) -> list[tuple[int, int]]: - """Retrieves maximum number of trials that can be scheduled in parallel + """Retrieves maximum number of trials that can be scheduled concurrently at different stages of optimization. Some optimization algorithms profit significantly from sequential optimization (i.e. suggest a few points, get updated with data for them, repeat, see https://ax.dev/docs/bayesopt.html). - Parallelism setting indicates how many trials should be running simulteneously + Concurrency setting indicates how many trials should be running simultaneously (generated, but not yet completed with data). The output of this method is mapping of form - {num_trials -> max_parallelism_setting}, where the max_parallelism_setting - is used for num_trials trials. If max_parallelism_setting is -1, as - many of the trials can be ran in parallel, as necessary. If num_trials - in a tuple is -1, then the corresponding max_parallelism_setting + {num_trials -> max_concurrency_setting}, where the max_concurrency_setting + is used for num_trials trials. If max_concurrency_setting is -1, as + many of the trials can be ran concurrently, as necessary. If num_trials + in a tuple is -1, then the corresponding max_concurrency_setting should be used for all subsequent trials. For example, if the returned list is [(5, -1), (12, 6), (-1, 3)], - the schedule could be: run 5 trials with any parallelism, run 6 trials in - parallel twice, run 3 trials in parallel for as long as needed. Here, + the schedule could be: run 5 trials with any concurrency, run 6 trials + concurrently twice, run 3 trials concurrently for as long as needed. Here, 'running' a trial means obtaining a next trial from `AxClient` through get_next_trials and completing it with data when available. Returns: - Mapping of form {num_trials -> max_parallelism_setting}. + Mapping of form {num_trials -> max_concurrency_setting}. """ - parallelism_settings = [] + concurrency_settings = [] for node in self.generation_strategy._nodes: - # Extract max_parallelism from MaxGenerationParallelism criterion - max_parallelism = None + # Extract max_concurrency from MaxGenerationParallelism criterion + max_concurrency = None for tc in node.transition_criteria: if isinstance(tc, MaxGenerationParallelism): - max_parallelism = tc.threshold + max_concurrency = tc.threshold break # Try to get num_trials from the node. If there's no MinTrials # criterion (unlimited trials), num_trials will raise UserInputError. @@ -877,13 +877,13 @@ def get_max_parallelism(self) -> list[tuple[int, int]]: num_trials = node.num_trials except UserInputError: num_trials = -1 - parallelism_settings.append( + concurrency_settings.append( ( num_trials, - max_parallelism if max_parallelism is not None else num_trials, + max_concurrency if max_concurrency is not None else num_trials, ) ) - return parallelism_settings + return concurrency_settings def get_optimization_trace( self, objective_optimum: float | None = None diff --git a/ax/service/tests/test_ax_client.py b/ax/service/tests/test_ax_client.py index afe81f05d01..5b4711b9be0 100644 --- a/ax/service/tests/test_ax_client.py +++ b/ax/service/tests/test_ax_client.py @@ -50,7 +50,7 @@ UserInputError, ) from ax.exceptions.generation_strategy import MaxParallelismReachedException -from ax.generation_strategy.dispatch_utils import DEFAULT_BAYESIAN_PARALLELISM +from ax.generation_strategy.dispatch_utils import DEFAULT_BAYESIAN_CONCURRENCY from ax.generation_strategy.generation_strategy import ( GenerationNode, GenerationStep, @@ -511,7 +511,7 @@ def test_default_generation_strategy_continuous(self) -> None: if i < 5: self.assertEqual(gen_limit, 5 - i) else: - self.assertEqual(gen_limit, DEFAULT_BAYESIAN_PARALLELISM) + self.assertEqual(gen_limit, DEFAULT_BAYESIAN_CONCURRENCY) parameterization, trial_index = ax_client.get_next_trial() x, y = parameterization.get("x"), parameterization.get("y") ax_client.complete_trial( @@ -1616,14 +1616,14 @@ def test_keep_generating_without_data(self) -> None: self.assertTrue(len(node0_min_trials) > 0) self.assertFalse(node0_min_trials[0].block_gen_if_met) - # Check that max_parallelism is None by verifying no MaxGenerationParallelism + # Check that max_concurrency is None by verifying no MaxGenerationParallelism # criterion exists on node 1 - node1_max_parallelism = [ + node1_max_concurrency = [ tc for tc in ax_client.generation_strategy._nodes[1].transition_criteria if isinstance(tc, MaxGenerationParallelism) ] - self.assertEqual(len(node1_max_parallelism), 0) + self.assertEqual(len(node1_max_concurrency), 0) for _ in range(10): ax_client.get_next_trial() @@ -2872,7 +2872,7 @@ def test_estimate_early_stopping_savings(self) -> None: self.assertEqual(ax_client.estimate_early_stopping_savings(), 0) - def test_max_parallelism_exception_when_early_stopping(self) -> None: + def test_max_concurrency_exception_when_early_stopping(self) -> None: ax_client = AxClient() ax_client.create_experiment( parameters=[ diff --git a/ax/utils/common/complexity_utils.py b/ax/utils/common/complexity_utils.py index a0edc391df2..9a3028ca76e 100644 --- a/ax/utils/common/complexity_utils.py +++ b/ax/utils/common/complexity_utils.py @@ -111,7 +111,7 @@ class OptimizationSummary: is True). tolerated_trial_failure_rate: Maximum tolerated trial failure rate (should be <= 0.9). - max_pending_trials: Maximum number of pending trials. + max_pending_trials: Maximum number of concurrent trials. min_failed_trials_for_failure_rate_check: Minimum failed trials before failure rate is checked. non_default_advanced_options: Whether non-default advanced options are set.