diff --git a/.github/workflows/local_area_publish.yaml b/.github/workflows/local_area_publish.yaml index 44675e63..a7fcae7f 100644 --- a/.github/workflows/local_area_publish.yaml +++ b/.github/workflows/local_area_publish.yaml @@ -1,14 +1,15 @@ name: Publish Local Area H5 Files on: - push: - branches: [main] - paths: - - 'policyengine_us_data/datasets/cps/local_area_calibration/**' - - '.github/workflows/local_area_publish.yaml' - - 'modal_app/**' - repository_dispatch: - types: [calibration-updated] + # TEMPORARILY DISABLED - re-enable push/repository_dispatch triggers when ready + # push: + # branches: [main] + # paths: + # - 'policyengine_us_data/datasets/cps/local_area_calibration/**' + # - '.github/workflows/local_area_publish.yaml' + # - 'modal_app/**' + # repository_dispatch: + # types: [calibration-updated] workflow_dispatch: inputs: num_workers: @@ -55,7 +56,7 @@ jobs: SKIP_UPLOAD="${{ github.event.inputs.skip_upload || 'false' }}" BRANCH="${{ github.head_ref || github.ref_name }}" - CMD="modal run modal_app/local_area.py --branch=${BRANCH} --num-workers=${NUM_WORKERS}" + CMD="modal run modal_app/local_area.py::main --branch=${BRANCH} --num-workers=${NUM_WORKERS}" if [ "$SKIP_UPLOAD" = "true" ]; then CMD="${CMD} --skip-upload" diff --git a/changelog.d/microimpute-fit-predict.changed.md b/changelog.d/microimpute-fit-predict.changed.md new file mode 100644 index 00000000..2dbf75bc --- /dev/null +++ b/changelog.d/microimpute-fit-predict.changed.md @@ -0,0 +1 @@ +Replaced batched QRF imputation with single sequential QRF via microimpute's fit_predict() API, preserving full covariance across all 85+ PUF income variables. diff --git a/policyengine_us_data/calibration/puf_impute.py b/policyengine_us_data/calibration/puf_impute.py index bf835583..445bd758 100644 --- a/policyengine_us_data/calibration/puf_impute.py +++ b/policyengine_us_data/calibration/puf_impute.py @@ -613,24 +613,18 @@ def _impute_weeks_unemployed( del cps_sim - qrf = QRF(log_level="INFO", memory_efficient=True) - # Subsample to 5000 for QRF training speed: CPS has ~200K person - # records; QRF fitting is O(n log n) per tree, so 5K keeps - # training under ~30s while retaining adequate distributional - # coverage. Empirical testing showed diminishing accuracy gains - # beyond ~5K–10K records for these predictors. - if len(X_train) > 5000: - X_train_sampled = X_train.sample(n=5000, random_state=42) - else: - X_train_sampled = X_train - - fitted = qrf.fit( - X_train=X_train_sampled, + qrf = QRF( + log_level="INFO", + memory_efficient=True, + max_train_samples=5000, + ) + predictions = qrf.fit_predict( + X_train=X_train, + X_test=X_test, predictors=WEEKS_PREDICTORS, imputed_variables=["weeks_unemployed"], n_jobs=1, ) - predictions = fitted.predict(X_test=X_test) imputed_weeks = predictions["weeks_unemployed"].values imputed_weeks = np.clip(imputed_weeks, 0, 52) @@ -647,8 +641,6 @@ def _impute_weeks_unemployed( (imputed_weeks[imputed_weeks > 0].mean() if (imputed_weeks > 0).any() else 0), ) - del fitted, predictions - gc.collect() return imputed_weeks @@ -706,23 +698,19 @@ def _impute_retirement_contributions( del cps_sim - # Subsample to 5000 for speed (see comment in - # _impute_weeks_unemployed for rationale). - if len(X_train) > 5000: - X_train_sampled = X_train.sample(n=5000, random_state=42) - else: - X_train_sampled = X_train - - # Train QRF - qrf = QRF(log_level="INFO", memory_efficient=True) + qrf = QRF( + log_level="INFO", + memory_efficient=True, + max_train_samples=5000, + ) try: - fitted = qrf.fit( - X_train=X_train_sampled, + predictions = qrf.fit_predict( + X_train=X_train, + X_test=X_test, predictors=RETIREMENT_PREDICTORS, imputed_variables=CPS_RETIREMENT_VARIABLES, n_jobs=1, ) - predictions = fitted.predict(X_test=X_test) except Exception: logger.warning( "QRF retirement imputation failed, returning zeros", @@ -779,8 +767,6 @@ def _impute_retirement_contributions( result["self_employed_pension_contributions"].mean(), ) - del fitted, predictions - gc.collect() return result @@ -849,13 +835,15 @@ def _run_qrf_imputation( X_test[pred] = data[pred][time_period].astype(np.float32) logger.info("Imputing %d PUF variables (full)", len(IMPUTED_VARIABLES)) - y_full = _batch_qrf(X_train_full, X_test, DEMOGRAPHIC_PREDICTORS, IMPUTED_VARIABLES) + y_full = _sequential_qrf( + X_train_full, X_test, DEMOGRAPHIC_PREDICTORS, IMPUTED_VARIABLES + ) logger.info( "Imputing %d PUF variables (override)", len(OVERRIDDEN_IMPUTED_VARIABLES), ) - y_override = _batch_qrf( + y_override = _sequential_qrf( X_train_override, X_test, DEMOGRAPHIC_PREDICTORS, @@ -896,70 +884,47 @@ def _stratified_subsample_index( return selected -def _batch_qrf( +def _sequential_qrf( X_train: pd.DataFrame, X_test: pd.DataFrame, predictors: List[str], output_vars: List[str], - batch_size: int = 10, ) -> Dict[str, np.ndarray]: - """Run QRF in batches to control memory. + """Run a single sequential QRF preserving covariance. + + Uses microimpute's fit_predict() which handles missing variable + detection, gc cleanup, and zero-fill internally. Each variable + is conditioned on all previously imputed variables, preserving + the full joint distribution. Args: X_train: Training data with predictors + output vars. X_test: Test data with predictors only. predictors: Predictor column names. output_vars: Output variable names to impute. - batch_size: Variables per batch. Returns: Dict mapping variable name to imputed values. """ from microimpute.models.qrf import QRF - available = [c for c in output_vars if c in X_train.columns] - missing = [c for c in output_vars if c not in X_train.columns] + qrf = QRF( + log_level="INFO", + memory_efficient=True, + ) + predictions = qrf.fit_predict( + X_train=X_train, + X_test=X_test, + predictors=predictors, + imputed_variables=output_vars, + n_jobs=1, + ) + result = {var: predictions[var].values for var in predictions.columns} + missing = set(output_vars) - set(result) if missing: - logger.warning( - "%d variables missing from training: %s", - len(missing), - missing[:5], + raise ValueError( + f"{len(missing)} variables requested but not returned " + f"by fit_predict(): {sorted(missing)[:10]}" ) - - result = {} - - for batch_start in range(0, len(available), batch_size): - batch_vars = available[batch_start : batch_start + batch_size] - - gc.collect() - - qrf = QRF( - log_level="INFO", - memory_efficient=True, - batch_size=10, - cleanup_interval=5, - ) - - batch_X_train = X_train[predictors + batch_vars].copy() - - fitted = qrf.fit( - X_train=batch_X_train, - predictors=predictors, - imputed_variables=batch_vars, - n_jobs=1, - ) - - predictions = fitted.predict(X_test=X_test) - - for var in batch_vars: - result[var] = predictions[var].values - - del fitted, predictions, batch_X_train - gc.collect() - - n_test = len(X_test) - for var in missing: - result[var] = np.zeros(n_test) - return result diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 6775fa16..418d7396 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -501,8 +501,8 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int): # Assign CPS variables. cps["employment_income"] = person.WSAL_VAL - cps["weekly_hours_worked"] = person.HRSWK * person.WKSWORK / 52 - cps["hours_worked_last_week"] = person.A_HRS1 * person.WKSWORK / 52 + cps["weekly_hours_worked"] = person.HRSWK + cps["hours_worked_last_week"] = person.A_HRS1 cps["taxable_interest_income"] = person.INT_VAL * (p["taxable_interest_fraction"]) cps["tax_exempt_interest_income"] = person.INT_VAL * ( diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 4d24810f..35147ff2 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -50,12 +50,49 @@ def generate(self): dataset_path=str(self.cps.file_path), ) + new_data = self._rename_imputed_to_inputs(new_data) new_data = self._drop_formula_variables(new_data) self.save_dataset(new_data) - # Variables with formulas that must still be stored (e.g. IDs - # needed by the dataset loader before formulas can run). - _KEEP_FORMULA_VARS = {"person_id"} + @classmethod + def _rename_imputed_to_inputs(cls, data): + """Rename QRF-imputed formula vars to their leaf inputs. + + The QRF imputes formula-level aggregates (e.g. + taxable_pension_income) but the engine needs leaf inputs + (e.g. taxable_private_pension_income) so formulas work. + """ + for formula_var, input_var in cls._IMPUTED_TO_INPUT.items(): + if formula_var in data: + logger.info( + "Renaming %s -> %s (leaf input)", + formula_var, + input_var, + ) + data[input_var] = data.pop(formula_var) + return data + + # Variables with formulas/adds that must still be stored. + # Includes IDs needed before formulas run and tax-unit-level + # QRF-imputed vars that can't be renamed to person-level leaves + # due to entity shape mismatch. + _KEEP_FORMULA_VARS = { + "person_id", + "interest_deduction", + "self_employed_pension_contribution_ald", + "self_employed_health_insurance_ald", + } + + # QRF imputes formula-level variables (e.g. taxable_pension_income) + # but we must store them under leaf input names so + # _drop_formula_variables doesn't discard them. The engine then + # recomputes the formula var from its adds. + # NOTE: only same-entity renames here; cross-entity vars + # (tax_unit -> person) go in _KEEP_FORMULA_VARS instead. + _IMPUTED_TO_INPUT = { + "taxable_pension_income": "taxable_private_pension_income", + "tax_exempt_pension_income": "tax_exempt_private_pension_income", + } @classmethod def _drop_formula_variables(cls, data): diff --git a/pyproject.toml b/pyproject.toml index 2085d93c..628315f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ dependencies = [ "tqdm>=4.60.0", "microdf_python>=1.2.1", "setuptools>=60", - "microimpute>=1.1.4", + "microimpute>=1.15.1", "pip-system-certs>=3.0", "google-cloud-storage>=2.0.0", "google-auth>=2.0.0", diff --git a/uv.lock b/uv.lock index 0290432b..741ec1e9 100644 --- a/uv.lock +++ b/uv.lock @@ -610,6 +610,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f8/0a/a3871375c7b9727edaeeea994bfff7c63ff7804c9829c19309ba2e058807/greenlet-3.3.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:b01548f6e0b9e9784a2c99c5651e5dc89ffcbe870bc5fb2e5ef864e9cc6b5dcb", size = 276379, upload-time = "2025-12-04T14:23:30.498Z" }, { url = "https://files.pythonhosted.org/packages/43/ab/7ebfe34dce8b87be0d11dae91acbf76f7b8246bf9d6b319c741f99fa59c6/greenlet-3.3.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:349345b770dc88f81506c6861d22a6ccd422207829d2c854ae2af8025af303e3", size = 597294, upload-time = "2025-12-04T14:50:06.847Z" }, { url = "https://files.pythonhosted.org/packages/a4/39/f1c8da50024feecd0793dbd5e08f526809b8ab5609224a2da40aad3a7641/greenlet-3.3.0-cp312-cp312-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:e8e18ed6995e9e2c0b4ed264d2cf89260ab3ac7e13555b8032b25a74c6d18655", size = 607742, upload-time = "2025-12-04T14:57:42.349Z" }, + { url = "https://files.pythonhosted.org/packages/77/cb/43692bcd5f7a0da6ec0ec6d58ee7cddb606d055ce94a62ac9b1aa481e969/greenlet-3.3.0-cp312-cp312-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:c024b1e5696626890038e34f76140ed1daf858e37496d33f2af57f06189e70d7", size = 622297, upload-time = "2025-12-04T15:07:13.552Z" }, { url = "https://files.pythonhosted.org/packages/75/b0/6bde0b1011a60782108c01de5913c588cf51a839174538d266de15e4bf4d/greenlet-3.3.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:047ab3df20ede6a57c35c14bf5200fcf04039d50f908270d3f9a7a82064f543b", size = 609885, upload-time = "2025-12-04T14:26:02.368Z" }, { url = "https://files.pythonhosted.org/packages/49/0e/49b46ac39f931f59f987b7cd9f34bfec8ef81d2a1e6e00682f55be5de9f4/greenlet-3.3.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2d9ad37fc657b1102ec880e637cccf20191581f75c64087a549e66c57e1ceb53", size = 1567424, upload-time = "2025-12-04T15:04:23.757Z" }, { url = "https://files.pythonhosted.org/packages/05/f5/49a9ac2dff7f10091935def9165c90236d8f175afb27cbed38fb1d61ab6b/greenlet-3.3.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:83cd0e36932e0e7f36a64b732a6f60c2fc2df28c351bae79fbaf4f8092fe7614", size = 1636017, upload-time = "2025-12-04T14:27:29.688Z" }, @@ -617,6 +618,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/2f/28592176381b9ab2cafa12829ba7b472d177f3acc35d8fbcf3673d966fff/greenlet-3.3.0-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:a1e41a81c7e2825822f4e068c48cb2196002362619e2d70b148f20a831c00739", size = 275140, upload-time = "2025-12-04T14:23:01.282Z" }, { url = "https://files.pythonhosted.org/packages/2c/80/fbe937bf81e9fca98c981fe499e59a3f45df2a04da0baa5c2be0dca0d329/greenlet-3.3.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9f515a47d02da4d30caaa85b69474cec77b7929b2e936ff7fb853d42f4bf8808", size = 599219, upload-time = "2025-12-04T14:50:08.309Z" }, { url = "https://files.pythonhosted.org/packages/c2/ff/7c985128f0514271b8268476af89aee6866df5eec04ac17dcfbc676213df/greenlet-3.3.0-cp313-cp313-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:7d2d9fd66bfadf230b385fdc90426fcd6eb64db54b40c495b72ac0feb5766c54", size = 610211, upload-time = "2025-12-04T14:57:43.968Z" }, + { url = "https://files.pythonhosted.org/packages/79/07/c47a82d881319ec18a4510bb30463ed6891f2ad2c1901ed5ec23d3de351f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:30a6e28487a790417d036088b3bcb3f3ac7d8babaa7d0139edbaddebf3af9492", size = 624311, upload-time = "2025-12-04T15:07:14.697Z" }, { url = "https://files.pythonhosted.org/packages/fd/8e/424b8c6e78bd9837d14ff7df01a9829fc883ba2ab4ea787d4f848435f23f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:087ea5e004437321508a8d6f20efc4cfec5e3c30118e1417ea96ed1d93950527", size = 612833, upload-time = "2025-12-04T14:26:03.669Z" }, { url = "https://files.pythonhosted.org/packages/b5/ba/56699ff9b7c76ca12f1cdc27a886d0f81f2189c3455ff9f65246780f713d/greenlet-3.3.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ab97cf74045343f6c60a39913fa59710e4bd26a536ce7ab2397adf8b27e67c39", size = 1567256, upload-time = "2025-12-04T15:04:25.276Z" }, { url = "https://files.pythonhosted.org/packages/1e/37/f31136132967982d698c71a281a8901daf1a8fbab935dce7c0cf15f942cc/greenlet-3.3.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:5375d2e23184629112ca1ea89a53389dddbffcf417dad40125713d88eb5f96e8", size = 1636483, upload-time = "2025-12-04T14:27:30.804Z" }, @@ -1159,7 +1161,7 @@ wheels = [ [[package]] name = "microimpute" -version = "1.12.0" +version = "1.15.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "joblib" }, @@ -1176,9 +1178,9 @@ dependencies = [ { name = "statsmodels" }, { name = "tqdm" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/8e/f1/b3e407ddadea69198b36f87b855416684d99631a1f62fb952ceb820f755c/microimpute-1.12.0.tar.gz", hash = "sha256:f8554b2f40d0d11b079860e7b32af04acb7910a8632dc5a6a8c469990c4aa225", size = 125271, upload-time = "2025-12-11T14:05:13.249Z" } +sdist = { url = "https://files.pythonhosted.org/packages/97/17/d621d4ed40e0afac6f1a2c4dea423783576613820d1460ae30d65c48309e/microimpute-1.15.1.tar.gz", hash = "sha256:af409525d475efeb8c8526e9630834c4f16563e15cd42665117d2a1397fcf404", size = 128669, upload-time = "2026-03-09T15:59:33.885Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/53/5f/6fb8a1058c6e06670f6cea56b49300cf169e685e254ff2455a97afc3f64b/microimpute-1.12.0-py3-none-any.whl", hash = "sha256:76433c4927a2140ab217e1da503b1e5c2fff03c4b6dfd940d8d7d5ccfc2df9fd", size = 108702, upload-time = "2025-12-11T14:05:12.005Z" }, + { url = "https://files.pythonhosted.org/packages/42/f1/1d80dbb8cc9e85962524a4233cfe42ac1a78e6f2cc0ca479ed1817f6d8ae/microimpute-1.15.1-py3-none-any.whl", hash = "sha256:f5f2de91eeedea28ddae42d42757b558d6eb85c1a1fd6a9097b53e309f19369c", size = 111313, upload-time = "2026-03-09T15:59:32.553Z" }, ] [[package]] @@ -1869,7 +1871,7 @@ requires-dist = [ { name = "google-cloud-storage", specifier = ">=2.0.0" }, { name = "l0-python", marker = "extra == 'l0'" }, { name = "microdf-python", specifier = ">=1.2.1" }, - { name = "microimpute", specifier = ">=1.1.4" }, + { name = "microimpute", specifier = ">=1.15.1" }, { name = "openpyxl", specifier = ">=3.1.5" }, { name = "pandas", specifier = ">=2.3.1" }, { name = "pip-system-certs", specifier = ">=3.0" },