Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 32 additions & 33 deletions ax/generation_strategy/dispatch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
logger: logging.Logger = get_logger(__name__)


DEFAULT_BAYESIAN_PARALLELISM = 3
DEFAULT_BAYESIAN_CONCURRENCY = 3
# `BO_MIXED` optimizes all range parameters once for each combination of choice
# parameters, then takes the optimum of those optima. The cost associated with this
# method grows with the number of combinations, and so it is only used when the
Expand All @@ -49,7 +49,7 @@ def _make_sobol_step(
num_trials: int = -1,
min_trials_observed: int | None = None,
enforce_num_trials: bool = True,
max_parallelism: int | None = None,
max_concurrency: int | None = None,
seed: int | None = None,
should_deduplicate: bool = False,
) -> GenerationStep:
Expand All @@ -62,9 +62,8 @@ def _make_sobol_step(
ceil(num_trials / 2) if min_trials_observed is None else min_trials_observed
),
enforce_num_trials=enforce_num_trials,
max_parallelism=max_parallelism,
max_parallelism=max_concurrency,
generator_kwargs={"deduplicate": True, "seed": seed},
should_deduplicate=should_deduplicate,
use_all_trials_in_exp=True,
)

Expand All @@ -73,7 +72,7 @@ def _make_botorch_step(
num_trials: int = -1,
min_trials_observed: int | None = None,
enforce_num_trials: bool = True,
max_parallelism: int | None = None,
max_concurrency: int | None = None,
generator: GeneratorRegistryBase = Generators.BOTORCH_MODULAR,
generator_kwargs: dict[str, Any] | None = None,
winsorization_config: None
Expand Down Expand Up @@ -130,7 +129,7 @@ def _make_botorch_step(
ceil(num_trials / 2) if min_trials_observed is None else min_trials_observed
),
enforce_num_trials=enforce_num_trials,
max_parallelism=max_parallelism,
max_parallelism=max_concurrency,
generator_kwargs=generator_kwargs,
should_deduplicate=should_deduplicate,
)
Expand Down Expand Up @@ -325,11 +324,11 @@ def choose_generation_strategy_legacy(
enforce_sequential_optimization: Whether to enforce that 1) the generation
strategy needs to be updated with ``min_trials_observed`` observations for
a given generation step before proceeding to the next one and 2) maximum
number of trials running at once (max_parallelism) if enforced for the
number of trials running at once (concurrency) if enforced for the
BayesOpt step. NOTE: ``max_parallelism_override`` and
``max_parallelism_cap`` settings will still take their effect on max
parallelism even if ``enforce_sequential_optimization=False``, so if those
settings are specified, max parallelism will be enforced.
concurrency even if ``enforce_sequential_optimization=False``, so if those
settings are specified, max concurrency will be enforced.
random_seed: Fixed random seed for the Sobol generator.
torch_device: The device to use for generation steps implemented in PyTorch
(e.g. via BoTorch). Some generation steps (in particular EHVI-based ones
Expand Down Expand Up @@ -360,21 +359,21 @@ def choose_generation_strategy_legacy(
min_sobol_trials_observed: Minimum number of Sobol trials that must be
observed before proceeding to the next generation step. Defaults to
`ceil(num_initialization_trials / 2)`.
max_parallelism_cap: Integer cap on parallelism in this generation strategy.
If specified, ``max_parallelism`` setting in each generation step will be
max_parallelism_cap: Integer cap on concurrency in this generation strategy.
If specified, the concurrency setting in each generation step will be
set to the minimum of the default setting for that step and the value of
this cap. ``max_parallelism_cap`` is meant to just be a hard limit on
parallelism (e.g. to avoid overloading machine(s) that evaluate the
concurrency (e.g. to avoid overloading machine(s) that evaluate the
experiment trials). Specify only if not specifying
``max_parallelism_override``.
max_parallelism_override: Integer, with which to override the default max
parallelism setting for all steps in the generation strategy returned from
concurrency setting for all steps in the generation strategy returned from
this function. Each generation step has a ``max_parallelism`` value, which
restricts how many trials can run simultaneously during a given generation
step. By default, the parallelism setting is chosen as appropriate for the
restricts how many trials can run concurrently during a given generation
step. By default, the concurrency setting is chosen as appropriate for the
model in a given generation step. If ``max_parallelism_override`` is -1,
no max parallelism will be enforced for any step of the generation
strategy. Be aware that parallelism is limited to improve performance of
no max concurrency will be enforced for any step of the generation
strategy. Be aware that concurrency is limited to improve performance of
Bayesian optimization, so only disable its limiting if necessary.
optimization_config: Used to infer whether to use MOO.
should_deduplicate: Whether to deduplicate the parameters of proposed arms
Expand Down Expand Up @@ -416,30 +415,30 @@ def choose_generation_strategy_legacy(
optimization_config=optimization_config,
use_saasbo=use_saasbo,
)
# Determine max parallelism for the generation steps.
# Determine max concurrency for the generation steps.
if max_parallelism_override == -1:
# `max_parallelism_override` of -1 means no max parallelism enforcement in
# `max_parallelism_override` of -1 means no max concurrency enforcement in
# the generation strategy, which means `max_parallelism=None` in gen. steps.
sobol_parallelism = bo_parallelism = None
sobol_concurrency = bo_concurrency = None
elif max_parallelism_override is not None:
sobol_parallelism = bo_parallelism = max_parallelism_override
sobol_concurrency = bo_concurrency = max_parallelism_override
elif max_parallelism_cap is not None: # Max parallelism override is None by now
sobol_parallelism = max_parallelism_cap
bo_parallelism = min(max_parallelism_cap, DEFAULT_BAYESIAN_PARALLELISM)
sobol_concurrency = max_parallelism_cap
bo_concurrency = min(max_parallelism_cap, DEFAULT_BAYESIAN_CONCURRENCY)
elif not enforce_sequential_optimization:
# If no max parallelism settings specified and not enforcing sequential
# optimization, do not limit parallelism.
sobol_parallelism = bo_parallelism = None
else: # No additional max parallelism settings, use defaults
sobol_parallelism = None # No restriction on Sobol phase
bo_parallelism = DEFAULT_BAYESIAN_PARALLELISM
# If no max concurrency settings specified and not enforcing sequential
# optimization, do not limit concurrency.
sobol_concurrency = bo_concurrency = None
else: # No additional max concurrency settings, use defaults
sobol_concurrency = None # No restriction on Sobol phase
bo_concurrency = DEFAULT_BAYESIAN_CONCURRENCY

if not force_random_search and suggested_model is not None:
if not enforce_sequential_optimization and (
max_parallelism_override is not None or max_parallelism_cap is not None
):
logger.info(
"If `enforce_sequential_optimization` is False, max parallelism is "
"If `enforce_sequential_optimization` is False, max concurrency is "
"not enforced and other max parallelism settings will be ignored."
)
if max_parallelism_override is not None and max_parallelism_cap is not None:
Expand Down Expand Up @@ -503,7 +502,7 @@ def choose_generation_strategy_legacy(
min_trials_observed=min_sobol_trials_observed,
enforce_num_trials=enforce_sequential_optimization,
seed=random_seed,
max_parallelism=sobol_parallelism,
max_concurrency=sobol_concurrency,
should_deduplicate=should_deduplicate,
)
)
Expand All @@ -512,7 +511,7 @@ def choose_generation_strategy_legacy(
generator=suggested_model,
winsorization_config=winsorization_config,
derelativize_with_raw_status_quo=derelativize_with_raw_status_quo,
max_parallelism=bo_parallelism,
max_concurrency=bo_concurrency,
generator_kwargs=generator_kwargs,
should_deduplicate=should_deduplicate,
disable_progbar=disable_progbar,
Expand Down Expand Up @@ -544,7 +543,7 @@ def choose_generation_strategy_legacy(
_make_sobol_step(
seed=random_seed,
should_deduplicate=should_deduplicate,
max_parallelism=sobol_parallelism,
max_concurrency=sobol_concurrency,
)
]
)
Expand Down
4 changes: 2 additions & 2 deletions ax/generation_strategy/generation_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ class GenerationStep:
observed` have not been completed, a call to `generation_strategy.gen`
will fail with a `DataRequiredError`.
max_parallelism: How many trials generated in the course of this step are
allowed to be run (i.e. have `trial.status` of `RUNNING`) simultaneously.
allowed to be run (i.e. have `trial.status` of `RUNNING`) concurrently.
If `max_parallelism` trials from this step are already running, a call
to `generation_strategy.gen` will fail with a `MaxParallelismReached
Exception`, indicating that more trials need to be completed before
Expand Down Expand Up @@ -1055,7 +1055,7 @@ def __new__(
)
if max_parallelism is not None and max_parallelism < 1:
raise UserInputError(
"Maximum parallelism should be None (if no limit) or "
"Maximum concurrency should be None (if no limit) or "
f"a positive number. Got: {max_parallelism} for "
f"step {generator_name}."
)
Expand Down
40 changes: 20 additions & 20 deletions ax/generation_strategy/tests/test_dispatch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
_make_botorch_step,
calculate_num_initialization_trials,
choose_generation_strategy_legacy,
DEFAULT_BAYESIAN_PARALLELISM,
DEFAULT_BAYESIAN_CONCURRENCY,
)
from ax.generation_strategy.generation_node import GenerationNode
from ax.generation_strategy.transition_criterion import (
Expand Down Expand Up @@ -621,14 +621,14 @@ def test_enforce_sequential_optimization(self) -> None:
sobol_gpei._nodes[0].transition_criteria[0], MinTrials
)
self.assertTrue(node0_min_trials.block_gen_if_met)
# Check that max_parallelism is set by verifying MaxGenerationParallelism
# Check that max_concurrency is set by verifying MaxGenerationParallelism
# criterion exists on node 1
node1_max_parallelism = [
node1_max_concurrency = [
tc
for tc in sobol_gpei._nodes[1].transition_criteria
if isinstance(tc, MaxGenerationParallelism)
]
self.assertTrue(len(node1_max_parallelism) > 0)
self.assertTrue(len(node1_max_concurrency) > 0)
with self.subTest("False"):
sobol_gpei = choose_generation_strategy_legacy(
search_space=get_branin_search_space(),
Expand All @@ -646,14 +646,14 @@ def test_enforce_sequential_optimization(self) -> None:
sobol_gpei._nodes[0].transition_criteria[0], MinTrials
)
self.assertFalse(node0_min_trials.block_gen_if_met)
# Check that max_parallelism is None by verifying no
# Check that max_concurrency is None by verifying no
# MaxGenerationParallelism criterion exists on node 1
node1_max_parallelism = [
node1_max_concurrency = [
tc
for tc in sobol_gpei._nodes[1].transition_criteria
if isinstance(tc, MaxGenerationParallelism)
]
self.assertEqual(len(node1_max_parallelism), 0)
self.assertEqual(len(node1_max_concurrency), 0)
with self.subTest("False and max_parallelism_override"):
with self.assertLogs(
choose_generation_strategy_legacy.__module__, logging.INFO
Expand Down Expand Up @@ -706,7 +706,7 @@ def test_max_parallelism_override(self) -> None:
search_space=get_branin_search_space(), max_parallelism_override=10
)
self.assertTrue(
all(self._get_max_parallelism(s) == 10 for s in sobol_gpei._nodes)
all(self._get_max_concurrency(s) == 10 for s in sobol_gpei._nodes)
)

def test_winsorization(self) -> None:
Expand Down Expand Up @@ -817,47 +817,47 @@ def test_fixed_num_initialization_trials(self) -> None:
3,
)

def _get_max_parallelism(self, node: GenerationNode) -> int | None:
"""Helper to extract max_parallelism from transition criteria."""
def _get_max_concurrency(self, node: GenerationNode) -> int | None:
"""Helper to extract max_concurrency from transition criteria."""
for tc in node.transition_criteria:
if isinstance(tc, MaxGenerationParallelism):
return tc.threshold
return None

def test_max_parallelism_adjustments(self) -> None:
def test_max_concurrency_adjustments(self) -> None:
# No adjustment.
sobol_gpei = choose_generation_strategy_legacy(
search_space=get_branin_search_space()
)
self.assertIsNone(self._get_max_parallelism(sobol_gpei._nodes[0]))
self.assertIsNone(self._get_max_concurrency(sobol_gpei._nodes[0]))
self.assertEqual(
self._get_max_parallelism(sobol_gpei._nodes[1]),
DEFAULT_BAYESIAN_PARALLELISM,
self._get_max_concurrency(sobol_gpei._nodes[1]),
DEFAULT_BAYESIAN_CONCURRENCY,
)
# Impose a cap of 1 on max parallelism for all steps.
sobol_gpei = choose_generation_strategy_legacy(
search_space=get_branin_search_space(), max_parallelism_cap=1
)
self.assertEqual(
self._get_max_parallelism(sobol_gpei._nodes[0]),
self._get_max_concurrency(sobol_gpei._nodes[0]),
1,
)
self.assertEqual(
self._get_max_parallelism(sobol_gpei._nodes[1]),
self._get_max_concurrency(sobol_gpei._nodes[1]),
1,
)
# Disable enforcing max parallelism for all steps.
sobol_gpei = choose_generation_strategy_legacy(
search_space=get_branin_search_space(), max_parallelism_override=-1
)
self.assertIsNone(self._get_max_parallelism(sobol_gpei._nodes[0]))
self.assertIsNone(self._get_max_parallelism(sobol_gpei._nodes[1]))
self.assertIsNone(self._get_max_concurrency(sobol_gpei._nodes[0]))
self.assertIsNone(self._get_max_concurrency(sobol_gpei._nodes[1]))
# Override max parallelism for all steps.
sobol_gpei = choose_generation_strategy_legacy(
search_space=get_branin_search_space(), max_parallelism_override=10
)
self.assertEqual(self._get_max_parallelism(sobol_gpei._nodes[0]), 10)
self.assertEqual(self._get_max_parallelism(sobol_gpei._nodes[1]), 10)
self.assertEqual(self._get_max_concurrency(sobol_gpei._nodes[0]), 10)
self.assertEqual(self._get_max_concurrency(sobol_gpei._nodes[1]), 10)

def test_set_should_deduplicate(self) -> None:
sobol_gpei = choose_generation_strategy_legacy(
Expand Down
32 changes: 16 additions & 16 deletions ax/service/ax_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,38 +837,38 @@ def get_trials_data_frame(self) -> pd.DataFrame:
return self.experiment.to_df()

def get_max_parallelism(self) -> list[tuple[int, int]]:
"""Retrieves maximum number of trials that can be scheduled in parallel
"""Retrieves maximum number of trials that can be scheduled concurrently
at different stages of optimization.

Some optimization algorithms profit significantly from sequential
optimization (i.e. suggest a few points, get updated with data for them,
repeat, see https://ax.dev/docs/bayesopt.html).
Parallelism setting indicates how many trials should be running simulteneously
Concurrency setting indicates how many trials should be running simultaneously
(generated, but not yet completed with data).

The output of this method is mapping of form
{num_trials -> max_parallelism_setting}, where the max_parallelism_setting
is used for num_trials trials. If max_parallelism_setting is -1, as
many of the trials can be ran in parallel, as necessary. If num_trials
in a tuple is -1, then the corresponding max_parallelism_setting
{num_trials -> max_concurrency_setting}, where the max_concurrency_setting
is used for num_trials trials. If max_concurrency_setting is -1, as
many of the trials can be ran concurrently, as necessary. If num_trials
in a tuple is -1, then the corresponding max_concurrency_setting
should be used for all subsequent trials.

For example, if the returned list is [(5, -1), (12, 6), (-1, 3)],
the schedule could be: run 5 trials with any parallelism, run 6 trials in
parallel twice, run 3 trials in parallel for as long as needed. Here,
the schedule could be: run 5 trials with any concurrency, run 6 trials
concurrently twice, run 3 trials concurrently for as long as needed. Here,
'running' a trial means obtaining a next trial from `AxClient` through
get_next_trials and completing it with data when available.

Returns:
Mapping of form {num_trials -> max_parallelism_setting}.
Mapping of form {num_trials -> max_concurrency_setting}.
"""
parallelism_settings = []
concurrency_settings = []
for node in self.generation_strategy._nodes:
# Extract max_parallelism from MaxGenerationParallelism criterion
max_parallelism = None
# Extract max_concurrency from MaxGenerationParallelism criterion
max_concurrency = None
for tc in node.transition_criteria:
if isinstance(tc, MaxGenerationParallelism):
max_parallelism = tc.threshold
max_concurrency = tc.threshold
break
# Try to get num_trials from the node. If there's no MinTrials
# criterion (unlimited trials), num_trials will raise UserInputError.
Expand All @@ -877,13 +877,13 @@ def get_max_parallelism(self) -> list[tuple[int, int]]:
num_trials = node.num_trials
except UserInputError:
num_trials = -1
parallelism_settings.append(
concurrency_settings.append(
(
num_trials,
max_parallelism if max_parallelism is not None else num_trials,
max_concurrency if max_concurrency is not None else num_trials,
)
)
return parallelism_settings
return concurrency_settings

def get_optimization_trace(
self, objective_optimum: float | None = None
Expand Down
Loading
Loading