Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions .github/workflows/local_area_publish.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
name: Publish Local Area H5 Files

on:
push:
branches: [main]
paths:
- 'policyengine_us_data/datasets/cps/local_area_calibration/**'
- '.github/workflows/local_area_publish.yaml'
- 'modal_app/**'
repository_dispatch:
types: [calibration-updated]
# TEMPORARILY DISABLED - re-enable push/repository_dispatch triggers when ready
# push:
# branches: [main]
# paths:
# - 'policyengine_us_data/datasets/cps/local_area_calibration/**'
# - '.github/workflows/local_area_publish.yaml'
# - 'modal_app/**'
# repository_dispatch:
# types: [calibration-updated]
workflow_dispatch:
inputs:
num_workers:
Expand Down Expand Up @@ -55,7 +56,7 @@ jobs:
SKIP_UPLOAD="${{ github.event.inputs.skip_upload || 'false' }}"
BRANCH="${{ github.head_ref || github.ref_name }}"

CMD="modal run modal_app/local_area.py --branch=${BRANCH} --num-workers=${NUM_WORKERS}"
CMD="modal run modal_app/local_area.py::main --branch=${BRANCH} --num-workers=${NUM_WORKERS}"

if [ "$SKIP_UPLOAD" = "true" ]; then
CMD="${CMD} --skip-upload"
Expand Down
1 change: 1 addition & 0 deletions changelog.d/microimpute-fit-predict.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Replaced batched QRF imputation with single sequential QRF via microimpute's fit_predict() API, preserving full covariance across all 85+ PUF income variables.
121 changes: 43 additions & 78 deletions policyengine_us_data/calibration/puf_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,24 +613,18 @@ def _impute_weeks_unemployed(

del cps_sim

qrf = QRF(log_level="INFO", memory_efficient=True)
# Subsample to 5000 for QRF training speed: CPS has ~200K person
# records; QRF fitting is O(n log n) per tree, so 5K keeps
# training under ~30s while retaining adequate distributional
# coverage. Empirical testing showed diminishing accuracy gains
# beyond ~5K–10K records for these predictors.
if len(X_train) > 5000:
X_train_sampled = X_train.sample(n=5000, random_state=42)
else:
X_train_sampled = X_train

fitted = qrf.fit(
X_train=X_train_sampled,
qrf = QRF(
log_level="INFO",
memory_efficient=True,
max_train_samples=5000,
)
predictions = qrf.fit_predict(
X_train=X_train,
X_test=X_test,
predictors=WEEKS_PREDICTORS,
imputed_variables=["weeks_unemployed"],
n_jobs=1,
)
predictions = fitted.predict(X_test=X_test)
imputed_weeks = predictions["weeks_unemployed"].values

imputed_weeks = np.clip(imputed_weeks, 0, 52)
Expand All @@ -647,8 +641,6 @@ def _impute_weeks_unemployed(
(imputed_weeks[imputed_weeks > 0].mean() if (imputed_weeks > 0).any() else 0),
)

del fitted, predictions
gc.collect()
return imputed_weeks


Expand Down Expand Up @@ -706,23 +698,19 @@ def _impute_retirement_contributions(

del cps_sim

# Subsample to 5000 for speed (see comment in
# _impute_weeks_unemployed for rationale).
if len(X_train) > 5000:
X_train_sampled = X_train.sample(n=5000, random_state=42)
else:
X_train_sampled = X_train

# Train QRF
qrf = QRF(log_level="INFO", memory_efficient=True)
qrf = QRF(
log_level="INFO",
memory_efficient=True,
max_train_samples=5000,
)
try:
fitted = qrf.fit(
X_train=X_train_sampled,
predictions = qrf.fit_predict(
X_train=X_train,
X_test=X_test,
predictors=RETIREMENT_PREDICTORS,
imputed_variables=CPS_RETIREMENT_VARIABLES,
n_jobs=1,
)
predictions = fitted.predict(X_test=X_test)
except Exception:
logger.warning(
"QRF retirement imputation failed, returning zeros",
Expand Down Expand Up @@ -779,8 +767,6 @@ def _impute_retirement_contributions(
result["self_employed_pension_contributions"].mean(),
)

del fitted, predictions
gc.collect()
return result


Expand Down Expand Up @@ -849,13 +835,15 @@ def _run_qrf_imputation(
X_test[pred] = data[pred][time_period].astype(np.float32)

logger.info("Imputing %d PUF variables (full)", len(IMPUTED_VARIABLES))
y_full = _batch_qrf(X_train_full, X_test, DEMOGRAPHIC_PREDICTORS, IMPUTED_VARIABLES)
y_full = _sequential_qrf(
X_train_full, X_test, DEMOGRAPHIC_PREDICTORS, IMPUTED_VARIABLES
)

logger.info(
"Imputing %d PUF variables (override)",
len(OVERRIDDEN_IMPUTED_VARIABLES),
)
y_override = _batch_qrf(
y_override = _sequential_qrf(
X_train_override,
X_test,
DEMOGRAPHIC_PREDICTORS,
Expand Down Expand Up @@ -896,70 +884,47 @@ def _stratified_subsample_index(
return selected


def _batch_qrf(
def _sequential_qrf(
X_train: pd.DataFrame,
X_test: pd.DataFrame,
predictors: List[str],
output_vars: List[str],
batch_size: int = 10,
) -> Dict[str, np.ndarray]:
"""Run QRF in batches to control memory.
"""Run a single sequential QRF preserving covariance.

Uses microimpute's fit_predict() which handles missing variable
detection, gc cleanup, and zero-fill internally. Each variable
is conditioned on all previously imputed variables, preserving
the full joint distribution.

Args:
X_train: Training data with predictors + output vars.
X_test: Test data with predictors only.
predictors: Predictor column names.
output_vars: Output variable names to impute.
batch_size: Variables per batch.

Returns:
Dict mapping variable name to imputed values.
"""
from microimpute.models.qrf import QRF

available = [c for c in output_vars if c in X_train.columns]
missing = [c for c in output_vars if c not in X_train.columns]
qrf = QRF(
log_level="INFO",
memory_efficient=True,
)
predictions = qrf.fit_predict(
X_train=X_train,
X_test=X_test,
predictors=predictors,
imputed_variables=output_vars,
n_jobs=1,
)

result = {var: predictions[var].values for var in predictions.columns}
missing = set(output_vars) - set(result)
if missing:
logger.warning(
"%d variables missing from training: %s",
len(missing),
missing[:5],
raise ValueError(
f"{len(missing)} variables requested but not returned "
f"by fit_predict(): {sorted(missing)[:10]}"
)

result = {}

for batch_start in range(0, len(available), batch_size):
batch_vars = available[batch_start : batch_start + batch_size]

gc.collect()

qrf = QRF(
log_level="INFO",
memory_efficient=True,
batch_size=10,
cleanup_interval=5,
)

batch_X_train = X_train[predictors + batch_vars].copy()

fitted = qrf.fit(
X_train=batch_X_train,
predictors=predictors,
imputed_variables=batch_vars,
n_jobs=1,
)

predictions = fitted.predict(X_test=X_test)

for var in batch_vars:
result[var] = predictions[var].values

del fitted, predictions, batch_X_train
gc.collect()

n_test = len(X_test)
for var in missing:
result[var] = np.zeros(n_test)

return result
4 changes: 2 additions & 2 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ def add_personal_income_variables(cps: h5py.File, person: DataFrame, year: int):
# Assign CPS variables.
cps["employment_income"] = person.WSAL_VAL

cps["weekly_hours_worked"] = person.HRSWK * person.WKSWORK / 52
cps["hours_worked_last_week"] = person.A_HRS1 * person.WKSWORK / 52
cps["weekly_hours_worked"] = person.HRSWK
cps["hours_worked_last_week"] = person.A_HRS1

cps["taxable_interest_income"] = person.INT_VAL * (p["taxable_interest_fraction"])
cps["tax_exempt_interest_income"] = person.INT_VAL * (
Expand Down
43 changes: 40 additions & 3 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,49 @@ def generate(self):
dataset_path=str(self.cps.file_path),
)

new_data = self._rename_imputed_to_inputs(new_data)
new_data = self._drop_formula_variables(new_data)
self.save_dataset(new_data)

# Variables with formulas that must still be stored (e.g. IDs
# needed by the dataset loader before formulas can run).
_KEEP_FORMULA_VARS = {"person_id"}
@classmethod
def _rename_imputed_to_inputs(cls, data):
"""Rename QRF-imputed formula vars to their leaf inputs.

The QRF imputes formula-level aggregates (e.g.
taxable_pension_income) but the engine needs leaf inputs
(e.g. taxable_private_pension_income) so formulas work.
"""
for formula_var, input_var in cls._IMPUTED_TO_INPUT.items():
if formula_var in data:
logger.info(
"Renaming %s -> %s (leaf input)",
formula_var,
input_var,
)
data[input_var] = data.pop(formula_var)
return data

# Variables with formulas/adds that must still be stored.
# Includes IDs needed before formulas run and tax-unit-level
# QRF-imputed vars that can't be renamed to person-level leaves
# due to entity shape mismatch.
_KEEP_FORMULA_VARS = {
"person_id",
"interest_deduction",
"self_employed_pension_contribution_ald",
"self_employed_health_insurance_ald",
}

# QRF imputes formula-level variables (e.g. taxable_pension_income)
# but we must store them under leaf input names so
# _drop_formula_variables doesn't discard them. The engine then
# recomputes the formula var from its adds.
# NOTE: only same-entity renames here; cross-entity vars
# (tax_unit -> person) go in _KEEP_FORMULA_VARS instead.
_IMPUTED_TO_INPUT = {
"taxable_pension_income": "taxable_private_pension_income",
"tax_exempt_pension_income": "tax_exempt_private_pension_income",
}

@classmethod
def _drop_formula_variables(cls, data):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ dependencies = [
"tqdm>=4.60.0",
"microdf_python>=1.2.1",
"setuptools>=60",
"microimpute>=1.1.4",
"microimpute>=1.15.1",
"pip-system-certs>=3.0",
"google-cloud-storage>=2.0.0",
"google-auth>=2.0.0",
Expand Down
10 changes: 6 additions & 4 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.