From 6c87b1f976edd940bd9f15881e8492d1a44bf5b8 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 14 Mar 2026 12:47:55 -0700 Subject: [PATCH] Sequential share-of-remainder imputation for SS sub-components MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of predicting all 4 SS sub-components in the main QRF batch and then normalizing them to sum to total social_security, predict shares of the remaining total sequentially: 1. retirement_share = retirement / total → retirement = share × total 2. disability_share = disability / (total − retirement) 3. survivors_share = survivors / (total − retirement − disability) 4. dependents = remainder Each QRF predicts a fraction in [0, 1], so sub-components are guaranteed non-negative and sum to the total by construction — no post-hoc normalization or fallback shares needed. Co-Authored-By: Claude Opus 4.6 --- changelog.d/sequential-ss-shares.changed.md | 1 + .../datasets/cps/extended_cps.py | 187 +++++++++++++----- .../tests/test_extended_cps.py | 145 ++++++++------ 3 files changed, 225 insertions(+), 108 deletions(-) create mode 100644 changelog.d/sequential-ss-shares.changed.md diff --git a/changelog.d/sequential-ss-shares.changed.md b/changelog.d/sequential-ss-shares.changed.md new file mode 100644 index 00000000..beff16c3 --- /dev/null +++ b/changelog.d/sequential-ss-shares.changed.md @@ -0,0 +1 @@ +Use sequential share-of-remainder imputation for SS sub-components. diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index f38d5746..26400e5f 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -35,11 +35,9 @@ "traditional_ira_contributions", "roth_ira_contributions", "self_employed_pension_contributions", - # Social Security sub-components - "social_security_retirement", - "social_security_disability", - "social_security_dependents", - "social_security_survivors", + # Social Security sub-components are handled separately via + # sequential share-of-remainder imputation -- see + # _impute_ss_subcomponents_sequential(). # Transfer income "unemployment_compensation", "tanf_reported", @@ -149,8 +147,13 @@ def _impute_cps_only_variables( ) # Load original (non-doubled) CPS for training data. + # Include SS sub-component columns for sequential imputation + # even though they're not in CPS_ONLY_IMPUTED_VARIABLES. + ss_train_cols = [v for v in SS_SUBCOMPONENT_SEQUENCE if v not in valid_outputs] cps_sim = Microsimulation(dataset=dataset_path) - X_train = cps_sim.calculate_dataframe(all_predictors + valid_outputs) + X_train = cps_sim.calculate_dataframe( + all_predictors + valid_outputs + ss_train_cols + ) available_outputs = [col for col in valid_outputs if col in X_train.columns] missing_outputs = [col for col in valid_outputs if col not in X_train.columns] @@ -198,9 +201,32 @@ def _impute_cps_only_variables( for var in missing_outputs: predictions[var] = 0 - # Apply domain constraints to retirement and SS variables. + # Apply domain constraints to retirement variables. predictions = _apply_post_processing(predictions, X_test, time_period, data) + # SS sub-components: sequential share-of-remainder imputation. + # Each stage predicts "share of what's left", so the four + # components sum to total social_security by construction. + ss_vars_in_train = [v for v in SS_SUBCOMPONENT_SEQUENCE if v in X_train.columns] + if len(ss_vars_in_train) == len(SS_SUBCOMPONENT_SEQUENCE): + n_half = len(data["person_id"][time_period]) // 2 + total_ss = data["social_security"][time_period][n_half:] + logger.info("Imputing SS sub-components via sequential shares") + ss_predictions = _impute_ss_subcomponents_sequential( + X_train=X_train, + X_test=X_test, + total_ss=total_ss, + predictors=all_predictors, + ) + for col in SS_SUBCOMPONENT_SEQUENCE: + predictions[col] = ss_predictions[col].values + else: + logger.warning( + "SS sub-component vars missing from CPS training data, " + "skipping sequential imputation: %s", + set(SS_SUBCOMPONENT_SEQUENCE) - set(ss_vars_in_train), + ) + logger.info( "Stage-2 CPS-only imputation took %.2fs total", time.time() - total_start, @@ -259,38 +285,6 @@ def apply_retirement_constraints(predictions, X_test, time_period): return result -def reconcile_ss_subcomponents(predictions, total_ss): - """Normalize Social Security sub-components to sum to total. - - Args: - predictions: DataFrame with columns for each SS - sub-component (retirement, disability, dependents, - survivors). - total_ss: numpy array of total social_security per record. - - Returns: - DataFrame with reconciled dollar values. - """ - values = np.maximum(predictions.values, 0) - row_sums = values.sum(axis=1) - positive_mask = total_ss > 0 - - shares = np.zeros_like(values) - nonzero_rows = row_sums > 0 - both = positive_mask & nonzero_rows - shares[both] = values[both] / row_sums[both, np.newaxis] - # If row_sum == 0 but total_ss > 0, distribute equally. - equal_rows = positive_mask & ~nonzero_rows - shares[equal_rows] = 1.0 / values.shape[1] - - out = np.where( - positive_mask[:, np.newaxis], - shares * total_ss[:, np.newaxis], - 0.0, - ) - return pd.DataFrame(out, columns=predictions.columns) - - _RETIREMENT_VARS = { "traditional_401k_contributions", "roth_401k_contributions", @@ -299,16 +293,112 @@ def reconcile_ss_subcomponents(predictions, total_ss): "self_employed_pension_contributions", } -_SS_SUBCOMPONENT_VARS = { +# Ordered largest-to-smallest so early predictions carry +# the most signal and the smallest component absorbs rounding. +SS_SUBCOMPONENT_SEQUENCE = [ "social_security_retirement", + "social_security_survivors", "social_security_disability", "social_security_dependents", - "social_security_survivors", -} +] + + +def _impute_ss_subcomponents_sequential(X_train, X_test, total_ss, predictors): + """Impute SS sub-components via sequential share-of-remainder. + + Instead of predicting all four dollar amounts and normalizing, + we predict *shares of the remaining total* sequentially: + + 1. retirement_share = retirement / total (predict in [0,1]) + retirement = retirement_share * total + 2. disability_share = disability / (total - retirement) + disability = disability_share * remaining + 3. survivors_share = survivors / (total - retirement - disability) + survivors = survivors_share * remaining + 4. dependents = remaining (whatever is left) + + This guarantees the four components sum to ``total_ss`` by + construction with no post-hoc normalization needed. + + Args: + X_train: CPS training data with predictors + raw SS + sub-component columns. + X_test: PUF clone test data with predictors. + total_ss: 1-D array of total social_security per PUF clone. + predictors: list of predictor column names. + + Returns: + DataFrame with one column per SS sub-component (dollar values). + """ + from microimpute.models.qrf import QRF + + n = len(X_test) + results = {var: np.zeros(n) for var in SS_SUBCOMPONENT_SEQUENCE} + has_ss = total_ss > 0 + + if not has_ss.any(): + return pd.DataFrame(results, index=X_test.index) + + remaining_train = X_train[SS_SUBCOMPONENT_SEQUENCE].sum(axis=1).values + remaining_test = total_ss.copy() + + # Augment predictors with running remaining-total so each + # stage conditions on what's left. + X_train_aug = X_train[predictors].copy() + X_test_aug = X_test[predictors].copy() + + for i, var in enumerate(SS_SUBCOMPONENT_SEQUENCE[:-1]): + share_col = f"_share_{var}" + + # Compute training shares: var / remaining, clipped to [0, 1]. + raw_train = X_train[var].values + safe_remaining = np.where(remaining_train > 0, remaining_train, 1.0) + train_share = np.clip(raw_train / safe_remaining, 0, 1) + X_train_aug[share_col] = train_share + X_train_aug["_ss_remaining"] = remaining_train + + X_test_aug["_ss_remaining"] = remaining_test + + qrf = QRF( + log_level="WARNING", + memory_efficient=True, + max_train_samples=5000, + ) + preds = qrf.fit_predict( + X_train=X_train_aug[predictors + ["_ss_remaining", share_col]], + X_test=X_test_aug[predictors + ["_ss_remaining"]], + predictors=predictors + ["_ss_remaining"], + imputed_variables=[share_col], + n_jobs=1, + ) + + share = np.clip(preds[share_col].values, 0, 1) + dollar = share * remaining_test + results[var] = np.where(has_ss, dollar, 0) + + # Update remaining totals for next stage. + remaining_train = remaining_train - raw_train + remaining_train = np.maximum(remaining_train, 0) + remaining_test = remaining_test - dollar + remaining_test = np.maximum(remaining_test, 0) + + # Last component is the remainder — no QRF needed. + last_var = SS_SUBCOMPONENT_SEQUENCE[-1] + results[last_var] = np.where(has_ss, remaining_test, 0) + + logger.info( + "SS sequential imputation: shares %.1f%% / %.1f%% / %.1f%% / %.1f%%", + *( + np.sum(results[v][has_ss]) / np.sum(total_ss[has_ss]) * 100 + for v in SS_SUBCOMPONENT_SEQUENCE + ), + ) + + return pd.DataFrame(results, index=X_test.index) def _apply_post_processing(predictions, X_test, time_period, data): - """Apply retirement constraints and SS reconciliation.""" + """Apply retirement constraints (SS handled separately).""" ret_cols = [c for c in predictions.columns if c in _RETIREMENT_VARS] if ret_cols: constrained = apply_retirement_constraints( @@ -317,14 +407,6 @@ def _apply_post_processing(predictions, X_test, time_period, data): for col in ret_cols: predictions[col] = constrained[col] - ss_cols = [c for c in predictions.columns if c in _SS_SUBCOMPONENT_VARS] - if ss_cols: - n_half = len(data["person_id"][time_period]) // 2 - total_ss = data["social_security"][time_period][n_half:] - reconciled = reconcile_ss_subcomponents(predictions[ss_cols], total_ss) - for col in ss_cols: - predictions[col] = reconciled[col] - return predictions @@ -363,7 +445,8 @@ def _splice_cps_only_predictions( if id_var in data: entity_half_lengths[entity_key] = len(data[id_var][time_period]) // 2 - for var in CPS_ONLY_IMPUTED_VARIABLES: + splice_vars = list(CPS_ONLY_IMPUTED_VARIABLES) + list(SS_SUBCOMPONENT_SEQUENCE) + for var in splice_vars: if var not in data or var not in predictions.columns: continue diff --git a/policyengine_us_data/tests/test_extended_cps.py b/policyengine_us_data/tests/test_extended_cps.py index 5ddf4692..91335fe1 100644 --- a/policyengine_us_data/tests/test_extended_cps.py +++ b/policyengine_us_data/tests/test_extended_cps.py @@ -18,8 +18,9 @@ from policyengine_us_data.datasets.cps.extended_cps import ( CPS_ONLY_IMPUTED_VARIABLES, CPS_STAGE2_INCOME_PREDICTORS, + SS_SUBCOMPONENT_SEQUENCE, + _impute_ss_subcomponents_sequential, apply_retirement_constraints, - reconcile_ss_subcomponents, ) @@ -74,17 +75,14 @@ def test_retirement_contributions_in_cps_only(self): f"Retirement contribution vars missing from CPS_ONLY: {missing}" ) - def test_ss_subcomponents_in_cps_only(self): - """All 4 SS sub-component vars should be in CPS_ONLY.""" - expected = { - "social_security_retirement", - "social_security_disability", - "social_security_dependents", - "social_security_survivors", - } - missing = expected - set(CPS_ONLY_IMPUTED_VARIABLES) - assert missing == set(), ( - f"SS sub-component vars missing from CPS_ONLY: {missing}" + def test_ss_subcomponents_not_in_cps_only(self): + """SS sub-components are handled by sequential imputation, + not the main CPS_ONLY batch.""" + ss_vars = set(SS_SUBCOMPONENT_SEQUENCE) + overlap = ss_vars & set(CPS_ONLY_IMPUTED_VARIABLES) + assert overlap == set(), ( + f"SS sub-component vars should NOT be in CPS_ONLY " + f"(handled sequentially): {overlap}" ) def test_nonexistent_vars_not_in_cps_only(self): @@ -190,65 +188,100 @@ def test_se_pension_zeroed_without_se_income( ).all(), "SE pension should be zero without SE income" -class TestSSReconciliation: - """Post-processing SS normalization ensures sub-components sum to total.""" +class TestSequentialSSImputation: + """Sequential share-of-remainder SS imputation guarantees + sub-components sum to total by construction.""" - def test_subcomponents_sum_to_total(self): - predictions = pd.DataFrame( + @pytest.fixture + def synthetic_ss_data(self): + """Create synthetic CPS-like training data with correlated + SS sub-components that sum to total social_security.""" + rng = np.random.default_rng(42) + n = 500 + age = rng.normal(68, 10, n).clip(18, 95) + is_male = rng.binomial(1, 0.45, n).astype(float) + emp_income = rng.exponential(20000, n) * (age < 65) + se_income = rng.exponential(5000, n) * (rng.random(n) < 0.1) + total_ss = rng.exponential(15000, n) * (age > 50) + + # Split total into sub-components with realistic shares. + ret_share = rng.beta(8, 3, n) # ~73% + rem1 = 1 - ret_share + dis_share = rng.beta(2, 8, n) * rem1 + rem2 = rem1 - dis_share + sur_share = rng.beta(2, 7, n) * rem2 + dep = rem2 - sur_share * rem2 + + X_train = pd.DataFrame( { - "social_security_retirement": [0.6, 0.0, 0.8, 0.3], - "social_security_disability": [0.3, 0.0, 0.1, 0.5], - "social_security_dependents": [0.05, 0.0, 0.05, 0.1], - "social_security_survivors": [0.05, 0.0, 0.05, 0.1], + "age": age, + "is_male": is_male, + "employment_income": emp_income, + "self_employment_income": se_income, + "social_security": total_ss, + "social_security_retirement": ret_share * total_ss, + "social_security_survivors": sur_share * rem2 * total_ss, + "social_security_disability": dis_share * total_ss, + "social_security_dependents": dep * total_ss, } ) - total_ss = np.array([20000, 0, 15000, 10000]) - result = reconcile_ss_subcomponents(predictions, total_ss) + + predictors = [ + "age", + "is_male", + "employment_income", + "self_employment_income", + "social_security", + ] + return X_train, predictors + + def test_subcomponents_sum_to_total(self, synthetic_ss_data): + X_train, predictors = synthetic_ss_data + X_test = X_train[predictors].iloc[:50].copy() + total_ss = X_test["social_security"].values + + result = _impute_ss_subcomponents_sequential( + X_train, X_test, total_ss, predictors + ) sums = sum(result[col].values for col in result.columns) np.testing.assert_allclose(sums, total_ss, atol=0.01) - def test_zero_ss_zeroes_all_subcomponents(self): - predictions = pd.DataFrame( - { - "social_security_retirement": [0.5, 0.7], - "social_security_disability": [0.3, 0.2], - "social_security_dependents": [0.1, 0.05], - "social_security_survivors": [0.1, 0.05], - } + def test_zero_ss_zeroes_all_subcomponents(self, synthetic_ss_data): + X_train, predictors = synthetic_ss_data + X_test = X_train[predictors].iloc[:10].copy() + total_ss = np.zeros(10) + + result = _impute_ss_subcomponents_sequential( + X_train, X_test, total_ss, predictors ) - total_ss = np.array([0, 0]) - result = reconcile_ss_subcomponents(predictions, total_ss) for col in result.columns: assert (result[col].values == 0).all(), f"{col} should be zero" - def test_shares_are_non_negative(self): - predictions = pd.DataFrame( - { - "social_security_retirement": [-0.5, 0.8], - "social_security_disability": [1.2, 0.2], - "social_security_dependents": [0.1, 0.0], - "social_security_survivors": [0.2, 0.0], - } + def test_all_components_non_negative(self, synthetic_ss_data): + X_train, predictors = synthetic_ss_data + X_test = X_train[predictors].iloc[:50].copy() + total_ss = X_test["social_security"].values + + result = _impute_ss_subcomponents_sequential( + X_train, X_test, total_ss, predictors ) - total_ss = np.array([10000, 5000]) - result = reconcile_ss_subcomponents(predictions, total_ss) for col in result.columns: - assert (result[col].values >= 0).all(), f"{col} has negative values" + assert (result[col].values >= -0.01).all(), f"{col} has negative values" - def test_single_component_gets_full_total(self): - predictions = pd.DataFrame( - { - "social_security_retirement": [1.0], - "social_security_disability": [0.0], - "social_security_dependents": [0.0], - "social_security_survivors": [0.0], - } - ) - total_ss = np.array([25000]) - result = reconcile_ss_subcomponents(predictions, total_ss) - assert result["social_security_retirement"].values[0] == pytest.approx( - 25000, abs=0.01 + def test_retirement_dominates(self, synthetic_ss_data): + """Retirement should get the largest share on average.""" + X_train, predictors = synthetic_ss_data + X_test = X_train[predictors].iloc[:100].copy() + total_ss = X_test["social_security"].values + + result = _impute_ss_subcomponents_sequential( + X_train, X_test, total_ss, predictors ) + has_ss = total_ss > 0 + ret_total = result["social_security_retirement"].values[has_ss].sum() + ss_total = total_ss[has_ss].sum() + ret_share = ret_total / ss_total + assert ret_share > 0.5, f"Retirement share {ret_share:.3f} should be > 0.5" class TestSequentialQRF: