diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index f4d4d67a..8c70c910 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -38,6 +38,7 @@ jobs: "onnx>=1.15.0,<2.0.0" \ "onnxruntime>=1.17" \ "onnxscript>=0.5.4" \ + "optuna>=2.10.0" \ "pyyaml>=6.0,<7.0" \ "pydantic>=2.0,<3.0" \ "pyarrow>=16.1" \ diff --git a/docs/source/api.rst b/docs/source/api.rst index 35ac6312..71f79ada 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -19,7 +19,7 @@ Inference Config Hyperparameter Search Config --------------------------------- .. automodule:: sequifier.config.hyperparameter_search_config - :members: HyperparameterSearch, ModelSpecHyperparameterSampling, TrainingSpecHyperparameterSampling + :members: HyperparameterSearchConfig, ModelSpecHyperparameterSampling, TrainingSpecHyperparameterSampling Non-standard Optimizers -------------------------- diff --git a/pyproject.toml b/pyproject.toml index d7b054b8..612666e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "onnx>=1.15.0,<2.0.0", "onnxruntime>=1.17", "onnxscript>=0.5.4", + "optuna>=2.10.0", "polars>= 1.0.0,<2.0.0", "pyyaml>=6.0,<7.0", "pydantic>=2.0,<3.0", diff --git a/src/sequifier/config/hyperparameter_search_config.py b/src/sequifier/config/hyperparameter_search_config.py index 9f6f01ff..549a143a 100644 --- a/src/sequifier/config/hyperparameter_search_config.py +++ b/src/sequifier/config/hyperparameter_search_config.py @@ -1,12 +1,12 @@ import json -from itertools import product -from typing import Optional, Union +import os +import warnings +from typing import Any, Optional, Union -import numpy as np import yaml from beartype import beartype from loguru import logger -from pydantic import BaseModel, ConfigDict, Field, field_validator +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator from sequifier.config.train_config import ( DotDict, @@ -17,14 +17,67 @@ from sequifier.helpers import normalize_path, try_catch_excess_keys +class FloatDistribution(BaseModel): + """Pydantic model representing a floating-point hyperparameter distribution for Optuna. + + Attributes: + low (float): The lower bound of the distribution. + high (float): The upper bound of the distribution. + log (bool): If True, sample from the distribution in the log domain. Defaults to False. + """ + + low: float + high: float + step: Optional[float] = None + log: bool = False + + @model_validator(mode="after") + def validate_step_and_log(self): + if self.log and self.step is not None and self.step != 1: + raise ValueError( + f"Optuna does not support setting step != 1 when log=True. " + f"Got step={self.step} and log={self.log}." + ) + return self + + +class IntDistribution(BaseModel): + """Pydantic model representing an integer hyperparameter distribution for Optuna. + + Attributes: + low (int): The lower bound of the distribution. + high (int): The upper bound of the distribution. + step (int): The spacing between valid integer values. Defaults to 1. + log (bool): If True, sample from the distribution in the log domain. Defaults to False. + """ + + low: int + high: int + step: int = 1 + log: bool = False + + @model_validator(mode="after") + def validate_step_and_log(self): + if self.log and self.step != 1: + raise ValueError( + f"Optuna does not support setting step != 1 when log=True. " + f"Got step={self.step} and log={self.log}." + ) + return self + + +OptunaFloat = Union[list[float], FloatDistribution] +OptunaInt = Union[list[int], IntDistribution] + + @beartype def load_hyperparameter_search_config( config_path: str, skip_metadata: bool -) -> "HyperparameterSearch": +) -> "HyperparameterSearchConfig": """Load a hyperparameter search configuration from a YAML file. This function reads a YAML configuration file, processes it to include - data-driven configurations if needed, and returns a HyperparameterSearch + data-driven configurations if needed, and returns a HyperparameterSearchConfig object. Args: @@ -34,7 +87,7 @@ def load_hyperparameter_search_config( data-driven configurations. Returns: - An instance of the HyperparameterSearch class, populated with the + An instance of the HyperparameterSearchConfig class, populated with the configuration from the file. """ with open(config_path, "r") as f: @@ -95,7 +148,7 @@ def load_hyperparameter_search_config( config_values["id_maps"] = metadata_config["id_maps"] - return try_catch_excess_keys(config_path, HyperparameterSearch, config_values) + return try_catch_excess_keys(config_path, HyperparameterSearchConfig, config_values) class TrainingSpecHyperparameterSampling(BaseModel): @@ -144,12 +197,14 @@ class TrainingSpecHyperparameterSampling(BaseModel): save_batch_interval_minutes: Optional[float] = None save_batch_interval_minutes_val_loss: bool = True calculate_validation_loss_on_initialization: bool = False - batch_size: list[int] - learning_rate: list[float] + + batch_size: OptunaInt + learning_rate: list[float] # Kept as list to preserve coupling with epochs criterion: dict[str, str] class_weights: Optional[dict[str, list[float]]] = None - accumulation_steps: list[int] - dropout: list[float] = [0.0] + accumulation_steps: OptunaInt + dropout: OptunaFloat = [0.0] + loss_weights: Optional[dict[str, float]] = None optimizer: list[DotDict] = Field( default_factory=lambda: [DotDict({"name": "Adam"})] @@ -263,99 +318,48 @@ def validate_scheduler_config(cls, v, info_dict): ) return v - def random_sample(self): - """Randomly sample a set of training hyperparameters. - - This method selects a random combination of hyperparameters from the - defined lists of possibilities. It ensures that learning rates and - schedulers are paired correctly. + def sample_trial(self, trial: Any) -> TrainingSpecModel: + """Samples training hyperparameters using an Optuna trial. - Returns: - A TrainingSpecModel instance populated with a randomly sampled set of - hyperparameters. - """ - learning_rate_and_scheduler_index = np.random.randint(len(self.learning_rate)) - optimizer_index = np.random.randint(len(self.optimizer)) - batch_size = np.random.choice(self.batch_size) - dropout = np.random.choice(self.dropout) - accumulation_steps = np.random.choice(self.accumulation_steps) - optimizer = self.optimizer[optimizer_index] - learning_rate = self.learning_rate[learning_rate_and_scheduler_index] - - logger.info( - f"{learning_rate = } - {batch_size = } - {dropout = } - {optimizer = }" - ) - - return TrainingSpecModel( - device=self.device, - epochs=self.epochs[learning_rate_and_scheduler_index], - log_interval=self.log_interval, - class_share_log_columns=self.class_share_log_columns, - early_stopping_epochs=self.early_stopping_epochs, - save_interval_epochs=self.save_interval_epochs, - save_latest_interval_minutes=self.save_latest_interval_minutes, - save_batch_interval_minutes=self.save_batch_interval_minutes, - save_batch_interval_minutes_val_loss=self.save_batch_interval_minutes_val_loss, - calculate_validation_loss_on_initialization=self.calculate_validation_loss_on_initialization, - batch_size=batch_size, - learning_rate=learning_rate, - criterion=self.criterion, - class_weights=self.class_weights, - accumulation_steps=accumulation_steps, - dropout=dropout, - loss_weights=self.loss_weights, - optimizer=optimizer, - scheduler=self.scheduler[learning_rate_and_scheduler_index], - continue_training=self.continue_training, - enforce_determinism=True, - scheduler_step_on=self.scheduler_step_on, - distributed=self.distributed, - load_full_data_to_ram=self.load_full_data_to_ram, - max_ram_gb=self.max_ram_gb, - device_max_concat_length=self.device_max_concat_length, - world_size=self.world_size, - num_workers=self.num_workers, - backend=self.backend, - layer_type_dtypes=self.layer_type_dtypes, - layer_autocast=self.layer_autocast, - sampling_strategy=self.sampling_strategy, - data_parallelism=self.data_parallelism, - fsdp_cpu_offload=self.fsdp_cpu_offload, - torch_compile=self.torch_compile, - float32_matmul_precision=self.float32_matmul_precision, - ) - - def grid_sample(self, i): - """Select a set of training hyperparameters based on a grid search index. - - This method generates a grid of all possible hyperparameter combinations - and selects the combination at the given index. + This method leverages the provided Optuna trial to suggest values for + hyperparameters like batch size, dropout, and learning rate based on the + defined search spaces (categorical lists or distributions). Args: - i: The index of the hyperparameter combination to select from the grid. + trial (Any): The Optuna trial object used for suggesting hyperparameters. Returns: - A TrainingSpecModel instance populated with the selected set of - hyperparameters. + TrainingSpecModel: A populated training specification model with the sampled hyperparameters. """ - hyperparameter_combinations = list( - product( - np.arange(len(self.learning_rate)), - self.batch_size, - self.dropout, - self.optimizer, - self.accumulation_steps, - ) + lr_sched_index = trial.suggest_categorical( + "lr_sched_index", list(range(len(self.learning_rate))) ) - ( - learning_rate_and_scheduler_index, - batch_size, - dropout, - optimizer, - accumulation_steps, - ) = hyperparameter_combinations[i] + epochs = self.epochs[lr_sched_index] + learning_rate = self.learning_rate[lr_sched_index] + scheduler = self.scheduler[lr_sched_index] - learning_rate = self.learning_rate[learning_rate_and_scheduler_index] + opt_index = trial.suggest_categorical( + "optimizer_index", list(range(len(self.optimizer))) + ) + optimizer = self.optimizer[opt_index] + + def sample_param( + name: str, space: Union[list, FloatDistribution, IntDistribution] + ): + if isinstance(space, list): + return trial.suggest_categorical(name, space) + elif isinstance(space, FloatDistribution): + return trial.suggest_float( + name, space.low, space.high, step=space.step, log=space.log + ) + elif isinstance(space, IntDistribution): + return trial.suggest_int( + name, space.low, space.high, step=space.step, log=space.log + ) + + batch_size = sample_param("batch_size", self.batch_size) + dropout = sample_param("dropout", self.dropout) + accumulation_steps = sample_param("accumulation_steps", self.accumulation_steps) logger.info( f"{learning_rate = } - {batch_size = } - {dropout = } - {optimizer = }" @@ -363,7 +367,7 @@ def grid_sample(self, i): return TrainingSpecModel( device=self.device, - epochs=self.epochs[learning_rate_and_scheduler_index], + epochs=epochs, log_interval=self.log_interval, class_share_log_columns=self.class_share_log_columns, early_stopping_epochs=self.early_stopping_epochs, @@ -380,7 +384,7 @@ def grid_sample(self, i): dropout=dropout, loss_weights=self.loss_weights, optimizer=optimizer, - scheduler=self.scheduler[learning_rate_and_scheduler_index], + scheduler=scheduler, continue_training=self.continue_training, enforce_determinism=True, scheduler_step_on=self.scheduler_step_on, @@ -400,23 +404,6 @@ def grid_sample(self, i): float32_matmul_precision=self.float32_matmul_precision, ) - def n_combinations(self): - """Calculate the total number of hyperparameter combinations. - - This method computes the total number of unique hyperparameter sets that - can be generated by the grid search. - - Returns: - The total number of possible hyperparameter combinations. - """ - return ( - len(self.learning_rate) - * len(self.batch_size) - * len(self.dropout) - * len(self.optimizer) - * len(self.accumulation_steps) - ) - class ModelSpecHyperparameterSampling(BaseModel): """Pydantic model for model specification hyperparameter sampling. @@ -436,8 +423,9 @@ class ModelSpecHyperparameterSampling(BaseModel): dim_model: list[int] feature_embedding_dims: Optional[list[dict[str, int]]] n_head: list[int] - dim_feedforward: list[int] - num_layers: list[int] + + dim_feedforward: OptunaInt + num_layers: OptunaInt prediction_length: int activation_fn: list[str] @@ -447,7 +435,7 @@ class ModelSpecHyperparameterSampling(BaseModel): norm_first: list[bool] n_kv_heads: list[Optional[int]] - rope_theta: list[float] + rope_theta: OptunaFloat @field_validator("n_head") @classmethod @@ -482,36 +470,61 @@ def validate_model_spec(cls, v, info): return v - def random_sample(self): - """Randomly sample a set of model hyperparameters. + def sample_trial(self, trial: Any) -> ModelSpecModel: + """Samples model architecture hyperparameters using an Optuna trial. + + This method uses the Optuna trial to suggest structural parameters such as + the number of layers, feedforward dimensions, and attention heads. It ensures + that dependent dimensions (like `n_head` and `dim_model`) stay correctly paired + and that invalid key-value head combinations are filtered out. - This method selects a random combination of model hyperparameters from the - defined lists of possibilities. It ensures that dim_model, feature_embedding_dims, - and n_head are paired correctly, and that n_kv_heads is a valid divisor of n_head. + Args: + trial (Any): The Optuna trial object used for suggesting hyperparameters. Returns: - A ModelSpecModel instance populated with a randomly sampled set of - hyperparameters. + ModelSpecModel: A populated model specification model with the sampled architecture parameters. """ - dim_model_index = np.random.randint(len(self.dim_model)) + dim_model_idx = trial.suggest_categorical( + "dim_model_idx", list(range(len(self.dim_model))) + ) + + initial_embedding_dim = self.initial_embedding_dim[dim_model_idx] + joint_embedding_dim = self.joint_embedding_dim[dim_model_idx] + dim_model = self.dim_model[dim_model_idx] + n_head = self.n_head[dim_model_idx] feature_embedding_dims = ( None if self.feature_embedding_dims is None - else self.feature_embedding_dims[dim_model_index] + else self.feature_embedding_dims[dim_model_idx] + ) + + def sample_param( + name: str, space: Union[list, FloatDistribution, IntDistribution] + ): + if isinstance(space, list): + return trial.suggest_categorical(name, space) + elif isinstance(space, FloatDistribution): + return trial.suggest_float( + name, space.low, space.high, step=space.step, log=space.log + ) + elif isinstance(space, IntDistribution): + return trial.suggest_int( + name, space.low, space.high, step=space.step, log=space.log + ) + + dim_feedforward = sample_param("dim_feedforward", self.dim_feedforward) + num_layers = sample_param("num_layers", self.num_layers) + rope_theta = sample_param("rope_theta", self.rope_theta) + + activation_fn = trial.suggest_categorical("activation_fn", self.activation_fn) + normalization = trial.suggest_categorical("normalization", self.normalization) + positional_encoding = trial.suggest_categorical( + "positional_encoding", self.positional_encoding + ) + attention_type = trial.suggest_categorical( + "attention_type", self.attention_type ) - initial_embedding_dim = self.initial_embedding_dim[dim_model_index] - joint_embedding_dim = self.joint_embedding_dim[dim_model_index] - dim_model = self.dim_model[dim_model_index] - n_head = self.n_head[dim_model_index] - dim_feedforward = np.random.choice(self.dim_feedforward) - num_layers = np.random.choice(self.num_layers) - - activation_fn = np.random.choice(self.activation_fn) - normalization = np.random.choice(self.normalization) - positional_encoding = np.random.choice(self.positional_encoding) - attention_type = np.random.choice(self.attention_type) - norm_first = np.random.choice(self.norm_first) - rope_theta = np.random.choice(self.rope_theta) + norm_first = trial.suggest_categorical("norm_first", self.norm_first) valid_kv_heads = [ kv @@ -525,9 +538,7 @@ def random_sample(self): ) n_kv_heads = None else: - # Use random.choice because valid_kv_heads might contain None - # and np.random.choice behaves weirdly with mixed None types. - n_kv_heads = np.random.choice(np.array(valid_kv_heads)) + n_kv_heads = trial.suggest_categorical("n_kv_heads", valid_kv_heads) logger.info( f"{initial_embedding_dim} - {joint_embedding_dim = } - {dim_model = } - {dim_feedforward = } - {num_layers = } - {activation_fn = } - {normalization = } - {positional_encoding = } - {attention_type = } - {norm_first = } - {n_kv_heads = } - {rope_theta = } " @@ -551,113 +562,8 @@ def random_sample(self): prediction_length=self.prediction_length, ) - def grid_sample(self, i): - """Select a set of model hyperparameters based on a grid search index. - - This method generates a grid of all possible model hyperparameter - combinations and selects the combination at the given index. - Includes sanitation logic to prevent invalid n_kv_heads combinations. - - Args: - i: The index of the hyperparameter combination to select from the grid. - - Returns: - A ModelSpecModel instance populated with the selected set of - hyperparameters. - """ - hyperparameter_combinations = list( - product( - np.arange(len(self.dim_model)), - self.dim_feedforward, - self.num_layers, - self.activation_fn, - self.normalization, - self.positional_encoding, - self.attention_type, - self.norm_first, - self.n_kv_heads, - self.rope_theta, - ) - ) - - ( - dim_model_index, - dim_feedforward, - num_layers, - activation_fn, - normalization, - positional_encoding, - attention_type, - norm_first, - n_kv_heads, - rope_theta, - ) = hyperparameter_combinations[i] - - initial_embedding_dim = self.initial_embedding_dim[dim_model_index] - joint_embedding_dim = self.joint_embedding_dim[dim_model_index] - dim_model = self.dim_model[dim_model_index] - n_head = self.n_head[dim_model_index] - - if n_kv_heads is not None: - if n_head % n_kv_heads != 0 or n_kv_heads > n_head: - logger.debug( - f"Grid sample index {i}: forcing n_kv_heads=None because {n_kv_heads} does not divide {n_head}" - ) - n_kv_heads = None - - logger.info( - f"{dim_model = } - {dim_feedforward = } - {joint_embedding_dim = } - {num_layers = } - {activation_fn = } - {normalization = } - {positional_encoding = } - {attention_type = } - {norm_first = } - {n_kv_heads = } - {rope_theta = } " - ) - - feature_embedding_dims = ( - None - if self.feature_embedding_dims is None - else self.feature_embedding_dims[dim_model_index] - ) - return ModelSpecModel( - initial_embedding_dim=initial_embedding_dim, - feature_embedding_dims=feature_embedding_dims, - joint_embedding_dim=joint_embedding_dim, - dim_model=dim_model, - n_head=n_head, - dim_feedforward=dim_feedforward, - num_layers=num_layers, - activation_fn=activation_fn, - normalization=normalization, - positional_encoding=positional_encoding, - attention_type=attention_type, - norm_first=norm_first, - n_kv_heads=n_kv_heads, - rope_theta=rope_theta, - prediction_length=self.prediction_length, - ) - - def n_combinations(self): - """Calculate the total number of model hyperparameter combinations. - - This method computes the total number of unique model hyperparameter sets - that can be generated by the grid search. - - Returns: - The total number of possible model hyperparameter combinations. - """ - return ( - len(self.dim_model) - * len(self.dim_feedforward) - * len(self.joint_embedding_dim) - * len(self.num_layers) - * len(self.activation_fn) - * len(self.normalization) - * len(self.positional_encoding) - * len(self.attention_type) - * len(self.norm_first) - * len(self.n_kv_heads) - * len(self.rope_theta) - ) - - -class HyperparameterSearch(BaseModel): +class HyperparameterSearchConfig(BaseModel): """Pydantic model for hyperparameter search configuration. Attributes: @@ -685,13 +591,17 @@ class HyperparameterSearch(BaseModel): export_with_dropout: If True, exports the model with dropout enabled. model_hyperparameter_sampling: The sampling configuration for model hyperparameters. training_hyperparameter_sampling: The sampling configuration for training hyperparameters. + evaluation_inference_config: The inference config to infer on for hyperparameter search optimization + evaluation_script: The script that outputs the evaluation metrics, typically from the inference output + evaluation_metrics: The evaluation metrics to optimize during hyperparameter search + evaluation_metric_directions: The direction to optimize evaluation_metrics in. Only 'minimize' and 'maximize' are allowed """ project_root: str metadata_config_path: str hp_search_name: str - search_strategy: str = "sample" # "sample" or "grid" - n_samples: Optional[int] + search_strategy: str = "bayesian" + n_trials: Optional[int] = Field(None, alias="n_samples") model_config_write_path: str training_data_path: str validation_data_path: str @@ -715,11 +625,73 @@ class HyperparameterSearch(BaseModel): export_pt: bool = False export_with_dropout: bool = False + evaluation_inference_config: Optional[str] = None + evaluation_script: Optional[str] = None + evaluation_metric_directions: Optional[list[str]] = None + evaluation_metrics: Optional[list[str]] = None + model_hyperparameter_sampling: ModelSpecHyperparameterSampling training_hyperparameter_sampling: TrainingSpecHyperparameterSampling override_input: bool = False + @field_validator("evaluation_metrics") + @classmethod + def validate_evaluation_metrics(cls, v, info): + if v is not None and info.data.get("evaluation_script") is None: + raise ValueError( + "evaluation_script must be provided if evaluation_metrics is defined." + ) + if v is not None: + if info.data.get("evaluation_metric_directions") is None: + raise ValueError( + "evaluation_metric_directions must be provided if evaluation_metrics is defined." + ) + else: + evaluation_metric_directions = info.data.get( + "evaluation_metric_directions" + ) + if len(v) != len(evaluation_metric_directions): + raise ValueError( + f"evaluation_metrics and evaluation_metric_directions must have the same number of values, len(evaluation_metrics) = {len(v)}, {len(evaluation_metric_directions) = }" + ) + if v is not None and info.data.get("evaluation_inference_config") is None: + warnings.warn( + "Please provide evaluation_inference_config if your evaluation_script requires inference outputs" + ) + return v + + @field_validator("evaluation_metric_directions") + @classmethod + def validate_evaluation_metric_directions(cls, v): + if v is not None: + allowed_vals = {"minimize", "maximize"} + diff = set(v).difference(allowed_vals) + if len(diff): + raise ValueError( + f"In evaluation_metric_directions, only 'minimize' and 'maximize' are allowed, found: {diff}" + ) + return v + + @field_validator("evaluation_script") + @classmethod + def validate_evaluation_script(cls, v, info): + if v is not None: + project_root = info.data.get("project_root") + if not os.path.exists(os.path.join(project_root, v)): + raise ValueError( + f"evaluation_script '{v}' does not exist at '{project_root}'" + ) + return v + + @field_validator("evaluation_inference_config") + @classmethod + def validate_evaluation_inference_config(cls, v, info): + if v is not None: + if not os.path.exists(v): + raise ValueError(f"evaluation_inference_config '{v}' does not exist") + return v + @field_validator("column_types") @classmethod def validate_model_spec(cls, v, info): @@ -730,86 +702,42 @@ def validate_model_spec(cls, v, info): ) return v - def random_sample(self, i): - """Randomly sample a full training configuration. - - This method generates a complete training configuration by randomly - sampling model and training hyperparameters, as well as selecting a - column set and sequence length. - - Args: - i: The index of the sample, used to create a unique model name. - - Returns: - A TrainModel instance populated with a randomly sampled configuration. - """ - model_spec = self.model_hyperparameter_sampling.random_sample() - training_spec = self.training_hyperparameter_sampling.random_sample() - input_columns_index = np.random.randint(len(self.input_columns)) - seq_length = np.random.choice(self.seq_length) - logger.info(f"{input_columns_index = } - {seq_length = }") - return TrainModel( - project_root=self.project_root, - metadata_config_path=self.metadata_config_path, - model_name=self.hp_search_name + f"-run-{i}", - training_data_path=self.training_data_path, - validation_data_path=self.validation_data_path, - read_format=self.read_format, - input_columns=self.input_columns[input_columns_index], - column_types=self.column_types[input_columns_index], - categorical_columns=self.categorical_columns[input_columns_index], - real_columns=self.real_columns[input_columns_index], - target_columns=self.target_columns, - target_column_types=self.target_column_types, - id_maps=self.id_maps, - seq_length=seq_length, - n_classes=self.n_classes, - inference_batch_size=self.inference_batch_size, - seed=101, - export_embedding_model=self.export_embedding_model, - export_generative_model=self.export_generative_model, - export_onnx=self.export_onnx, - export_pt=self.export_pt, - export_with_dropout=self.export_with_dropout, - model_spec=model_spec, - training_spec=training_spec, - ) + @field_validator("search_strategy") + @classmethod + def validate_search_strategy(cls, v: str) -> str: + allowed = ["sample", "grid", "bayesian"] + if v not in allowed: + raise ValueError(f"search_strategy must be one of {allowed}, got '{v}'") + return v - def grid_sample(self, i): - """Select a full training configuration based on a grid search index. + def sample_trial(self, trial: Any, run_index: int) -> TrainModel: + """Generates a complete training configuration using an Optuna trial. - This method generates a grid of all possible configurations and selects - the configuration at the given index. + This method orchestrates the sampling of both model and training specifications, + as well as data sequence parameters, combining them into a final configuration + ready for model execution. Args: - i: The index of the configuration to select from the grid. + trial (Any): The Optuna trial object used for suggesting hyperparameters. + run_index (int): The current run/trial index, used to assign a unique name to the model. Returns: - A TrainModel instance populated with the selected configuration. + TrainModel: A fully populated configuration instance for the current trial. """ - model_hyperparamter_sample = self.model_hyperparameter_sampling.n_combinations() - training_hyperparamter_sample = ( - self.training_hyperparameter_sampling.n_combinations() - ) - inner_combinations = model_hyperparamter_sample * training_hyperparamter_sample + model_spec = self.model_hyperparameter_sampling.sample_trial(trial) + training_spec = self.training_hyperparameter_sampling.sample_trial(trial) - i_model = i % model_hyperparamter_sample - i_training = (i // model_hyperparamter_sample) % training_hyperparamter_sample - i_outer = i // inner_combinations - - model_spec = self.model_hyperparameter_sampling.grid_sample(i_model) - training_spec = self.training_hyperparameter_sampling.grid_sample(i_training) - - hyperparameter_combinations = list( - product(np.arange(len(self.input_columns)), self.seq_length) + input_columns_index = trial.suggest_categorical( + "input_columns_index", list(range(len(self.input_columns))) ) + seq_length = trial.suggest_categorical("seq_length", self.seq_length) - input_columns_index, seq_length = hyperparameter_combinations[i_outer] + logger.info(f"{input_columns_index = } - {seq_length = }") return TrainModel( project_root=self.project_root, metadata_config_path=self.metadata_config_path, - model_name=self.hp_search_name + f"-run-{i}", + model_name=f"{self.hp_search_name}-run-{run_index}", training_data_path=self.training_data_path, validation_data_path=self.validation_data_path, read_format=self.read_format, @@ -824,49 +752,11 @@ def grid_sample(self, i): n_classes=self.n_classes, inference_batch_size=self.inference_batch_size, seed=101, - export_embedding_model=False, - export_generative_model=True, + export_embedding_model=self.export_embedding_model, + export_generative_model=self.export_generative_model, export_onnx=self.export_onnx, export_pt=self.export_pt, export_with_dropout=self.export_with_dropout, model_spec=model_spec, training_spec=training_spec, ) - - def sample(self, i): - """Sample a configuration based on the specified search strategy. - - This method delegates to either random_sample or grid_sample based on - the `search_strategy` attribute. - - Args: - i: The index of the sample or grid combination to generate. - - Returns: - A TrainModel instance with a generated configuration. - - Raises: - Exception: If the search_strategy is not 'sample' or 'grid'. - """ - if self.search_strategy == "sample": - return self.random_sample(i) - elif self.search_strategy == "grid": - return self.grid_sample(i) - else: - raise Exception(f"{self.search_strategy} invalid") - - def n_combinations(self): - """Calculate the total number of possible configurations. - - This method computes the total number of unique configurations that can be - generated by a grid search over all defined hyperparameters. - - Returns: - The total number of possible hyperparameter configurations. - """ - return ( - len(self.input_columns) - * len(self.seq_length) - * self.model_hyperparameter_sampling.n_combinations() - * self.training_hyperparameter_sampling.n_combinations() - ) diff --git a/src/sequifier/helpers.py b/src/sequifier/helpers.py index df7cbecc..f2097548 100644 --- a/src/sequifier/helpers.py +++ b/src/sequifier/helpers.py @@ -1,6 +1,9 @@ +import glob import os import random +import re import sys +from datetime import datetime from typing import Any, Optional, Union import numpy as np @@ -423,3 +426,84 @@ def get_torch_dtype(dtype_str: str) -> torch.dtype: ) return dtype_map[dtype_str] + + +def get_best_model_path( + project_root: str, run_name: str, model_type: str +) -> tuple[str, int]: + """ + Searches for the exported 'best' model file for a given run and returns its path and epoch. + + Args: + project_root: The root directory of the project. + run_name: The unique identifier for the hyperparameter search run. + model_type: The extension of the exported model (e.g., 'onnx' or 'pt'). + + Returns: + A tuple containing: + - The file path to the best model (str). + - The actual epoch at which this model was saved (int). + + Raises: + FileNotFoundError: If no matching model files are found. + """ + search_pattern = os.path.join( + project_root, "models", f"sequifier-{run_name}-best-*.{model_type}" + ) + + matching_models = glob.glob(search_pattern) + + if not matching_models: + raise FileNotFoundError( + f"Could not find an exported 'best' model matching: {search_pattern}" + ) + + # Find the file with the highest epoch number in its name + best_model_path = max( + matching_models, + key=lambda p: int(os.path.splitext(os.path.basename(p))[0].split("-")[-1]), + ) + last_epoch = int(best_model_path.split("-")[-1].split(".")[0]) + return best_model_path, last_epoch + + +def get_last_training_batch_timedelta( + model_name: str, rank: int, project_root: str = "." +) -> float: + """ + Reads the level 2 log file, finds the last two mid-epoch training logs, + and returns the timedelta between them in seconds. + """ + # Construct the path to the level 2 log file based on configure_logger() + log_path = os.path.join( + project_root, "logs", f"sequifier-{model_name}-rank{rank}-2.txt" + ) + + if not os.path.exists(log_path): + raise FileNotFoundError(f"Log file not found: {log_path}") + + # Regex to capture the timestamp of mid-epoch training batch logs + # Matches lines like: "2026-05-26 15:15:39 | INFO | [INFO] Epoch 1 | Batch 10/... | Loss: ..." + train_log_pattern = re.compile( + r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+\|.*?\[INFO\] Epoch.*?Batch" + ) + + timestamps = [] + + with open(log_path, "r", encoding="utf-8") as file: + for line in file: + match = train_log_pattern.search(line) + if match: + timestamps.append( + datetime.strptime(match.group(1), "%Y-%m-%d %H:%M:%S") + ) + + if len(timestamps) < 2: + raise ValueError( + "Not enough mid-epoch training logs found in the file to calculate a timedelta." + ) + + # Get the last two chronologically recorded batch timestamps + t1, t2 = timestamps[-2], timestamps[-1] + + return (t2 - t1).total_seconds() diff --git a/src/sequifier/hyperparameter_search.py b/src/sequifier/hyperparameter_search.py index 79b57d97..78a6413c 100644 --- a/src/sequifier/hyperparameter_search.py +++ b/src/sequifier/hyperparameter_search.py @@ -1,11 +1,14 @@ -import glob +import ctypes +import json import os +import signal import subprocess -from datetime import datetime -from typing import Optional +import sys +import time +import warnings +from typing import Union -import numpy as np -import torch +import optuna import torch._dynamo import yaml from beartype import beartype @@ -14,241 +17,290 @@ from sequifier.config.hyperparameter_search_config import ( # noqa: E402 load_hyperparameter_search_config, ) -from sequifier.config.train_config import TrainModel # noqa: E402 -from sequifier.helpers import configure_logger # noqa: E402 -from sequifier.helpers import normalize_path # noqa: E402 +from sequifier.helpers import ( # noqa: E402 + get_best_model_path, + get_last_training_batch_timedelta, +) from sequifier.io.yaml import TrainModelDumper # noqa: E402 -@beartype -def hyperparameter_search(config_path, skip_metadata) -> None: - """Main function for initiating a hyperparameter search process. +def set_pdeathsig(): + """Binds child process lifecycle to the parent orchestrator via Linux prctl.""" + if sys.platform.startswith("linux"): + libc = ctypes.CDLL("libc.so.6") + libc.prctl(1, signal.SIGTERM) # PR_SET_PDEATHSIG = 1 + - This function loads the hyperparameter search configuration, initializes - the searcher, and starts the search. +def objective(trial: optuna.Trial, config) -> Union[float, tuple[float, ...]]: + """The central objective engine bridging Optuna to pure CLI execution. + + This function handles generating the YAML configuration for the specific + trial, dynamically allocating a port for distributed training, launching the + training subprocess, asynchronously polling the validation metrics, and reporting + them back to Optuna for potential pruning. Args: - config_path (str): Path to the hyperparameter search YAML - configuration file. - skip_metadata (bool): Flag indicating whether to run the search - on unprocessed data. + trial (optuna.Trial): The Optuna trial object managing the current hyperparameter combination. + config (HyperparameterSearchConfig): The parsed hyperparameter search configuration. Returns: - None + float: The best validation loss achieved during the trial. + + Raises: + optuna.TrialPruned: If the trial is pruned by the Optuna orchestrator. + RuntimeError: If the training subprocess fails or is externally preempted. """ - hyperparameter_search_config = load_hyperparameter_search_config( - config_path, skip_metadata + run_config = config.sample_trial(trial, trial.number) + run_name = run_config.model_name + + # 1. YAML Generation + config_path = os.path.join( + config.project_root, config.model_config_write_path, f"{run_name}.yaml" + ) + os.makedirs(os.path.dirname(config_path), exist_ok=True) + with open(config_path, "w") as f: + yaml.dump(run_config, f, Dumper=TrainModelDumper, sort_keys=False) + + os.environ["SEQUIFIER_HYPERPARAMETER_SEARCH_RUN"] = "1" + + env = os.environ.copy() + cmd = ["sequifier", "train", f"--config-path={config_path}"] + process = subprocess.Popen( + cmd, + env=env, + preexec_fn=set_pdeathsig if sys.platform.startswith("linux") else None, ) - hyperparameter_searcher = HyperparameterSearcher(hyperparameter_search_config) + metrics_path = os.path.join( + config.project_root, "logs", f"sequifier-{run_name}-metrics.jsonl" + ) + prune_path = os.path.join( + config.project_root, "logs", f"sequifier-{run_name}.prune" + ) - hyperparameter_searcher.hyperparameter_search() + last_read_pos = 0 + best_val_loss = float("inf") + + def consume_metrics(last_read_pos: int, best_val_loss: float) -> tuple[int, float]: + """Helper closure to read written metrics and evaluate pruning.""" + if os.path.exists(metrics_path): + with open(metrics_path, "r") as f: + f.seek(last_read_pos) + while True: + line = f.readline() + if not line or (not line.endswith("\n")): + break # Reached end of currently written data + try: + data = json.loads(line) + val_loss = data.get("val_loss") + global_step = data.get("global_step") + + if global_step is not None and val_loss is not None: + # 5. Cooperative Pruning Evaluation + is_multi_objective = ( + config.evaluation_metrics is not None + and len(config.evaluation_metrics) > 1 + ) + if not is_multi_objective: + trial.report(val_loss, global_step) + best_val_loss = min(best_val_loss, val_loss) + + if trial.should_prune(): + open(prune_path, "w").close() + try: + try: + timedelta = ( + get_last_training_batch_timedelta( + run_name, 0, config.project_root + ) + ) + timeout_val = (timedelta * 2) + 30 + except (ValueError, FileNotFoundError): + timeout_val = 60.0 # Safe default fallback + + process.wait(timeout=timeout_val) + except subprocess.TimeoutExpired: + process.kill() # Escalation + raise optuna.TrialPruned() + + last_read_pos = f.tell() + + except json.JSONDecodeError: + break + return last_read_pos, best_val_loss + + # 4. Asynchronous Polling & Caching Mitigation + while process.poll() is None: + last_read_pos, best_val_loss = consume_metrics(last_read_pos, best_val_loss) + time.sleep(2) + + _, best_val_loss = consume_metrics(last_read_pos, best_val_loss) + + exit_code = process.returncode + if exit_code == 143: + if os.path.exists(prune_path): + raise optuna.TrialPruned() + else: + raise RuntimeError( + f"Trial pre-empted externally by cluster (SIGTERM). Exit code: {exit_code}" + ) + elif exit_code != 0: + raise RuntimeError(f"Training failed with exit code {exit_code}") + model_type = "onnx" if run_config.export_onnx else "pt" + model_path, last_epoch = get_best_model_path( + config.project_root, run_name, model_type + ) -class HyperparameterSearcher: - """A class for performing hyperparameter search. + if config.evaluation_inference_config: + subprocess.run( + [ + "sequifier", + "infer", + f"--config-path={config.evaluation_inference_config}", + f"--model-path={model_path}", + ], + check=True, + ) - Manages the hyperparameter search process based on a given configuration. - This class handles sampling hyperparameters, creating training configurations, - launching training subprocesses, and logging results. - """ + if config.evaluation_script and config.evaluation_metrics: + eval_script_path = config.evaluation_script + cmd = [sys.executable, eval_script_path, f"{run_name}-best-{last_epoch}"] - def __init__(self, hyperparameter_search_config): - """Initializes the HyperparameterSearcher instance. - - Args: - hyperparameter_search_config (HyperparameterSearchConfig): An object - containing the configuration for the hyperparameter search, - loaded via `load_hyperparameter_search_config`. - """ - self.config = hyperparameter_search_config - self.normalized_config_path = normalize_path( - self.config.model_config_write_path, - self.config.project_root, - ) - self.start_run = self._get_start_run() - self._initialize_log_file() - self.n_samples = self._calculate_n_samples( - hyperparameter_search_config.override_input + eval_process = subprocess.run( + cmd, capture_output=True, text=True, cwd=config.project_root ) - @beartype - def _get_start_run(self) -> int: - """Determines the starting run number by checking existing config files. - - This allows for resuming a search. It finds the highest existing run - number (e.g., 'hp_search_name-run-10.yaml') and returns the next - integer (e.g., 11). If no previous runs are found, it starts from 0. - - Returns: - int: The integer run number to start the search from (e.g., 1 for a - new search, or `n+1` if `n` is the last completed run). - """ - file_root = f"{self.config.hp_search_name}-run-" - search_pattern = os.path.join(self.normalized_config_path, f"{file_root}*.yaml") - files = [os.path.split(file)[1] for file in glob.glob(search_pattern)] - files.sort( - key=lambda filename: int(filename.replace(file_root, "").split(".")[0]) + if eval_process.returncode != 0: + raise RuntimeError( + f"Evaluation script failed (exit code {eval_process.returncode}):\n{eval_process.stderr}" + ) + + eval_json_path = os.path.join( + config.project_root, + "outputs", + "evaluations", + f"{run_name}-best-{last_epoch}.json", ) + if not os.path.exists(eval_json_path): + raise FileNotFoundError( + f"Evaluation JSON not found at expected path: {eval_json_path}" + ) + + with open(eval_json_path, "r") as f: + eval_results = json.load(f) + eval_results_keys = set(list(eval_results.keys())) + evaluation_metrics = set(config.evaluation_metrics) + missing_metrics = evaluation_metrics.difference(eval_results_keys) + excess_metrics = eval_results_keys.difference(evaluation_metrics) + if len(missing_metrics): + raise ValueError( + f"Some of the configured evaluation metrics are not in the script output: {missing_metrics}" + ) + if len(excess_metrics): + warnings.warn( + f"Some metrics output by the script are not used in hyperparameter optimization: {excess_metrics}" + ) + + metrics = [] + for metric in config.evaluation_metrics: + if metric not in eval_results: + raise KeyError( + f"Metric '{metric}' missing in {eval_json_path}. Found keys: {list(eval_results.keys())}" + ) + metrics.append(float(eval_results[metric])) - if len(files) > 0: - last_iter = int(files[-1].split(".")[0].replace(file_root, "")) - return last_iter + 1 + if len(metrics) == 1: + return metrics[0] else: - return 0 + return tuple(metrics) - @beartype - def _initialize_log_file(self) -> None: - """Sets up the log file for the hyperparameter search. + return best_val_loss - It creates the 'logs' directory if it doesn't exist and opens a log file. - If starting from run 1, it overwrites (mode 'w'); otherwise, it appends - (mode 'a') to the existing log. - Returns: - None - """ - self.logger = configure_logger( - self.config.project_root, self.config.hp_search_name - ) +@beartype +def hyperparameter_search(config_path: str, skip_metadata: bool) -> None: + """Main function for initiating an Optuna-based hyperparameter search process. - @beartype - def _calculate_n_samples(self, override_input: bool) -> int: - """Calculates the total number of hyperparameter combinations to sample. - - Based on the `search_strategy` ('grid' or 'sample'), it either - calculates the total number of combinations or uses the specified - `n_samples`. It includes interactive prompts for user confirmation if - the strategy is 'grid' or if 'sample' exceeds the total combinations. - - Returns: - int: The total number of samples (runs) to execute. - - Raises: - Exception: If the `search_strategy` in the config is not 'grid' - or 'sample'. - AssertionError: If `n_samples` is not set when `search_strategy` - is 'sample'. - """ - n_combinations = self.config.n_combinations() - self.logger.info(f"Found {n_combinations} hyperparameter combinations") - if self.config.search_strategy == "sample": - n_samples = self.config.n_samples - if n_samples is None: - raise ValueError("n_samples must be defined for 'sample' strategy") - if n_samples > self.config.n_combinations(): - if not override_input: - input( - f"{n_samples} is above the number of combinations of hyperparameters. Press any key to continue with grid search or abort to reconfigure" - ) - n_samples = self.config.n_combinations() - self.config.search_strategy = "grid" - elif self.config.search_strategy == "grid": - n_samples = self.config.n_combinations() - if not override_input: - input( - f"Found {n_samples} hyperparameter combinations. Please enter any key to confirm, or change search strategy to 'sample'" - ) + This function loads the configuration, initializes the Optuna study with a + minimization direction, and kicks off the optimization loop. Once the configured + number of trials is complete, it prints out the best trial's value and hyperparameters. + + Args: + config_path (str): Path to the hyperparameter search YAML configuration file. + skip_metadata (bool): Flag indicating whether to skip loading/processing data metadata. + + Raises: + ValueError: If `n_trials` is not defined in the configuration. + """ + config = load_hyperparameter_search_config(config_path, skip_metadata) + + os.makedirs(os.path.join(config.project_root, "state", "optuna"), exist_ok=True) + strategy = getattr(config, "search_strategy", "bayesian") + if strategy in ["sample"]: + sampler = optuna.samplers.RandomSampler() + elif strategy == "grid": + if hasattr(optuna.samplers, "BruteForceSampler"): + sampler = optuna.samplers.BruteForceSampler() else: - raise Exception( - f"search strategy '{self.config.search_strategy}' is not valid. Allowed values are 'grid' and 'sample'" + raise RuntimeError( + "Grid search requires Optuna >= 3.1 for BruteForceSampler." ) + else: # "bayesian" + sampler = optuna.samplers.TPESampler() - if n_samples is None: - raise ValueError("n_samples must be defined for 'sample' strategy") - - return n_samples - - def _create_config_and_run( - self, i: int, seed: int, config: Optional[TrainModel] = None, attempt=0 - ): - """Creates a specific training configuration file and executes the run. - - This method samples a configuration (if not provided), writes it to a - YAML file, and then launches the `sequifier train` command as a - subprocess. It includes retry logic: if a run fails (e.g., CUDA out - of memory), it recursively calls itself with a halved batch size, - retrying up to 3 times. - - Args: - i (int): The current run number (e.g., 1, 2, 3...). - seed (int): The random seed to use for this specific run. - config (Optional[TrainModel]): A specific `TrainModel` config to - use. If `None`, a new config will be sampled using - `self.config.sample(i)`. Defaults to `None`. - attempt (int): The current retry attempt number (0 for the first - try). Defaults to 0. - - Returns: - None - - Raises: - AssertionError: If the batch size becomes non-positive after - halving during a retry. - """ - if config is None: - config = self.config.sample(i) - full_config_path = os.path.join( - self.normalized_config_path, - f"{self.config.hp_search_name}-run-{i}.yaml", + storage_path = os.path.join( + config.project_root, "state", "optuna", f"{config.hp_search_name}.db" + ) + + is_multivariate = ( + config.evaluation_metrics is not None and len(config.evaluation_metrics) > 1 + ) + + if is_multivariate: + study = optuna.create_study( + study_name=config.hp_search_name, + directions=config.evaluation_metric_directions, + sampler=sampler, + storage=f"sqlite:///{storage_path}", + load_if_exists=True, ) - with open(full_config_path, "w") as f: - f.write( - yaml.dump( - config, - Dumper=TrainModelDumper, - sort_keys=False, - default_flow_style=False, - ) + else: + direction = ( + config.evaluation_metric_directions[0] + if ( + config.evaluation_metric_directions + and len(config.evaluation_metric_directions) == 1 ) + else "minimize" + ) + study = optuna.create_study( + study_name=config.hp_search_name, + direction=direction, + sampler=sampler, + storage=f"sqlite:///{storage_path}", + load_if_exists=True, + ) - self.logger.info( - f"--- Starting Hyperparameter Search Run {i} with seed {seed} ---" + n_trials = config.n_trials + if n_trials is None and config.search_strategy != "grid": + raise ValueError( + "n_trials/n_samples must be specified for hyperparameter search." ) - try: - subprocess.run( - [ - "sequifier", - "train", - f"--config-path={full_config_path}", - f"--seed={seed}", - ], - check=True, - ) - self.logger.info(f"--- Finished Hyperparameter Search Run {i} ---") - - except subprocess.CalledProcessError as e: - if attempt < 3: - if config is None: - raise RuntimeError("Config object lost during retry logic.") - new_batch_size = int(config.training_spec.batch_size / 2) - - if new_batch_size <= 0: - raise ValueError( - "Batch size reduced to 0 or less during retry logic." - ) - config.training_spec.batch_size = new_batch_size - self.logger.info( - f"ERROR: Run {i} failed with exit code {e.returncode}. Halving batch size to {new_batch_size} in attempt {attempt + 1}" - ) - self._create_config_and_run(i, seed, config, attempt=attempt + 1) - else: - self.logger.info( - f"ERROR: Run {i} failed with exit code {e.returncode}. Stopping run {i}" - ) - @beartype - def hyperparameter_search(self) -> None: - """Performs the hyperparameter search loop. - - It iterates from the `start_run` number up to the total `n_samples`, - generating a unique seed for each run and calling - `_create_config_and_run` to execute it. - - Returns: - None - """ - for i in range(self.start_run, self.n_samples): - seed = int(datetime.now().timestamp() * 1e6) % (2**32) - np.random.seed(seed) - self._create_config_and_run(i, seed=seed) + study.optimize(lambda trial: objective(trial, config), n_trials=n_trials) + + if is_multivariate: + print("\nBest trials (Pareto front):") + for trial in study.best_trials: + print(f" Values: {trial.values}") + print(" Params: ") + for key, value in trial.params.items(): + print(f" {key}: {value}") + else: + print("\nBest trial:") + trial = study.best_trial + print(f" Value: {trial.value}") + print(" Params: ") + for key, value in trial.params.items(): + print(f" {key}: {value}") diff --git a/src/sequifier/make.py b/src/sequifier/make.py index 89a3a65c..fdc29a7a 100644 --- a/src/sequifier/make.py +++ b/src/sequifier/make.py @@ -95,6 +95,7 @@ checkpoints/ outputs/ data/ +state/ .DS_Store""" @@ -110,6 +111,8 @@ def make(args): raise ValueError(f"project_name '{project_name}' is not admissible") os.makedirs(f"{project_name}/configs") + os.makedirs(f"{project_name}/state/optuna") + os.makedirs(f"{project_name}/scripts") with open(f"{project_name}/.gitignore", "w") as f: f.write(gitignore_string) diff --git a/src/sequifier/train.py b/src/sequifier/train.py index 841d2cd7..e958f78c 100644 --- a/src/sequifier/train.py +++ b/src/sequifier/train.py @@ -1,25 +1,29 @@ import contextlib import copy import glob +import json import logging import math import os -import time -import uuid -import warnings -from typing import Any, Optional, Union - -import numpy as np -import polars as pl -import torch -import torch._dynamo -import torch.distributed as dist -import torch.multiprocessing as mp -from beartype import beartype -from packaging import version -from torch import Tensor, nn -from torch.amp import GradScaler -from torch.distributed.checkpoint.state_dict import ( +import sys + +os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1" +import time # noqa: E402 +import uuid # noqa: E402 +import warnings # noqa: E402 +from typing import Any, Optional, Union # noqa: E402 + +import numpy as np # noqa: E402 +import polars as pl # noqa: E402 +import torch # noqa: E402 +import torch._dynamo # noqa: E402 +import torch.distributed as dist # noqa: E402 +import torch.multiprocessing as mp # noqa: E402 +from beartype import beartype # noqa: E402 +from packaging import version # noqa: E402 +from torch import Tensor, nn # noqa: E402 +from torch.amp import GradScaler # noqa: E402 +from torch.distributed.checkpoint.state_dict import ( # noqa: E402 StateDictOptions, get_model_state_dict, get_optimizer_state_dict, @@ -28,19 +32,23 @@ ) if version.parse(torch.__version__) >= version.parse("2.6.0"): - from torch.distributed.fsdp import MixedPrecisionPolicy, OffloadPolicy, fully_shard + from torch.distributed.fsdp import ( # noqa: E402 + MixedPrecisionPolicy, + OffloadPolicy, + fully_shard, + ) else: - from torch.distributed._composable.fsdp import ( + from torch.distributed._composable.fsdp import ( # noqa: E402 MixedPrecisionPolicy, OffloadPolicy, fully_shard, ) -from torch.distributed.device_mesh import init_device_mesh -from torch.nn import ModuleDict -from torch.nn.functional import one_hot -from torch.nn.parallel import DistributedDataParallel as DDP -from torch.utils.data import DataLoader +from torch.distributed.device_mesh import init_device_mesh # noqa: E402 +from torch.nn import ModuleDict # noqa: E402 +from torch.nn.functional import one_hot # noqa: E402 +from torch.nn.parallel import DistributedDataParallel as DDP # noqa: E402 +from torch.utils.data import DataLoader # noqa: E402 torch._dynamo.config.suppress_errors = True @@ -432,17 +440,24 @@ def train(args: Any, args_config: dict[str, Any]) -> None: ) else: # Single-node multi-GPU fallback using mp.spawn - mp.spawn( - _mp_train_worker_wrapper, - args=( - world_size, - config, - from_folder, - config.training_spec.torch_compile, - ), - nprocs=world_size, - join=True, - ) + try: + mp.spawn( + _mp_train_worker_wrapper, + args=( + world_size, + config, + from_folder, + config.training_spec.torch_compile, + ), + nprocs=world_size, + join=True, + ) + except mp.ProcessExitedException as e: + # Catch the specific PyTorch exception and check the exit_code attribute + if e.exit_code == 143: + sys.exit(143) + else: + raise e else: train_worker(0, 1, config, from_folder, 0, config.training_spec.torch_compile) @@ -1156,6 +1171,44 @@ def _get_full_state_dict( for k, v in self.state_dict().items() } + @beartype + def _check_and_terminate(self): + """Checks for an external pruning signal and terminates the process if required. + + This method looks for a specific `.prune` file generated by the Optuna orchestrator. + If running in a distributed setting, the rank 0 process checks for the file and + broadcasts a termination signal to all other ranks. If the signal is received, + the process cleans up its distributed process group, clears the GPU cache, and + gracefully exits with code 143 (SIGTERM) to allow Optuna to prune the trial. + """ + if os.getenv("SEQUIFIER_HYPERPARAMETER_SEARCH_RUN") is not None: + should_prune = 0 + if self.rank == 0: + prune_file = os.path.join( + self.project_root, "logs", f"sequifier-{self.model_name}.prune" + ) + if os.path.exists(prune_file): + should_prune = 1 + + if self.hparams.training_spec.distributed: + signal_tensor = torch.tensor( + [should_prune], dtype=torch.int32, device=self.device + ) + dist.broadcast(signal_tensor, src=0) + should_prune = signal_tensor.item() + + if should_prune: + if self.rank == 0: + self.logger.info( + "[INFO] Pruning signal received from Optuna orchestrator. Tearing down cooperatively." + ) + if self.hparams.training_spec.distributed: + cleanup() + if self.device.startswith("cuda"): + torch.cuda.empty_cache() + + sys.exit(143) + @beartype def train_model( self, @@ -1193,7 +1246,9 @@ def train_model( ) elapsed = 0.0 - self._log_epoch_results(0, 0, elapsed, total_loss, total_losses, output) + self._log_epoch_results( + 0, 0, elapsed, total_loss, total_losses, output, 0 + ) for epoch in range(self.start_epoch, self.hparams.training_spec.epochs + 1): if ( self.early_stopping_epochs is None @@ -1215,6 +1270,7 @@ def train_model( ) elapsed = time.time() - epoch_start_time + total_expected_batches = epoch * len(train_loader) self._log_epoch_results( epoch, len(train_loader), @@ -1222,6 +1278,7 @@ def train_model( total_loss, total_losses, output, + total_expected_batches, ) if total_loss < best_val_loss: @@ -1248,6 +1305,7 @@ def train_model( ) last_epoch = epoch + self._check_and_terminate() except KeyboardInterrupt: self.logger.info("\n" + "=" * 89) self.logger.info("[WARNING] Training interrupted by user (Ctrl+C).") @@ -1413,19 +1471,21 @@ def _train_epoch( total_loss += loss.item() batches_aggregated += 1 - if (batch_count + 1) % self.log_interval == 0 and self.rank == 0: - learning_rate = self.scheduler.get_last_lr()[0] - s_per_batch = (time.time() - start_time) / max( - 1, batches_aggregated - ) - avg_train_loss = total_loss / max(1, batches_aggregated) - self.logger.info( - f"[INFO] Epoch {epoch:3d} | Batch {(batch_count+1):5d}/{num_batches:5d} | Loss: {format_number(avg_train_loss)} | LR: {format_number(learning_rate)} | S/Batch {format_number(s_per_batch)}" - ) - total_loss = 0.0 - batches_aggregated = 0 - self.start_batch = 0 - start_time = time.time() + if (batch_count + 1) % self.log_interval == 0: + if self.rank == 0: + learning_rate = self.scheduler.get_last_lr()[0] + s_per_batch = (time.time() - start_time) / max( + 1, batches_aggregated + ) + avg_train_loss = total_loss / max(1, batches_aggregated) + self.logger.info( + f"[INFO] Epoch {epoch:3d} | Batch {(batch_count+1):5d}/{num_batches:5d} | Loss: {format_number(avg_train_loss)} | LR: {format_number(learning_rate)} | S/Batch {format_number(s_per_batch)}" + ) + total_loss = 0.0 + batches_aggregated = 0 + self.start_batch = 0 + start_time = time.time() + self._check_and_terminate() del data, targets, output, loss, losses @@ -1474,6 +1534,9 @@ def _train_epoch( ) if not self.hparams.training_spec.distributed or self.rank == 0: + current_global_step = (epoch - 1) * num_batches + ( + batch_count + 1 + ) self._log_epoch_results( 0, batch_count + 1, @@ -1481,8 +1544,10 @@ def _train_epoch( val_loss, val_losses, output, + current_global_step, ) val_loss_batch[0] = float(val_loss) + self._check_and_terminate() else: val_loss_batch[0] = np.float32(np.nan) @@ -2153,6 +2218,7 @@ def _log_epoch_results( total_loss: np.float32, total_losses: dict[str, np.float32], output: dict[str, Tensor], + global_step: int, ) -> None: """Logs the results of an epoch. @@ -2177,6 +2243,25 @@ class share statistics (if configured) to the log file. self.logger.info("-" * 89) self.logger.info(log_string) + metrics_file = os.path.join( + self.project_root, "logs", f"sequifier-{self.model_name}-metrics.jsonl" + ) + with open(metrics_file, "a") as f: + f.write( + json.dumps( + { + "epoch": epoch, + "batch": batch, + "global_step": global_step, + "val_loss": float(total_loss), + "elapsed": elapsed, + } + ) + + "\n" + ) + f.flush() + os.fsync(f.fileno()) + if len(total_losses) > 1: loss_strs = [ f"{key}_loss: {format_number(value)}" diff --git a/tests/configs/hyperparameter-search-bayesian.yaml b/tests/configs/hyperparameter-search-bayesian.yaml new file mode 100644 index 00000000..990869c7 --- /dev/null +++ b/tests/configs/hyperparameter-search-bayesian.yaml @@ -0,0 +1,67 @@ +project_root: tests/project_folder +metadata_config_path: tests/project_folder/configs/metadata_configs/test-data-categorical-5.json +hp_search_name: test-hp-search-bayesian +model_config_write_path: configs + +read_format: pt +target_columns: [itemId] +target_column_types: + itemId: categorical +seq_length: [8] +inference_batch_size: 10 + +# Search Strategy +search_strategy: bayesian +n_samples: 4 + +# Configuration Loading Overrides (set to null to use values from metadata) +input_columns: null +# Export Settings +export_embedding_model: false +export_generative_model: true +export_onnx: false +export_pt: true +export_with_dropout: false + +# Model Hyperparameter Search Space +model_hyperparameter_sampling: + initial_embedding_dim: [40, 80] + joint_embedding_dim: [null, null] + feature_embedding_dims: null + dim_model: [40, 80] + n_head: [2, 4] + dim_feedforward: [10, 12] + num_layers: [2] + activation_fn: ["swiglu"] + normalization: ["rmsnorm"] + positional_encoding: ["rope"] + attention_type: ["mqa", "gqa"] + norm_first: [ true] + n_kv_heads: [1] + rope_theta: [10000.0] + prediction_length: 1 + +# Training Hyperparameter Search Space +training_hyperparameter_sampling: + device: cpu + epochs: [1, 1] + save_interval_epochs: 10 + batch_size: [5, 10] + learning_rate: [0.001, 0.01] + criterion: + itemId: CrossEntropyLoss + accumulation_steps: [1] + dropout: [0.0] + optimizer: + - name: Adam + scheduler: + - name: StepLR + step_size: 1 + gamma: 0.99 + - name: StepLR + step_size: 1 + gamma: 0.99 + log_interval: 5 + continue_training: false + +override_input: true diff --git a/tests/configs/hyperparameter-search-custom-eval-inference.yaml b/tests/configs/hyperparameter-search-custom-eval-inference.yaml new file mode 100644 index 00000000..6dd476d7 --- /dev/null +++ b/tests/configs/hyperparameter-search-custom-eval-inference.yaml @@ -0,0 +1,40 @@ +project_root: tests/project_folder +metadata_config_path: configs/metadata_configs/test-data-categorical-5.json + +model_type: generative +model_path: models/sequifier-test-hp-search-custom-eval-run-0-best-1.pt + +data_path: data/test-data-categorical-5-split2 +read_format: pt +write_format: csv + +input_columns: +- itemId +- supCat1 +- supCat2 +- supCat3 +- supCat4 +target_columns: +- itemId +- supCat1 +- supCat2 +- supCat3 +- supCat4 +target_column_types: + itemId: categorical + supCat1: categorical + supCat2: categorical + supCat3: categorical + supCat4: categorical + +output_probabilities: false +map_to_id: true +device: cpu +seq_length: 8 +prediction_length: 1 +inference_batch_size: 10 +enforce_determinism: true + +# Autoregression +autoregression: true +autoregression_extra_steps: 30 diff --git a/tests/configs/hyperparameter-search-custom-eval.yaml b/tests/configs/hyperparameter-search-custom-eval.yaml new file mode 100644 index 00000000..88a16baf --- /dev/null +++ b/tests/configs/hyperparameter-search-custom-eval.yaml @@ -0,0 +1,90 @@ +project_root: tests/project_folder +metadata_config_path: tests/project_folder/configs/metadata_configs/test-data-categorical-5.json +hp_search_name: test-hp-search-custom-eval +model_config_write_path: configs + +read_format: pt +target_columns: +- itemId +- supCat1 +- supCat2 +- supCat3 +- supCat4 +target_column_types: + itemId: categorical + supCat1: categorical + supCat2: categorical + supCat3: categorical + supCat4: categorical +seq_length: [8] +inference_batch_size: 10 + +# Search Strategy +search_strategy: bayesian +n_samples: 4 + +# Configuration Loading Overrides (set to null to use values from metadata) +input_columns: null +# Export Settings +export_embedding_model: false +export_generative_model: true +export_onnx: true +export_pt: false +export_with_dropout: false + +# Model Hyperparameter Search Space +model_hyperparameter_sampling: + initial_embedding_dim: [40, 80] + joint_embedding_dim: [null, null] + feature_embedding_dims: null + dim_model: [40, 80] + n_head: [2, 4] + dim_feedforward: [10, 12] + num_layers: [2] + activation_fn: ["swiglu"] + normalization: ["rmsnorm"] + positional_encoding: ["rope"] + attention_type: ["mqa", "gqa"] + norm_first: [ true] + n_kv_heads: [1] + rope_theta: [10000.0] + prediction_length: 1 + +# Training Hyperparameter Search Space +training_hyperparameter_sampling: + device: cpu + epochs: [1, 1] + save_interval_epochs: 10 + batch_size: [5, 10] + learning_rate: [0.001, 0.01] + criterion: + itemId: CrossEntropyLoss + supCat1: CrossEntropyLoss + supCat2: CrossEntropyLoss + supCat3: CrossEntropyLoss + supCat4: CrossEntropyLoss + accumulation_steps: [1] + dropout: [0.0] + optimizer: + - name: Adam + scheduler: + - name: StepLR + step_size: 1 + gamma: 0.99 + - name: StepLR + step_size: 1 + gamma: 0.99 + log_interval: 5 + continue_training: false + + +evaluation_metric_directions: + - minimize + - maximize +evaluation_metrics: + - max + - stdev +evaluation_inference_config: tests/configs/hyperparameter-search-custom-eval-inference.yaml +evaluation_script: scripts/hp_search_eval_script.py + +override_input: true diff --git a/tests/configs/hyperparameter-search-sample.yaml b/tests/configs/hyperparameter-search-sample.yaml index 8a819b64..87bf2db6 100644 --- a/tests/configs/hyperparameter-search-sample.yaml +++ b/tests/configs/hyperparameter-search-sample.yaml @@ -14,53 +14,71 @@ inference_batch_size: 10 search_strategy: sample n_samples: 4 -# Configuration Loading Overrides (set to null to use values from metadata) +# Configuration Loading Overrides input_columns: null -# Export Settings export_embedding_model: false export_generative_model: true export_onnx: false export_pt: true export_with_dropout: false -# Model Hyperparameter Search Space +# Model Hyperparameter Search Space using Distributions model_hyperparameter_sampling: - initial_embedding_dim: [40, 80] - joint_embedding_dim: [null, null] + initial_embedding_dim: [40] + joint_embedding_dim: [null] feature_embedding_dims: null - dim_model: [40, 80] - n_head: [2, 4] - dim_feedforward: [10, 12] - num_layers: [2] + dim_model: [40] + n_head: [2] + # IntDistribution test with step + dim_feedforward: + low: 10 + high: 20 + step: 2 + # IntDistribution test without step + num_layers: + low: 1 + high: 3 activation_fn: ["swiglu"] normalization: ["rmsnorm"] positional_encoding: ["rope"] - attention_type: ["mqa", "gqa"] - norm_first: [ true] + attention_type: ["mqa"] + norm_first: [true] n_kv_heads: [1] - rope_theta: [10000.0] + # FloatDistribution test with log + rope_theta: + low: 1000.0 + high: 10000.0 + log: true prediction_length: 1 -# Training Hyperparameter Search Space +# Training Hyperparameter Search Space using Distributions training_hyperparameter_sampling: device: cpu - epochs: [1, 1] + epochs: [1] save_interval_epochs: 10 - batch_size: [5, 10] - learning_rate: [0.001, 0.01] + # IntDistribution test + batch_size: + low: 5 + high: 15 + step: 5 + learning_rate: [0.001] criterion: itemId: CrossEntropyLoss - accumulation_steps: [1] - dropout: [0.0] + # IntDistribution test + accumulation_steps: + low: 1 + high: 2 + # FloatDistribution test without log + dropout: + low: 0.1 + high: 0.5 + log: true optimizer: - name: Adam scheduler: - name: StepLR step_size: 1 gamma: 0.99 - - name: StepLR - step_size: 1 - gamma: 0.99 log_interval: 5 continue_training: false diff --git a/tests/integration-test-log.txt b/tests/integration-test-log.txt index 3e23b85c..dec6c483 100644 --- a/tests/integration-test-log.txt +++ b/tests/integration-test-log.txt @@ -13,60 +13,5 @@ sequifier preprocess --config-path tests/configs/preprocess-test-categorical-exa sequifier preprocess --config-path tests/configs/preprocess-test-categorical-exact-pt.yaml sequifier hyperparameter-search --config-path tests/configs/hyperparameter-search-grid.yaml sequifier hyperparameter-search --config-path tests/configs/hyperparameter-search-sample.yaml -sequifier train --config-path tests/configs/train-test-categorical.yaml --metadata-config-path configs/metadata_configs/test-data-categorical-1.json --model-name model-categorical-1 --input-columns itemId -sequifier train --config-path tests/configs/train-test-real.yaml --metadata-config-path configs/metadata_configs/test-data-real-1.json --model-name model-real-1 --input-columns None -sequifier train --config-path tests/configs/train-test-categorical.yaml --metadata-config-path configs/metadata_configs/test-data-categorical-3.json --model-name model-categorical-3 --input-columns itemId supCat1 -sequifier train --config-path tests/configs/train-test-real.yaml --metadata-config-path configs/metadata_configs/test-data-real-3.json --model-name model-real-3 --input-columns None -sequifier train --config-path tests/configs/train-test-categorical.yaml --metadata-config-path configs/metadata_configs/test-data-categorical-5.json --model-name model-categorical-5 --input-columns itemId supCat1 supCat2 supCat4 -sequifier train --config-path tests/configs/train-test-real.yaml --metadata-config-path configs/metadata_configs/test-data-real-5.json --model-name model-real-5 --input-columns None -sequifier train --config-path tests/configs/train-test-categorical.yaml --metadata-config-path configs/metadata_configs/test-data-categorical-50.json --model-name model-categorical-50 --input-columns itemId supCat1 supCat2 supCat3 supCat4 supCat5 supCat6 supCat7 supCat8 supCat9 supCat10 supCat11 supCat12 supCat13 supCat14 supCat15 supCat16 supCat17 supCat18 supCat19 supCat20 supCat21 supCat22 supCat23 supCat24 supCat25 supCat26 supCat27 supCat28 supCat29 supCat30 supCat31 supCat32 supCat33 supCat34 supCat35 supCat36 supCat37 supCat38 supCat39 supCat40 supCat41 supCat42 supCat43 supCat44 supCat45 supCat46 supCat47 supCat48 supCat49 -sequifier train --config-path tests/configs/train-test-real.yaml --metadata-config-path configs/metadata_configs/test-data-real-50.json --model-name model-real-50 --input-columns None -sequifier train --config-path tests/configs/train-test-categorical-inf-size-1.yaml -sequifier train --config-path tests/configs/train-test-categorical-inf-size-3.yaml -sequifier train --config-path tests/configs/train-test-categorical-multitarget.yaml -sequifier train --config-path tests/configs/train-test-categorical-multitarget-eager.yaml -sequifier train --config-path tests/configs/train-test-distributed.yaml -sequifier train --config-path tests/configs/train-test-distributed-lazy-parquet.yaml -sequifier train --config-path tests/configs/train-test-lazy.yaml -sequifier infer --config-path tests/configs/infer-test-categorical.yaml --metadata-config-path configs/metadata_configs/test-data-categorical-1.json --model-path models/sequifier-model-categorical-1-best-3.onnx --data-path data/test-data-categorical-1-split2 --input-columns itemId -sequifier infer --config-path tests/configs/infer-test-real.yaml --metadata-config-path configs/metadata_configs/test-data-real-1.json --model-path models/sequifier-model-real-1-best-3.pt --data-path data/test-data-real-1-split1.parquet --input-columns None -sequifier infer --config-path tests/configs/infer-test-categorical.yaml --metadata-config-path configs/metadata_configs/test-data-categorical-3.json --model-path models/sequifier-model-categorical-3-best-3.onnx --data-path data/test-data-categorical-3-split2 --input-columns itemId supCat1 -sequifier infer --config-path tests/configs/infer-test-real.yaml --metadata-config-path configs/metadata_configs/test-data-real-3.json --model-path models/sequifier-model-real-3-best-3.pt --data-path data/test-data-real-3-split1.parquet --input-columns None -sequifier infer --config-path tests/configs/infer-test-categorical.yaml --metadata-config-path configs/metadata_configs/test-data-categorical-5.json --model-path models/sequifier-model-categorical-5-best-3.onnx --data-path data/test-data-categorical-5-split2 --input-columns itemId supCat1 supCat2 supCat4 -sequifier infer --config-path tests/configs/infer-test-real.yaml --metadata-config-path configs/metadata_configs/test-data-real-5.json --model-path models/sequifier-model-real-5-best-3.pt --data-path data/test-data-real-5-split1.parquet --input-columns None -sequifier infer --config-path tests/configs/infer-test-categorical.yaml --metadata-config-path configs/metadata_configs/test-data-categorical-50.json --model-path models/sequifier-model-categorical-50-best-3.onnx --data-path data/test-data-categorical-50-split2 --input-columns itemId supCat1 supCat2 supCat3 supCat4 supCat5 supCat6 supCat7 supCat8 supCat9 supCat10 supCat11 supCat12 supCat13 supCat14 supCat15 supCat16 supCat17 supCat18 supCat19 supCat20 supCat21 supCat22 supCat23 supCat24 supCat25 supCat26 supCat27 supCat28 supCat29 supCat30 supCat31 supCat32 supCat33 supCat34 supCat35 supCat36 supCat37 supCat38 supCat39 supCat40 supCat41 supCat42 supCat43 supCat44 supCat45 supCat46 supCat47 supCat48 supCat49 -sequifier infer --config-path tests/configs/infer-test-real.yaml --metadata-config-path configs/metadata_configs/test-data-real-50.json --model-path models/sequifier-model-real-50-best-3.pt --data-path data/test-data-real-50-split1.parquet --input-columns None -sequifier infer --config-path tests/configs/infer-test-categorical-multitarget.yaml -sequifier infer --config-path tests/configs/infer-test-real-autoregression.yaml --input-columns itemValue --randomize -sequifier infer --config-path tests/configs/infer-test-categorical-inf-size-1.yaml -sequifier infer --config-path tests/configs/infer-test-categorical-inf-size-3.yaml -sequifier infer --config-path tests/configs/infer-test-distributed.yaml -sequifier infer --config-path tests/configs/infer-test-distributed-parquet.yaml -sequifier infer --config-path tests/configs/infer-test-lazy.yaml -sequifier infer --config-path tests/configs/infer-test-categorical-autoregression.yaml --input-columns itemId -sequifier infer --config-path tests/configs/infer-test-categorical-embedding.yaml --input-columns itemId -sequifier infer --config-path tests/configs/infer-test-categorical-inf-size-3-embedding.yaml -sequifier preprocess --config-path tests/configs/preprocess-test-categorical-precomputed-stats.yaml -sequifier preprocess --config-path tests/configs/preprocess-test-categorical-precomputed-stats-negative.yaml -sequifier train --config-path tests/configs/train-test-resume-epoch.yaml -sequifier train --config-path tests/configs/train-test-resume-mid-epoch.yaml -sequifier hyperparameter-search --config-path tests/configs/hyperparameter-search-grid.yaml -sequifier hyperparameter-search --config-path tests/configs/hyperparameter-search-sample.yaml -sequifier visualize-training model-categorical-1 --project-root tests/project_folder -sequifier visualize-training model-categorical-1-inf-size --project-root tests/project_folder -sequifier visualize-training model-categorical-3 --project-root tests/project_folder -sequifier visualize-training model-categorical-3-from-mid-epoch-checkpoint --project-root tests/project_folder -sequifier visualize-training model-categorical-3-inf-size --project-root tests/project_folder -sequifier visualize-training model-categorical-5 --project-root tests/project_folder -sequifier visualize-training model-categorical-50 --project-root tests/project_folder -sequifier visualize-training model-categorical-distributed --project-root tests/project_folder -sequifier visualize-training model-categorical-distributed-lazy-parquet --project-root tests/project_folder -sequifier visualize-training model-categorical-lazy --project-root tests/project_folder -sequifier visualize-training model-categorical-multitarget-5 --project-root tests/project_folder -sequifier visualize-training model-categorical-multitarget-5-eager --project-root tests/project_folder -sequifier visualize-training model-real-1 --project-root tests/project_folder -sequifier visualize-training model-real-1-from-epoch-checkpoint --project-root tests/project_folder -sequifier visualize-training model-real-3 --project-root tests/project_folder -sequifier visualize-training model-real-5 --project-root tests/project_folder -sequifier visualize-training model-real-50 --project-root tests/project_folder -sequifier visualize-training test-hp-search-grid-run-0,test-hp-search-grid-run-1,test-hp-search-grid-run-2,test-hp-search-grid-run-3 --project-root tests/project_folder --log-scale --bucket-training-batches 5 +sequifier hyperparameter-search --config-path tests/configs/hyperparameter-search-bayesian.yaml +sequifier hyperparameter-search --config-path tests/configs/hyperparameter-search-custom-eval.yaml diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 01885337..9b193a7b 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -240,6 +240,12 @@ def hp_search_configs(): return { "grid": os.path.join("tests", "configs", "hyperparameter-search-grid.yaml"), "sample": os.path.join("tests", "configs", "hyperparameter-search-sample.yaml"), + "bayesian": os.path.join( + "tests", "configs", "hyperparameter-search-bayesian.yaml" + ), + "custom-eval": os.path.join( + "tests", "configs", "hyperparameter-search-custom-eval.yaml" + ), } @@ -315,6 +321,8 @@ def format_configs_locally( inference_config_path_lazy, hp_search_configs["grid"], hp_search_configs["sample"], + hp_search_configs["bayesian"], + hp_search_configs["custom-eval"], ] for config_path in config_paths: with open(config_path, "r") as f: @@ -432,6 +440,21 @@ def run_preprocessing( shutil.copyfile(source_path, target_path) + os.makedirs(os.path.join(project_root, "scripts")) + source_path = os.path.join( + "tests", "resources", "source_scripts", "hp_search_eval_script.py" + ) + target_path = os.path.join(project_root, "scripts", "hp_search_eval_script.py") + shutil.copyfile(source_path, target_path) + + source_path = os.path.join( + "tests", "configs", "hyperparameter-search-custom-eval-inference.yaml" + ) + target_path = os.path.join( + project_root, "configs", "hyperparameter-search-custom-eval-inference.yaml" + ) + shutil.copyfile(source_path, target_path) + @pytest.fixture(scope="session") def run_training( @@ -524,7 +547,7 @@ def run_training_from_checkpoint( ) -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def run_hp_search( project_root, hp_search_configs, format_configs_locally, run_preprocessing ): @@ -536,6 +559,14 @@ def run_hp_search( f"sequifier hyperparameter-search --config-path {hp_search_configs['sample']}" ) + run_and_log( + f"sequifier hyperparameter-search --config-path {hp_search_configs['bayesian']}" + ) + + run_and_log( + f"sequifier hyperparameter-search --config-path {hp_search_configs['custom-eval']}" + ) + @pytest.fixture(scope="session") def copy_autoregression_model(project_root, run_training): diff --git a/tests/integration/test_hyperparameter_search.py b/tests/integration/test_hyperparameter_search.py index b3a597e3..00f42d25 100644 --- a/tests/integration/test_hyperparameter_search.py +++ b/tests/integration/test_hyperparameter_search.py @@ -1,4 +1,5 @@ import glob +import json import os @@ -20,3 +21,46 @@ def test_hp_search_sample_outputs(run_hp_search, project_root): assert ( len(generated_configs) == 4 ), f"Expected 4 sample configs, found {len(generated_configs)}" + + +def test_hp_search_bayesian_outputs(run_hp_search, project_root): + hp_name = "test-hp-search-bayesian" + config_dir = os.path.join(project_root, "configs") + + generated_configs = glob.glob(os.path.join(config_dir, f"{hp_name}-run-*.yaml")) + assert ( + len(generated_configs) == 4 + ), f"Expected 4 bayesian configs, found {len(generated_configs)}" + + +def test_hp_search_state(run_hp_search, project_root): + state_dir = os.path.join(project_root, "state", "optuna") + + assert os.path.exists(os.path.join(state_dir, "test-hp-search-sample.db")) + assert os.path.exists(os.path.join(state_dir, "test-hp-search-grid.db")) + assert os.path.exists(os.path.join(state_dir, "test-hp-search-bayesian.db")) + assert os.path.exists(os.path.join(state_dir, "test-hp-search-custom-eval.db")) + + +def test_hp_search_inference_feedback_loop(run_hp_search, project_root): + # Verify that the evaluations directory was populated + eval_dir = os.path.join(project_root, "outputs", "evaluations") + assert os.path.exists(eval_dir), f"Evaluation directory {eval_dir} was not created." + + eval_files = [ + f + for f in os.listdir(eval_dir) + if f.startswith("test-hp-search-custom-eval-run-") and f.endswith(".json") + ] + + assert len(eval_files) == 4, f"Expected 4 evaluation JSONs, found {len(eval_files)}" + + for f in eval_files: + with open(os.path.join(eval_dir, f), "r") as fp: + metrics = json.load(fp) + assert "max" in metrics, f"'max' missing in {f}" + assert "stdev" in metrics, f"'stdev' missing in {f}" + + # Sanity check that metrics were actually calculated + assert isinstance(metrics["max"], int) + assert isinstance(metrics["stdev"], float) diff --git a/tests/integration/test_training.py b/tests/integration/test_training.py index 7f854f29..1fb9fc51 100644 --- a/tests/integration/test_training.py +++ b/tests/integration/test_training.py @@ -130,6 +130,16 @@ def test_model_files_exists(run_training, run_training_from_checkpoint, project_ for i in range(4) for suffix in ["best", "last"] ] + + [ + f"sequifier-test-hp-search-bayesian-run-{i}-{suffix}-1.pt" + for i in range(4) + for suffix in ["best", "last"] + ] + + [ + f"sequifier-test-hp-search-custom-eval-run-{i}-{suffix}-1.onnx" + for i in range(4) + for suffix in ["best", "last"] + ] ) ) diff --git a/tests/resources/source_scripts/hp_search_eval_script.py b/tests/resources/source_scripts/hp_search_eval_script.py new file mode 100644 index 00000000..9381766b --- /dev/null +++ b/tests/resources/source_scripts/hp_search_eval_script.py @@ -0,0 +1,42 @@ +import json +import os +import sys + +import polars as pl + + +def main(): + if len(sys.argv) < 2: + print("Error: Missing run_name argument.") + sys.exit(1) + + run_name = sys.argv[1] + + # 3. Load predictions and calculate mean & variance + preds_path = f"outputs/predictions/sequifier-{run_name}-predictions" + + dfs = [] + for root, dir, files in os.walk(preds_path): + for file in sorted(list(files)): + # 1. Read everything as strings to avoid read-time schema crashes + df = pl.read_csv(os.path.join(preds_path, file), infer_schema_length=0) + + # 2. Cast to Int64 (strict=False turns bad strings to null) & fill nulls with -1 + df = df.with_columns(pl.all().cast(pl.Int64, strict=False).fill_null(-1)) + dfs.append(df) + df = pl.concat(dfs) + + max_freqs = df["itemId"].value_counts()["count"].max() + stdev_freqs = df["itemId"].value_counts()["count"].std() + + # 4. Save metrics back for Optuna to ingest + eval_dir = "outputs/evaluations" + os.makedirs(eval_dir, exist_ok=True) + eval_json_path = os.path.join(eval_dir, f"{run_name}.json") + + with open(eval_json_path, "w") as f: + f.write(json.dumps({"max": max_freqs, "stdev": stdev_freqs}, indent=2)) + + +if __name__ == "__main__": + main()