Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/top-tail-income-representation.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Include PUF aggregate records and inject high-income PUF records into ExtendedCPS to improve top-tail income representation.
163 changes: 163 additions & 0 deletions policyengine_us_data/datasets/cps/extended_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

logger = logging.getLogger(__name__)

# Minimum AGI for PUF records to be injected into the ExtendedCPS
AGI_INJECTION_THRESHOLD = 1_000_000

# CPS-only variables that should be QRF-imputed for the PUF clone half
# instead of naively duplicated from the CPS donor. These are
# income-correlated variables that exist only in the CPS; demographics,
Expand Down Expand Up @@ -446,8 +449,168 @@ def generate(self):

new_data = self._rename_imputed_to_inputs(new_data)
new_data = self._drop_formula_variables(new_data)
new_data = self._inject_high_income_puf_records(new_data)
self.save_dataset(new_data)

def _inject_high_income_puf_records(self, new_data):
"""Inject ultra-high-income PUF records into the extended CPS.

The CPS severely under-represents the top of the income
distribution ($5M+ AGI brackets have -95% to -99% calibration
errors). This directly appends PUF records with AGI above
AGI_INJECTION_THRESHOLD as additional households, giving the
reweighter actual high-income observations.

Uses raw PUF arrays for speed — only creates a Microsimulation
briefly to identify high-AGI households, then reads stored
arrays directly instead of calling calculate() per variable.
"""
from policyengine_us import Microsimulation

logger.info("Loading PUF for high-income record injection...")
puf_sim = Microsimulation(dataset=self.puf)

# Identify high-AGI households (only calculation needed)
hh_agi = puf_sim.calculate("adjusted_gross_income", map_to="household").values
tbs = puf_sim.tax_benefit_system
del puf_sim # Free simulation memory immediately

# Load raw PUF arrays (fast, no simulation)
puf_data = self.puf(require=True).load_dataset()
all_hh_ids = puf_data["household_id"]
high_mask_hh = hh_agi > AGI_INJECTION_THRESHOLD
high_hh_id_set = set(all_hh_ids[high_mask_hh])

n_high_hh = int(high_mask_hh.sum())
if n_high_hh == 0:
logger.info("No high-income PUF households found")
return new_data

logger.info(
"Injecting %d PUF households with AGI > $%d",
n_high_hh,
AGI_INJECTION_THRESHOLD,
)

# Build entity-level masks from raw arrays
high_hh_id_arr = np.fromiter(high_hh_id_set, dtype=float)
person_mask = np.isin(
puf_data["person_household_id"],
high_hh_id_arr,
)
# In PUF, household = tax_unit = spm_unit = family
group_mask = np.isin(
puf_data["household_id"],
high_hh_id_arr,
)
# Marital units: find which belong to high-income persons
high_marital_ids = np.unique(puf_data["person_marital_unit_id"][person_mask])
marital_mask = np.isin(
puf_data["marital_unit_id"],
high_marital_ids,
)

entity_masks = {
"person": person_mask,
"household": group_mask,
"tax_unit": group_mask,
"spm_unit": group_mask,
"family": group_mask,
"marital_unit": marital_mask,
}

logger.info(
"High-income record counts: persons=%d, households=%d, marital_units=%d",
person_mask.sum(),
group_mask.sum(),
marital_mask.sum(),
)

# Compute ID offset to avoid collisions with existing data
id_offset = 0
for key in new_data:
if key.endswith("_id"):
vals = new_data[key][self.time_period]
if vals.dtype.kind in ("f", "i", "u"):
id_offset = max(id_offset, int(vals.max()))
id_offset += 1_000_000

# Build a lookup: existing array length → entity, so we can
# handle variables not in the tax-benefit system (e.g. raw
# dataset variables) by matching their array length.
entity_lengths = {}
for entity_key, m in entity_masks.items():
# Pre-injection length for this entity in the CPS data
for key in new_data:
arr = new_data[key][self.time_period]
if len(arr) not in entity_lengths:
entity_lengths[len(arr)] = entity_key

# For each variable in new_data, read from raw PUF arrays
# (no calculate() calls — uses stored values directly)
n_appended = 0
for variable in list(new_data.keys()):
var_meta = tbs.variables.get(variable)

if var_meta is not None:
entity = var_meta.entity.key
else:
# Infer entity from array length
arr_len = len(new_data[variable][self.time_period])
entity = entity_lengths.get(arr_len)

if entity is None or entity not in entity_masks:
continue

mask = entity_masks[entity]

# Read from raw PUF arrays if available; pad with
# defaults for variables not in PUF (CPS-only variables)
existing = new_data[variable][self.time_period]
n_inject = int(mask.sum())

# For string/enum vars, use the most common existing value
# as default (empty string is not a valid enum value)
if existing.dtype.kind in ("S", "U"):
default_val = np.full(n_inject, existing[0], dtype=existing.dtype)
else:
default_val = np.zeros(n_inject, dtype=existing.dtype)

use_puf = (
variable in puf_data
and len(np.asarray(puf_data[variable])) == len(mask)
and np.asarray(puf_data[variable]).dtype.kind == existing.dtype.kind
)

if use_puf:
puf_values = np.asarray(puf_data[variable])
puf_subset = puf_values[mask]
else:
puf_subset = default_val

# Offset IDs to avoid collisions
if variable.endswith("_id") and puf_subset.dtype.kind in (
"f",
"i",
"u",
):
puf_subset = puf_subset + id_offset

# Ensure dtype matches existing array
if puf_subset.dtype != existing.dtype:
try:
puf_subset = puf_subset.astype(existing.dtype)
except (ValueError, TypeError):
puf_subset = default_val

new_data[variable][self.time_period] = np.concatenate(
[existing, puf_subset]
)
n_appended += 1

logger.info("Appended PUF values for %d variables", n_appended)
return new_data

@classmethod
def _rename_imputed_to_inputs(cls, data):
"""Rename QRF-imputed formula vars to their leaf inputs.
Expand Down
77 changes: 76 additions & 1 deletion policyengine_us_data/datasets/puf/puf.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,81 @@ def decode_age_dependent(age_range: int) -> int:
return rng.integers(low=lower, high=upper, endpoint=False)


MARS_IMPUTATION_PREDICTORS = [
"E00200",
"E00300",
"E00600",
"E01000",
"E00900",
"E26270",
"E02400",
"E01500",
"XTOT",
]


def impute_aggregate_mars(puf: pd.DataFrame) -> pd.DataFrame:
"""Impute MARS for aggregate records using QRF on income variables.

PUF aggregate records (MARS=0) have complete income/deduction data
but no demographics. MARS is needed as a predictor for the
downstream demographic QRF in impute_missing_demographics().

We train a QRF on regular records' income profiles to predict
MARS, then impute it for the aggregate records. All other
demographics (AGERANGE, GENDER, EARNSPLIT, AGEDP1-3) are left
for impute_missing_demographics() to handle.
"""
from microimpute.models.qrf import QRF

agg_mask = puf.MARS == 0
n_agg = agg_mask.sum()
if n_agg == 0:
return puf

reg = puf[puf.MARS != 0].copy()

# Use available income variables as predictors for MARS
predictors = [c for c in MARS_IMPUTATION_PREDICTORS if c in puf.columns]

# Train on a sample of regular records (sample() returns a copy)
train = reg.sample(n=min(10_000, len(reg)), random_state=42)[
predictors + ["MARS"]
].fillna(0)

qrf = QRF()
fitted = qrf.fit(
X_train=train,
predictors=predictors,
imputed_variables=["MARS"],
)

# Predict MARS for aggregate records
agg_data = puf.loc[agg_mask, predictors].fillna(0)
predicted = fitted.predict(X_test=agg_data)
# QRF outputs continuous values; round to nearest valid MARS
mars_imputed = predicted["MARS"].values.round().astype(int)
mars_imputed = np.clip(mars_imputed, 1, 4)
puf.loc[agg_mask, "MARS"] = mars_imputed

# Adjust XTOT for joint filers (need at least 2 exemptions)
joint_agg = agg_mask & (puf.MARS == 2)
puf.loc[joint_agg, "XTOT"] = puf.loc[joint_agg, "XTOT"].clip(lower=2)

# DSI and EIC are predictors in the downstream demographic QRF:
# ultra-high-income filers are never dependents and never get EITC
if "DSI" in puf.columns:
puf.loc[agg_mask, "DSI"] = 0
if "EIC" in puf.columns:
puf.loc[agg_mask, "EIC"] = 0

# AGERANGE, GENDER, EARNSPLIT, AGEDP1-3 are left unset —
# impute_missing_demographics() handles them via QRF using
# [E00200, MARS, DSI, EIC, XTOT] as predictors.

return puf


def preprocess_puf(puf: pd.DataFrame) -> pd.DataFrame:
# Add variable renames
puf.S006 = puf.S006 / 100
Expand Down Expand Up @@ -552,7 +627,7 @@ def generate(self):
self.save_dataset(arrays)
return

puf = puf[puf.MARS != 0] # Remove aggregate records
puf = impute_aggregate_mars(puf)

original_recid = puf.RECID.values.copy()
puf = preprocess_puf(puf)
Expand Down
7 changes: 4 additions & 3 deletions policyengine_us_data/tests/test_datasets/test_enhanced_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,10 @@ def test_immigration_status_diversity():
"Immigration status not being read from data."
)

# Also check that we have a reasonable percentage of citizens (should be 85-90%)
assert 80 < citizen_pct < 95, (
f"Citizen percentage ({citizen_pct:.1f}%) outside expected range (80-95%)"
# Check reasonable percentage of citizens (85-90% for CPS, higher with
# injected PUF records since tax filers are almost all citizens)
assert 80 < citizen_pct < 98, (
f"Citizen percentage ({citizen_pct:.1f}%) outside expected range (80-98%)"
)

# Check that we have some non-citizens
Expand Down
Loading