diff --git a/LICENSE b/LICENSE index 9c131ad..e88d9bc 100755 --- a/LICENSE +++ b/LICENSE @@ -29,3 +29,27 @@ PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +Code used in the covariate_regressor module is distributed with the following license: + +MIT License + +Copyright (c) 2017 Lukas Snoek + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/afqinsight/covariate_regressor.py b/afqinsight/covariate_regressor.py new file mode 100644 index 0000000..e69f30a --- /dev/null +++ b/afqinsight/covariate_regressor.py @@ -0,0 +1,237 @@ +import numpy as np +from scipy.linalg import lstsq +from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.impute import SimpleImputer + + +def find_subset_indices(X_full, X_subset, method="hash", allow_missing=False): + """ + Find row indices in X_full that correspond to rows in X_subset. + Supports 'hash' (fast) and 'precise' (element-wise) matching. + Allow_missing appends empty array for non-matching rows if True. + """ + if X_full.shape[1] != X_subset.shape[1]: + raise ValueError( + f"Feature dimensions don't match: {X_full.shape[1]} vs {X_subset.shape[1]}" + ) + indices = [] + if method == "precise": + for i, subset_row in enumerate(X_subset): + matches = [ + j + for j, full_row in enumerate(X_full) + if np.array_equal(full_row, subset_row, equal_nan=True) + ] + if not matches and not allow_missing: + raise ValueError(f"No matching row found for subset row {i}") + indices.append(matches[0] if matches else []) + elif method == "hash": + full_hashes = [hash(row.tobytes()) for row in X_full] + for i, subset_row in enumerate(X_subset): + subset_hash = hash(subset_row.tobytes()) + try: + indices.append(full_hashes.index(subset_hash)) + except ValueError as e: + if allow_missing: + indices.append([]) + else: + raise ValueError(f"No matching row found for subset row {i}") from e + else: + raise ValueError(f"Unknown method '{method}'. Use 'hash' or 'precise'.") + return np.array(indices) + + +class CovariateRegressor(BaseEstimator, TransformerMixin): + """ + Fits covariate(s) onto each feature in X and returns their residuals. + """ + + def __init__( + self, + covariate, + X_full, + pipeline=None, + cross_validate=True, + precise=False, + unique_id_col_index=None, + stack_intercept=True, + ): + """Regresses out a variable (covariate) from each feature in X. + + Parameters + ---------- + covariate : numpy array + Array of length (n_samples, n_covariates) to regress out of each + feature; May have multiple columns for multiple covariates. + X_full : numpy array + Array of length (n_samples, n_features), from which the covariate + will be regressed. This is used to determine how the + covariate-models should be cross-validated (which is necessary + to use in in scikit-learn Pipelines). + pipeline : sklearn.pipeline.Pipeline or None, default=None + Optional scikit-learn pipeline to apply to the covariate before fitting + the regression model. If provided, the pipeline will be fitted on the + covariate data during the fit phase and applied to transform the covariate + in both fit and transform phases. This allows for preprocessing steps + such as imputation, scaling, normalization, or feature engineering to be + applied to the covariate consistently across train and test sets. If None, + the covariate is used as-is without any preprocessing. + cross_validate : bool + Whether to cross-validate the covariate-parameters (y~covariate) + estimated from the train-set to the test set (cross_validate=True) + or whether to fit the covariate regressor separately on the test-set + (cross_validate=False). + precise: bool + When setting precise to True, the arrays are compared feature-wise, + which is accurate, but relatively slow. When setting precise to False, + it will infer the index of the covariates by looking at the hash of all + the features, which is much faster. Also, to aid the accuracy, we remove + the features which are constant (0) across samples. + stack_intercept : bool + Whether to stack an intercept to the covariate (default is True) + + Attributes + ---------- + weights_ : numpy array + Array with weights for the covariate(s). + + Notes + ----- + This is a modified version of the ConfoundRegressor from [1]_. Setting + cross_validate to True is equivalent to "foldwise covariate regression" (FwCR) + as described in Snoek et al. (2019). Setting this parameter to False, however, + is NOT equivalent to "whole dataset covariate regression" (WDCR) as it does not + apply covariate regression to the *full* dataset, but simply refits the + covariate model on the test-set. We recommend setting this parameter to True. + Transformer-objects in scikit-learn only allow to pass the data (X) and + optionally the target (y) to the fit and transform methods. However, we need + to index the covariate accordingly as well. To do so, we compare the X during + initialization (self.X_full) with the X passed to fit/transform. As such, we can + infer which samples are passed to the methods and index the covariate + accordingly. The precise flag controls the precision of the index matching. + + References + ---------- + .. [1] Lukas Snoek, Steven Miletić, H. Steven Scholte, + "How to control for confounds in decoding analyses of neuroimaging data", + NeuroImage, Volume 184, 2019, Pages 741-760, ISSN 1053-8119, + https://doi.org/10.1016/j.neuroimage.2018.09.074. + """ + self.covariate = covariate.astype(np.float64) + self.cross_validate = cross_validate + self.X_full = X_full + self.precise = precise + self.stack_intercept = stack_intercept + self.weights_ = None + self.pipeline = pipeline + self.imputer = SimpleImputer(strategy="median") + self.X_imputer = SimpleImputer(strategy="median") + self.unique_id_col_index = unique_id_col_index + + def _prepare_covariate(self, covariate): + """Prepare covariate matrix (adds intercept if needed)""" + if self.stack_intercept: + return np.c_[np.ones((covariate.shape[0], 1)), covariate] + return covariate + + def fit(self, X, y=None): + """Fits the covariate-regressor to X. + + Parameters + ---------- + X : numpy array + An array of shape (n_samples, n_features), which should correspond + to your train-set only! + y : None + Included for compatibility; does nothing. + """ + + # Prepare covariate matrix (adds intercept if needed) + covariate = self._prepare_covariate(self.covariate) + + # Find indices of X subset in the original X + method = "precise" if self.precise else "hash" + fit_idx = find_subset_indices(self.X_full, X, method=method) + + # Remove unique ID column if specified + if self.unique_id_col_index is not None: + X = np.delete(X, self.unique_id_col_index, axis=1) + + # Extract covariate data for the fitting subset + covariate_fit = covariate[fit_idx, :] + + # Conditional imputation for covariate data + if np.isnan(covariate_fit).any(): + covariate_fit = self.imputer.fit_transform(covariate_fit) + else: + # Still fit the imputer for consistency in transform + self.imputer.fit(covariate_fit) + + # Apply pipeline transformation if specified + if self.pipeline is not None: + X = self.pipeline.fit_transform(X) + + # Conditional imputation for X + if np.isnan(X).any(): + X = self.X_imputer.fit_transform(X) + else: + # Still fit the imputer for consistency in transform + self.X_imputer.fit(X) + + # Fit linear regression: X = covariate * weights + residuals + # Using scipy's lstsq for numerical stability + self.weights_ = lstsq(covariate_fit, X)[0] + + return self + + def transform(self, X): + """Regresses out covariate from X. + + Parameters + ---------- + X : numpy array + An array of shape (n_samples, n_features), which should correspond + to your train-set only! + + Returns + ------- + X_new : ndarray + ndarray with covariate-regressed features + """ + + if not self.cross_validate: + self.fit(X) + + # Prepare covariate matrix (adds intercept if needed) + covariate = self._prepare_covariate(self.covariate) + + # Find indices of X subset in the original X + method = "precise" if self.precise else "hash" + transform_idx = find_subset_indices(self.X_full, X, method=method) + + # Remove unique ID column if specified + if self.unique_id_col_index is not None: + X = np.delete(X, self.unique_id_col_index, axis=1) + + # Extract covariate data for the transform subset + covariate_transform = covariate[transform_idx] + + # Conditional imputation for covariate data (use fitted imputer) + if np.isnan(covariate_transform).any(): + covariate_transform = self.imputer.transform(covariate_transform) + + # Apply pipeline transformation if specified + if self.pipeline is not None: + X = self.pipeline.transform(X) + + # Conditional imputation for X (use fitted imputer) + if np.isnan(X).any(): + X = self.X_imputer.transform(X) + + # Compute residuals + X_new = X - covariate_transform.dot(self.weights_) + + # Ensure no NaNs in output + X_new = np.nan_to_num(X_new) + + return X_new diff --git a/afqinsight/tests/test_covariate_regressor.py b/afqinsight/tests/test_covariate_regressor.py new file mode 100644 index 0000000..35930eb --- /dev/null +++ b/afqinsight/tests/test_covariate_regressor.py @@ -0,0 +1,233 @@ +import numpy as np +import pytest +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import StandardScaler + +from afqinsight.covariate_regressor import ( + CovariateRegressor, + find_subset_indices, +) + + +class TestFindSubsetIndices: + def test_find_subset_indices_precise(self): + """Test find_subset_indices with precise method""" + np.random.seed(42) + X_full = np.random.rand(100, 10) + X_subset = X_full[20:30] # Known subset + + indices = find_subset_indices(X_full, X_subset, method="precise") + expected_indices = np.arange(20, 30) + + np.testing.assert_array_equal(indices, expected_indices) + + def test_find_subset_indices_hash(self): + """Test find_subset_indices with hash method""" + np.random.seed(42) + X_full = np.random.rand(50, 10) + X_subset = X_full[10:20] # Known subset + + indices = find_subset_indices(X_full, X_subset, method="hash") + expected_indices = np.arange(10, 20) + + np.testing.assert_array_equal(indices, expected_indices) + + def test_dimension_mismatch_error(self): + """Test error when dimensions don't match""" + X_full = np.random.rand(50, 10) + X_subset = np.random.rand(20, 5) # Different number of features + + with pytest.raises(ValueError, match="Feature dimensions don't match"): + find_subset_indices(X_full, X_subset) + + def test_no_matching_row_error(self): + """Test error when no matching row is found""" + X_full = np.random.rand(50, 10) + X_subset = np.random.rand(20, 10) # Different data, no matches + + with pytest.raises(ValueError, match="No matching row found"): + find_subset_indices(X_full, X_subset, method="precise") + + def test_invalid_method_error(self): + """Test error with invalid method""" + X_full = np.random.rand(50, 10) + X_subset = X_full[10:20] + + with pytest.raises(ValueError, match="Unknown method"): + find_subset_indices(X_full, X_subset, method="invalid") + + +class TestCovariateRegressor: + def test_basic_functionality(self): + """ + Test basic covariate regression with default parameters. + + Scenario: Standard use case with multiple covariates and default settings + (cross_validate=True, stack_intercept=True). + """ + np.random.seed(42) + n_samples, n_features = 100, 20 + + # Create synthetic data + X = np.random.randn(n_samples, n_features) + covariate = np.random.randn(n_samples, 2) # 2 covariates + + # Initialize regressor + regressor = CovariateRegressor(covariate=covariate, X_full=X) + + # Fit and transform + X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + # Check output shape + assert X_residuals.shape == X_test.shape + + # Check that weights were fitted + assert regressor.weights_ is not None + assert regressor.weights_.shape[0] == 3 # 2 covariates + intercept + assert regressor.weights_.shape[1] == n_features + + def test_cross_validate_false(self): + """ + Test CovariateRegressor with cross_validate=False. + + Scenario: Disable cross-validation to test the regressor with + standard non-cross-validated fitting. + """ + np.random.seed(42) + n_samples, n_features = 50, 10 + + X = np.random.randn(n_samples, n_features) + covariate = np.random.randn(n_samples, 1) + + regressor = CovariateRegressor( + covariate=covariate, X_full=X, cross_validate=False + ) + + X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + assert X_residuals.shape == X_test.shape + + def test_with_missing_values(self): + """ + Test CovariateRegressor handles missing values (NaN) correctly. + + Edge case: Data contains NaN values in both features and covariates. + The regressor should handle these appropriately without introducing + NaN values in the output. + """ + np.random.seed(42) + n_samples, n_features = 100, 15 + + X = np.random.randn(n_samples, n_features) + covariate = np.random.randn(n_samples, 1) + + # Introduce some NaN values + X[10:15, 5] = np.nan + covariate[20:25] = np.nan + + regressor = CovariateRegressor(covariate=covariate, X_full=X) + + X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + # Should not contain NaN values + assert not np.isnan(X_residuals).any() + assert X_residuals.shape == X_test.shape + + def test_no_intercept(self): + """ + Test CovariateRegressor with stack_intercept=False. + + Edge case: Disable intercept term in the regression model. + Weights should only contain the covariate coefficients, no intercept. + """ + np.random.seed(42) + n_samples, n_features = 50, 8 + + X = np.random.randn(n_samples, n_features) + covariate = np.random.randn(n_samples, 1) + + regressor = CovariateRegressor( + covariate=covariate, X_full=X, stack_intercept=False + ) + + X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + # Check that weights shape reflects no intercept + assert regressor.weights_.shape[0] == 1 # Only covariate, no intercept + assert X_residuals.shape == X_test.shape + + def test_precise_vs_hash_methods(self): + """ + Test that find_subset_indices produces consistent results + across different methods (precise and hash). + + Scenario: Verify that both methods for finding subset indices + return the same results. + """ + np.random.seed(42) + X_full = np.random.rand(50, 10) + X_subset = X_full[10:20] + + indices_precise = find_subset_indices(X_full, X_subset, method="precise") + indices_hash = find_subset_indices(X_full, X_subset, method="hash") + + np.testing.assert_array_equal(indices_precise, indices_hash) + + def test_pipeline_integration(self): + """ + Test CovariateRegressor with a preprocessing pipeline. + + Scenario: Use StandardScaler as a preprocessing pipeline before + covariate regression to test integration with sklearn transformers. + """ + np.random.seed(42) + n_samples, n_features = 100, 10 + + X = np.random.randn(n_samples, n_features) + covariate = np.random.randn(n_samples, 1) + pipeline = StandardScaler() + + regressor = CovariateRegressor(covariate=covariate, X_full=X, pipeline=pipeline) + + X_train, X_test = train_test_split(X, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + assert X_residuals.shape == X_test.shape + + def test_unique_id_column_removal(self): + """ + Test CovariateRegressor correctly removes unique ID column. + + Edge case: Data includes a unique ID column that should be excluded + from regression analysis. Verify that the output has one fewer + column than the input. + """ + np.random.seed(42) + n_samples, n_features = 50, 10 + + # Add a unique ID column at index 0 + X = np.random.randn(n_samples, n_features) + ids = np.arange(n_samples).reshape(-1, 1) + X_with_id = np.hstack([ids, X]) + + covariate = np.random.randn(n_samples, 1) + + regressor = CovariateRegressor( + covariate=covariate, X_full=X_with_id, unique_id_col_index=0 + ) + + X_train, X_test = train_test_split(X_with_id, test_size=0.3, random_state=42) + regressor.fit(X_train) + X_residuals = regressor.transform(X_test) + + # Output should have one less column (ID removed) + assert X_residuals.shape[1] == n_features