Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 66 additions & 42 deletions policyengine_us_data/calibration/publish_local_area.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,17 @@ def build_h5(
# CD subset filtering: zero out cells whose CD isn't in subset
if cd_subset is not None:
cd_subset_set = set(cd_subset)
cd_mask = np.vectorize(lambda cd: cd in cd_subset_set)(clone_cds_matrix)
cd_mask = np.vectorize(lambda cd: cd in cd_subset_set)(
clone_cds_matrix
)
W[~cd_mask] = 0

# County filtering: scale weights by P(target_counties | CD)
if county_filter is not None:
unique_cds = np.unique(clone_cds_matrix)
cd_prob = {
cd: get_county_filter_probability(cd, county_filter) for cd in unique_cds
cd: get_county_filter_probability(cd, county_filter)
for cd in unique_cds
}
p_matrix = np.vectorize(
cd_prob.__getitem__,
Expand All @@ -195,11 +198,15 @@ def build_h5(
)
clone_weights = W[active_geo, active_hh]
active_blocks = blocks.reshape(n_clones_total, n_hh)[active_geo, active_hh]
active_clone_cds = clone_cds.reshape(n_clones_total, n_hh)[active_geo, active_hh]
active_clone_cds = clone_cds.reshape(n_clones_total, n_hh)[
active_geo, active_hh
]

empty_count = np.sum(active_blocks == "")
if empty_count > 0:
raise ValueError(f"{empty_count} active clones have empty block GEOIDs")
raise ValueError(
f"{empty_count} active clones have empty block GEOIDs"
)

print(f"Active clones: {n_clones:,}")
print(f"Total weight: {clone_weights.sum():,.0f}")
Expand Down Expand Up @@ -244,12 +251,16 @@ def build_h5(
# === Build clone index arrays ===
hh_clone_idx = active_hh

persons_per_clone = np.array([len(hh_to_persons.get(h, [])) for h in active_hh])
persons_per_clone = np.array(
[len(hh_to_persons.get(h, [])) for h in active_hh]
)
person_parts = [
np.array(hh_to_persons.get(h, []), dtype=np.int64) for h in active_hh
]
person_clone_idx = (
np.concatenate(person_parts) if person_parts else np.array([], dtype=np.int64)
np.concatenate(person_parts)
if person_parts
else np.array([], dtype=np.int64)
)

entity_clone_idx = {}
Expand All @@ -258,7 +269,8 @@ def build_h5(
epc = np.array([len(hh_to_entity[ek].get(h, [])) for h in active_hh])
entities_per_clone[ek] = epc
parts = [
np.array(hh_to_entity[ek].get(h, []), dtype=np.int64) for h in active_hh
np.array(hh_to_entity[ek].get(h, []), dtype=np.int64)
for h in active_hh
]
entity_clone_idx[ek] = (
np.concatenate(parts) if parts else np.array([], dtype=np.int64)
Expand Down Expand Up @@ -297,7 +309,9 @@ def build_h5(
sorted_keys = entity_keys[sorted_order]
sorted_new = new_entity_ids[ek][sorted_order]

p_old_eids = person_entity_id_arrays[ek][person_clone_idx].astype(np.int64)
p_old_eids = person_entity_id_arrays[ek][person_clone_idx].astype(
np.int64
)
person_keys = clone_ids_for_persons * offset + p_old_eids

positions = np.searchsorted(sorted_keys, person_keys)
Expand All @@ -311,22 +325,6 @@ def build_h5(
unique_geo = derive_geography_from_blocks(unique_blocks)
clone_geo = {k: v[block_inv] for k, v in unique_geo.items()}

# === Calculate weights for all entity levels ===
person_weights = np.repeat(clone_weights, persons_per_clone)
per_person_wt = clone_weights / np.maximum(persons_per_clone, 1)

entity_weights = {}
for ek in SUB_ENTITIES:
n_ents = len(entity_clone_idx[ek])
ent_person_counts = np.zeros(n_ents, dtype=np.int32)
np.add.at(
ent_person_counts,
new_person_entity_ids[ek],
1,
)
clone_ids_e = np.repeat(np.arange(n_clones), entities_per_clone[ek])
entity_weights[ek] = per_person_wt[clone_ids_e] * ent_person_counts

# === Determine variables to save ===
vars_to_save = set(sim.input_variables)
vars_to_save.add("county")
Expand Down Expand Up @@ -413,16 +411,12 @@ def build_h5(
}

# === Override weights ===
# Only write household_weight; sub-entity weights (tax_unit_weight,
# spm_unit_weight, person_weight, etc.) are formula variables in
# policyengine-us that derive from household_weight at runtime.
data["household_weight"] = {
time_period: clone_weights.astype(np.float32),
}
data["person_weight"] = {
time_period: person_weights.astype(np.float32),
}
for ek in SUB_ENTITIES:
data[f"{ek}_weight"] = {
time_period: entity_weights[ek].astype(np.float32),
}

# === Override geography ===
data["state_fips"] = {
Expand Down Expand Up @@ -451,8 +445,17 @@ def build_h5(
time_period: clone_geo[gv].astype("S"),
}

# === Set zip_code for LA County clones (ACA rating area fix) ===
la_mask = clone_geo["county_fips"].astype(str) == "06037"
if la_mask.any():
zip_codes = np.full(len(la_mask), "UNKNOWN")
zip_codes[la_mask] = "90001"
data["zip_code"] = {time_period: zip_codes.astype("S")}

# === Gap 4: Congressional district GEOID ===
clone_cd_geoids = np.array([int(cd) for cd in active_clone_cds], dtype=np.int32)
clone_cd_geoids = np.array(
[int(cd) for cd in active_clone_cds], dtype=np.int32
)
data["congressional_district_geoid"] = {
time_period: clone_cd_geoids,
}
Expand All @@ -472,7 +475,9 @@ def build_h5(
)

# Get cloned person ages and SPM unit IDs
person_ages = sim.calculate("age", map_to="person").values[person_clone_idx]
person_ages = sim.calculate("age", map_to="person").values[
person_clone_idx
]

# Get cloned tenure types
spm_tenure_holder = sim.get_holder("spm_unit_tenure_type")
Expand Down Expand Up @@ -531,6 +536,7 @@ def build_h5(
hh_blocks=active_blocks,
hh_state_fips=hh_state_fips,
hh_ids=original_hh_ids,
hh_clone_indices=active_geo.astype(np.int64),
entity_hh_indices=entity_hh_indices,
entity_counts=entity_counts,
time_period=time_period,
Expand Down Expand Up @@ -627,14 +633,18 @@ def build_states(

if upload:
print(f"Uploading {state_code}.h5 to GCP...")
upload_local_area_file(str(output_path), "states", skip_hf=True)
upload_local_area_file(
str(output_path), "states", skip_hf=True
)
hf_queue.append((str(output_path), "states"))

record_completed_state(state_code)
print(f"Completed {state_code}")

if upload and len(hf_queue) >= hf_batch_size:
print(f"\nUploading batch of {len(hf_queue)} files to HuggingFace...")
print(
f"\nUploading batch of {len(hf_queue)} files to HuggingFace..."
)
upload_local_area_batch_to_hf(hf_queue)
hf_queue = []

Expand All @@ -643,7 +653,9 @@ def build_states(
raise

if upload and hf_queue:
print(f"\nUploading final batch of {len(hf_queue)} files to HuggingFace...")
print(
f"\nUploading final batch of {len(hf_queue)} files to HuggingFace..."
)
upload_local_area_batch_to_hf(hf_queue)


Expand Down Expand Up @@ -695,14 +707,18 @@ def build_districts(

if upload:
print(f"Uploading {friendly_name}.h5 to GCP...")
upload_local_area_file(str(output_path), "districts", skip_hf=True)
upload_local_area_file(
str(output_path), "districts", skip_hf=True
)
hf_queue.append((str(output_path), "districts"))

record_completed_district(friendly_name)
print(f"Completed {friendly_name}")

if upload and len(hf_queue) >= hf_batch_size:
print(f"\nUploading batch of {len(hf_queue)} files to HuggingFace...")
print(
f"\nUploading batch of {len(hf_queue)} files to HuggingFace..."
)
upload_local_area_batch_to_hf(hf_queue)
hf_queue = []

Expand All @@ -711,7 +727,9 @@ def build_districts(
raise

if upload and hf_queue:
print(f"\nUploading final batch of {len(hf_queue)} files to HuggingFace...")
print(
f"\nUploading final batch of {len(hf_queue)} files to HuggingFace..."
)
upload_local_area_batch_to_hf(hf_queue)


Expand Down Expand Up @@ -758,7 +776,9 @@ def build_cities(

if upload:
print("Uploading NYC.h5 to GCP...")
upload_local_area_file(str(output_path), "cities", skip_hf=True)
upload_local_area_file(
str(output_path), "cities", skip_hf=True
)
hf_queue.append((str(output_path), "cities"))

record_completed_city("NYC")
Expand All @@ -769,7 +789,9 @@ def build_cities(
raise

if upload and hf_queue:
print(f"\nUploading batch of {len(hf_queue)} city files to HuggingFace...")
print(
f"\nUploading batch of {len(hf_queue)} city files to HuggingFace..."
)
upload_local_area_batch_to_hf(hf_queue)


Expand Down Expand Up @@ -846,7 +868,9 @@ def main():
elif args.skip_download:
inputs = {
"weights": WORK_DIR / "calibration_weights.npy",
"dataset": (WORK_DIR / "source_imputed_stratified_extended_cps.h5"),
"dataset": (
WORK_DIR / "source_imputed_stratified_extended_cps.h5"
),
}
print("Using existing files in work directory:")
for key, path in inputs.items():
Expand Down
Loading
Loading