diff --git a/CHANGELOG.md b/CHANGELOG.md index fa40f8a..95ef80f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## [4.0.0-alpha.1] ### Changed @@ -14,6 +14,7 @@ and this project adheres to - Switched deep learning framework from Tensorflow to PyTorch - Speed up predictions by removing ensemble method where output from three models with differing kernel sizes was averaged to one prediction - Separated calibration logic to dedicated reusable module with sklearn-like API. +- Improved computational efficiency of piece-wise linear calibration and set sensible default parameters - Built-in transfer learning functionality, instead of using external `deeplcretrainer` package. - Cleaned up package, removing legacy and unused code and files, and improving modularity - Modernized CI workflows to use `uv` diff --git a/MANIFEST.in b/MANIFEST.in index 49ee242..b05205c 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1 @@ -include deeplc/models/* include deeplc/package_data/**/* -include deeplc/baseline_performance/* diff --git a/deeplc/_features.py b/deeplc/_features.py index 3f01e5d..d10745b 100644 --- a/deeplc/_features.py +++ b/deeplc/_features.py @@ -1,5 +1,7 @@ """Feature extraction for DeepLC.""" +# TODO: Consider ProForma fixed modifications (that are not applied yet) for feature extraction. + from __future__ import annotations import logging @@ -26,6 +28,112 @@ # fmt: on +def encode_peptidoform( + peptidoform: Peptidoform | str, + add_ccs_features: bool = False, + padding_length: int = 60, + positions: set[int] | None = None, + positions_pos: set[int] | None = None, + positions_neg: set[int] | None = None, + dict_aa: dict[str, int] | None = None, + dict_index_pos: dict[str, int] | None = None, + dict_index: dict[str, int] | None = None, +) -> dict[str, np.ndarray]: + """ + Extract features from a single peptidoform. + + Parameters + ---------- + peptidoform + The peptidoform to encode, either as a Peptidoform object or a string. + add_ccs_features + Whether to include CCS features. Default is False. + padding_length + The maximum length of the sequence after padding. Default is 60. + positions + The positions to consider for feature extraction. Default is DEFAULT_POSITIONS. + positions_pos + The positive positions to consider for feature extraction. Default is + DEFAULT_POSITIONS_POS. + positions_neg + The negative positions to consider for feature extraction. Default is + DEFAULT_POSITIONS_NEG. + dict_aa + A dictionary mapping amino acids to indices. Default is DEFAULT_DICT_AA. + dict_index_pos + A dictionary mapping atoms to indices for the positional matrix. Default is + DEFAULT_DICT_INDEX_POS. + dict_index + A dictionary mapping atoms to indices. Default is DEFAULT_DICT_INDEX. + + Returns + ------- + dict[str, np.ndarray] + A dictionary of Numpy arrays containing the extracted features. + + """ + positions = positions or DEFAULT_POSITIONS + positions_pos = positions_pos or DEFAULT_POSITIONS_POS + positions_neg = positions_neg or DEFAULT_POSITIONS_NEG + dict_aa = dict_aa or DEFAULT_DICT_AA + dict_index_pos = dict_index_pos or DEFAULT_DICT_INDEX_POS + dict_index = dict_index or DEFAULT_DICT_INDEX + + if isinstance(peptidoform, str): + peptidoform = Peptidoform(peptidoform) + seq = peptidoform.sequence + charge = peptidoform.precursor_charge + seq, seq_len = _truncate_sequence(seq, padding_length) + + std_matrix = _fill_standard_matrix(seq, padding_length, dict_index) + onehot_matrix = _fill_onehot_matrix(peptidoform.parsed_sequence, padding_length, dict_aa) + pos_matrix = _fill_pos_matrix( + seq, seq_len, positions_pos, positions_neg, dict_index, dict_index_pos + ) + _apply_modifications( + std_matrix, + pos_matrix, + peptidoform.parsed_sequence, + seq_len, + dict_index, + dict_index_pos, + positions, + ) + _apply_terminal_modifications( + std_matrix, + pos_matrix, + peptidoform, + seq_len, + dict_index, + dict_index_pos, + positions, + ) + + matrix_all = np.sum(std_matrix, axis=0) + matrix_all = np.append(matrix_all, seq_len) + if add_ccs_features: + if not charge: + raise ValueError(f"Peptidoform has no charge: {peptidoform}") + matrix_all = np.append(matrix_all, (seq.count("H")) / seq_len) + matrix_all = np.append( + matrix_all, (seq.count("F") + seq.count("W") + seq.count("Y")) / seq_len + ) + matrix_all = np.append(matrix_all, (seq.count("D") + seq.count("E")) / seq_len) + matrix_all = np.append(matrix_all, (seq.count("K") + seq.count("R")) / seq_len) + matrix_all = np.append(matrix_all, charge) + + matrix_sum = _compute_rolling_sum(std_matrix.T, n=2)[:, ::2].T + + matrix_global = np.concatenate([matrix_all, pos_matrix.flatten()]) + + return { + "matrix": std_matrix, + "matrix_sum": matrix_sum, + "matrix_global": matrix_global, + "matrix_hc": onehot_matrix, + } + + def _truncate_sequence(seq: str, max_length: int) -> tuple[str, int]: """Truncate the sequence if it exceeds the max_length.""" if len(seq) > max_length: @@ -98,6 +206,40 @@ def _fill_pos_matrix( return pos_mat +def _apply_composition_to_matrices( + mat: np.ndarray, + pos_mat: np.ndarray, + composition: mass.Composition, + i: int, + seq_len: int, + dict_index: dict[str, int], + dict_index_pos: dict[str, int], + positions: set[int], +) -> None: + """Apply a composition delta to the standard and positional matrices.""" + for atom_comp, change in composition.items(): + try: + mat[i, dict_index[atom_comp]] += change + if i in positions: + pos_mat[i, dict_index_pos[atom_comp]] += change + elif (i - seq_len) in positions: + pos_mat[i - seq_len, dict_index_pos[atom_comp]] += change + except KeyError: + try: + warnings.warn(f"Replacing pattern for atom: {atom_comp}", stacklevel=2) + atom_comp_clean = sub(r"\[.*?\]", "", atom_comp) + mat[i, dict_index[atom_comp_clean]] += change + if i in positions: + pos_mat[i, dict_index_pos[atom_comp_clean]] += change + elif (i - seq_len) in positions: + pos_mat[i - seq_len, dict_index_pos[atom_comp_clean]] += change + except KeyError: + warnings.warn(f"Ignoring atom {atom_comp} at pos {i}", stacklevel=2) + continue + except IndexError: + warnings.warn(f"Index error for atom {atom_comp} at pos {i}", stacklevel=2) + + def _apply_modifications( mat: np.ndarray, pos_mat: np.ndarray, @@ -118,27 +260,54 @@ def _apply_modifications( f"Skipping modification without known composition: {token[1]}", stacklevel=2 ) continue - for atom_comp, change in mod_comp.items(): + _apply_composition_to_matrices( + mat, + pos_mat, + mod_comp, + i, + seq_len, + dict_index, + dict_index_pos, + positions, + ) + + +def _apply_terminal_modifications( + mat: np.ndarray, + pos_mat: np.ndarray, + peptidoform: Peptidoform, + seq_len: int, + dict_index: dict[str, int], + dict_index_pos: dict[str, int], + positions: set[int], +) -> None: + """Apply N- and C-terminal modification changes to the matrices.""" + terminal_mods = [ + (0, peptidoform.properties.get("n_term")), # N-terminus at position 0 + (seq_len - 1, peptidoform.properties.get("c_term")), # C-terminus at last position + ] + for i, mods in terminal_mods: + if not mods: + continue + for tag in mods: try: - mat[i, dict_index[atom_comp]] += change - if i in positions: - pos_mat[i, dict_index_pos[atom_comp]] += change - elif (i - seq_len) in positions: - pos_mat[i - seq_len, dict_index_pos[atom_comp]] += change - except KeyError: - try: - warnings.warn(f"Replacing pattern for atom: {atom_comp}", stacklevel=2) - atom_comp_clean = sub(r"\[.*?\]", "", atom_comp) - mat[i, dict_index[atom_comp_clean]] += change - if i in positions: - pos_mat[i, dict_index_pos[atom_comp_clean]] += change - elif (i - seq_len) in positions: - pos_mat[i - seq_len, dict_index_pos[atom_comp_clean]] += change - except KeyError: - warnings.warn(f"Ignoring atom {atom_comp} at pos {i}", stacklevel=2) - continue - except IndexError: - warnings.warn(f"Index error for atom {atom_comp} at pos {i}", stacklevel=2) + mod_comp = tag.composition + except Exception: + warnings.warn( + f"Skipping terminal modification without known composition: {tag}", + stacklevel=2, + ) + continue + _apply_composition_to_matrices( + mat, + pos_mat, + mod_comp, + i, + seq_len, + dict_index, + dict_index_pos, + positions, + ) def _compute_rolling_sum(matrix: np.ndarray, n: int = 2) -> np.ndarray: @@ -146,68 +315,3 @@ def _compute_rolling_sum(matrix: np.ndarray, n: int = 2) -> np.ndarray: ret = np.cumsum(matrix, axis=1, dtype=np.float32) ret[:, n:] = ret[:, n:] - ret[:, :-n] return ret[:, n - 1 :] - - -def encode_peptidoform( - peptidoform: Peptidoform | str, - add_ccs_features: bool = False, - padding_length: int = 60, - positions: set[int] | None = None, - positions_pos: set[int] | None = None, - positions_neg: set[int] | None = None, - dict_aa: dict[str, int] | None = None, - dict_index_pos: dict[str, int] | None = None, - dict_index: dict[str, int] | None = None, -) -> dict[str, np.ndarray]: - """Extract features from a single peptidoform.""" - positions = positions or DEFAULT_POSITIONS - positions_pos = positions_pos or DEFAULT_POSITIONS_POS - positions_neg = positions_neg or DEFAULT_POSITIONS_NEG - dict_aa = dict_aa or DEFAULT_DICT_AA - dict_index_pos = dict_index_pos or DEFAULT_DICT_INDEX_POS - dict_index = dict_index or DEFAULT_DICT_INDEX - - if isinstance(peptidoform, str): - peptidoform = Peptidoform(peptidoform) - seq = peptidoform.sequence - charge = peptidoform.precursor_charge - seq, seq_len = _truncate_sequence(seq, padding_length) - - std_matrix = _fill_standard_matrix(seq, padding_length, dict_index) - onehot_matrix = _fill_onehot_matrix(peptidoform.parsed_sequence, padding_length, dict_aa) - pos_matrix = _fill_pos_matrix( - seq, seq_len, positions_pos, positions_neg, dict_index, dict_index_pos - ) - _apply_modifications( - std_matrix, - pos_matrix, - peptidoform.parsed_sequence, - seq_len, - dict_index, - dict_index_pos, - positions, - ) - - matrix_all = np.sum(std_matrix, axis=0) - matrix_all = np.append(matrix_all, seq_len) - if add_ccs_features: - if not charge: - raise ValueError(f"Peptidoform has no charge: {peptidoform}") - matrix_all = np.append(matrix_all, (seq.count("H")) / seq_len) - matrix_all = np.append( - matrix_all, (seq.count("F") + seq.count("W") + seq.count("Y")) / seq_len - ) - matrix_all = np.append(matrix_all, (seq.count("D") + seq.count("E")) / seq_len) - matrix_all = np.append(matrix_all, (seq.count("K") + seq.count("R")) / seq_len) - matrix_all = np.append(matrix_all, charge) - - matrix_sum = _compute_rolling_sum(std_matrix.T, n=2)[:, ::2].T - - matrix_global = np.concatenate([matrix_all, pos_matrix.flatten()]) - - return { - "matrix": std_matrix, - "matrix_sum": matrix_sum, - "matrix_global": matrix_global, - "matrix_hc": onehot_matrix, - } diff --git a/deeplc/_model_ops.py b/deeplc/_model_ops.py index 1f80eec..0b334f2 100644 --- a/deeplc/_model_ops.py +++ b/deeplc/_model_ops.py @@ -7,7 +7,14 @@ from pathlib import Path import torch -from rich.progress import track +from rich.progress import ( + BarColumn, + MofNCompleteColumn, + Progress, + TextColumn, + TimeRemainingColumn, + track, +) from torch.utils.data import DataLoader, Dataset, Subset from deeplc._architecture import DeepLCModel @@ -16,83 +23,26 @@ logger = logging.getLogger(__name__) -# TODO: Implement Lightning? - - -def promote_buffers_to_parameters( - model: torch.nn.Module, - buffer_indices: list[int] | None = None, -) -> torch.nn.Module: - """ - Promote ONNX initializer buffers to nn.Parameters so they become trainable. - - ONNX-converted GraphModules (from onnx2torch) store dense/FC layer weights as - buffers on an ``initializers`` submodule, making them invisible to the optimizer. - This function converts selected buffers to nn.Parameters so they can be fine-tuned. - - Parameters - ---------- - model - The loaded GraphModule from onnx2torch. - buffer_indices - Indices of ``onnx_initializer_*`` buffers to promote. If None, promotes the - global feature branch (0-5) and the final dense head (34-45). - - Returns - ------- - torch.nn.Module - The same model with buffers promoted to parameters. - - """ - if buffer_indices is None: - # Dense head (34-45) + global feature branch (0-5) - buffer_indices = list(range(0, 6)) + list(range(34, 46)) - - init_mod = dict(model.named_modules()).get("initializers") - if init_mod is None: - logger.debug("No 'initializers' submodule found; skipping buffer promotion.") - return model - - promoted = 0 - for idx in buffer_indices: - name = f"onnx_initializer_{idx}" - if name in init_mod._buffers: - buf = init_mod._buffers.pop(name) - init_mod._parameters[name] = torch.nn.Parameter(buf) - promoted += 1 - - logger.info( - f"Promoted {promoted} buffers to parameters. " - f"Total trainable params: {sum(p.numel() for p in model.parameters() if p.requires_grad)}" - ) - return model - - def load_model( model: torch.nn.Module | PathLike | str | None = None, device: str | None = None, ) -> torch.nn.Module: """Load a model from a file or return a randomly initialized model if none is provided.""" # If device is not specified, use the default device (GPU if available, else CPU) - selected_device = device or torch.device( - "cuda" if torch.cuda.is_available() else "cpu" - ) + selected_device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") # Load model from file if a path is provided if isinstance(model, str | Path): - loaded_model = torch.load( - model, weights_only=False, map_location=selected_device - ) + loaded_model = torch.load(model, weights_only=False, map_location=selected_device) elif isinstance(model, torch.nn.Module): loaded_model = model + logger.debug("Using provided PyTorch model instance") elif model is None: # Initialize a new model with default architecture loaded_model = DeepLCModel() logger.debug("Initialized new DeepLCModel with default architecture") else: - raise TypeError( - f"Expected a PyTorch Module or a file path, got {type(model)} instead." - ) + raise TypeError(f"Expected a PyTorch Module or a file path, got {type(model)} instead.") # Ensure the model is on the specified device loaded_model.to(selected_device) @@ -110,7 +60,7 @@ def train( epochs: int = 25, batch_size: int = 512, patience: int = 10, - trainable_layers: str | None = None, + show_progress: bool = True, ) -> torch.nn.Module: """ Train or fine-tune the model. @@ -135,9 +85,8 @@ def train( Batch size for training and validation. patience Number of epochs with no improvement before early stopping. - trainable_layers - If provided, only layers containing this keyword in their name will be trainable. - All other layers will be frozen. If None, all layers are trainable. + show_progress + If True, display a Rich progress bar during training. If False, run silently. Returns ------- @@ -147,16 +96,6 @@ def train( """ model = load_model(model, device) - # Promote ONNX initializer buffers (dense head) to trainable parameters - model = promote_buffers_to_parameters(model) - - # Freeze layers if requested - - # Freeze layers if requested - if trainable_layers is not None: - _freeze_layers(model, trainable_layers) - logger.debug(f"Frozen all layers except those containing '{trainable_layers}'") - # Parse datasets; setup loaders train_loader = DataLoader( train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers @@ -175,24 +114,28 @@ def train( best_val_loss = float("inf") epochs_no_improve = 0 - for epoch in range(epochs): - avg_loss = _train_epoch(model, train_loader, optimizer, loss_fn, device) - avg_val_loss = _validate_epoch(model, val_loader, loss_fn, device) - - logger.debug( - f"Epoch {epoch + 1}/{epochs}, " - f"Loss: {avg_loss:.4f}, " - f"Validation Loss: {avg_val_loss:.4f}" - ) - - if avg_val_loss < best_val_loss: - best_val_loss = avg_val_loss - best_model_wts = copy.deepcopy(model.state_dict()) - epochs_no_improve = 0 - else: - epochs_no_improve += 1 + with _create_progress(disable=not show_progress) as progress: + epoch_task = progress.add_task("Epochs", total=epochs, status="") + + for _epoch in range(epochs): + avg_loss = _train_epoch(model, train_loader, optimizer, loss_fn, device) + avg_val_loss = _validate_epoch(model, val_loader, loss_fn, device) + + # Early stopping check + if avg_val_loss < best_val_loss: + best_val_loss = avg_val_loss + best_model_wts = copy.deepcopy(model.state_dict()) + epochs_no_improve = 0 + else: + epochs_no_improve += 1 + + # Update epoch bar with loss info + status = f"loss={avg_loss:.4f} val_loss={avg_val_loss:.4f} best={best_val_loss:.4f}" + if epochs_no_improve >= patience: + status += " [yellow]early stop[/yellow]" + progress.update(epoch_task, advance=1, status=status) + if epochs_no_improve >= patience: - logger.debug(f"Early stopping triggered at epoch {epoch + 1}") break model.load_state_dict(best_model_wts) @@ -205,13 +148,12 @@ def predict( device: str = "cpu", batch_size: int = 512, num_workers: int = 0, + show_progress: bool = True, ) -> torch.Tensor: """Predict using the model for the given dataset.""" model = load_model(model, device) - data_loader = DataLoader( - data, batch_size=batch_size, shuffle=False, num_workers=num_workers - ) - predictions = _predict_epoch(model, data_loader, device) + data_loader = DataLoader(data, batch_size=batch_size, shuffle=False, num_workers=num_workers) + predictions = _predict_epoch(model, data_loader, device, show_progress=show_progress) return predictions.cpu().detach() @@ -224,23 +166,13 @@ def evaluate( ) -> float: """Evaluate the model on the given dataset.""" model = load_model(model, device) - data_loader = DataLoader( - data, batch_size=batch_size, shuffle=False, num_workers=num_workers - ) + data_loader = DataLoader(data, batch_size=batch_size, shuffle=False, num_workers=num_workers) loss_fn = torch.nn.L1Loss() avg_loss = _validate_epoch(model, data_loader, loss_fn, device) return avg_loss -def _freeze_layers(model: torch.nn.Module, unfreeze_keyword: str) -> None: - """Freeze all layers except those containing the unfreeze_keyword in their name.""" - for name, param in model.named_parameters(): - param.requires_grad = unfreeze_keyword in name - - -def _get_optimizer( - model: torch.nn.Module, learning_rate: float -) -> torch.optim.Optimizer: +def _get_optimizer(model: torch.nn.Module, learning_rate: float) -> torch.optim.Optimizer: return torch.optim.Adam( filter(lambda p: p.requires_grad, model.parameters()), lr=learning_rate, @@ -257,7 +189,7 @@ def _train_epoch( """Train the model for one epoch.""" model.train() running_loss = 0.0 - for features, targets in track(data_loader): + for features, targets in data_loader: features = [feature_tensor.to(device) for feature_tensor in features] targets = targets.to(device).view(-1, 1) optimizer.zero_grad() @@ -266,8 +198,7 @@ def _train_epoch( loss.backward() optimizer.step() running_loss += loss.item() - avg_loss = float(running_loss / len(data_loader)) - return avg_loss + return float(running_loss / len(data_loader)) def _validate_epoch( @@ -280,26 +211,43 @@ def _validate_epoch( model.eval() val_loss = 0.0 with torch.no_grad(): - for features, targets in track(data_loader): + for features, targets in data_loader: features = [feature_tensor.to(device) for feature_tensor in features] targets = targets.to(device).view(-1, 1) outputs = model(*features) val_loss += loss_fn(outputs, targets).item() - avg_val_loss = float(val_loss / len(data_loader)) - return avg_val_loss + return float(val_loss / len(data_loader)) def _predict_epoch( model: torch.nn.Module, data_loader: DataLoader, device: str, + show_progress: bool = False, ) -> torch.Tensor: """Predict using the model for one epoch.""" model.eval() predictions = [] with torch.no_grad(): - for features, _ in track(data_loader): + for features, _ in track( + data_loader, description="Predicting...", transient=True, disable=not show_progress + ): features = [feature_tensor.to(device) for feature_tensor in features] outputs = model(*features) predictions.append(outputs.cpu()) return torch.cat(predictions, dim=0).squeeze() + + +def _create_progress(disable: bool = False) -> Progress: + """Create a Rich progress bar for training.""" + return Progress( + TextColumn("[bold blue]{task.description}"), + BarColumn(), + MofNCompleteColumn(), + TextColumn("|"), + TimeRemainingColumn(), + TextColumn("|"), + TextColumn("{task.fields[status]}"), + disable=disable, + transient=True, + ) diff --git a/deeplc/calibration.py b/deeplc/calibration.py index 9621638..cf344cb 100644 --- a/deeplc/calibration.py +++ b/deeplc/calibration.py @@ -57,9 +57,10 @@ def transform(self, source: np.ndarray) -> np.ndarray: class PiecewiseLinearCalibration(Calibration): def __init__( self, - number_of_splits: int = 50, + number_of_splits: int = 10, extrapolate: bool = True, - use_median: bool = True, + use_median: bool = False, + min_samples_per_segment: int = 20, ) -> None: """ Piece-wise linear calibration based on per-split anchors. @@ -74,11 +75,19 @@ def __init__( If False, clips input values to the fitted range. use_median : bool If True, uses the median of each segment to define anchors. If False, uses the mean. + min_samples_per_segment : int + Minimum number of samples required for a segment to contribute an anchor. + Segments with fewer samples are skipped, which helps avoid unstable anchors in + sparse regions when using many splits. """ super().__init__() self.number_of_splits = int(number_of_splits) self.extrapolate = bool(extrapolate) self.use_median = bool(use_median) + self.min_samples_per_segment = int(min_samples_per_segment) + + if self.min_samples_per_segment < 1: + raise ValueError("`min_samples_per_segment` must be >= 1.") self._calibrate_min: float | None = None self._calibrate_max: float | None = None @@ -114,13 +123,20 @@ def fit(self, target: np.ndarray, source: np.ndarray) -> None: raise CalibrationError("Source values have zero or invalid range; cannot calibrate.") boundaries = np.linspace(cal_min, cal_max, self.number_of_splits + 1, dtype=np.float32) - starts: np.ndarray = np.searchsorted(source, boundaries[:-1], side="left") # type: ignore[var-annotated] - ends: np.ndarray = np.searchsorted(source, boundaries[1:], side="left") # type: ignore[var-annotated] + starts_raw: np.ndarray = np.searchsorted(source, boundaries[:-1], side="left") # type: ignore[var-annotated] + ends_raw: np.ndarray = np.searchsorted(source, boundaries[1:], side="left") # type: ignore[var-annotated] + + # Merge adjacent sparse segments by assigning each segment to a group based on + # how many min_samples-sized chunks the cumulative count has crossed so far. + # Segments whose cumulative count falls within the same chunk share a group id + # and are merged into a single anchor. + counts = ends_raw - starts_raw + group_ids = (np.cumsum(counts) - 1) // self.min_samples_per_segment + group_start_indices = np.concatenate(([0], np.flatnonzero(np.diff(group_ids)) + 1)) + group_end_indices = np.concatenate((group_start_indices[1:] - 1, [len(starts_raw) - 1])) - # Filter out empty segments - valid_segments = ends > starts - starts = starts[valid_segments] - ends = ends[valid_segments] + starts = starts_raw[group_start_indices] + ends = ends_raw[group_end_indices] # Compute anchors for all segments aggregate_func = np.median if self.use_median else np.mean @@ -234,33 +250,27 @@ def fit( self, target: np.ndarray, source: np.ndarray, - simplified: bool = False, ) -> None: """Fit a spline-based model mapping source to target values.""" target, source = _prepare_series(target, source) - # TODO: What's the use of `simplified`? Was taken from original code. - if simplified: - linear_model = LinearRegression() - linear_model.fit(source.reshape(-1, 1), target) - linear_model_left = linear_model - spline_model = linear_model - linear_model_right = linear_model - else: - spline = SplineTransformer(degree=4, n_knots=int(len(source) / 500) + 5) - spline_model = make_pipeline(spline, LinearRegression()) - spline_model.fit(source.reshape(-1, 1), target) - - n_top = int(len(source) * 0.1) - X_left = source[:n_top] - y_left = target[:n_top] - linear_model_left = LinearRegression() - linear_model_left.fit(X_left.reshape(-1, 1), y_left) - - X_right = source[-n_top:] - y_right = target[-n_top:] - linear_model_right = LinearRegression() - linear_model_right.fit(X_right.reshape(-1, 1), y_right) + # Spline model + spline = SplineTransformer(degree=4, n_knots=int(len(source) / 500) + 5) + spline_model = make_pipeline(spline, LinearRegression()) + spline_model.fit(source.reshape(-1, 1), target) + + # Linear fit for left trail + n_top = int(len(source) * 0.1) + X_left = source[:n_top] + y_left = target[:n_top] + linear_model_left = LinearRegression() + linear_model_left.fit(X_left.reshape(-1, 1), y_left) + + # Linear fit for right trail + X_right = source[-n_top:] + y_right = target[-n_top:] + linear_model_right = LinearRegression() + linear_model_right.fit(X_right.reshape(-1, 1), y_right) self._calibrate_min = float(np.min(source)) self._calibrate_max = float(np.max(source)) diff --git a/deeplc/core.py b/deeplc/core.py index ab6d0ce..0c93f8d 100644 --- a/deeplc/core.py +++ b/deeplc/core.py @@ -13,7 +13,6 @@ from deeplc import _model_ops from deeplc.calibration import ( Calibration, - PiecewiseLinearCalibration, SplineTransformerCalibration, ) from deeplc.data import DeepLCDataset, split_datasets @@ -48,7 +47,6 @@ def predict( Retention time predictions. """ - LOGGER.info("Predicting retention times...") return _model_ops.predict( model=model or DEFAULT_MODEL, data=DeepLCDataset.from_psm_list(psm_list), @@ -56,6 +54,67 @@ def predict( ).numpy() +def calibrate( + psm_list_reference: PSMList, + model: torch.nn.Module | PathLike | str | None = None, + calibration: Calibration | None = None, + predict_kwargs: dict | None = None, +) -> Calibration: + """ + Return a `Calibration` instance fitted to the reference dataset. + + Parameters + ---------- + psm_list_reference + List of PSMs to use as reference for calibration. + model + Trained model or path to model file. + calibration + Calibration instance to use. If None, SplineTransformerCalibration is used. + predict_kwargs + Additional keyword arguments to pass to the prediction function. + + Returns + ------- + Calibration + Fitted calibration instance. + + """ + # Get calibration + if calibration is None: + LOGGER.debug("No calibration provided, using SplineTransformerCalibration by default.") + calibration = SplineTransformerCalibration() + elif not isinstance(calibration, Calibration): + raise ValueError( + f"Expected calibration to be of type `Calibration`, got {type(calibration)}" + ) + if calibration.is_fitted: + LOGGER.warning( + "Provided Calibration is already fitted. Refitting will overwrite existing fit." + ) + + if any(psm_list_reference["is_decoy"]): + LOGGER.warning( + "Reference PSM list contains decoy PSMs. " + "These will be included in the calibration fitting." + ) + + # Predict initial retention times for the reference dataset + LOGGER.debug("Predicting retention times for reference...") + source_rt_cal = predict( + psm_list=psm_list_reference, + model=model, + predict_kwargs=predict_kwargs, + ) + + # Fit calibration + LOGGER.debug("Fitting calibration...") + target_rt_cal = np.array(psm_list_reference["retention_time"], dtype=np.float32) + calibration.fit(target=target_rt_cal, source=source_rt_cal) + + return calibration + + def predict_and_calibrate( psm_list: PSMList, psm_list_reference: PSMList, @@ -93,31 +152,19 @@ def predict_and_calibrate( predict_kwargs=predict_kwargs, ) - # Fit calibration - # Validate passed calibration instance or create default if None - if calibration is None: - LOGGER.debug("No calibration provided, using SplineTransformerCalibration by default.") - calibration = SplineTransformerCalibration() - elif not isinstance(calibration, Calibration): + if calibration is not None and not isinstance(calibration, Calibration): raise ValueError( - f"Expected calibration to be of type Calibration, got {type(calibration)}" + f"Expected calibration to be of type `Calibration`, got {type(calibration)}" ) # Fit calibration if not already fitted - if not calibration.is_fitted: - LOGGER.info("Fitting calibration...") - if any(psm_list_reference["is_decoy"]): - LOGGER.warning( - "Reference PSM list contains decoy PSMs. " - "These will be included in the calibration fitting." - ) - target_rt_cal = np.array(psm_list_reference["retention_time"], dtype=np.float32) - source_rt_cal = predict( - psm_list=psm_list_reference, + if calibration is None or not calibration.is_fitted: + calibration = calibrate( + psm_list_reference=psm_list_reference, model=model, + calibration=calibration, predict_kwargs=predict_kwargs, ) - calibration.fit(target=target_rt_cal, source=source_rt_cal) else: LOGGER.info("Calibration is already fitted, skipping fitting step.") @@ -131,8 +178,6 @@ def finetune_and_predict( psm_list: PSMList, psm_list_reference: PSMList, model: torch.nn.Module | PathLike | str | None = None, - calibration: Calibration | None = None, - partial_freeze: bool = False, train_kwargs: dict | None = None, predict_kwargs: dict | None = None, ) -> np.ndarray: @@ -147,11 +192,6 @@ def finetune_and_predict( List of PSMs to use as reference for fine-tuning and calibration. model Trained model or path to model file. - calibration - Calibration instance to use. If already fitted, it will be re-fitted after fine-tuning - using the same options. If None, a simple PiecewiseLinearCalibration is used. - partial_freeze - If True, only the final layer of the model will be finetuned. train_kwargs Additional keyword arguments to pass to the training function. predict_kwargs @@ -167,7 +207,6 @@ def finetune_and_predict( finetuned_model = finetune( psm_list=psm_list_reference, model=model, - partial_freeze=partial_freeze, train_kwargs=train_kwargs, ) @@ -179,31 +218,13 @@ def finetune_and_predict( predict_kwargs=predict_kwargs, ) - # Fit calibration - # Validate passed calibration instance or create default if None - if calibration is None: - LOGGER.info("No calibration provided, using PiecewiseLinearCalibration by default.") - calibration = PiecewiseLinearCalibration() - elif not isinstance(calibration, Calibration): - raise ValueError( - f"Expected calibration to be of type Calibration, got {type(calibration)}" - ) - - # TODO: Is this necessary? Should it work equally well without calibration? - # Fit calibration if not already fitted + # Fit calibration with simple PiecewiseLinearCalibration to the fine-tuned model predictions LOGGER.info("Fitting calibration with fine-tuned model predictions...") - if any(psm_list_reference["is_decoy"]): # remove this one since already in finetune? - LOGGER.warning( - "Reference PSM list contains decoy PSMs. " - "These will be included in the calibration fitting." - ) - target_rt_cal = np.array(psm_list_reference["retention_time"], dtype=np.float32) - source_rt_cal = predict( - psm_list=psm_list_reference, + calibration = calibrate( + psm_list_reference=psm_list_reference, model=finetuned_model, predict_kwargs=predict_kwargs, ) - calibration.fit(target=target_rt_cal, source=source_rt_cal) # Apply calibration to predictions calibrated_rt = calibration.transform(predicted_rt) @@ -216,7 +237,6 @@ def finetune( psm_list_validation: PSMList | None = None, validation_split: float = 0.1, model: torch.nn.Module | PathLike | str | None = None, - partial_freeze: bool = False, train_kwargs: dict | None = None, ) -> torch.nn.Module: """ @@ -231,8 +251,6 @@ def finetune( used. model Trained model or path to model file. - partial_freeze - If True, only the final layer of the model will be finetuned. train_kwargs Additional keyword arguments to pass to the training function. @@ -253,12 +271,10 @@ def finetune( training_dataset, validation_dataset = split_datasets( training_data, validation_data=validation_data, validation_split=validation_split ) - LOGGER.info("Training new model...") finetuned_model = _model_ops.train( model=model or DEFAULT_MODEL, train_dataset=training_dataset, validation_dataset=validation_dataset, - trainable_layers="33_1" if partial_freeze else None, # TODO: Don't hardcode **(train_kwargs or {}), ) return finetuned_model diff --git a/deeplc/package_data/models/full_hc_PXD005573_pub_1fd8363d9af9dcad3be7553c39396960.pt b/deeplc/package_data/models/full_hc_PXD005573_pub_1fd8363d9af9dcad3be7553c39396960.pt index 82399a4..9f2a7db 100644 Binary files a/deeplc/package_data/models/full_hc_PXD005573_pub_1fd8363d9af9dcad3be7553c39396960.pt and b/deeplc/package_data/models/full_hc_PXD005573_pub_1fd8363d9af9dcad3be7553c39396960.pt differ diff --git a/deeplc/package_data/models/full_hc_PXD005573_pub_8c22d89667368f2f02ad996469ba157e.pt b/deeplc/package_data/models/full_hc_PXD005573_pub_8c22d89667368f2f02ad996469ba157e.pt index 2d67b07..660b830 100644 Binary files a/deeplc/package_data/models/full_hc_PXD005573_pub_8c22d89667368f2f02ad996469ba157e.pt and b/deeplc/package_data/models/full_hc_PXD005573_pub_8c22d89667368f2f02ad996469ba157e.pt differ diff --git a/deeplc/package_data/models/full_hc_PXD005573_pub_cb975cfdd4105f97efa0b3afffe075cc.pt b/deeplc/package_data/models/full_hc_PXD005573_pub_cb975cfdd4105f97efa0b3afffe075cc.pt index 292a21a..db7e953 100644 Binary files a/deeplc/package_data/models/full_hc_PXD005573_pub_cb975cfdd4105f97efa0b3afffe075cc.pt and b/deeplc/package_data/models/full_hc_PXD005573_pub_cb975cfdd4105f97efa0b3afffe075cc.pt differ diff --git a/tests/test_calibration.py b/tests/test_calibration.py index 15b854d..229e0d5 100644 --- a/tests/test_calibration.py +++ b/tests/test_calibration.py @@ -77,3 +77,27 @@ def test_zero_range_predicted_raises(): cal = PiecewiseLinearCalibration(number_of_splits=10) with pytest.raises(CalibrationError): cal.fit(target=y, source=x) + + +def test_piecewise_skips_sparse_segments_with_min_samples_threshold(): + source_dense = np.linspace(0.0, 80.0, 1000, dtype=np.float32) + source_sparse = np.array([95.0, 97.0, 99.0], dtype=np.float32) + source = np.concatenate([source_dense, source_sparse]).astype(np.float32) + target = (1.2 * source) + 3.0 + + cal_no_threshold = PiecewiseLinearCalibration( + number_of_splits=100, + min_samples_per_segment=1, + ) + cal_no_threshold.fit(target=target, source=source) + x_no_threshold, _ = cal_no_threshold.get_calibration_curve() + + cal_threshold = PiecewiseLinearCalibration( + number_of_splits=100, + min_samples_per_segment=10, + ) + cal_threshold.fit(target=target, source=source) + x_threshold, _ = cal_threshold.get_calibration_curve() + + assert x_threshold.size > 1 + assert x_threshold.size < x_no_threshold.size diff --git a/tests/test_features.py b/tests/test_features.py new file mode 100644 index 0000000..a396874 --- /dev/null +++ b/tests/test_features.py @@ -0,0 +1,264 @@ +"""Tests for deeplc._features.encode_peptidoform.""" + +from __future__ import annotations + +import warnings + +import numpy as np +import pytest +from psm_utils import Peptidoform +from pyteomics import mass + +from deeplc._features import ( + DEFAULT_DICT_AA, + DEFAULT_DICT_INDEX, + DEFAULT_DICT_INDEX_POS, + DEFAULT_POSITIONS, + DEFAULT_POSITIONS_NEG, + DEFAULT_POSITIONS_POS, + encode_peptidoform, +) + +# HELPERS + +PADDING = 60 + +# Number of rows in pos_matrix: max(positions) - min(positions) + 1 +# DEFAULT_POSITIONS = {0,1,2,3,-1,-2,-3,-4} → 3 - (-4) + 1 = 8 +_POS_ROWS = max(DEFAULT_POSITIONS) - min(DEFAULT_POSITIONS) + 1 +# matrix_global = sum(std_matrix, axis=0) [6] + seq_len [1] + pos_matrix.flatten() [8*6] +_GLOBAL_BASE_LEN = len(DEFAULT_DICT_INDEX) + 1 + _POS_ROWS * len(DEFAULT_DICT_INDEX_POS) +# _compute_rolling_sum(std_matrix.T, n=2)[:, ::2].T → (30, 6) +_SUM_ROWS = (PADDING - 1) // 2 # == 29 for n=2, stride 2 on 59 cols + + +class TestReturnStructure: + """Tests that encode_peptidoform returns the expected keys and shapes.""" + + def test_returns_four_keys(self): + result = encode_peptidoform("ACDE") + assert set(result.keys()) == {"matrix", "matrix_sum", "matrix_global", "matrix_hc"} + + def test_matrix_shape(self): + result = encode_peptidoform("ACDE") + assert result["matrix"].shape == (PADDING, len(DEFAULT_DICT_INDEX)) + + def test_matrix_hc_shape(self): + result = encode_peptidoform("ACDE") + assert result["matrix_hc"].shape == (PADDING, len(DEFAULT_DICT_AA)) + + def test_matrix_global_shape_no_ccs(self): + result = encode_peptidoform("ACDE") + assert result["matrix_global"].shape == (_GLOBAL_BASE_LEN,) + + def test_matrix_global_shape_with_ccs(self): + result = encode_peptidoform("ACDE/2", add_ccs_features=True) + # add_ccs_features appends 5 extra values (H%, FWY%, DE%, KR%, charge) + assert result["matrix_global"].shape == (_GLOBAL_BASE_LEN + 5,) + + def test_matrix_sum_shape(self): + result = encode_peptidoform("ACDE") + assert result["matrix_sum"].ndim == 2 + assert result["matrix_sum"].shape[1] == len(DEFAULT_DICT_INDEX) + + def test_matrix_dtype(self): + result = encode_peptidoform("ACDE") + assert result["matrix"].dtype == np.float16 + + def test_matrix_hc_dtype(self): + result = encode_peptidoform("ACDE") + assert result["matrix_hc"].dtype == np.float16 + + +class TestStringInput: + """Tests that both str and Peptidoform inputs are accepted and equivalent.""" + + def test_str_and_peptidoform_are_equivalent(self): + str_result = encode_peptidoform("ACDE") + pf_result = encode_peptidoform(Peptidoform("ACDE")) + for key in str_result: + np.testing.assert_array_equal(str_result[key], pf_result[key]) + + +class TestPaddingAndSeqLen: + """Tests that padding and sequence length are handled correctly.""" + + def test_padded_rows_are_zero(self): + seq = "ACDE" + result = encode_peptidoform(seq) + # Rows beyond seq length should be all zeros in standard matrix + assert np.all(result["matrix"][len(seq) :] == 0) + + def test_padded_rows_are_zero_onehot(self): + seq = "ACDE" + result = encode_peptidoform(seq) + assert np.all(result["matrix_hc"][len(seq) :] == 0) + + def test_seq_len_encoded_in_matrix_global(self): + seq = "ACDE" + result = encode_peptidoform(seq) + # matrix_global[len(DEFAULT_DICT_INDEX)] holds seq_len + assert result["matrix_global"][len(DEFAULT_DICT_INDEX)] == len(seq) + + def test_truncation_warns(self): + long_seq = "A" * (PADDING + 5) + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + result = encode_peptidoform(long_seq) + assert any("Truncating" in str(warning.message) for warning in w) + # After truncation seq_len == PADDING + assert result["matrix_global"][len(DEFAULT_DICT_INDEX)] == PADDING + + +class TestOneHotEncoding: + """Tests the one-hot (matrix_hc) component.""" + + def test_first_residue_one_hot(self): + # "A" is index 5 in DEFAULT_DICT_AA + result = encode_peptidoform("ACDE") + assert result["matrix_hc"][0, DEFAULT_DICT_AA["A"]] == 1.0 + + def test_second_residue_one_hot(self): + result = encode_peptidoform("ACDE") + assert result["matrix_hc"][1, DEFAULT_DICT_AA["C"]] == 1.0 + + def test_each_residue_has_exactly_one_hot(self): + seq = "ACDE" + result = encode_peptidoform(seq) + for i in range(len(seq)): + assert result["matrix_hc"][i].sum() == 1.0 + + def test_padded_rows_are_zero_and_no_hot(self): + seq = "AC" + result = encode_peptidoform(seq) + assert result["matrix_hc"][2:].sum() == 0.0 + + +class TestStandardMatrixComposition: + """Tests that atomic composition in std_matrix is correct.""" + + def test_glycine_carbon_count(self): + # Glycine (G): C2 H3 N1 O1 — check carbon at index 0 + result = encode_peptidoform("G") + c_idx = DEFAULT_DICT_INDEX["C"] + expected_c = mass.std_aa_comp["G"]["C"] + assert result["matrix"][0, c_idx] == expected_c + + def test_unmodified_and_modified_differ_in_affected_residue(self): + # Oxidized methionine adds one O + unmod = encode_peptidoform("ACMDE") + mod = encode_peptidoform("ACM[Oxidation]DE") + o_idx = DEFAULT_DICT_INDEX["O"] + assert mod["matrix"][2, o_idx] > unmod["matrix"][2, o_idx] + + def test_modification_does_not_affect_other_residues(self): + unmod = encode_peptidoform("ACMDE") + mod = encode_peptidoform("ACM[Oxidation]DE") + for i in [0, 1, 3, 4]: + np.testing.assert_array_equal(mod["matrix"][i], unmod["matrix"][i]) + + +class TestNTerminalModification: + """Tests that N-terminal modifications are applied to position 0.""" + + def test_nterm_mod_changes_position_zero(self): + unmod = encode_peptidoform("ACDE") + mod = encode_peptidoform("[Acetyl]-ACDE") + # Acetyl adds C2H2O to position 0; at least carbon should increase + c_idx = DEFAULT_DICT_INDEX["C"] + assert mod["matrix"][0, c_idx] > unmod["matrix"][0, c_idx] + + def test_nterm_mod_does_not_affect_other_positions(self): + unmod = encode_peptidoform("ACDE") + mod = encode_peptidoform("[Acetyl]-ACDE") + for i in range(1, 4): + np.testing.assert_array_equal(mod["matrix"][i], unmod["matrix"][i]) + + def test_nterm_mod_reflected_in_matrix_global(self): + unmod = encode_peptidoform("ACDE") + mod = encode_peptidoform("[Acetyl]-ACDE") + # matrix_global contains the column sums so modification must change it + assert not np.array_equal(mod["matrix_global"], unmod["matrix_global"]) + + def test_nterm_mod_reflected_in_pos_matrix_part(self): + # Position 0 is in DEFAULT_POSITIONS_POS so pos_matrix row 0 must change + unmod = encode_peptidoform("ACDE") + mod = encode_peptidoform("[Acetyl]-ACDE") + # pos_matrix is concatenated at the end of matrix_global after the base part + base = len(DEFAULT_DICT_INDEX) + 1 # col sums + seq_len + pos_flat_unmod = unmod["matrix_global"][base:] + pos_flat_mod = mod["matrix_global"][base:] + assert not np.array_equal(pos_flat_unmod, pos_flat_mod) + + +class TestCTerminalModification: + """Tests that C-terminal modifications are applied to the last residue position.""" + + def test_cterm_mod_changes_last_residue_position(self): + seq = "ACDE" + unmod = encode_peptidoform(seq) + mod = encode_peptidoform("ACDE-[Amidation]") + last = len(seq) - 1 + # Amidation changes N count (replaces O with NH2) + assert not np.array_equal(mod["matrix"][last], unmod["matrix"][last]) + + def test_cterm_mod_does_not_affect_other_positions(self): + unmod = encode_peptidoform("ACDE") + mod = encode_peptidoform("ACDE-[Amidation]") + for i in range(0, 3): + np.testing.assert_array_equal(mod["matrix"][i], unmod["matrix"][i]) + + def test_cterm_mod_reflected_in_matrix_global(self): + unmod = encode_peptidoform("ACDE") + mod = encode_peptidoform("ACDE-[Amidation]") + assert not np.array_equal(mod["matrix_global"], unmod["matrix_global"]) + + +class TestBothTerminalModifications: + """Tests a peptide carrying both N- and C-terminal modifications.""" + + def test_both_term_mods_change_both_ends(self): + unmod = encode_peptidoform("ACDE") + mod = encode_peptidoform("[Acetyl]-ACDE-[Amidation]") + assert not np.array_equal(mod["matrix"][0], unmod["matrix"][0]) + assert not np.array_equal(mod["matrix"][3], unmod["matrix"][3]) + + def test_middle_residues_unchanged(self): + unmod = encode_peptidoform("ACDE") + mod = encode_peptidoform("[Acetyl]-ACDE-[Amidation]") + for i in [1, 2]: + np.testing.assert_array_equal(mod["matrix"][i], unmod["matrix"][i]) + + +class TestCCSFeatures: + """Tests the add_ccs_features flag.""" + + def test_ccs_features_requires_charge(self): + with pytest.raises(ValueError, match="no charge"): + encode_peptidoform("ACDE", add_ccs_features=True) + + def test_ccs_features_appends_five_values(self): + base = encode_peptidoform("ACDE/2") + ccs = encode_peptidoform("ACDE/2", add_ccs_features=True) + assert ccs["matrix_global"].shape[0] == base["matrix_global"].shape[0] + 5 + + def test_ccs_charge_value_position(self): + # matrix_global layout with CCS: + # [col_sums(6), seq_len(1), H%(1), FWY%(1), DE%(1), KR%(1), charge(1), pos_flat(48)] + charge = 3 + result = encode_peptidoform(f"ACDE/{charge}", add_ccs_features=True) + charge_idx = len(DEFAULT_DICT_INDEX) + 1 + 4 # 6 col sums + seq_len + 4 ratios + assert result["matrix_global"][charge_idx] == charge + + +class TestShortPeptide: + """Tests edge cases for short peptides.""" + + def test_single_residue(self): + result = encode_peptidoform("A") + assert result["matrix"].shape == (PADDING, len(DEFAULT_DICT_INDEX)) + assert result["matrix_hc"][0, DEFAULT_DICT_AA["A"]] == 1.0 + + def test_two_residues_no_crash(self): + result = encode_peptidoform("AC") + assert result["matrix_global"].shape == (_GLOBAL_BASE_LEN,)