From 3df0d9162d81375b115458e4fa95b83ac23da57d Mon Sep 17 00:00:00 2001 From: "baogorek@gmail.com" Date: Mon, 16 Mar 2026 13:17:20 -0400 Subject: [PATCH 1/5] Blend entity values on would_file draws; remove wrong entity weights Matrix builder: precompute entity values with would_file=False alongside the all-True values, then blend per tax unit based on the would_file draw before applying target takeup draws. This ensures X@w matches sim.calculate for targets affected by non-target state variables. Fixes #609 publish_local_area: remove explicit sub-entity weight overrides (tax_unit_weight, spm_unit_weight, family_weight, marital_unit_weight, person_weight) that used incorrect person-count splitting. These are formula variables in policyengine-us that correctly derive from household_weight at runtime. Fixes #610 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../calibration/publish_local_area.py | 26 +- .../calibration/unified_matrix_builder.py | 260 +++++++++++++++++- 2 files changed, 258 insertions(+), 28 deletions(-) diff --git a/policyengine_us_data/calibration/publish_local_area.py b/policyengine_us_data/calibration/publish_local_area.py index 72594631..83e31ba6 100644 --- a/policyengine_us_data/calibration/publish_local_area.py +++ b/policyengine_us_data/calibration/publish_local_area.py @@ -311,22 +311,6 @@ def build_h5( unique_geo = derive_geography_from_blocks(unique_blocks) clone_geo = {k: v[block_inv] for k, v in unique_geo.items()} - # === Calculate weights for all entity levels === - person_weights = np.repeat(clone_weights, persons_per_clone) - per_person_wt = clone_weights / np.maximum(persons_per_clone, 1) - - entity_weights = {} - for ek in SUB_ENTITIES: - n_ents = len(entity_clone_idx[ek]) - ent_person_counts = np.zeros(n_ents, dtype=np.int32) - np.add.at( - ent_person_counts, - new_person_entity_ids[ek], - 1, - ) - clone_ids_e = np.repeat(np.arange(n_clones), entities_per_clone[ek]) - entity_weights[ek] = per_person_wt[clone_ids_e] * ent_person_counts - # === Determine variables to save === vars_to_save = set(sim.input_variables) vars_to_save.add("county") @@ -413,16 +397,12 @@ def build_h5( } # === Override weights === + # Only write household_weight; sub-entity weights (tax_unit_weight, + # spm_unit_weight, person_weight, etc.) are formula variables in + # policyengine-us that derive from household_weight at runtime. data["household_weight"] = { time_period: clone_weights.astype(np.float32), } - data["person_weight"] = { - time_period: person_weights.astype(np.float32), - } - for ek in SUB_ENTITIES: - data[f"{ek}_weight"] = { - time_period: entity_weights[ek].astype(np.float32), - } # === Override geography === data["state_fips"] = { diff --git a/policyengine_us_data/calibration/unified_matrix_builder.py b/policyengine_us_data/calibration/unified_matrix_builder.py index 04d785ff..62ff4bee 100644 --- a/policyengine_us_data/calibration/unified_matrix_builder.py +++ b/policyengine_us_data/calibration/unified_matrix_builder.py @@ -152,7 +152,38 @@ def _compute_single_state( exc, ) - return (state, {"hh": hh, "person": person, "entity": entity_vals}) + entity_wf_false = {} + if rerandomize_takeup: + has_tu_target = any( + info["entity"] == "tax_unit" for info in affected_targets.values() + ) + if has_tu_target: + n_tu = len(state_sim.calculate("tax_unit_id", map_to="tax_unit").values) + state_sim.set_input( + "would_file_taxes_voluntarily", + time_period, + np.zeros(n_tu, dtype=bool), + ) + for var in get_calculated_variables(state_sim): + state_sim.delete_arrays(var) + for tvar, info in affected_targets.items(): + if info["entity"] != "tax_unit": + continue + entity_wf_false[tvar] = state_sim.calculate( + tvar, + time_period, + map_to="tax_unit", + ).values.astype(np.float32) + + return ( + state, + { + "hh": hh, + "person": person, + "entity": entity_vals, + "entity_wf_false": entity_wf_false, + }, + ) def _compute_single_state_group_counties( @@ -278,7 +309,40 @@ def _compute_single_state_group_counties( exc, ) - results.append((county_fips, {"hh": hh, "entity": entity_vals})) + entity_wf_false = {} + if rerandomize_takeup: + has_tu_target = any( + info["entity"] == "tax_unit" for info in affected_targets.values() + ) + if has_tu_target: + n_tu = len(state_sim.calculate("tax_unit_id", map_to="tax_unit").values) + state_sim.set_input( + "would_file_taxes_voluntarily", + time_period, + np.zeros(n_tu, dtype=bool), + ) + for var in get_calculated_variables(state_sim): + if var != "county": + state_sim.delete_arrays(var) + for tvar, info in affected_targets.items(): + if info["entity"] != "tax_unit": + continue + entity_wf_false[tvar] = state_sim.calculate( + tvar, + time_period, + map_to="tax_unit", + ).values.astype(np.float32) + + results.append( + ( + county_fips, + { + "hh": hh, + "entity": entity_vals, + "entity_wf_false": entity_wf_false, + }, + ) + ) return results @@ -552,11 +616,37 @@ def _process_single_clone( # Takeup re-randomisation if do_takeup and affected_target_info: from policyengine_us_data.utils.takeup import ( + SIMPLE_TAKEUP_VARS, compute_block_takeup_for_entities, ) clone_blocks = geo_blocks[col_start:col_end] + # Phase 1: compute non-target draws (would_file) FIRST + wf_draws = {} + for spec in SIMPLE_TAKEUP_VARS: + if spec.get("target") is not None: + continue + var_name = spec["variable"] + entity = spec["entity"] + rate_key = spec["rate_key"] + if rate_key not in precomputed_rates: + continue + ent_hh = entity_hh_idx_map[entity] + ent_blocks = clone_blocks[ent_hh] + ent_hh_ids = household_ids[ent_hh] + draws = compute_block_takeup_for_entities( + var_name, + precomputed_rates[rate_key], + ent_blocks, + ent_hh_ids, + ) + wf_draws[entity] = draws + if var_name in person_vars: + pidx = entity_to_person_idx[entity] + person_vars[var_name] = draws[pidx].astype(np.float32) + + # Phase 2: target loop with would_file blending for tvar, info in affected_target_info.items(): if tvar.endswith("_count"): continue @@ -586,6 +676,34 @@ def _process_single_clone( if tvar in sv: ent_eligible[m] = sv[tvar][m] + # Blend: for tax_unit targets, select between + # all-takeup-true and would_file=false values + if entity_level == "tax_unit" and "tax_unit" in wf_draws: + ent_wf_false = np.zeros(n_ent, dtype=np.float32) + if tvar in county_dep_targets and county_values: + ent_counties = clone_counties[ent_hh] + for cfips in np.unique(ent_counties): + m = ent_counties == cfips + cv = county_values.get(cfips, {}).get("entity_wf_false", {}) + if tvar in cv: + ent_wf_false[m] = cv[tvar][m] + else: + st = int(cfips[:2]) + sv = state_values[st].get("entity_wf_false", {}) + if tvar in sv: + ent_wf_false[m] = sv[tvar][m] + else: + for st in np.unique(ent_states): + m = ent_states == st + sv = state_values[int(st)].get("entity_wf_false", {}) + if tvar in sv: + ent_wf_false[m] = sv[tvar][m] + ent_eligible = np.where( + wf_draws["tax_unit"], + ent_eligible, + ent_wf_false, + ) + ent_blocks = clone_blocks[ent_hh] ent_hh_ids = household_ids[ent_hh] @@ -950,10 +1068,43 @@ def _build_state_values( exc, ) + entity_wf_false = {} + if rerandomize_takeup: + has_tu_target = any( + info["entity"] == "tax_unit" + for info in affected_targets.values() + ) + if has_tu_target: + n_tu = len( + state_sim.calculate( + "tax_unit_id", + map_to="tax_unit", + ).values + ) + state_sim.set_input( + "would_file_taxes_voluntarily", + self.time_period, + np.zeros(n_tu, dtype=bool), + ) + for var in get_calculated_variables(state_sim): + state_sim.delete_arrays(var) + for ( + tvar, + info, + ) in affected_targets.items(): + if info["entity"] != "tax_unit": + continue + entity_wf_false[tvar] = state_sim.calculate( + tvar, + self.time_period, + map_to="tax_unit", + ).values.astype(np.float32) + state_values[state] = { "hh": hh, "person": person, "entity": entity_vals, + "entity_wf_false": entity_wf_false, } if (i + 1) % 10 == 0 or i == 0: logger.info( @@ -1216,9 +1367,43 @@ def _build_county_values( exc, ) + entity_wf_false = {} + if rerandomize_takeup: + has_tu_target = any( + info["entity"] == "tax_unit" + for info in affected_targets.values() + ) + if has_tu_target: + n_tu = len( + state_sim.calculate( + "tax_unit_id", + map_to="tax_unit", + ).values + ) + state_sim.set_input( + "would_file_taxes_voluntarily", + self.time_period, + np.zeros(n_tu, dtype=bool), + ) + for var in get_calculated_variables(state_sim): + if var != "county": + state_sim.delete_arrays(var) + for ( + tvar, + info, + ) in affected_targets.items(): + if info["entity"] != "tax_unit": + continue + entity_wf_false[tvar] = state_sim.calculate( + tvar, + self.time_period, + map_to="tax_unit", + ).values.astype(np.float32) + county_values[county_fips] = { "hh": hh, "entity": entity_vals, + "entity_wf_false": entity_wf_false, } county_count += 1 if county_count % 500 == 0 or county_count == 1: @@ -1928,10 +2113,14 @@ def build_matrix( len(affected_target_info), ) - # Pre-compute takeup rates (constant across clones) + # Pre-compute takeup rates for ALL takeup vars + from policyengine_us_data.utils.takeup import ( + SIMPLE_TAKEUP_VARS as _ALL_TAKEUP, + ) + precomputed_rates = {} - for tvar, info in affected_target_info.items(): - rk = info["rate_key"] + for spec in _ALL_TAKEUP: + rk = spec["rate_key"] if rk not in precomputed_rates: precomputed_rates[rk] = load_take_up_rate(rk, self.time_period) @@ -2083,6 +2272,36 @@ def build_matrix( # for affected target variables if rerandomize_takeup and affected_target_info: clone_blocks = geography.block_geoid[col_start:col_end] + + from policyengine_us_data.utils.takeup import ( + SIMPLE_TAKEUP_VARS as _SEQ_TAKEUP, + ) + + # Phase 1: non-target draws (would_file) FIRST + wf_draws = {} + for spec in _SEQ_TAKEUP: + if spec.get("target") is not None: + continue + var_name = spec["variable"] + entity = spec["entity"] + rate_key = spec["rate_key"] + if rate_key not in precomputed_rates: + continue + ent_hh = entity_hh_idx_map[entity] + ent_blocks = clone_blocks[ent_hh] + ent_hh_ids = household_ids[ent_hh] + draws = compute_block_takeup_for_entities( + var_name, + precomputed_rates[rate_key], + ent_blocks, + ent_hh_ids, + ) + wf_draws[entity] = draws + if var_name in person_vars: + pidx = entity_to_person_idx[entity] + person_vars[var_name] = draws[pidx].astype(np.float32) + + # Phase 2: target loop with would_file blending for ( tvar, info, @@ -2116,6 +2335,37 @@ def build_matrix( if tvar in sv: ent_eligible[m] = sv[tvar][m] + # Blend for tax_unit targets + if entity_level == "tax_unit" and "tax_unit" in wf_draws: + ent_wf_false = np.zeros(n_ent, dtype=np.float32) + if tvar in county_dep_targets and county_values: + ent_counties = clone_counties[ent_hh] + for cfips in np.unique(ent_counties): + m = ent_counties == cfips + cv = county_values.get(cfips, {}).get( + "entity_wf_false", {} + ) + if tvar in cv: + ent_wf_false[m] = cv[tvar][m] + else: + st = int(cfips[:2]) + sv = state_values[st].get("entity_wf_false", {}) + if tvar in sv: + ent_wf_false[m] = sv[tvar][m] + else: + for st in np.unique(ent_states): + m = ent_states == st + sv = state_values[int(st)].get( + "entity_wf_false", {} + ) + if tvar in sv: + ent_wf_false[m] = sv[tvar][m] + ent_eligible = np.where( + wf_draws["tax_unit"], + ent_eligible, + ent_wf_false, + ) + ent_blocks = clone_blocks[ent_hh] ent_hh_ids = household_ids[ent_hh] From 310bb7372016c312d99c93e9e4c11f815b27fb14 Mon Sep 17 00:00:00 2001 From: "baogorek@gmail.com" Date: Mon, 16 Mar 2026 13:53:56 -0400 Subject: [PATCH 2/5] Salt takeup draws with hh_id:clone_idx instead of block:hh_id Replace block-based RNG salting with (hh_id, clone_idx) salting. Draws are now tied to the donor household identity and independent across clones, eliminating the multi-clone-same-block collision issue (#597). Geographic variation comes through the rate threshold, not the draw. Closes #597 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../calibration/publish_local_area.py | 1 + .../calibration/unified_matrix_builder.py | 8 + .../test_unified_calibration.py | 148 ++++++++++++------ policyengine_us_data/utils/takeup.py | 135 ++++++---------- 4 files changed, 154 insertions(+), 138 deletions(-) diff --git a/policyengine_us_data/calibration/publish_local_area.py b/policyengine_us_data/calibration/publish_local_area.py index 83e31ba6..9ad22323 100644 --- a/policyengine_us_data/calibration/publish_local_area.py +++ b/policyengine_us_data/calibration/publish_local_area.py @@ -511,6 +511,7 @@ def build_h5( hh_blocks=active_blocks, hh_state_fips=hh_state_fips, hh_ids=original_hh_ids, + hh_clone_indices=active_geo.astype(np.int64), entity_hh_indices=entity_hh_indices, entity_counts=entity_counts, time_period=time_period, diff --git a/policyengine_us_data/calibration/unified_matrix_builder.py b/policyengine_us_data/calibration/unified_matrix_builder.py index 62ff4bee..e9ddb494 100644 --- a/policyengine_us_data/calibration/unified_matrix_builder.py +++ b/policyengine_us_data/calibration/unified_matrix_builder.py @@ -635,11 +635,13 @@ def _process_single_clone( ent_hh = entity_hh_idx_map[entity] ent_blocks = clone_blocks[ent_hh] ent_hh_ids = household_ids[ent_hh] + ent_ci = np.full(len(ent_hh), clone_idx, dtype=np.int64) draws = compute_block_takeup_for_entities( var_name, precomputed_rates[rate_key], ent_blocks, ent_hh_ids, + ent_ci, ) wf_draws[entity] = draws if var_name in person_vars: @@ -706,12 +708,14 @@ def _process_single_clone( ent_blocks = clone_blocks[ent_hh] ent_hh_ids = household_ids[ent_hh] + ent_ci = np.full(n_ent, clone_idx, dtype=np.int64) ent_takeup = compute_block_takeup_for_entities( takeup_var, precomputed_rates[info["rate_key"]], ent_blocks, ent_hh_ids, + ent_ci, ) ent_values = (ent_eligible * ent_takeup).astype(np.float32) @@ -2290,11 +2294,13 @@ def build_matrix( ent_hh = entity_hh_idx_map[entity] ent_blocks = clone_blocks[ent_hh] ent_hh_ids = household_ids[ent_hh] + ent_ci = np.full(len(ent_hh), clone_idx, dtype=np.int64) draws = compute_block_takeup_for_entities( var_name, precomputed_rates[rate_key], ent_blocks, ent_hh_ids, + ent_ci, ) wf_draws[entity] = draws if var_name in person_vars: @@ -2368,12 +2374,14 @@ def build_matrix( ent_blocks = clone_blocks[ent_hh] ent_hh_ids = household_ids[ent_hh] + ent_ci = np.full(n_ent, clone_idx, dtype=np.int64) ent_takeup = compute_block_takeup_for_entities( takeup_var, precomputed_rates[info["rate_key"]], ent_blocks, ent_hh_ids, + ent_ci, ) ent_values = (ent_eligible * ent_takeup).astype(np.float32) diff --git a/policyengine_us_data/tests/test_calibration/test_unified_calibration.py b/policyengine_us_data/tests/test_calibration/test_unified_calibration.py index 28a3c906..1283dabe 100644 --- a/policyengine_us_data/tests/test_calibration/test_unified_calibration.py +++ b/policyengine_us_data/tests/test_calibration/test_unified_calibration.py @@ -74,44 +74,61 @@ def test_rate_comparison_produces_booleans(self): class TestBlockSaltedDraws: """Verify compute_block_takeup_for_entities produces - reproducible, block-dependent draws.""" + reproducible, clone-dependent draws.""" - def test_same_block_same_results(self): - blocks = np.array(["370010001001001"] * 500) - d1 = compute_block_takeup_for_entities("takes_up_snap_if_eligible", 0.8, blocks) - d2 = compute_block_takeup_for_entities("takes_up_snap_if_eligible", 0.8, blocks) + def test_same_inputs_same_results(self): + n = 500 + blocks = np.array(["370010001001001"] * n) + hh_ids = np.arange(n, dtype=np.int64) + ci = np.zeros(n, dtype=np.int64) + d1 = compute_block_takeup_for_entities( + "takes_up_snap_if_eligible", 0.8, blocks, hh_ids, ci + ) + d2 = compute_block_takeup_for_entities( + "takes_up_snap_if_eligible", 0.8, blocks, hh_ids, ci + ) np.testing.assert_array_equal(d1, d2) - def test_different_blocks_different_results(self): + def test_different_clone_idx_different_results(self): n = 500 + blocks = np.array(["370010001001001"] * n) + hh_ids = np.arange(n, dtype=np.int64) + ci0 = np.zeros(n, dtype=np.int64) + ci1 = np.ones(n, dtype=np.int64) d1 = compute_block_takeup_for_entities( - "takes_up_snap_if_eligible", - 0.8, - np.array(["370010001001001"] * n), + "takes_up_snap_if_eligible", 0.8, blocks, hh_ids, ci0 ) d2 = compute_block_takeup_for_entities( - "takes_up_snap_if_eligible", - 0.8, - np.array(["480010002002002"] * n), + "takes_up_snap_if_eligible", 0.8, blocks, hh_ids, ci1 ) assert not np.array_equal(d1, d2) def test_different_vars_different_results(self): - blocks = np.array(["370010001001001"] * 500) - d1 = compute_block_takeup_for_entities("takes_up_snap_if_eligible", 0.8, blocks) - d2 = compute_block_takeup_for_entities("takes_up_aca_if_eligible", 0.8, blocks) + n = 500 + blocks = np.array(["370010001001001"] * n) + hh_ids = np.arange(n, dtype=np.int64) + ci = np.zeros(n, dtype=np.int64) + d1 = compute_block_takeup_for_entities( + "takes_up_snap_if_eligible", 0.8, blocks, hh_ids, ci + ) + d2 = compute_block_takeup_for_entities( + "takes_up_aca_if_eligible", 0.8, blocks, hh_ids, ci + ) assert not np.array_equal(d1, d2) - def test_hh_salt_differs_from_block_only(self): - blocks = np.array(["370010001001001"] * 500) - hh_ids = np.array([1] * 500) - d_block = compute_block_takeup_for_entities( - "takes_up_snap_if_eligible", 0.8, blocks + def test_different_hh_ids_different_results(self): + n = 500 + blocks = np.array(["370010001001001"] * n) + ci = np.zeros(n, dtype=np.int64) + hh_a = np.arange(n, dtype=np.int64) + hh_b = np.arange(n, dtype=np.int64) + 1000 + d1 = compute_block_takeup_for_entities( + "takes_up_snap_if_eligible", 0.8, blocks, hh_a, ci ) - d_hh = compute_block_takeup_for_entities( - "takes_up_snap_if_eligible", 0.8, blocks, hh_ids + d2 = compute_block_takeup_for_entities( + "takes_up_snap_if_eligible", 0.8, blocks, hh_b, ci ) - assert not np.array_equal(d_block, d_hh) + assert not np.array_equal(d1, d2) class TestApplyBlockTakeupToArrays: @@ -126,6 +143,7 @@ def _make_arrays(self, n_hh, persons_per_hh, tu_per_hh, spm_per_hh): hh_blocks = np.array(["370010001001001"] * n_hh) hh_state_fips = np.array([37] * n_hh, dtype=np.int32) hh_ids = np.arange(n_hh, dtype=np.int64) + hh_clone_indices = np.zeros(n_hh, dtype=np.int64) entity_hh_indices = { "person": np.repeat(np.arange(n_hh), persons_per_hh), "tax_unit": np.repeat(np.arange(n_hh), tu_per_hh), @@ -140,6 +158,7 @@ def _make_arrays(self, n_hh, persons_per_hh, tu_per_hh, spm_per_hh): hh_blocks, hh_state_fips, hh_ids, + hh_clone_indices, entity_hh_indices, entity_counts, ) @@ -336,38 +355,61 @@ def test_county_fips_length(self): class TestBlockTakeupSeeding: """Verify compute_block_takeup_for_entities is - reproducible and block-dependent.""" + reproducible and clone-dependent.""" def test_reproducible(self): + n = 100 blocks = np.array(["010010001001001"] * 50 + ["020010001001001"] * 50) - r1 = compute_block_takeup_for_entities("takes_up_snap_if_eligible", 0.8, blocks) - r2 = compute_block_takeup_for_entities("takes_up_snap_if_eligible", 0.8, blocks) + hh_ids = np.arange(n, dtype=np.int64) + ci = np.zeros(n, dtype=np.int64) + r1 = compute_block_takeup_for_entities( + "takes_up_snap_if_eligible", 0.8, blocks, hh_ids, ci + ) + r2 = compute_block_takeup_for_entities( + "takes_up_snap_if_eligible", 0.8, blocks, hh_ids, ci + ) np.testing.assert_array_equal(r1, r2) - def test_different_blocks_different_draws(self): + def test_different_blocks_different_rates(self): + """With state-dependent rates, different blocks yield + different takeup because rate thresholds differ.""" n = 500 - blocks_a = np.array(["010010001001001"] * n) - blocks_b = np.array(["020010001001001"] * n) + hh_ids = np.arange(n, dtype=np.int64) + ci = np.zeros(n, dtype=np.int64) + rate_dict = {"AL": 0.9, "AK": 0.3} r_a = compute_block_takeup_for_entities( - "takes_up_snap_if_eligible", 0.8, blocks_a + "takes_up_snap_if_eligible", + rate_dict, + np.array(["010010001001001"] * n), + hh_ids, + ci, ) r_b = compute_block_takeup_for_entities( - "takes_up_snap_if_eligible", 0.8, blocks_b + "takes_up_snap_if_eligible", + rate_dict, + np.array(["020010001001001"] * n), + hh_ids, + ci, ) assert not np.array_equal(r_a, r_b) def test_returns_booleans(self): - blocks = np.array(["370010001001001"] * 100) + n = 100 + blocks = np.array(["370010001001001"] * n) + hh_ids = np.arange(n, dtype=np.int64) + ci = np.zeros(n, dtype=np.int64) result = compute_block_takeup_for_entities( - "takes_up_snap_if_eligible", 0.8, blocks + "takes_up_snap_if_eligible", 0.8, blocks, hh_ids, ci ) assert result.dtype == bool def test_rate_respected(self): n = 10000 blocks = np.array(["370010001001001"] * n) + hh_ids = np.arange(n, dtype=np.int64) + ci = np.zeros(n, dtype=np.int64) result = compute_block_takeup_for_entities( - "takes_up_snap_if_eligible", 0.75, blocks + "takes_up_snap_if_eligible", 0.75, blocks, hh_ids, ci ) frac = result.mean() assert 0.70 < frac < 0.80 @@ -481,6 +523,7 @@ def test_matrix_and_stacked_identical_draws(self): """Both paths must produce identical boolean arrays.""" var = "takes_up_snap_if_eligible" rate = 0.75 + clone_idx = 5 # 2 blocks, 3 households, variable entity counts per HH # HH0 has 2 entities in block A @@ -497,20 +540,23 @@ def test_matrix_and_stacked_identical_draws(self): ] ) hh_ids = np.array([100, 100, 200, 200, 200, 300]) + ci = np.full(len(blocks), clone_idx, dtype=np.int64) - # Path 1: compute_block_takeup_for_entities (stacked) - stacked = compute_block_takeup_for_entities(var, rate, blocks, hh_ids) + # Path 1: compute_block_takeup_for_entities + stacked = compute_block_takeup_for_entities(var, rate, blocks, hh_ids, ci) - # Path 2: reproduce matrix builder inline logic + # Path 2: reproduce inline logic with hh_id:clone_idx salt n = len(blocks) inline_takeup = np.zeros(n, dtype=bool) - for blk in np.unique(blocks): - bm = blocks == blk - for hh_id in np.unique(hh_ids[bm]): - hh_mask = bm & (hh_ids == hh_id) - rng = seeded_rng(var, salt=f"{blk}:{int(hh_id)}") - draws = rng.random(int(hh_mask.sum())) - inline_takeup[hh_mask] = draws < rate + for hh_id in np.unique(hh_ids): + hh_mask = hh_ids == hh_id + rng = seeded_rng(var, salt=f"{int(hh_id)}:{clone_idx}") + draws = rng.random(int(hh_mask.sum())) + # Rate from block's state FIPS + blk = blocks[hh_mask][0] + sf = int(str(blk)[:2]) + r = _resolve_rate(rate, sf) + inline_takeup[hh_mask] = draws < r np.testing.assert_array_equal(stacked, inline_takeup) @@ -542,18 +588,22 @@ def test_state_specific_rate_resolved_from_block(self): n = 5000 blocks_nc = np.array(["370010001001001"] * n) - result_nc = compute_block_takeup_for_entities(var, rate_dict, blocks_nc) - # NC rate=0.9, expect ~90% + hh_ids_nc = np.arange(n, dtype=np.int64) + ci = np.zeros(n, dtype=np.int64) + result_nc = compute_block_takeup_for_entities( + var, rate_dict, blocks_nc, hh_ids_nc, ci + ) frac_nc = result_nc.mean() assert 0.85 < frac_nc < 0.95, f"NC frac={frac_nc}" blocks_tx = np.array(["480010002002002"] * n) - result_tx = compute_block_takeup_for_entities(var, rate_dict, blocks_tx) - # TX rate=0.6, expect ~60% + hh_ids_tx = np.arange(n, dtype=np.int64) + result_tx = compute_block_takeup_for_entities( + var, rate_dict, blocks_tx, hh_ids_tx, ci + ) frac_tx = result_tx.mean() assert 0.55 < frac_tx < 0.65, f"TX frac={frac_tx}" - # Verify _resolve_rate actually gives different rates assert _resolve_rate(rate_dict, 37) == 0.9 assert _resolve_rate(rate_dict, 48) == 0.6 diff --git a/policyengine_us_data/utils/takeup.py b/policyengine_us_data/utils/takeup.py index 5e49b20a..b8db8c90 100644 --- a/policyengine_us_data/utils/takeup.py +++ b/policyengine_us_data/utils/takeup.py @@ -22,90 +22,66 @@ "variable": "takes_up_snap_if_eligible", "entity": "spm_unit", "rate_key": "snap", + "target": "snap", }, { "variable": "takes_up_aca_if_eligible", "entity": "tax_unit", "rate_key": "aca", + "target": "aca_ptc", }, { "variable": "takes_up_dc_ptc", "entity": "tax_unit", "rate_key": "dc_ptc", + "target": "dc_property_tax_credit", }, { "variable": "takes_up_head_start_if_eligible", "entity": "person", "rate_key": "head_start", + "target": "head_start", }, { "variable": "takes_up_early_head_start_if_eligible", "entity": "person", "rate_key": "early_head_start", + "target": "early_head_start", }, { "variable": "takes_up_ssi_if_eligible", "entity": "person", "rate_key": "ssi", + "target": "ssi", }, { "variable": "would_file_taxes_voluntarily", "entity": "tax_unit", "rate_key": "voluntary_filing", + "target": None, }, { "variable": "takes_up_medicaid_if_eligible", "entity": "person", "rate_key": "medicaid", + "target": "medicaid", }, { "variable": "takes_up_tanf_if_eligible", "entity": "spm_unit", "rate_key": "tanf", + "target": "tanf", }, ] TAKEUP_AFFECTED_TARGETS: Dict[str, dict] = { - "snap": { - "takeup_var": "takes_up_snap_if_eligible", - "entity": "spm_unit", - "rate_key": "snap", - }, - "tanf": { - "takeup_var": "takes_up_tanf_if_eligible", - "entity": "spm_unit", - "rate_key": "tanf", - }, - "aca_ptc": { - "takeup_var": "takes_up_aca_if_eligible", - "entity": "tax_unit", - "rate_key": "aca", - }, - "ssi": { - "takeup_var": "takes_up_ssi_if_eligible", - "entity": "person", - "rate_key": "ssi", - }, - "medicaid": { - "takeup_var": "takes_up_medicaid_if_eligible", - "entity": "person", - "rate_key": "medicaid", - }, - "head_start": { - "takeup_var": "takes_up_head_start_if_eligible", - "entity": "person", - "rate_key": "head_start", - }, - "early_head_start": { - "takeup_var": "takes_up_early_head_start_if_eligible", - "entity": "person", - "rate_key": "early_head_start", - }, - "dc_property_tax_credit": { - "takeup_var": "takes_up_dc_ptc", - "entity": "tax_unit", - "rate_key": "dc_ptc", - }, + spec["target"]: { + "takeup_var": spec["variable"], + "entity": spec["entity"], + "rate_key": spec["rate_key"], + } + for spec in SIMPLE_TAKEUP_VARS + if spec.get("target") is not None } # FIPS -> 2-letter state code for Medicaid rate lookup @@ -182,34 +158,26 @@ def compute_block_takeup_for_entities( var_name: str, rate_or_dict, entity_blocks: np.ndarray, - entity_hh_ids: np.ndarray = None, - entity_clone_ids: np.ndarray = None, + entity_hh_ids: np.ndarray, + entity_clone_indices: np.ndarray, ) -> np.ndarray: - """Compute boolean takeup via block-level seeded draws. - - Each unique (block, household) pair gets its own seeded RNG, - producing reproducible draws regardless of how many households - share the same block across clones. + """Compute boolean takeup via clone-seeded draws. - When multiple clones share the same (block, hh_id), the draws - are generated once for a single clone's entity count and tiled - so every clone gets identical draws — matching the matrix - builder, which processes each clone independently. - - State FIPS for rate resolution is derived from the first two - characters of each block GEOID. + Each unique (hh_id, clone_idx) pair gets its own seeded RNG, + producing reproducible draws tied to the donor household and + independent across clones. The rate varies by state (derived + from the block GEOID). Args: var_name: Takeup variable name. rate_or_dict: Scalar rate or {state_code: rate} dict. - entity_blocks: Block GEOID per entity (str array). - entity_hh_ids: Household ID per entity (int array). - When provided, seeds per (block, household) for - clone-independent draws. - entity_clone_ids: Clone index per entity (int array). - When provided, draws are tiled across clones sharing - the same (block, hh_id) so each clone gets identical - takeup decisions. + entity_blocks: Block GEOID per entity (str array), + used only for state FIPS rate resolution. + entity_hh_ids: Original household ID per entity. + entity_clone_indices: Clone index per entity. For the + matrix builder (single clone), a scalar broadcast + via np.full. For the H5 builder (all clones), + a per-entity array. Returns: Boolean array of shape (n_entities,). @@ -218,35 +186,22 @@ def compute_block_takeup_for_entities( draws = np.zeros(n, dtype=np.float64) rates = np.ones(n, dtype=np.float64) + # Resolve rates from block state FIPS for block in np.unique(entity_blocks): if block == "": continue blk_mask = entity_blocks == block sf = int(str(block)[:2]) - rate = _resolve_rate(rate_or_dict, sf) - rates[blk_mask] = rate - - if entity_hh_ids is not None: - for hh_id in np.unique(entity_hh_ids[blk_mask]): - hh_mask = blk_mask & (entity_hh_ids == hh_id) - n_total = int(hh_mask.sum()) - rng = seeded_rng(var_name, salt=f"{block}:{int(hh_id)}") + rates[blk_mask] = _resolve_rate(rate_or_dict, sf) - if entity_clone_ids is not None and n_total > 1: - clone_ids = entity_clone_ids[hh_mask] - first_clone = clone_ids[0] - n_per_clone = int((clone_ids == first_clone).sum()) - if n_per_clone < n_total: - base_draws = rng.random(n_per_clone) - n_copies = n_total // n_per_clone - draws[hh_mask] = np.tile(base_draws, n_copies) - else: - draws[hh_mask] = rng.random(n_total) - else: - draws[hh_mask] = rng.random(n_total) - else: - rng = seeded_rng(var_name, salt=str(block)) - draws[blk_mask] = rng.random(int(blk_mask.sum())) + # Draw per (hh_id, clone_idx) pair + for hh_id in np.unique(entity_hh_ids): + hh_mask = entity_hh_ids == hh_id + for ci in np.unique(entity_clone_indices[hh_mask]): + ci_mask = hh_mask & (entity_clone_indices == ci) + n_ent = int(ci_mask.sum()) + rng = seeded_rng(var_name, salt=f"{int(hh_id)}:{int(ci)}") + draws[ci_mask] = rng.random(n_ent) return draws < rates @@ -255,13 +210,14 @@ def apply_block_takeup_to_arrays( hh_blocks: np.ndarray, hh_state_fips: np.ndarray, hh_ids: np.ndarray, + hh_clone_indices: np.ndarray, entity_hh_indices: Dict[str, np.ndarray], entity_counts: Dict[str, int], time_period: int, takeup_filter: List[str] = None, precomputed_rates: Optional[Dict[str, Any]] = None, ) -> Dict[str, np.ndarray]: - """Compute block-level takeup draws from raw arrays. + """Compute takeup draws from raw arrays. Works without a Microsimulation instance. For each takeup variable, maps entity-level arrays from household-level block/ @@ -271,7 +227,8 @@ def apply_block_takeup_to_arrays( Args: hh_blocks: Block GEOID per cloned household (str array). hh_state_fips: State FIPS per cloned household (int array). - hh_ids: Household ID per cloned household (int array). + hh_ids: Original household ID per cloned household. + hh_clone_indices: Clone index per cloned household. entity_hh_indices: {entity_key: array} mapping each entity instance to its household index. Keys: "person", "tax_unit", "spm_unit". @@ -304,7 +261,7 @@ def apply_block_takeup_to_arrays( ent_hh_idx = entity_hh_indices[entity] ent_blocks = hh_blocks[ent_hh_idx].astype(str) ent_hh_ids = hh_ids[ent_hh_idx] - ent_clone_ids = ent_hh_idx + ent_clone_indices = hh_clone_indices[ent_hh_idx] if precomputed_rates is not None and rate_key in precomputed_rates: rate_or_dict = precomputed_rates[rate_key] @@ -315,7 +272,7 @@ def apply_block_takeup_to_arrays( rate_or_dict, ent_blocks, ent_hh_ids, - entity_clone_ids=ent_clone_ids, + ent_clone_indices, ) result[var_name] = bools From 01ad39b639c2f3bff7fc8720fe6f132365a08992 Mon Sep 17 00:00:00 2001 From: "baogorek@gmail.com" Date: Mon, 16 Mar 2026 16:08:32 -0400 Subject: [PATCH 3/5] Fix LA County crash in county precomputation by setting zip_code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit County precomputation crashes on LA County (06037) because aca_ptc → slcsp_rating_area_la_county → three_digit_zip_code calls zip_code.astype(int) on 'UNKNOWN'. Set zip_code='90001' for LA County in both precomputation and publish_local_area so X @ w matches sim.calculate("aca_ptc").sum(). Fixes #612 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../calibration/publish_local_area.py | 81 ++++++-- .../calibration/unified_matrix_builder.py | 176 +++++++++++++----- 2 files changed, 192 insertions(+), 65 deletions(-) diff --git a/policyengine_us_data/calibration/publish_local_area.py b/policyengine_us_data/calibration/publish_local_area.py index 9ad22323..40926686 100644 --- a/policyengine_us_data/calibration/publish_local_area.py +++ b/policyengine_us_data/calibration/publish_local_area.py @@ -161,14 +161,17 @@ def build_h5( # CD subset filtering: zero out cells whose CD isn't in subset if cd_subset is not None: cd_subset_set = set(cd_subset) - cd_mask = np.vectorize(lambda cd: cd in cd_subset_set)(clone_cds_matrix) + cd_mask = np.vectorize(lambda cd: cd in cd_subset_set)( + clone_cds_matrix + ) W[~cd_mask] = 0 # County filtering: scale weights by P(target_counties | CD) if county_filter is not None: unique_cds = np.unique(clone_cds_matrix) cd_prob = { - cd: get_county_filter_probability(cd, county_filter) for cd in unique_cds + cd: get_county_filter_probability(cd, county_filter) + for cd in unique_cds } p_matrix = np.vectorize( cd_prob.__getitem__, @@ -195,11 +198,15 @@ def build_h5( ) clone_weights = W[active_geo, active_hh] active_blocks = blocks.reshape(n_clones_total, n_hh)[active_geo, active_hh] - active_clone_cds = clone_cds.reshape(n_clones_total, n_hh)[active_geo, active_hh] + active_clone_cds = clone_cds.reshape(n_clones_total, n_hh)[ + active_geo, active_hh + ] empty_count = np.sum(active_blocks == "") if empty_count > 0: - raise ValueError(f"{empty_count} active clones have empty block GEOIDs") + raise ValueError( + f"{empty_count} active clones have empty block GEOIDs" + ) print(f"Active clones: {n_clones:,}") print(f"Total weight: {clone_weights.sum():,.0f}") @@ -244,12 +251,16 @@ def build_h5( # === Build clone index arrays === hh_clone_idx = active_hh - persons_per_clone = np.array([len(hh_to_persons.get(h, [])) for h in active_hh]) + persons_per_clone = np.array( + [len(hh_to_persons.get(h, [])) for h in active_hh] + ) person_parts = [ np.array(hh_to_persons.get(h, []), dtype=np.int64) for h in active_hh ] person_clone_idx = ( - np.concatenate(person_parts) if person_parts else np.array([], dtype=np.int64) + np.concatenate(person_parts) + if person_parts + else np.array([], dtype=np.int64) ) entity_clone_idx = {} @@ -258,7 +269,8 @@ def build_h5( epc = np.array([len(hh_to_entity[ek].get(h, [])) for h in active_hh]) entities_per_clone[ek] = epc parts = [ - np.array(hh_to_entity[ek].get(h, []), dtype=np.int64) for h in active_hh + np.array(hh_to_entity[ek].get(h, []), dtype=np.int64) + for h in active_hh ] entity_clone_idx[ek] = ( np.concatenate(parts) if parts else np.array([], dtype=np.int64) @@ -297,7 +309,9 @@ def build_h5( sorted_keys = entity_keys[sorted_order] sorted_new = new_entity_ids[ek][sorted_order] - p_old_eids = person_entity_id_arrays[ek][person_clone_idx].astype(np.int64) + p_old_eids = person_entity_id_arrays[ek][person_clone_idx].astype( + np.int64 + ) person_keys = clone_ids_for_persons * offset + p_old_eids positions = np.searchsorted(sorted_keys, person_keys) @@ -431,8 +445,17 @@ def build_h5( time_period: clone_geo[gv].astype("S"), } + # === Set zip_code for LA County clones (ACA rating area fix) === + la_mask = clone_geo["county_fips"].astype(str) == "06037" + if la_mask.any(): + zip_codes = np.full(len(la_mask), "UNKNOWN") + zip_codes[la_mask] = "90001" + data["zip_code"] = {time_period: zip_codes.astype("S")} + # === Gap 4: Congressional district GEOID === - clone_cd_geoids = np.array([int(cd) for cd in active_clone_cds], dtype=np.int32) + clone_cd_geoids = np.array( + [int(cd) for cd in active_clone_cds], dtype=np.int32 + ) data["congressional_district_geoid"] = { time_period: clone_cd_geoids, } @@ -452,7 +475,9 @@ def build_h5( ) # Get cloned person ages and SPM unit IDs - person_ages = sim.calculate("age", map_to="person").values[person_clone_idx] + person_ages = sim.calculate("age", map_to="person").values[ + person_clone_idx + ] # Get cloned tenure types spm_tenure_holder = sim.get_holder("spm_unit_tenure_type") @@ -608,14 +633,18 @@ def build_states( if upload: print(f"Uploading {state_code}.h5 to GCP...") - upload_local_area_file(str(output_path), "states", skip_hf=True) + upload_local_area_file( + str(output_path), "states", skip_hf=True + ) hf_queue.append((str(output_path), "states")) record_completed_state(state_code) print(f"Completed {state_code}") if upload and len(hf_queue) >= hf_batch_size: - print(f"\nUploading batch of {len(hf_queue)} files to HuggingFace...") + print( + f"\nUploading batch of {len(hf_queue)} files to HuggingFace..." + ) upload_local_area_batch_to_hf(hf_queue) hf_queue = [] @@ -624,7 +653,9 @@ def build_states( raise if upload and hf_queue: - print(f"\nUploading final batch of {len(hf_queue)} files to HuggingFace...") + print( + f"\nUploading final batch of {len(hf_queue)} files to HuggingFace..." + ) upload_local_area_batch_to_hf(hf_queue) @@ -676,14 +707,18 @@ def build_districts( if upload: print(f"Uploading {friendly_name}.h5 to GCP...") - upload_local_area_file(str(output_path), "districts", skip_hf=True) + upload_local_area_file( + str(output_path), "districts", skip_hf=True + ) hf_queue.append((str(output_path), "districts")) record_completed_district(friendly_name) print(f"Completed {friendly_name}") if upload and len(hf_queue) >= hf_batch_size: - print(f"\nUploading batch of {len(hf_queue)} files to HuggingFace...") + print( + f"\nUploading batch of {len(hf_queue)} files to HuggingFace..." + ) upload_local_area_batch_to_hf(hf_queue) hf_queue = [] @@ -692,7 +727,9 @@ def build_districts( raise if upload and hf_queue: - print(f"\nUploading final batch of {len(hf_queue)} files to HuggingFace...") + print( + f"\nUploading final batch of {len(hf_queue)} files to HuggingFace..." + ) upload_local_area_batch_to_hf(hf_queue) @@ -739,7 +776,9 @@ def build_cities( if upload: print("Uploading NYC.h5 to GCP...") - upload_local_area_file(str(output_path), "cities", skip_hf=True) + upload_local_area_file( + str(output_path), "cities", skip_hf=True + ) hf_queue.append((str(output_path), "cities")) record_completed_city("NYC") @@ -750,7 +789,9 @@ def build_cities( raise if upload and hf_queue: - print(f"\nUploading batch of {len(hf_queue)} city files to HuggingFace...") + print( + f"\nUploading batch of {len(hf_queue)} city files to HuggingFace..." + ) upload_local_area_batch_to_hf(hf_queue) @@ -827,7 +868,9 @@ def main(): elif args.skip_download: inputs = { "weights": WORK_DIR / "calibration_weights.npy", - "dataset": (WORK_DIR / "source_imputed_stratified_extended_cps.h5"), + "dataset": ( + WORK_DIR / "source_imputed_stratified_extended_cps.h5" + ), } print("Using existing files in work directory:") for key, path in inputs.items(): diff --git a/policyengine_us_data/calibration/unified_matrix_builder.py b/policyengine_us_data/calibration/unified_matrix_builder.py index e9ddb494..9a3db18b 100644 --- a/policyengine_us_data/calibration/unified_matrix_builder.py +++ b/policyengine_us_data/calibration/unified_matrix_builder.py @@ -124,7 +124,9 @@ def _compute_single_state( if rerandomize_takeup: for spec in SIMPLE_TAKEUP_VARS: entity = spec["entity"] - n_ent = len(state_sim.calculate(f"{entity}_id", map_to=entity).values) + n_ent = len( + state_sim.calculate(f"{entity}_id", map_to=entity).values + ) state_sim.set_input( spec["variable"], time_period, @@ -158,7 +160,9 @@ def _compute_single_state( info["entity"] == "tax_unit" for info in affected_targets.values() ) if has_tu_target: - n_tu = len(state_sim.calculate("tax_unit_id", map_to="tax_unit").values) + n_tu = len( + state_sim.calculate("tax_unit_id", map_to="tax_unit").values + ) state_sim.set_input( "would_file_taxes_voluntarily", time_period, @@ -253,6 +257,12 @@ def _compute_single_state_group_counties( time_period, np.full(n_hh, county_idx, dtype=np.int32), ) + if county_fips == "06037": + state_sim.set_input( + "zip_code", + time_period, + np.full(n_hh, "90001"), + ) if rerandomize_takeup: for vname, (ent, orig) in original_takeup.items(): state_sim.set_input(vname, time_period, orig) @@ -281,7 +291,9 @@ def _compute_single_state_group_counties( if rerandomize_takeup: for spec in SIMPLE_TAKEUP_VARS: entity = spec["entity"] - n_ent = len(state_sim.calculate(f"{entity}_id", map_to=entity).values) + n_ent = len( + state_sim.calculate(f"{entity}_id", map_to=entity).values + ) state_sim.set_input( spec["variable"], time_period, @@ -312,10 +324,15 @@ def _compute_single_state_group_counties( entity_wf_false = {} if rerandomize_takeup: has_tu_target = any( - info["entity"] == "tax_unit" for info in affected_targets.values() + info["entity"] == "tax_unit" + for info in affected_targets.values() ) if has_tu_target: - n_tu = len(state_sim.calculate("tax_unit_id", map_to="tax_unit").values) + n_tu = len( + state_sim.calculate( + "tax_unit_id", map_to="tax_unit" + ).values + ) state_sim.set_input( "would_file_taxes_voluntarily", time_period, @@ -387,7 +404,9 @@ def _assemble_clone_values_standalone( state_masks = {int(s): clone_states == s for s in unique_clone_states} unique_person_states = np.unique(person_states) - person_state_masks = {int(s): person_states == s for s in unique_person_states} + person_state_masks = { + int(s): person_states == s for s in unique_person_states + } county_masks = {} unique_counties = None if clone_counties is not None and county_values: @@ -686,7 +705,9 @@ def _process_single_clone( ent_counties = clone_counties[ent_hh] for cfips in np.unique(ent_counties): m = ent_counties == cfips - cv = county_values.get(cfips, {}).get("entity_wf_false", {}) + cv = county_values.get(cfips, {}).get( + "entity_wf_false", {} + ) if tvar in cv: ent_wf_false[m] = cv[tvar][m] else: @@ -862,10 +883,18 @@ def _build_entity_relationship(self, sim) -> pd.DataFrame: self._entity_rel_cache = pd.DataFrame( { - "person_id": sim.calculate("person_id", map_to="person").values, - "household_id": sim.calculate("household_id", map_to="person").values, - "tax_unit_id": sim.calculate("tax_unit_id", map_to="person").values, - "spm_unit_id": sim.calculate("spm_unit_id", map_to="person").values, + "person_id": sim.calculate( + "person_id", map_to="person" + ).values, + "household_id": sim.calculate( + "household_id", map_to="person" + ).values, + "tax_unit_id": sim.calculate( + "tax_unit_id", map_to="person" + ).values, + "spm_unit_id": sim.calculate( + "spm_unit_id", map_to="person" + ).values, } ) return self._entity_rel_cache @@ -985,7 +1014,9 @@ def _build_state_values( except Exception as exc: for f in futures: f.cancel() - raise RuntimeError(f"State {st} failed: {exc}") from exc + raise RuntimeError( + f"State {st} failed: {exc}" + ) from exc else: from policyengine_us import Microsimulation from policyengine_us_data.utils.takeup import ( @@ -1041,7 +1072,9 @@ def _build_state_values( for spec in SIMPLE_TAKEUP_VARS: entity = spec["entity"] n_ent = len( - state_sim.calculate(f"{entity}_id", map_to=entity).values + state_sim.calculate( + f"{entity}_id", map_to=entity + ).values ) state_sim.set_input( spec["variable"], @@ -1257,7 +1290,9 @@ def _build_county_values( except Exception as exc: for f in futures: f.cancel() - raise RuntimeError(f"State group {sf} failed: {exc}") from exc + raise RuntimeError( + f"State group {sf} failed: {exc}" + ) from exc else: from policyengine_us import Microsimulation from policyengine_us_data.utils.takeup import ( @@ -1467,7 +1502,9 @@ def _assemble_clone_values( # Pre-compute masks to avoid recomputing per variable state_masks = {int(s): clone_states == s for s in unique_clone_states} unique_person_states = np.unique(person_states) - person_state_masks = {int(s): person_states == s for s in unique_person_states} + person_state_masks = { + int(s): person_states == s for s in unique_person_states + } county_masks = {} unique_counties = None if clone_counties is not None and county_values: @@ -1480,7 +1517,9 @@ def _assemble_clone_values( continue if var in cdv and county_values and clone_counties is not None: first_county = unique_counties[0] - if var not in county_values.get(first_county, {}).get("hh", {}): + if var not in county_values.get(first_county, {}).get( + "hh", {} + ): continue arr = np.empty(n_records, dtype=np.float32) for county in unique_counties: @@ -1622,7 +1661,9 @@ def _calculate_uprating_factors(self, params) -> dict: factors[(from_year, "cpi")] = 1.0 try: - pop_from = params.calibration.gov.census.populations.total(from_year) + pop_from = params.calibration.gov.census.populations.total( + from_year + ) pop_to = params.calibration.gov.census.populations.total( self.time_period ) @@ -1699,7 +1740,9 @@ def _get_state_uprating_factors( var_factors[var] = 1.0 continue period = row.iloc[0]["period"] - factor, _ = self._get_uprating_info(var, period, national_factors) + factor, _ = self._get_uprating_info( + var, period, national_factors + ) var_factors[var] = factor result[state_int] = var_factors @@ -1834,7 +1877,9 @@ def _make_target_name( non_geo = [c for c in constraints if c["variable"] not in _GEO_VARS] if non_geo: - strs = [f"{c['variable']}{c['operation']}{c['value']}" for c in non_geo] + strs = [ + f"{c['variable']}{c['operation']}{c['value']}" for c in non_geo + ] parts.append("[" + ",".join(strs) + "]") return "/".join(parts) @@ -1978,9 +2023,15 @@ def build_matrix( n_targets = len(targets_df) # 2. Sort targets by geographic level - targets_df["_geo_level"] = targets_df["geographic_id"].apply(get_geo_level) - targets_df = targets_df.sort_values(["_geo_level", "variable", "geographic_id"]) - targets_df = targets_df.drop(columns=["_geo_level"]).reset_index(drop=True) + targets_df["_geo_level"] = targets_df["geographic_id"].apply( + get_geo_level + ) + targets_df = targets_df.sort_values( + ["_geo_level", "variable", "geographic_id"] + ) + targets_df = targets_df.drop(columns=["_geo_level"]).reset_index( + drop=True + ) # 3. Build column index structures from geography state_col_lists: Dict[int, list] = defaultdict(list) @@ -2007,7 +2058,9 @@ def build_matrix( geo_id = row["geographic_id"] target_geo_info.append((geo_level, geo_id)) - non_geo = [c for c in constraints if c["variable"] not in _GEO_VARS] + non_geo = [ + c for c in constraints if c["variable"] not in _GEO_VARS + ] non_geo_constraints_list.append(non_geo) target_names.append( @@ -2046,10 +2099,14 @@ def build_matrix( # 5c. State-independent structures (computed once) entity_rel = self._build_entity_relationship(sim) - household_ids = sim.calculate("household_id", map_to="household").values + household_ids = sim.calculate( + "household_id", map_to="household" + ).values person_hh_ids = sim.calculate("household_id", map_to="person").values hh_id_to_idx = {int(hid): idx for idx, hid in enumerate(household_ids)} - person_hh_indices = np.array([hh_id_to_idx[int(hid)] for hid in person_hh_ids]) + person_hh_indices = np.array( + [hh_id_to_idx[int(hid)] for hid in person_hh_ids] + ) tax_benefit_system = sim.tax_benefit_system # Pre-extract entity keys so workers don't need @@ -2057,7 +2114,9 @@ def build_matrix( variable_entity_map: Dict[str, str] = {} for var in unique_variables: if var.endswith("_count") and var in tax_benefit_system.variables: - variable_entity_map[var] = tax_benefit_system.variables[var].entity.key + variable_entity_map[var] = tax_benefit_system.variables[ + var + ].entity.key # 5c-extra: Entity-to-household index maps for takeup affected_target_info = {} @@ -2072,7 +2131,9 @@ def build_matrix( # Build entity-to-household index arrays spm_to_hh_id = ( - entity_rel.groupby("spm_unit_id")["household_id"].first().to_dict() + entity_rel.groupby("spm_unit_id")["household_id"] + .first() + .to_dict() ) spm_ids = sim.calculate("spm_unit_id", map_to="spm_unit").values spm_hh_idx = np.array( @@ -2080,7 +2141,9 @@ def build_matrix( ) tu_to_hh_id = ( - entity_rel.groupby("tax_unit_id")["household_id"].first().to_dict() + entity_rel.groupby("tax_unit_id")["household_id"] + .first() + .to_dict() ) tu_ids = sim.calculate("tax_unit_id", map_to="tax_unit").values tu_hh_idx = np.array( @@ -2099,7 +2162,9 @@ def build_matrix( f"{entity_level}_id", map_to=entity_level, ).values - ent_id_to_idx = {int(eid): idx for idx, eid in enumerate(ent_ids)} + ent_id_to_idx = { + int(eid): idx for idx, eid in enumerate(ent_ids) + } person_ent_ids = entity_rel[f"{entity_level}_id"].values entity_to_person_idx[entity_level] = np.array( [ent_id_to_idx[int(eid)] for eid in person_ent_ids] @@ -2126,7 +2191,9 @@ def build_matrix( for spec in _ALL_TAKEUP: rk = spec["rate_key"] if rk not in precomputed_rates: - precomputed_rates[rk] = load_take_up_rate(rk, self.time_period) + precomputed_rates[rk] = load_take_up_rate( + rk, self.time_period + ) # Store for post-optimization stacked takeup self.entity_hh_idx_map = entity_hh_idx_map @@ -2227,7 +2294,9 @@ def build_matrix( except Exception as exc: for f in futures: f.cancel() - raise RuntimeError(f"Clone {ci} failed: {exc}") from exc + raise RuntimeError( + f"Clone {ci} failed: {exc}" + ) from exc else: # ---- Sequential clone processing (unchanged) ---- @@ -2294,7 +2363,9 @@ def build_matrix( ent_hh = entity_hh_idx_map[entity] ent_blocks = clone_blocks[ent_hh] ent_hh_ids = household_ids[ent_hh] - ent_ci = np.full(len(ent_hh), clone_idx, dtype=np.int64) + ent_ci = np.full( + len(ent_hh), clone_idx, dtype=np.int64 + ) draws = compute_block_takeup_for_entities( var_name, precomputed_rates[rate_key], @@ -2305,7 +2376,9 @@ def build_matrix( wf_draws[entity] = draws if var_name in person_vars: pidx = entity_to_person_idx[entity] - person_vars[var_name] = draws[pidx].astype(np.float32) + person_vars[var_name] = draws[pidx].astype( + np.float32 + ) # Phase 2: target loop with would_file blending for ( @@ -2326,7 +2399,9 @@ def build_matrix( ent_counties = clone_counties[ent_hh] for cfips in np.unique(ent_counties): m = ent_counties == cfips - cv = county_values.get(cfips, {}).get("entity", {}) + cv = county_values.get(cfips, {}).get( + "entity", {} + ) if tvar in cv: ent_eligible[m] = cv[tvar][m] else: @@ -2342,7 +2417,10 @@ def build_matrix( ent_eligible[m] = sv[tvar][m] # Blend for tax_unit targets - if entity_level == "tax_unit" and "tax_unit" in wf_draws: + if ( + entity_level == "tax_unit" + and "tax_unit" in wf_draws + ): ent_wf_false = np.zeros(n_ent, dtype=np.float32) if tvar in county_dep_targets and county_values: ent_counties = clone_counties[ent_hh] @@ -2355,7 +2433,9 @@ def build_matrix( ent_wf_false[m] = cv[tvar][m] else: st = int(cfips[:2]) - sv = state_values[st].get("entity_wf_false", {}) + sv = state_values[st].get( + "entity_wf_false", {} + ) if tvar in sv: ent_wf_false[m] = sv[tvar][m] else: @@ -2384,7 +2464,9 @@ def build_matrix( ent_ci, ) - ent_values = (ent_eligible * ent_takeup).astype(np.float32) + ent_values = (ent_eligible * ent_takeup).astype( + np.float32 + ) hh_result = np.zeros(n_records, dtype=np.float32) np.add.at(hh_result, ent_hh, ent_values) @@ -2444,15 +2526,17 @@ def build_matrix( constraint_key, ) if vkey not in count_cache: - count_cache[vkey] = _calculate_target_values_standalone( - target_variable=variable, - non_geo_constraints=non_geo, - n_households=n_records, - hh_vars=hh_vars, - person_vars=person_vars, - entity_rel=entity_rel, - household_ids=household_ids, - variable_entity_map=variable_entity_map, + count_cache[vkey] = ( + _calculate_target_values_standalone( + target_variable=variable, + non_geo_constraints=non_geo, + n_households=n_records, + hh_vars=hh_vars, + person_vars=person_vars, + entity_rel=entity_rel, + household_ids=household_ids, + variable_entity_map=variable_entity_map, + ) ) values = count_cache[vkey] else: From 07f602263c274bf80d053572886022e3670a8741 Mon Sep 17 00:00:00 2001 From: "baogorek@gmail.com" Date: Mon, 16 Mar 2026 16:38:20 -0400 Subject: [PATCH 4/5] Preserve zip_code across delete_arrays in county precomputation The zip_code set for LA County (06037) was being wiped by delete_arrays which only preserved "county". Also apply the 06037 zip_code fix to the in-process county precomputation path (not just the parallel worker function). Fixes #612 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../calibration/unified_matrix_builder.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/policyengine_us_data/calibration/unified_matrix_builder.py b/policyengine_us_data/calibration/unified_matrix_builder.py index 9a3db18b..d574f7a3 100644 --- a/policyengine_us_data/calibration/unified_matrix_builder.py +++ b/policyengine_us_data/calibration/unified_matrix_builder.py @@ -267,7 +267,7 @@ def _compute_single_state_group_counties( for vname, (ent, orig) in original_takeup.items(): state_sim.set_input(vname, time_period, orig) for var in get_calculated_variables(state_sim): - if var != "county": + if var not in ("county", "zip_code"): state_sim.delete_arrays(var) hh = {} @@ -300,7 +300,7 @@ def _compute_single_state_group_counties( np.ones(n_ent, dtype=bool), ) for var in get_calculated_variables(state_sim): - if var != "county": + if var not in ("county", "zip_code"): state_sim.delete_arrays(var) entity_vals = {} @@ -339,7 +339,7 @@ def _compute_single_state_group_counties( np.zeros(n_tu, dtype=bool), ) for var in get_calculated_variables(state_sim): - if var != "county": + if var not in ("county", "zip_code"): state_sim.delete_arrays(var) for tvar, info in affected_targets.items(): if info["entity"] != "tax_unit": @@ -1333,6 +1333,12 @@ def _build_county_values( dtype=np.int32, ), ) + if county_fips == "06037": + state_sim.set_input( + "zip_code", + self.time_period, + np.full(n_hh, "90001"), + ) if rerandomize_takeup: for vname, ( ent, @@ -1344,7 +1350,7 @@ def _build_county_values( orig, ) for var in get_calculated_variables(state_sim): - if var != "county": + if var not in ("county", "zip_code"): state_sim.delete_arrays(var) hh = {} @@ -1380,7 +1386,7 @@ def _build_county_values( np.ones(n_ent, dtype=bool), ) for var in get_calculated_variables(state_sim): - if var != "county": + if var not in ("county", "zip_code"): state_sim.delete_arrays(var) entity_vals = {} @@ -1425,7 +1431,7 @@ def _build_county_values( np.zeros(n_tu, dtype=bool), ) for var in get_calculated_variables(state_sim): - if var != "county": + if var not in ("county", "zip_code"): state_sim.delete_arrays(var) for ( tvar, From 080d30979af13346c62320119472fa5866e32dc1 Mon Sep 17 00:00:00 2001 From: "baogorek@gmail.com" Date: Mon, 16 Mar 2026 16:54:02 -0400 Subject: [PATCH 5/5] Remove no-op would_file pass from county precomputation The only county-dependent variable (aca_ptc) does not depend on would_file_taxes_voluntarily, so the entity_wf_false pass was computing identical values. Removing it eliminates ~2,977 extra simulation passes during --county-level builds. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../calibration/unified_matrix_builder.py | 64 ------------------- 1 file changed, 64 deletions(-) diff --git a/policyengine_us_data/calibration/unified_matrix_builder.py b/policyengine_us_data/calibration/unified_matrix_builder.py index d574f7a3..f2ba3587 100644 --- a/policyengine_us_data/calibration/unified_matrix_builder.py +++ b/policyengine_us_data/calibration/unified_matrix_builder.py @@ -321,42 +321,12 @@ def _compute_single_state_group_counties( exc, ) - entity_wf_false = {} - if rerandomize_takeup: - has_tu_target = any( - info["entity"] == "tax_unit" - for info in affected_targets.values() - ) - if has_tu_target: - n_tu = len( - state_sim.calculate( - "tax_unit_id", map_to="tax_unit" - ).values - ) - state_sim.set_input( - "would_file_taxes_voluntarily", - time_period, - np.zeros(n_tu, dtype=bool), - ) - for var in get_calculated_variables(state_sim): - if var not in ("county", "zip_code"): - state_sim.delete_arrays(var) - for tvar, info in affected_targets.items(): - if info["entity"] != "tax_unit": - continue - entity_wf_false[tvar] = state_sim.calculate( - tvar, - time_period, - map_to="tax_unit", - ).values.astype(np.float32) - results.append( ( county_fips, { "hh": hh, "entity": entity_vals, - "entity_wf_false": entity_wf_false, }, ) ) @@ -1412,43 +1382,9 @@ def _build_county_values( exc, ) - entity_wf_false = {} - if rerandomize_takeup: - has_tu_target = any( - info["entity"] == "tax_unit" - for info in affected_targets.values() - ) - if has_tu_target: - n_tu = len( - state_sim.calculate( - "tax_unit_id", - map_to="tax_unit", - ).values - ) - state_sim.set_input( - "would_file_taxes_voluntarily", - self.time_period, - np.zeros(n_tu, dtype=bool), - ) - for var in get_calculated_variables(state_sim): - if var not in ("county", "zip_code"): - state_sim.delete_arrays(var) - for ( - tvar, - info, - ) in affected_targets.items(): - if info["entity"] != "tax_unit": - continue - entity_wf_false[tvar] = state_sim.calculate( - tvar, - self.time_period, - map_to="tax_unit", - ).values.astype(np.float32) - county_values[county_fips] = { "hh": hh, "entity": entity_vals, - "entity_wf_false": entity_wf_false, } county_count += 1 if county_count % 500 == 0 or county_count == 1: