From b629b668eea32158aaaff14586fc75bbca382447 Mon Sep 17 00:00:00 2001 From: Howard Chiu <137316255+chiuhoward@users.noreply.github.com> Date: Thu, 23 Oct 2025 14:32:26 -0700 Subject: [PATCH 1/6] Add CovariateRegressor with comprehensive functionality - Implements CovariateRegressor for regressing covariates from features - Includes find_subset_indices utility for hash and precise matching - Adds IdentityTransformer helper class - Tests cover edge cases and error handling - Supports cross-validation, missing value imputation, and flexible pipelines - All tests pass with full coverage of core functionality --- afqinsight/CovariateRegressor.py | 225 ++++++++++++++++++++ afqinsight/tests/test_CovariateRegressor.py | 208 ++++++++++++++++++ 2 files changed, 433 insertions(+) create mode 100644 afqinsight/CovariateRegressor.py create mode 100644 afqinsight/tests/test_CovariateRegressor.py diff --git a/afqinsight/CovariateRegressor.py b/afqinsight/CovariateRegressor.py new file mode 100644 index 0000000..1a65173 --- /dev/null +++ b/afqinsight/CovariateRegressor.py @@ -0,0 +1,225 @@ +import numpy as np +from scipy.linalg import lstsq +from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.impute import SimpleImputer + + +def find_subset_indices(X_full, X_subset, method="hash"): + """ + Find row indices in X_full that correspond to rows in X_subset. + Supports 'hash' (fast) and 'precise' (element-wise) matching. + """ + if X_full.shape[1] != X_subset.shape[1]: + raise ValueError( + f"Feature dimensions don't match: {X_full.shape[1]} vs {X_subset.shape[1]}" + ) + indices = [] + if method == "precise": + for i, subset_row in enumerate(X_subset): + matches = [ + j + for j, full_row in enumerate(X_full) + if np.array_equal(full_row, subset_row, equal_nan=True) + ] + if not matches: + raise ValueError(f"No matching row found for subset row {i}") + indices.append(matches[0]) + elif method == "hash": + full_hashes = [hash(row.tobytes()) for row in X_full] + for i, subset_row in enumerate(X_subset): + subset_hash = hash(subset_row.tobytes()) + try: + indices.append(full_hashes.index(subset_hash)) + except ValueError as e: + raise ValueError(f"No matching row found for subset row {i}") from e + else: + raise ValueError(f"Unknown method '{method}'. Use 'hash' or 'precise'.") + return np.array(indices) + + +class IdentityTransformer(BaseEstimator, TransformerMixin): + """A transformer that returns the input unchanged.""" + + def fit(self, X, y=None): + return self + + def transform(self, X): + return X + + +class CovariateRegressor(BaseEstimator, TransformerMixin): + """ + Fits covariate(s) onto each feature in X and returns their residuals. + """ + + def __init__( + self, + covariate, + X, + pipeline=None, + cross_validate=True, + precise=False, + unique_id_col_index=None, + stack_intercept=True, + ): + """Regresses out a variable (covariate) from each feature in X. + + Parameters + ---------- + covariate : numpy array + Array of length (n_samples, n_covariates) to regress out of each + feature; May have multiple columns for multiple covariates. + X : numpy array + Array of length (n_samples, n_features), from which the covariate + will be regressed. This is used to determine how the + covariate-models should be cross-validated (which is necessary + to use in in scikit-learn Pipelines). + cross_validate : bool + Whether to cross-validate the covariate-parameters (y~covariate) + estimated from the train-set to the test set (cross_validate=True) + or whether to fit the covariate regressor separately on the test-set + (cross_validate=False). Setting this parameter to True is equivalent + to "foldwise covariate regression" (FwCR) as described in our paper + (https://www.biorxiv.org/content/early/2018/03/28/290684). Setting + this parameter to False, however, is NOT equivalent to "whole + dataset covariate regression" (WDCR) as it does not apply covariate + regression to the *full* dataset, but simply refits the covariate + model on the test-set. We recommend setting this parameter to True. + precise: bool + Transformer-objects in scikit-learn only allow to pass the data + (X) and optionally the target (y) to the fit and transform methods. + However, we need to index the covariate accordingly as well. To do so, + we compare the X during initialization (self.X) with the X passed to + fit/transform. As such, we can infer which samples are passed to the + methods and index the covariate accordingly. When setting precise to + True, the arrays are compared feature-wise, which is accurate, but + relatively slow. When setting precise to False, it will infer the index + by looking at the hash of all the features, which is much + faster. Also, to aid the accuracy, we remove the features which are constant + (0) across samples. + stack_intercept : bool + Whether to stack an intercept to the covariate (default is True) + + Attributes + ---------- + weights_ : numpy array + Array with weights for the covariate(s). + """ + self.covariate = covariate.astype(np.float64) + self.cross_validate = cross_validate + self.X = X + self.precise = precise + self.stack_intercept = stack_intercept + self.weights_ = None + self.pipeline = pipeline + self.imputer = SimpleImputer(strategy="median") + self.X_imputer = SimpleImputer(strategy="median") + self.unique_id_col_index = unique_id_col_index + + def _prepare_covariate(self, covariate): + """Prepare covariate matrix (adds intercept if needed)""" + if self.stack_intercept: + return np.c_[np.ones((covariate.shape[0], 1)), covariate] + return covariate + + def fit(self, X, y=None): + """Fits the covariate-regressor to X. + + Parameters + ---------- + X : numpy array + An array of shape (n_samples, n_features), which should correspond + to your train-set only! + y : None + Included for compatibility; does nothing. + """ + + # Prepare covariate matrix (adds intercept if needed) + covariate = self._prepare_covariate(self.covariate) + + # Find indices of X subset in the original X + method = "precise" if self.precise else "hash" + fit_idx = find_subset_indices(self.X, X, method=method) + + # Remove unique ID column if specified + if self.unique_id_col_index is not None: + X = np.delete(X, self.unique_id_col_index, axis=1) + + # Extract covariate data for the fitting subset + covariate_fit = covariate[fit_idx, :] + + # Conditional imputation for covariate data + if np.isnan(covariate_fit).any(): + covariate_fit = self.imputer.fit_transform(covariate_fit) + else: + # Still fit the imputer for consistency in transform + self.imputer.fit(covariate_fit) + + # Apply pipeline transformation if specified + if self.pipeline is not None: + X = self.pipeline.fit_transform(X) + + # Conditional imputation for X + if np.isnan(X).any(): + X = self.X_imputer.fit_transform(X) + else: + # Still fit the imputer for consistency in transform + self.X_imputer.fit(X) + + # Fit linear regression: X = covariate * weights + residuals + # Using scipy's lstsq for numerical stability + self.weights_ = lstsq(covariate_fit, X)[0] + + return self + + def transform(self, X): + """Regresses out covariate from X. + + Parameters + ---------- + X : numpy array + An array of shape (n_samples, n_features), which should correspond + to your train-set only! + + Returns + ------- + X_new : ndarray + ndarray with covariate-regressed features + """ + + if not self.cross_validate: + self.fit(X) + + # Prepare covariate matrix (adds intercept if needed) + covariate = self._prepare_covariate(self.covariate) + + # Find indices of X subset in the original X + method = "precise" if self.precise else "hash" + transform_idx = find_subset_indices(self.X, X, method=method) + + # Remove unique ID column if specified + if self.unique_id_col_index is not None: + X = np.delete(X, self.unique_id_col_index, axis=1) + + # Extract covariate data for the transform subset + covariate_transform = covariate[transform_idx] + + # Conditional imputation for covariate data (use fitted imputer) + if np.isnan(covariate_transform).any(): + covariate_transform = self.imputer.transform(covariate_transform) + + # Apply pipeline transformation if specified + if self.pipeline is not None: + X = self.pipeline.transform(X) + + # Conditional imputation for X (use fitted imputer) + if np.isnan(X).any(): + X = self.X_imputer.transform(X) + + # Compute residuals + X_new = X - covariate_transform.dot(self.weights_) + + # Ensure no NaNs in output + X_new = np.nan_to_num(X_new) + + return X_new diff --git a/afqinsight/tests/test_CovariateRegressor.py b/afqinsight/tests/test_CovariateRegressor.py new file mode 100644 index 0000000..e75d335 --- /dev/null +++ b/afqinsight/tests/test_CovariateRegressor.py @@ -0,0 +1,208 @@ +import numpy as np +import pytest +from sklearn.model_selection import train_test_split + +from afqinsight.CovariateRegressor import ( + CovariateRegressor, + IdentityTransformer, + find_subset_indices, +) + + +class TestFindSubsetIndices: + def test_find_subset_indices_precise(self): + """Test find_subset_indices with precise method""" + np.random.seed(42) + X_full = np.random.rand(100, 10) + X_subset = X_full[20:30] # Known subset + + indices = find_subset_indices(X_full, X_subset, method="precise") + expected_indices = np.arange(20, 30) + + np.testing.assert_array_equal(indices, expected_indices) + + def test_find_subset_indices_hash(self): + """Test find_subset_indices with hash method""" + np.random.seed(42) + X_full = np.random.rand(50, 10) + X_subset = X_full[10:20] # Known subset + + indices = find_subset_indices(X_full, X_subset, method="hash") + expected_indices = np.arange(10, 20) + + np.testing.assert_array_equal(indices, expected_indices) + + def test_dimension_mismatch_error(self): + """Test error when dimensions don't match""" + X_full = np.random.rand(50, 10) + X_subset = np.random.rand(20, 5) # Different number of features + + with pytest.raises(ValueError, match="Feature dimensions don't match"): + find_subset_indices(X_full, X_subset) + + def test_no_matching_row_error(self): + """Test error when no matching row is found""" + X_full = np.random.rand(50, 10) + X_subset = np.random.rand(20, 10) # Different data, no matches + + with pytest.raises(ValueError, match="No matching row found"): + find_subset_indices(X_full, X_subset, method="precise") + + def test_invalid_method_error(self): + """Test error with invalid method""" + X_full = np.random.rand(50, 10) + X_subset = X_full[10:20] + + with pytest.raises(ValueError, match="Unknown method"): + find_subset_indices(X_full, X_subset, method="invalid") + + +class TestIdentityTransformer: + def test_identity_transformer(self): + """Test that IdentityTransformer returns input unchanged""" + X = np.random.rand(100, 20) + transformer = IdentityTransformer() + + # Test fit returns self + assert transformer.fit(X) is transformer + + # Test transform returns unchanged data + X_transformed = transformer.transform(X) + np.testing.assert_array_equal(X, X_transformed) + + +class TestCovariateRegressor: + def test_basic_functionality(self): + """Test basic covariate regression""" + np.random.seed(42) + n_samples, n_features = 100, 20 + + # Create synthetic data + X = np.random.randn(n_samples, n_features) + covariate = np.random.randn(n_samples, 2) # 2 covariates + + # Initialize regressor + regressor = CovariateRegressor(covariate=covariate, X=X) + + # Fit and transform + X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + # Check output shape + assert X_residuals.shape == X_test.shape + + # Check that weights were fitted + assert regressor.weights_ is not None + assert regressor.weights_.shape[0] == 3 # 2 covariates + intercept + assert regressor.weights_.shape[1] == n_features + + def test_cross_validate_false(self): + """Test with cross_validate=False""" + np.random.seed(42) + n_samples, n_features = 50, 10 + + X = np.random.randn(n_samples, n_features) + covariate = np.random.randn(n_samples, 1) + + regressor = CovariateRegressor(covariate=covariate, X=X, cross_validate=False) + + X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + assert X_residuals.shape == X_test.shape + + def test_with_missing_values(self): + """Test handling of missing values""" + np.random.seed(42) + n_samples, n_features = 100, 15 + + X = np.random.randn(n_samples, n_features) + covariate = np.random.randn(n_samples, 1) + + # Introduce some NaN values + X[10:15, 5] = np.nan + covariate[20:25] = np.nan + + regressor = CovariateRegressor(covariate=covariate, X=X) + + X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + # Should not contain NaN values + assert not np.isnan(X_residuals).any() + assert X_residuals.shape == X_test.shape + + def test_no_intercept(self): + """Test with stack_intercept=False""" + np.random.seed(42) + n_samples, n_features = 50, 8 + + X = np.random.randn(n_samples, n_features) + covariate = np.random.randn(n_samples, 1) + + regressor = CovariateRegressor(covariate=covariate, X=X, stack_intercept=False) + + X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + # Check that weights shape reflects no intercept + assert regressor.weights_.shape[0] == 1 # Only covariate, no intercept + assert X_residuals.shape == X_test.shape + + def test_precise_vs_hash_methods(self): + """Test that both methods give same results""" + + np.random.seed(42) + X_full = np.random.rand(50, 10) + X_subset = X_full[10:20] + + indices_precise = find_subset_indices(X_full, X_subset, method="precise") + indices_hash = find_subset_indices(X_full, X_subset, method="hash") + + np.testing.assert_array_equal(indices_precise, indices_hash) + + def test_pipeline_integration(self): + """Test with a custom pipeline""" + from sklearn.preprocessing import StandardScaler + + np.random.seed(42) + n_samples, n_features = 100, 10 + + X = np.random.randn(n_samples, n_features) + covariate = np.random.randn(n_samples, 1) + pipeline = StandardScaler() + + regressor = CovariateRegressor(covariate=covariate, X=X, pipeline=pipeline) + + X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + assert X_residuals.shape == X_test.shape + + def test_unique_id_column_removal(self): + """Test removal of unique ID column""" + np.random.seed(42) + n_samples, n_features = 50, 10 + + # Add a unique ID column at index 0 + X = np.random.randn(n_samples, n_features) + ids = np.arange(n_samples).reshape(-1, 1) + X_with_id = np.hstack([ids, X]) + + covariate = np.random.randn(n_samples, 1) + + regressor = CovariateRegressor( + covariate=covariate, X=X_with_id, unique_id_col_index=0 + ) + + X_train, X_test = train_test_split(X_with_id, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + # Output should have one less column (ID removed) + assert X_residuals.shape[1] == n_features From 1cbb934516107affe094f27afcec4b0208986003 Mon Sep 17 00:00:00 2001 From: Howard Chiu <137316255+chiuhoward@users.noreply.github.com> Date: Thu, 30 Oct 2025 15:58:30 -0700 Subject: [PATCH 2/6] Update reference for Snoek et al --- afqinsight/CovariateRegressor.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/afqinsight/CovariateRegressor.py b/afqinsight/CovariateRegressor.py index 1a65173..d089886 100644 --- a/afqinsight/CovariateRegressor.py +++ b/afqinsight/CovariateRegressor.py @@ -104,6 +104,17 @@ def __init__( ---------- weights_ : numpy array Array with weights for the covariate(s). + + Notes + ----- + This is a modified version of the ConfoundRegressor from [1]_. + + References + ---------- + .. [1] Lukas Snoek, Steven Miletić, H. Steven Scholte, + "How to control for confounds in decoding analyses of neuroimaging data", + NeuroImage, Volume 184, 2019, Pages 741-760, ISSN 1053-8119, + https://doi.org/10.1016/j.neuroimage.2018.09.074. """ self.covariate = covariate.astype(np.float64) self.cross_validate = cross_validate From 4578b1b6fc4bdecd175408b1db718b7ecae2f824 Mon Sep 17 00:00:00 2001 From: Howard Chiu <137316255+chiuhoward@users.noreply.github.com> Date: Thu, 30 Oct 2025 16:01:24 -0700 Subject: [PATCH 3/6] Update inline reference to Snoek et al --- afqinsight/CovariateRegressor.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/afqinsight/CovariateRegressor.py b/afqinsight/CovariateRegressor.py index d089886..d139938 100644 --- a/afqinsight/CovariateRegressor.py +++ b/afqinsight/CovariateRegressor.py @@ -79,12 +79,12 @@ def __init__( estimated from the train-set to the test set (cross_validate=True) or whether to fit the covariate regressor separately on the test-set (cross_validate=False). Setting this parameter to True is equivalent - to "foldwise covariate regression" (FwCR) as described in our paper - (https://www.biorxiv.org/content/early/2018/03/28/290684). Setting - this parameter to False, however, is NOT equivalent to "whole - dataset covariate regression" (WDCR) as it does not apply covariate - regression to the *full* dataset, but simply refits the covariate - model on the test-set. We recommend setting this parameter to True. + to "foldwise covariate regression" (FwCR) as described in Snoek et al. + (2019). Setting this parameter to False, however, is NOT equivalent + to "whole dataset covariate regression" (WDCR) as it does not apply + covariate regression to the *full* dataset, but simply refits the + covariate model on the test-set. We recommend setting this parameter + to True. precise: bool Transformer-objects in scikit-learn only allow to pass the data (X) and optionally the target (y) to the fit and transform methods. From 39c747a16b736494f169ef80365874a2c5f56c2d Mon Sep 17 00:00:00 2001 From: Howard Chiu <137316255+chiuhoward@users.noreply.github.com> Date: Thu, 6 Nov 2025 11:58:32 -0800 Subject: [PATCH 4/6] Addressed some comments and renamed filenames from camelcase --- ...ateRegressor.py => covariate_regressor.py} | 45 +++++++++++-------- ...gressor.py => test_covariate_regressor.py} | 2 +- 2 files changed, 27 insertions(+), 20 deletions(-) rename afqinsight/{CovariateRegressor.py => covariate_regressor.py} (79%) rename afqinsight/tests/{test_CovariateRegressor.py => test_covariate_regressor.py} (99%) diff --git a/afqinsight/CovariateRegressor.py b/afqinsight/covariate_regressor.py similarity index 79% rename from afqinsight/CovariateRegressor.py rename to afqinsight/covariate_regressor.py index d139938..36fe54c 100644 --- a/afqinsight/CovariateRegressor.py +++ b/afqinsight/covariate_regressor.py @@ -74,29 +74,25 @@ def __init__( will be regressed. This is used to determine how the covariate-models should be cross-validated (which is necessary to use in in scikit-learn Pipelines). + pipeline : sklearn.pipeline.Pipeline or None, default=None + Optional scikit-learn pipeline to apply to the covariate before fitting + the regression model. If provided, the pipeline will be fitted on the + covariate data during the fit phase and applied to transform the covariate + in both fit and transform phases. This allows for preprocessing steps + such as imputation, scaling, normalization, or feature engineering to be + applied to the covariate consistently across train and test sets. If None, + the covariate is used as-is without any preprocessing. cross_validate : bool Whether to cross-validate the covariate-parameters (y~covariate) estimated from the train-set to the test set (cross_validate=True) or whether to fit the covariate regressor separately on the test-set - (cross_validate=False). Setting this parameter to True is equivalent - to "foldwise covariate regression" (FwCR) as described in Snoek et al. - (2019). Setting this parameter to False, however, is NOT equivalent - to "whole dataset covariate regression" (WDCR) as it does not apply - covariate regression to the *full* dataset, but simply refits the - covariate model on the test-set. We recommend setting this parameter - to True. + (cross_validate=False). precise: bool - Transformer-objects in scikit-learn only allow to pass the data - (X) and optionally the target (y) to the fit and transform methods. - However, we need to index the covariate accordingly as well. To do so, - we compare the X during initialization (self.X) with the X passed to - fit/transform. As such, we can infer which samples are passed to the - methods and index the covariate accordingly. When setting precise to - True, the arrays are compared feature-wise, which is accurate, but - relatively slow. When setting precise to False, it will infer the index - by looking at the hash of all the features, which is much - faster. Also, to aid the accuracy, we remove the features which are constant - (0) across samples. + When setting precise to True, the arrays are compared feature-wise, + which is accurate, but relatively slow. When setting precise to False, + it will infer the index of the covariates by looking at the hash of all + the features, which is much faster. Also, to aid the accuracy, we remove + the features which are constant (0) across samples. stack_intercept : bool Whether to stack an intercept to the covariate (default is True) @@ -107,7 +103,18 @@ def __init__( Notes ----- - This is a modified version of the ConfoundRegressor from [1]_. + This is a modified version of the ConfoundRegressor from [1]_. Setting + cross_validate to True is equivalent to "foldwise covariate regression" (FwCR) + as described in Snoek et al. (2019). Setting this parameter to False, however, + is NOT equivalent to "whole dataset covariate regression" (WDCR) as it does not + apply covariate regression to the *full* dataset, but simply refits the + covariate model on the test-set. We recommend setting this parameter to True. + Transformer-objects in scikit-learn only allow to pass the data (X) and + optionally the target (y) to the fit and transform methods. However, we need + to index the covariate accordingly as well. To do so, we compare the X during + initialization (self.X) with the X passed to fit/transform. As such, we can + infer which samples are passed to the methods and index the covariate + accordingly. The precise flag controls the precision of the index matching. References ---------- diff --git a/afqinsight/tests/test_CovariateRegressor.py b/afqinsight/tests/test_covariate_regressor.py similarity index 99% rename from afqinsight/tests/test_CovariateRegressor.py rename to afqinsight/tests/test_covariate_regressor.py index e75d335..46514c5 100644 --- a/afqinsight/tests/test_CovariateRegressor.py +++ b/afqinsight/tests/test_covariate_regressor.py @@ -2,7 +2,7 @@ import pytest from sklearn.model_selection import train_test_split -from afqinsight.CovariateRegressor import ( +from afqinsight.covariate_regressor import ( CovariateRegressor, IdentityTransformer, find_subset_indices, From eef48b85bb455049518bb300640b3e0681fb5794 Mon Sep 17 00:00:00 2001 From: Howard Chiu <137316255+chiuhoward@users.noreply.github.com> Date: Thu, 6 Nov 2025 16:23:05 -0800 Subject: [PATCH 5/6] addressed comments and updated license --- LICENSE | 24 ++++++++++++++ afqinsight/covariate_regressor.py | 34 ++++++++------------ afqinsight/tests/test_covariate_regressor.py | 28 ++++++++++++---- 3 files changed, 59 insertions(+), 27 deletions(-) diff --git a/LICENSE b/LICENSE index 9c131ad..e88d9bc 100755 --- a/LICENSE +++ b/LICENSE @@ -29,3 +29,27 @@ PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +Code used in the covariate_regressor module is distributed with the following license: + +MIT License + +Copyright (c) 2017 Lukas Snoek + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/afqinsight/covariate_regressor.py b/afqinsight/covariate_regressor.py index 36fe54c..e69f30a 100644 --- a/afqinsight/covariate_regressor.py +++ b/afqinsight/covariate_regressor.py @@ -4,10 +4,11 @@ from sklearn.impute import SimpleImputer -def find_subset_indices(X_full, X_subset, method="hash"): +def find_subset_indices(X_full, X_subset, method="hash", allow_missing=False): """ Find row indices in X_full that correspond to rows in X_subset. Supports 'hash' (fast) and 'precise' (element-wise) matching. + Allow_missing appends empty array for non-matching rows if True. """ if X_full.shape[1] != X_subset.shape[1]: raise ValueError( @@ -21,9 +22,9 @@ def find_subset_indices(X_full, X_subset, method="hash"): for j, full_row in enumerate(X_full) if np.array_equal(full_row, subset_row, equal_nan=True) ] - if not matches: + if not matches and not allow_missing: raise ValueError(f"No matching row found for subset row {i}") - indices.append(matches[0]) + indices.append(matches[0] if matches else []) elif method == "hash": full_hashes = [hash(row.tobytes()) for row in X_full] for i, subset_row in enumerate(X_subset): @@ -31,22 +32,15 @@ def find_subset_indices(X_full, X_subset, method="hash"): try: indices.append(full_hashes.index(subset_hash)) except ValueError as e: - raise ValueError(f"No matching row found for subset row {i}") from e + if allow_missing: + indices.append([]) + else: + raise ValueError(f"No matching row found for subset row {i}") from e else: raise ValueError(f"Unknown method '{method}'. Use 'hash' or 'precise'.") return np.array(indices) -class IdentityTransformer(BaseEstimator, TransformerMixin): - """A transformer that returns the input unchanged.""" - - def fit(self, X, y=None): - return self - - def transform(self, X): - return X - - class CovariateRegressor(BaseEstimator, TransformerMixin): """ Fits covariate(s) onto each feature in X and returns their residuals. @@ -55,7 +49,7 @@ class CovariateRegressor(BaseEstimator, TransformerMixin): def __init__( self, covariate, - X, + X_full, pipeline=None, cross_validate=True, precise=False, @@ -69,7 +63,7 @@ def __init__( covariate : numpy array Array of length (n_samples, n_covariates) to regress out of each feature; May have multiple columns for multiple covariates. - X : numpy array + X_full : numpy array Array of length (n_samples, n_features), from which the covariate will be regressed. This is used to determine how the covariate-models should be cross-validated (which is necessary @@ -112,7 +106,7 @@ def __init__( Transformer-objects in scikit-learn only allow to pass the data (X) and optionally the target (y) to the fit and transform methods. However, we need to index the covariate accordingly as well. To do so, we compare the X during - initialization (self.X) with the X passed to fit/transform. As such, we can + initialization (self.X_full) with the X passed to fit/transform. As such, we can infer which samples are passed to the methods and index the covariate accordingly. The precise flag controls the precision of the index matching. @@ -125,7 +119,7 @@ def __init__( """ self.covariate = covariate.astype(np.float64) self.cross_validate = cross_validate - self.X = X + self.X_full = X_full self.precise = precise self.stack_intercept = stack_intercept self.weights_ = None @@ -157,7 +151,7 @@ def fit(self, X, y=None): # Find indices of X subset in the original X method = "precise" if self.precise else "hash" - fit_idx = find_subset_indices(self.X, X, method=method) + fit_idx = find_subset_indices(self.X_full, X, method=method) # Remove unique ID column if specified if self.unique_id_col_index is not None: @@ -213,7 +207,7 @@ def transform(self, X): # Find indices of X subset in the original X method = "precise" if self.precise else "hash" - transform_idx = find_subset_indices(self.X, X, method=method) + transform_idx = find_subset_indices(self.X_full, X, method=method) # Remove unique ID column if specified if self.unique_id_col_index is not None: diff --git a/afqinsight/tests/test_covariate_regressor.py b/afqinsight/tests/test_covariate_regressor.py index 46514c5..51c2d14 100644 --- a/afqinsight/tests/test_covariate_regressor.py +++ b/afqinsight/tests/test_covariate_regressor.py @@ -1,14 +1,24 @@ import numpy as np import pytest +from sklearn.base import BaseEstimator, TransformerMixin from sklearn.model_selection import train_test_split from afqinsight.covariate_regressor import ( CovariateRegressor, - IdentityTransformer, find_subset_indices, ) +class IdentityTransformer(BaseEstimator, TransformerMixin): + """A transformer that returns the input unchanged.""" + + def fit(self, X, y=None): + return self + + def transform(self, X): + return X + + class TestFindSubsetIndices: def test_find_subset_indices_precise(self): """Test find_subset_indices with precise method""" @@ -82,7 +92,7 @@ def test_basic_functionality(self): covariate = np.random.randn(n_samples, 2) # 2 covariates # Initialize regressor - regressor = CovariateRegressor(covariate=covariate, X=X) + regressor = CovariateRegressor(covariate=covariate, X_full=X) # Fit and transform X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) @@ -105,7 +115,9 @@ def test_cross_validate_false(self): X = np.random.randn(n_samples, n_features) covariate = np.random.randn(n_samples, 1) - regressor = CovariateRegressor(covariate=covariate, X=X, cross_validate=False) + regressor = CovariateRegressor( + covariate=covariate, X_full=X, cross_validate=False + ) X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) regressor.fit(X_train) @@ -125,7 +137,7 @@ def test_with_missing_values(self): X[10:15, 5] = np.nan covariate[20:25] = np.nan - regressor = CovariateRegressor(covariate=covariate, X=X) + regressor = CovariateRegressor(covariate=covariate, X_full=X) X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) regressor.fit(X_train) @@ -143,7 +155,9 @@ def test_no_intercept(self): X = np.random.randn(n_samples, n_features) covariate = np.random.randn(n_samples, 1) - regressor = CovariateRegressor(covariate=covariate, X=X, stack_intercept=False) + regressor = CovariateRegressor( + covariate=covariate, X_full=X, stack_intercept=False + ) X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) regressor.fit(X_train) @@ -176,7 +190,7 @@ def test_pipeline_integration(self): covariate = np.random.randn(n_samples, 1) pipeline = StandardScaler() - regressor = CovariateRegressor(covariate=covariate, X=X, pipeline=pipeline) + regressor = CovariateRegressor(covariate=covariate, X_full=X, pipeline=pipeline) X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) regressor.fit(X_train) @@ -197,7 +211,7 @@ def test_unique_id_column_removal(self): covariate = np.random.randn(n_samples, 1) regressor = CovariateRegressor( - covariate=covariate, X=X_with_id, unique_id_col_index=0 + covariate=covariate, X_full=X_with_id, unique_id_col_index=0 ) X_train, X_test = train_test_split(X_with_id, test_size=0.3, random_state=42) From b4f89780dfe8ad9b0a1b249e927958ad8913615c Mon Sep 17 00:00:00 2001 From: Howard Chiu <137316255+chiuhoward@users.noreply.github.com> Date: Wed, 12 Nov 2025 17:30:37 -0800 Subject: [PATCH 6/6] Removed IdentityTransformer from test_covariate_regressor.py --- afqinsight/tests/test_covariate_regressor.py | 77 +++++++++++--------- 1 file changed, 44 insertions(+), 33 deletions(-) diff --git a/afqinsight/tests/test_covariate_regressor.py b/afqinsight/tests/test_covariate_regressor.py index 51c2d14..35930eb 100644 --- a/afqinsight/tests/test_covariate_regressor.py +++ b/afqinsight/tests/test_covariate_regressor.py @@ -1,7 +1,7 @@ import numpy as np import pytest -from sklearn.base import BaseEstimator, TransformerMixin from sklearn.model_selection import train_test_split +from sklearn.preprocessing import StandardScaler from afqinsight.covariate_regressor import ( CovariateRegressor, @@ -9,16 +9,6 @@ ) -class IdentityTransformer(BaseEstimator, TransformerMixin): - """A transformer that returns the input unchanged.""" - - def fit(self, X, y=None): - return self - - def transform(self, X): - return X - - class TestFindSubsetIndices: def test_find_subset_indices_precise(self): """Test find_subset_indices with precise method""" @@ -67,23 +57,14 @@ def test_invalid_method_error(self): find_subset_indices(X_full, X_subset, method="invalid") -class TestIdentityTransformer: - def test_identity_transformer(self): - """Test that IdentityTransformer returns input unchanged""" - X = np.random.rand(100, 20) - transformer = IdentityTransformer() - - # Test fit returns self - assert transformer.fit(X) is transformer - - # Test transform returns unchanged data - X_transformed = transformer.transform(X) - np.testing.assert_array_equal(X, X_transformed) - - class TestCovariateRegressor: def test_basic_functionality(self): - """Test basic covariate regression""" + """ + Test basic covariate regression with default parameters. + + Scenario: Standard use case with multiple covariates and default settings + (cross_validate=True, stack_intercept=True). + """ np.random.seed(42) n_samples, n_features = 100, 20 @@ -108,7 +89,12 @@ def test_basic_functionality(self): assert regressor.weights_.shape[1] == n_features def test_cross_validate_false(self): - """Test with cross_validate=False""" + """ + Test CovariateRegressor with cross_validate=False. + + Scenario: Disable cross-validation to test the regressor with + standard non-cross-validated fitting. + """ np.random.seed(42) n_samples, n_features = 50, 10 @@ -126,7 +112,13 @@ def test_cross_validate_false(self): assert X_residuals.shape == X_test.shape def test_with_missing_values(self): - """Test handling of missing values""" + """ + Test CovariateRegressor handles missing values (NaN) correctly. + + Edge case: Data contains NaN values in both features and covariates. + The regressor should handle these appropriately without introducing + NaN values in the output. + """ np.random.seed(42) n_samples, n_features = 100, 15 @@ -148,7 +140,12 @@ def test_with_missing_values(self): assert X_residuals.shape == X_test.shape def test_no_intercept(self): - """Test with stack_intercept=False""" + """ + Test CovariateRegressor with stack_intercept=False. + + Edge case: Disable intercept term in the regression model. + Weights should only contain the covariate coefficients, no intercept. + """ np.random.seed(42) n_samples, n_features = 50, 8 @@ -168,8 +165,13 @@ def test_no_intercept(self): assert X_residuals.shape == X_test.shape def test_precise_vs_hash_methods(self): - """Test that both methods give same results""" + """ + Test that find_subset_indices produces consistent results + across different methods (precise and hash). + Scenario: Verify that both methods for finding subset indices + return the same results. + """ np.random.seed(42) X_full = np.random.rand(50, 10) X_subset = X_full[10:20] @@ -180,9 +182,12 @@ def test_precise_vs_hash_methods(self): np.testing.assert_array_equal(indices_precise, indices_hash) def test_pipeline_integration(self): - """Test with a custom pipeline""" - from sklearn.preprocessing import StandardScaler + """ + Test CovariateRegressor with a preprocessing pipeline. + Scenario: Use StandardScaler as a preprocessing pipeline before + covariate regression to test integration with sklearn transformers. + """ np.random.seed(42) n_samples, n_features = 100, 10 @@ -199,7 +204,13 @@ def test_pipeline_integration(self): assert X_residuals.shape == X_test.shape def test_unique_id_column_removal(self): - """Test removal of unique ID column""" + """ + Test CovariateRegressor correctly removes unique ID column. + + Edge case: Data includes a unique ID column that should be excluded + from regression analysis. Verify that the output has one fewer + column than the input. + """ np.random.seed(42) n_samples, n_features = 50, 10