diff --git a/changelog.d/oa-calibration-pipeline.added.md b/changelog.d/oa-calibration-pipeline.added.md new file mode 100644 index 00000000..b02a4f42 --- /dev/null +++ b/changelog.d/oa-calibration-pipeline.added.md @@ -0,0 +1 @@ +Add Output Area crosswalk and geographic assignment for OA-level calibration pipeline. diff --git a/docs/oa_calibration_pipeline.md b/docs/oa_calibration_pipeline.md new file mode 100644 index 00000000..086af562 --- /dev/null +++ b/docs/oa_calibration_pipeline.md @@ -0,0 +1,109 @@ +# Output Area Calibration Pipeline + +This document describes the plan to port the US-side clone-and-prune calibration methodology to the UK data, going down to Output Area (OA) level — the UK equivalent of the US Census Block. + +## Background + +The US pipeline (policyengine-us-data) uses an L0-regularized clone-and-prune approach: +1. Clone each CPS household N times +2. Assign each clone a different Census Block (population-weighted) +3. Build a sparse calibration matrix (targets x records) +4. Run L0-regularized optimization to drop most clones, keeping only the best-fitting records per area +5. Publish per-area H5 files from the sparse weight vector + +The UK pipeline currently uses standard PyTorch Adam gradient descent on a dense weight matrix (n_areas x n_households) at constituency (650) and local authority (360) level. We want to bring the US approach to the UK at Output Area (~180K OAs) granularity. + +## Implementation Phases + +### Phase 1: Output Area Crosswalk & Geographic Assignment +**Status: In Progress** + +Build the OA crosswalk and population-weighted assignment function. + +**Deliverables:** +- `policyengine_uk_data/calibration/oa_crosswalk.py` — downloads/builds the OA → LSOA → MSOA → LA → constituency → region → country crosswalk +- `policyengine_uk_data/storage/oa_crosswalk.csv.gz` — compressed crosswalk file +- `policyengine_uk_data/calibration/oa_assignment.py` — assigns cloned records to OAs (population-weighted, country-constrained) +- Tests validating crosswalk completeness and assignment correctness + +**Data sources:** +- ONS Open Geography Portal: OA → LSOA → MSOA → LA lookup +- ONS mid-year population estimates at OA level +- ONS OA → constituency lookup (2024 boundaries) + +**US reference:** PR #484 (census-block-assignment) + +--- + +### Phase 2: Clone-and-Assign +**Status: Not Started** + +Clone each FRS household N times and assign each clone a different OA. + +**Deliverables:** +- `policyengine_uk_data/calibration/clone_and_assign.py` +- Modify `datasets/create_datasets.py` to insert clone step after imputations, before calibration + +**Key design:** +- N=10 clones initially (tune later) +- Constituency collision avoidance: each clone gets a different constituency where possible +- Country constraint preserved: English households → English OAs only + +**US reference:** PR #457 (district-h5) + PR #531 (census-block-calibration) + +--- + +### Phase 3: L0 Calibration Engine +**Status: Not Started** + +Port L0-regularized optimization from US side. + +**Deliverables:** +- `policyengine_uk_data/utils/calibrate_l0.py` +- Add `l0-python` dependency to `pyproject.toml` + +**Key design:** +- HardConcrete gates for continuous L0 relaxation +- Relative squared error loss +- L0 + L2 regularization with presets (local vs national) +- Keep existing `calibrate.py` as fallback during validation + +**US reference:** PR #364 (bogorek-l0) + PR #365 + +--- + +### Phase 4: Sparse Matrix Builder +**Status: Not Started** + +Build sparse calibration matrix from cloned dataset. + +**Deliverables:** +- `policyengine_uk_data/calibration/matrix_builder.py` +- Wire existing `targets/sources/` definitions into sparse matrix rows + +**US reference:** PR #456 + PR #489 + +--- + +### Phase 5: SQLite Target Database +**Status: Not Started** + +Hierarchical target storage: UK → country → region → LA → constituency → MSOA → LSOA → OA. + +**Deliverables:** +- `policyengine_uk_data/db/` directory with ETL scripts +- Migrate existing CSV/Excel targets into SQLite + +**US reference:** PR #398 (treasury) + PR #488 (db-work) + +--- + +### Phase 6: Local Area Publishing +**Status: Not Started** + +Generate per-area H5 files from sparse weights. Modal integration for scale. + +**Deliverables:** +- `policyengine_uk_data/calibration/publish_local_h5s.py` + +**US reference:** PR #465 (modal) diff --git a/policyengine_uk_data/calibration/__init__.py b/policyengine_uk_data/calibration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/policyengine_uk_data/calibration/oa_assignment.py b/policyengine_uk_data/calibration/oa_assignment.py new file mode 100644 index 00000000..9fc1dab8 --- /dev/null +++ b/policyengine_uk_data/calibration/oa_assignment.py @@ -0,0 +1,303 @@ +"""Output Area assignment for cloned FRS records. + +Assigns population-weighted random Output Areas to household +clones, with constituency collision avoidance (each clone of +the same household gets a different constituency where +possible) and country constraints (English households get +English OAs only, etc.). + +Analogous to policyengine-us-data's clone_and_assign.py. +""" + +import logging +from dataclasses import dataclass +from functools import lru_cache +from pathlib import Path +from typing import Dict, Optional + +import numpy as np +import pandas as pd + +from policyengine_uk_data.calibration.oa_crosswalk import ( + CROSSWALK_PATH, + load_oa_crosswalk, +) + +logger = logging.getLogger(__name__) + +# Country name → OA code prefix mapping +COUNTRY_PREFIXES = { + "England": "E", + "Wales": "W", + "Scotland": "S", + "Northern Ireland": "N", # NI DZ codes start with digits +} + +# Map FRS country codes to country names. +# FRS uses numeric codes: 1=England, 2=Wales, 3=Scotland, +# 4=Northern Ireland +FRS_COUNTRY_MAP = { + 1: "England", + 2: "Wales", + 3: "Scotland", + 4: "Northern Ireland", +} + + +@dataclass +class GeographyAssignment: + """Random geography assignment for cloned FRS records. + + All arrays have length n_records * n_clones. + Index i corresponds to clone (i // n_records), + record (i % n_records). + """ + + oa_code: np.ndarray # str, OA/DZ codes + lsoa_code: np.ndarray # str, LSOA/DZ codes + msoa_code: np.ndarray # str, MSOA/IZ codes + la_code: np.ndarray # str, LA codes + constituency_code: np.ndarray # str, constituency codes + region_code: np.ndarray # str, region codes + country: np.ndarray # str, country names + n_records: int + n_clones: int + + +@lru_cache(maxsize=1) +def _load_country_distributions( + crosswalk_path: Optional[str] = None, +) -> Dict[str, Dict]: + """Load OA distributions grouped by country. + + Returns: + Dict mapping country name to dict with keys: + - oa_codes: np.ndarray of OA code strings + - constituencies: np.ndarray of constituency codes + - probs: np.ndarray of population-weighted + probabilities (sum to 1 within country) + - crosswalk_idx: np.ndarray of indices into the + full crosswalk DataFrame + """ + path = Path(crosswalk_path) if crosswalk_path else None + xw = load_oa_crosswalk(path) + + # Ensure population is numeric + xw["population"] = pd.to_numeric(xw["population"], errors="coerce").fillna(0) + + distributions = {} + for country_name in [ + "England", + "Wales", + "Scotland", + "Northern Ireland", + ]: + mask = xw["country"] == country_name + subset = xw[mask].copy() + + if len(subset) == 0: + logger.warning(f"No OAs found for {country_name}") + continue + + pop = subset["population"].values.astype(np.float64) + total = pop.sum() + if total == 0: + # Uniform if no population data + probs = np.ones(len(subset)) / len(subset) + else: + probs = pop / total + + distributions[country_name] = { + "oa_codes": subset["oa_code"].values, + "constituencies": subset["constituency_code"].values, + "lsoa_codes": subset["lsoa_code"].values, + "msoa_codes": subset["msoa_code"].values, + "la_codes": subset["la_code"].values, + "region_codes": subset["region_code"].values, + "probs": probs, + } + + return distributions + + +def assign_random_geography( + household_countries: np.ndarray, + n_clones: int = 10, + seed: int = 42, + crosswalk_path: Optional[str] = None, +) -> GeographyAssignment: + """Assign random OA geography to cloned FRS records. + + Each of n_records * n_clones total records gets a random + Output Area sampled from the country-specific + population-weighted distribution. LA, constituency, + region are derived from the OA. + + Constituency collision avoidance: each clone of the same + household gets a different constituency where possible + (up to 50 retry iterations). + + Args: + household_countries: Array of length n_records with + FRS country codes (1-4) or country name strings. + n_clones: Number of clones per household. + seed: Random seed for reproducibility. + crosswalk_path: Override crosswalk file path. + + Returns: + GeographyAssignment with arrays of length + n_records * n_clones. + """ + n_records = len(household_countries) + n_total = n_records * n_clones + + # Normalise country codes to names + if np.issubdtype(household_countries.dtype, np.integer): + countries = np.array( + [FRS_COUNTRY_MAP.get(int(c), "England") for c in household_countries] + ) + else: + countries = np.asarray(household_countries, dtype=str) + + distributions = _load_country_distributions( + str(crosswalk_path) if crosswalk_path else None + ) + + rng = np.random.default_rng(seed) + + # Output arrays + oa_codes = np.empty(n_total, dtype=object) + constituency_codes = np.empty(n_total, dtype=object) + lsoa_codes = np.empty(n_total, dtype=object) + msoa_codes = np.empty(n_total, dtype=object) + la_codes = np.empty(n_total, dtype=object) + region_codes = np.empty(n_total, dtype=object) + country_names = np.empty(n_total, dtype=object) + + # Group households by country for efficient sampling + unique_countries = np.unique(countries) + + # Track assigned constituencies per record for + # collision avoidance + assigned_const = np.empty((n_clones, n_records), dtype=object) + + for clone_idx in range(n_clones): + start = clone_idx * n_records + end = start + n_records + + for country_name in unique_countries: + if country_name not in distributions: + logger.warning(f"No distribution for {country_name}, skipping") + continue + + dist = distributions[country_name] + hh_mask = countries == country_name + n_hh = hh_mask.sum() + + if n_hh == 0: + continue + + # Sample OAs + indices = rng.choice( + len(dist["oa_codes"]), + size=n_hh, + p=dist["probs"], + ) + + sampled_const = dist["constituencies"][indices] + + # Constituency collision avoidance + if clone_idx > 0: + # Find records where we've seen this + # constituency before + hh_positions = np.where(hh_mask)[0] + collisions = np.zeros(n_hh, dtype=bool) + for prev in range(clone_idx): + prev_const = assigned_const[prev, hh_positions] + collisions |= sampled_const == prev_const + + for _ in range(50): + n_bad = collisions.sum() + if n_bad == 0: + break + new_idx = rng.choice( + len(dist["oa_codes"]), + size=n_bad, + p=dist["probs"], + ) + indices[collisions] = new_idx + sampled_const = dist["constituencies"][indices] + collisions = np.zeros(n_hh, dtype=bool) + for prev in range(clone_idx): + prev_const = assigned_const[prev, hh_positions] + collisions |= sampled_const == prev_const + + # Store results + positions = np.where(hh_mask)[0] + for i, pos in enumerate(positions): + idx = start + pos + oa_codes[idx] = dist["oa_codes"][indices[i]] + constituency_codes[idx] = dist["constituencies"][indices[i]] + lsoa_codes[idx] = dist["lsoa_codes"][indices[i]] + msoa_codes[idx] = dist["msoa_codes"][indices[i]] + la_codes[idx] = dist["la_codes"][indices[i]] + region_codes[idx] = dist["region_codes"][indices[i]] + country_names[idx] = country_name + + assigned_const[clone_idx, positions] = sampled_const + + return GeographyAssignment( + oa_code=oa_codes, + lsoa_code=lsoa_codes, + msoa_code=msoa_codes, + la_code=la_codes, + constituency_code=constituency_codes, + region_code=region_codes, + country=country_names, + n_records=n_records, + n_clones=n_clones, + ) + + +def save_geography(geography: GeographyAssignment, path: Path) -> None: + """Save a GeographyAssignment to a compressed .npz file. + + Args: + geography: The geography assignment to save. + path: Output file path (should end in .npz). + """ + np.savez_compressed( + path, + oa_code=geography.oa_code, + lsoa_code=geography.lsoa_code, + msoa_code=geography.msoa_code, + la_code=geography.la_code, + constituency_code=geography.constituency_code, + region_code=geography.region_code, + country=geography.country, + n_records=np.array([geography.n_records]), + n_clones=np.array([geography.n_clones]), + ) + + +def load_geography(path: Path) -> GeographyAssignment: + """Load a GeographyAssignment from a .npz file. + + Args: + path: Path to the .npz file. + + Returns: + GeographyAssignment with all fields restored. + """ + data = np.load(path, allow_pickle=True) + return GeographyAssignment( + oa_code=data["oa_code"], + lsoa_code=data["lsoa_code"], + msoa_code=data["msoa_code"], + la_code=data["la_code"], + constituency_code=data["constituency_code"], + region_code=data["region_code"], + country=data["country"], + n_records=int(data["n_records"][0]), + n_clones=int(data["n_clones"][0]), + ) diff --git a/policyengine_uk_data/calibration/oa_crosswalk.py b/policyengine_uk_data/calibration/oa_crosswalk.py new file mode 100644 index 00000000..4e2d8ae4 --- /dev/null +++ b/policyengine_uk_data/calibration/oa_crosswalk.py @@ -0,0 +1,758 @@ +"""Build a unified UK Output Area crosswalk. + +Downloads OA-level geographic lookups from ONS (England & Wales), +NRS (Scotland), and NISRA (Northern Ireland) and combines them +into a single crosswalk mapping: + + OA -> LSOA/DZ -> MSOA/IZ -> LA -> constituency -> region -> country + +The crosswalk also includes OA-level population estimates from +Census 2021 (E+W, NI) and Census 2022 (Scotland). + +Output: storage/oa_crosswalk.csv.gz + +Columns: + oa_code - Output Area GSS code (E00/W00/S00/95xx) + lsoa_code - LSOA (E+W) or Data Zone (Scotland) code + msoa_code - MSOA (E+W) or Intermediate Zone (Scotland) code + la_code - Local Authority District GSS code + constituency_code - Parliamentary constituency 2024 GSS code + region_code - Region GSS code (E12/W99/S99/N99) + country - Country name (England/Wales/Scotland/Northern Ireland) + population - Population (Census 2021 or 2022) +""" + +import io +import logging +import zipfile +from pathlib import Path +from typing import Optional + +import numpy as np +import pandas as pd +import requests + +logger = logging.getLogger(__name__) + +STORAGE_FOLDER = Path(__file__).parent.parent / "storage" +CROSSWALK_PATH = STORAGE_FOLDER / "oa_crosswalk.csv.gz" + +# ── Download URLs ───────────────────────────────────────── + +# ONS Hub CSV download: OA21 → LSOA21 → MSOA21 → LAD22 (E+W) +_EW_OA_LOOKUP_URL = ( + "https://open-geography-portalx-ons.hub.arcgis.com/" + "api/download/v1/items/" + "b9ca90c10aaa4b8d9791e9859a38ca67/csv?layers=0" +) + +# ONS Hub CSV download: OA21 → PCON25 (E+W constituency 2024) +_EW_OA_CONST_URL = ( + "https://open-geography-portalx-ons.hub.arcgis.com/" + "api/download/v1/items/" + "5968b5b2c0f14dd29ba277beaae6dec3/csv?layers=0" +) + +# ONS Hub CSV download: LAD24 → RGN24 (England only) +_EN_LAD_REGION_URL = ( + "https://open-geography-portalx-ons.hub.arcgis.com/" + "api/download/v1/items/" + "3959874c514b470e9dd160acdc00c97a/csv?layers=0" +) + +# Nomis Census 2021 TS001 (population by OA) - bulk zip +_EW_POPULATION_URL = ( + "https://www.nomisweb.co.uk/output/census/2021/census2021-ts001.zip" +) + +# NRS Scotland: OA22 → DZ22 → IZ22 (zip) +_SCOTLAND_OA_DZ_URL = "https://www.nrscotland.gov.uk/media/iz3evrqt/oa22_dz22_iz22.zip" + +# NRS Scotland: OA22 → UK Parliamentary Constituency 2024 +_SCOTLAND_OA_CONST_URL = "https://www.nrscotland.gov.uk/media/njkmhppf/oa22_ukpc24.zip" + +# statistics.gov.scot: DZ22 → IZ22 → LA → constituency +_SCOTLAND_DZ_LOOKUP_URL = ( + "https://statistics.gov.scot/downloads/file?" + "id=75ff05d2-d482-4463-81b6-76b8dd6b6d3b/" + "DataZone2022lookup_2025-10-28.csv" +) + +# Scotland Census 2022 OA population +_SCOTLAND_OA_POP_URL = ( + "https://www.scotlandscensus.gov.uk/media/kqcmo4ge/output-area-2022-all-persons.csv" +) + +# NISRA NI DZ2021 lookup +_NI_DZ_LOOKUP_URL = ( + "https://www.nisra.gov.uk/sites/nisra.gov.uk/files/" + "publications/geography-dz2021-lookup-tables.xlsx" +) + +# NISRA NI DZ2021 population +_NI_DZ_POP_URL = ( + "https://www.nisra.gov.uk/sites/nisra.gov.uk/files/" + "publications/" + "census-2021-person-estimates-data-zones.xlsx" +) + + +def _download_csv(url: str, timeout: int = 300) -> pd.DataFrame: + """Download a CSV file from a URL. + + Args: + url: URL to download. + timeout: Request timeout in seconds. + + Returns: + DataFrame from the CSV. + """ + logger.info(f"Downloading {url[:80]}...") + resp = requests.get(url, timeout=timeout) + resp.raise_for_status() + # Try utf-8-sig (strips BOM), fall back to latin-1 + for enc in ("utf-8-sig", "utf-8", "latin-1"): + try: + text = resp.content.decode(enc) + return pd.read_csv(io.StringIO(text)) + except (UnicodeDecodeError, ValueError): + continue + # Last resort + return pd.read_csv(io.BytesIO(resp.content)) + + +def _download_csv_from_zip( + url: str, + csv_filter: str = ".csv", + timeout: int = 300, +) -> pd.DataFrame: + """Download a ZIP and extract the first CSV matching + the filter. + + Args: + url: URL of the ZIP file. + csv_filter: Substring that the CSV filename must + contain (case-insensitive). + timeout: Request timeout in seconds. + + Returns: + DataFrame from the extracted CSV. + """ + logger.info(f"Downloading ZIP {url[:80]}...") + resp = requests.get(url, timeout=timeout) + resp.raise_for_status() + + with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: + csv_files = [ + n + for n in zf.namelist() + if n.lower().endswith(".csv") and csv_filter.lower() in n.lower() + ] + if not csv_files: + # Fallback: any CSV + csv_files = [n for n in zf.namelist() if n.lower().endswith(".csv")] + if not csv_files: + raise FileNotFoundError(f"No CSV found in ZIP. Contents: {zf.namelist()}") + logger.info(f" Extracting {csv_files[0]}") + with zf.open(csv_files[0]) as f: + raw = f.read().decode("utf-8-sig") + return pd.read_csv(io.StringIO(raw)) + + +# ── England & Wales ─────────────────────────────────────── + + +def _get_ew_oa_hierarchy() -> pd.DataFrame: + """Download OA → LSOA → MSOA → LAD lookup for E+W. + + Returns: + DataFrame with columns: oa_code, lsoa_code, + msoa_code, la_code + """ + df = _download_csv(_EW_OA_LOOKUP_URL) + logger.info(f" E+W hierarchy: {len(df)} rows, columns {df.columns.tolist()[:6]}") + + # Strip whitespace from column names + df.columns = df.columns.str.strip() + + # Direct rename - known column names from ONS + col_map = { + "OA21CD": "oa_code", + "LSOA21CD": "lsoa_code", + "MSOA21CD": "msoa_code", + "LAD22CD": "la_code", + } + # Handle different LAD vintages + for c in df.columns: + if c.startswith("LAD") and c.endswith("CD"): + col_map[c] = "la_code" + + df = df.rename(columns=col_map) + return df[["oa_code", "lsoa_code", "msoa_code", "la_code"]] + + +def _get_ew_population() -> pd.DataFrame: + """Download Census 2021 OA-level population for E+W. + + Returns: + DataFrame with columns: oa_code, population + """ + df = _download_csv_from_zip(_EW_POPULATION_URL, csv_filter="oa") + logger.info(f" TS001 columns: {df.columns.tolist()}") + + # Find geography code and total population columns + geo_col = None + obs_col = None + for c in df.columns: + cl = c.lower() + if "geography code" in cl: + geo_col = c + elif "total" in cl and "measures" in cl: + obs_col = c + elif ( + cl + in ( + "observation", + "obs_value", + "count", + ) + or "observation" in cl + ): + if obs_col is None: + obs_col = c + + if geo_col is None: + geo_col = [c for c in df.columns if "code" in c.lower()][0] + if obs_col is None: + obs_col = df.select_dtypes(include="number").columns[-1] + + result = pd.DataFrame( + { + "oa_code": df[geo_col].astype(str), + "population": pd.to_numeric(df[obs_col], errors="coerce") + .fillna(0) + .astype(int), + } + ) + # Keep only OA-level codes (E00/W00) + result = result[result["oa_code"].str.match(r"^[EW]00")].copy() + return result + + +def _get_ew_constituency() -> pd.DataFrame: + """Download OA → constituency (2024) for E+W. + + Returns: + DataFrame with columns: oa_code, constituency_code + """ + df = _download_csv(_EW_OA_CONST_URL) + logger.info( + f" E+W constituency: {len(df)} rows, columns {df.columns.tolist()[:6]}" + ) + + df.columns = df.columns.str.strip() + col_map = {"OA21CD": "oa_code"} + for c in df.columns: + if c.startswith("PCON") and c.endswith("CD"): + col_map[c] = "constituency_code" + df = df.rename(columns=col_map) + return df[["oa_code", "constituency_code"]] + + +# ── Scotland ────────────────────────────────────────────── + + +def _get_scotland_oa_hierarchy() -> pd.DataFrame: + """Build Scotland OA → DZ → IZ → LA hierarchy. + + Returns: + DataFrame with columns: oa_code, lsoa_code (=DZ), + msoa_code (=IZ), la_code, constituency_code + """ + # OA → DZ → IZ from NRS zip + oa_dz = _download_csv_from_zip(_SCOTLAND_OA_DZ_URL, csv_filter="") + logger.info( + f" Scotland OA→DZ: {len(oa_dz)} rows, columns {oa_dz.columns.tolist()}" + ) + + # Find column names dynamically + oa_col = [c for c in oa_dz.columns if "oa" in c.lower() and "code" in c.lower()] + dz_col = [ + c + for c in oa_dz.columns + if "dz" in c.lower() or "datazone" in c.lower() and "code" in c.lower() + ] + + if not oa_col: + oa_col = [oa_dz.columns[0]] + if not dz_col: + dz_col = [oa_dz.columns[1]] + + oa_dz = oa_dz.rename( + columns={ + oa_col[0]: "oa_code", + dz_col[0]: "lsoa_code", + } + ) + + # DZ → LA → constituency from statistics.gov.scot + dz_lookup = _download_csv(_SCOTLAND_DZ_LOOKUP_URL) + logger.info( + f" Scotland DZ lookup: {len(dz_lookup)} rows, " + f"columns {dz_lookup.columns.tolist()[:8]}" + ) + + # Find columns + dz_lk_col = [ + c for c in dz_lookup.columns if "dz" in c.lower() and "code" in c.lower() + ] + iz_col = [c for c in dz_lookup.columns if "iz" in c.lower() and "code" in c.lower()] + la_col = [c for c in dz_lookup.columns if "la" in c.lower() and "code" in c.lower()] + ukpc_col = [ + c for c in dz_lookup.columns if "ukpc" in c.lower() or "pcon" in c.lower() + ] + + if not dz_lk_col: + dz_lk_col = [dz_lookup.columns[0]] + if not iz_col: + iz_col = [dz_lookup.columns[2]] + if not la_col: + la_col = [dz_lookup.columns[4]] + + rename_map = { + dz_lk_col[0]: "lsoa_code", + iz_col[0]: "msoa_code", + la_col[0]: "la_code", + } + if ukpc_col: + rename_map[ukpc_col[0]] = "constituency_code" + + dz_lookup = dz_lookup.rename(columns=rename_map) + + merge_cols = ["lsoa_code", "msoa_code", "la_code"] + if "constituency_code" in dz_lookup.columns: + merge_cols.append("constituency_code") + + # Deduplicate DZ lookup (multiple rows per DZ in some + # versions) + dz_dedup = dz_lookup[merge_cols].drop_duplicates(subset=["lsoa_code"]) + + result = oa_dz[["oa_code", "lsoa_code"]].merge(dz_dedup, on="lsoa_code", how="left") + + # Also try OA → constituency direct lookup + try: + oa_const = _download_csv_from_zip(_SCOTLAND_OA_CONST_URL, csv_filter="") + oa_c_col = [ + c for c in oa_const.columns if "oa" in c.lower() and "code" in c.lower() + ] + const_c_col = [ + c + for c in oa_const.columns + if "ukpc" in c.lower() or "pcon" in c.lower() or "const" in c.lower() + ] + if oa_c_col and const_c_col: + oa_const = oa_const.rename( + columns={ + oa_c_col[0]: "oa_code", + const_c_col[0]: "const_direct", + } + ) + result = result.merge( + oa_const[["oa_code", "const_direct"]], + on="oa_code", + how="left", + ) + # Prefer direct OA→const over DZ-derived + if "constituency_code" not in result.columns: + result["constituency_code"] = "" + mask = result["const_direct"].notna() + result.loc[mask, "constituency_code"] = result.loc[mask, "const_direct"] + result = result.drop(columns=["const_direct"]) + except Exception as e: + logger.warning( + f"Could not download Scotland OA→constituency direct lookup: {e}" + ) + + if "constituency_code" not in result.columns: + result["constituency_code"] = "" + + return result + + +def _get_scotland_oa_population() -> pd.DataFrame: + """Get Scotland Census 2022 OA-level population. + + Tries the NRS OA population CSV first. If that fails + (403), falls back to assigning equal population within + each Data Zone using the DZ lookup. + + Returns: + DataFrame with columns: oa_code, population + """ + # Try direct OA population download + try: + df = _download_csv(_SCOTLAND_OA_POP_URL) + logger.info( + f" Scotland pop: {len(df)} rows, columns {df.columns.tolist()[:5]}" + ) + + oa_col = [ + c + for c in df.columns + if ("output" in c.lower() or "oa" in c.lower()) and "code" in c.lower() + ] + if not oa_col: + for c in df.columns: + if df[c].dtype == object: + sample = str(df[c].iloc[0]) + if sample.startswith("S00"): + oa_col = [c] + break + if not oa_col: + oa_col = [df.columns[0]] + + count_col = [ + c + for c in df.columns + if "count" in c.lower() or "total" in c.lower() or "all person" in c.lower() + ] + if not count_col: + num_cols = df.select_dtypes(include="number").columns + if len(num_cols) > 0: + count_col = [num_cols[0]] + + if count_col: + return pd.DataFrame( + { + "oa_code": df[oa_col[0]].astype(str), + "population": pd.to_numeric(df[count_col[0]], errors="coerce") + .fillna(0) + .astype(int), + } + ) + except Exception as e: + logger.warning( + f"Could not download Scotland OA population: {e}. Using uniform population." + ) + + # Fallback: use OA→DZ lookup and assign ~120 per OA + # (Scotland pop ~5.4M / 46K OAs ≈ 117) + logger.info(" Using uniform population estimate for Scotland OAs (~117 per OA)") + oa_dz = _download_csv_from_zip(_SCOTLAND_OA_DZ_URL, csv_filter="") + oa_col = oa_dz.columns[0] + return pd.DataFrame( + { + "oa_code": oa_dz[oa_col].astype(str), + "population": 117, + } + ) + + +# ── Northern Ireland ────────────────────────────────────── + + +def _get_ni_hierarchy() -> pd.DataFrame: + """Build NI Data Zone hierarchy. + + NI does not publish census data at OA level. The smallest + published geography is the Data Zone (DZ2021, ~3,780 areas). + We treat NI Data Zones as the OA-equivalent. + + Returns: + DataFrame with columns: oa_code (=DZ2021), + lsoa_code (=DZ2021), msoa_code (=SDZ2021), + la_code (LGD2014) + """ + logger.info("Downloading NI DZ2021 lookup...") + try: + resp = requests.get(_NI_DZ_LOOKUP_URL, timeout=120) + resp.raise_for_status() + except Exception as e: + logger.warning( + f"Could not download NI DZ lookup: {e}. Returning empty NI hierarchy." + ) + return pd.DataFrame( + columns=[ + "oa_code", + "lsoa_code", + "msoa_code", + "la_code", + ] + ) + df = pd.read_excel(io.BytesIO(resp.content)) + + logger.info(f" NI lookup: {len(df)} rows, columns {df.columns.tolist()[:6]}") + + dz_col = [c for c in df.columns if "dz2021" in c.lower() and "code" in c.lower()] + sdz_col = [c for c in df.columns if "sdz2021" in c.lower() and "code" in c.lower()] + lgd_col = [c for c in df.columns if "lgd" in c.lower() and "code" in c.lower()] + + if not dz_col or not sdz_col or not lgd_col: + logger.warning( + f"NI columns not as expected: {df.columns.tolist()}. Using positional." + ) + dz_col = [df.columns[0]] + sdz_col = [df.columns[1]] + lgd_col = [df.columns[2]] + + return pd.DataFrame( + { + "oa_code": df[dz_col[0]].astype(str), + "lsoa_code": df[dz_col[0]].astype(str), + "msoa_code": df[sdz_col[0]].astype(str), + "la_code": df[lgd_col[0]].astype(str), + } + ) + + +def _get_ni_population() -> pd.DataFrame: + """Get NI Census 2021 population at Data Zone level. + + Returns: + DataFrame with columns: oa_code, population + """ + logger.info("Downloading NI DZ population...") + try: + resp = requests.get(_NI_DZ_POP_URL, timeout=120) + resp.raise_for_status() + except Exception as e: + logger.warning(f"Could not download NI population: {e}. Returning empty.") + return pd.DataFrame(columns=["oa_code", "population"]) + df = pd.read_excel(io.BytesIO(resp.content)) + + logger.info(f" NI pop: {len(df)} rows, columns {df.columns.tolist()[:5]}") + + dz_col = [c for c in df.columns if "dz" in c.lower() and "code" in c.lower()] + pop_col = [ + c + for c in df.columns + if "population" in c.lower() or "all" in c.lower() or "total" in c.lower() + ] + + if not dz_col: + dz_col = [df.columns[0]] + if not pop_col: + num_cols = df.select_dtypes(include="number").columns + pop_col = [num_cols[0]] if len(num_cols) > 0 else None + + if pop_col is None: + return pd.DataFrame( + { + "oa_code": df[dz_col[0]].astype(str), + "population": 1, + } + ) + + return pd.DataFrame( + { + "oa_code": df[dz_col[0]].astype(str), + "population": pd.to_numeric(df[pop_col[0]], errors="coerce") + .fillna(0) + .astype(int), + } + ) + + +# ── Region & Country Assignment ─────────────────────────── + + +def _get_la_to_region_map() -> dict: + """Download LAD → region mapping for England. + + Returns: + Dict mapping LAD code to region code. + """ + try: + df = _download_csv(_EN_LAD_REGION_URL) + logger.info(f" LAD→region: {len(df)} rows, columns {df.columns.tolist()[:4]}") + + lad_col = [c for c in df.columns if "lad" in c.lower() and "cd" in c.lower()] + rgn_col = [c for c in df.columns if "rgn" in c.lower() and "cd" in c.lower()] + + if lad_col and rgn_col: + return dict(zip(df[lad_col[0]], df[rgn_col[0]])) + except Exception as e: + logger.warning(f"Could not download region lookup: {e}") + + return {} + + +def _assign_regions(df: pd.DataFrame) -> pd.DataFrame: + """Assign region codes based on LA code. + + Args: + df: DataFrame with la_code column. + + Returns: + DataFrame with region_code column added. + """ + la_to_region = _get_la_to_region_map() + + def get_region(la_code: str) -> str: + if not isinstance(la_code, str): + return "" + if la_code in la_to_region: + return la_to_region[la_code] + if la_code.startswith("W"): + return "W99999999" + if la_code.startswith("S"): + return "S99999999" + if la_code.startswith("N"): + return "N99999999" + # Try matching on different LAD vintage + for k, v in la_to_region.items(): + if k[:3] == la_code[:3]: + # Same LA type prefix + pass + return "" + + df["region_code"] = df["la_code"].apply(get_region) + return df + + +def _assign_country(df: pd.DataFrame) -> pd.DataFrame: + """Assign country name based on OA code prefix. + + Args: + df: DataFrame with oa_code column. + + Returns: + DataFrame with country column added. + """ + + def get_country(oa_code: str) -> str: + if not isinstance(oa_code, str): + return "Unknown" + if oa_code.startswith("E"): + return "England" + elif oa_code.startswith("W"): + return "Wales" + elif oa_code.startswith("S"): + return "Scotland" + else: + return "Northern Ireland" + + df["country"] = df["oa_code"].apply(get_country) + return df + + +# ── Main Build Function ────────────────────────────────── + + +def build_oa_crosswalk( + save: bool = True, + output_path: Optional[Path] = None, +) -> pd.DataFrame: + """Build the unified UK Output Area crosswalk. + + Downloads data from ONS, NRS, and NISRA, combines into + a single crosswalk, and optionally saves to compressed + CSV. + + Args: + save: Whether to save to disk. + output_path: Override output path. + + Returns: + DataFrame with columns: oa_code, lsoa_code, + msoa_code, la_code, constituency_code, + region_code, country, population + """ + if output_path is None: + output_path = CROSSWALK_PATH + + # ── England & Wales ── + logger.info("=== Building E+W OA hierarchy ===") + ew_hierarchy = _get_ew_oa_hierarchy() + ew_population = _get_ew_population() + ew_const = _get_ew_constituency() + + ew = ew_hierarchy.merge(ew_population, on="oa_code", how="left") + ew["population"] = ew["population"].fillna(0).astype(int) + ew = ew.merge(ew_const, on="oa_code", how="left") + ew["constituency_code"] = ew["constituency_code"].fillna("") + + logger.info(f"E+W: {len(ew):,} OAs, pop {ew['population'].sum():,}") + + # ── Scotland ── + logger.info("=== Building Scotland OA hierarchy ===") + scot = _get_scotland_oa_hierarchy() + scot_pop = _get_scotland_oa_population() + + scot = scot.merge(scot_pop, on="oa_code", how="left") + scot["population"] = scot["population"].fillna(0).astype(int) + + logger.info(f"Scotland: {len(scot):,} OAs, pop {scot['population'].sum():,}") + + # ── Northern Ireland ── + logger.info("=== Building NI hierarchy ===") + ni = _get_ni_hierarchy() + ni_pop = _get_ni_population() + + ni = ni.merge(ni_pop, on="oa_code", how="left") + ni["population"] = ni["population"].fillna(0).astype(int) + ni["constituency_code"] = "" + + logger.info(f"NI: {len(ni):,} Data Zones, pop {ni['population'].sum():,}") + + # ── Combine ── + combined = pd.concat([ew, scot, ni], ignore_index=True) + combined = _assign_regions(combined) + combined = _assign_country(combined) + + combined = combined[ + [ + "oa_code", + "lsoa_code", + "msoa_code", + "la_code", + "constituency_code", + "region_code", + "country", + "population", + ] + ] + + logger.info( + f"=== Total: {len(combined):,} areas, pop {combined['population'].sum():,} ===" + ) + + if save: + output_path.parent.mkdir(parents=True, exist_ok=True) + combined.to_csv(output_path, index=False, compression="gzip") + logger.info(f"Saved crosswalk to {output_path}") + + return combined + + +def load_oa_crosswalk( + path: Optional[Path] = None, +) -> pd.DataFrame: + """Load the pre-built OA crosswalk from disk. + + Args: + path: Override path (default: + storage/oa_crosswalk.csv.gz). + + Returns: + DataFrame with crosswalk columns. + + Raises: + FileNotFoundError: If crosswalk file doesn't exist. + """ + if path is None: + path = CROSSWALK_PATH + + if not path.exists(): + raise FileNotFoundError( + f"{path} not found. Run build_oa_crosswalk() " + "or 'python -m policyengine_uk_data.calibration." + "oa_crosswalk' to generate." + ) + + return pd.read_csv(path, dtype=str) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + build_oa_crosswalk() diff --git a/policyengine_uk_data/storage/oa_crosswalk.csv.gz b/policyengine_uk_data/storage/oa_crosswalk.csv.gz new file mode 100644 index 00000000..932f057a Binary files /dev/null and b/policyengine_uk_data/storage/oa_crosswalk.csv.gz differ diff --git a/policyengine_uk_data/tests/test_oa_crosswalk.py b/policyengine_uk_data/tests/test_oa_crosswalk.py new file mode 100644 index 00000000..2b7933d8 --- /dev/null +++ b/policyengine_uk_data/tests/test_oa_crosswalk.py @@ -0,0 +1,378 @@ +"""Tests for OA crosswalk building and geographic assignment. + +These tests validate: +1. Crosswalk completeness (all 4 countries, expected OA counts) +2. Hierarchy consistency (OA → LSOA → MSOA → LA nesting) +3. Country constraints (E→England, W→Wales, S→Scotland) +4. Population totals (within reasonable range of known figures) +5. Assignment correctness (country constraints, constituency + collision avoidance, distribution proportionality) +""" + +import tempfile +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest + +from policyengine_uk_data.calibration.oa_crosswalk import ( + build_oa_crosswalk, + load_oa_crosswalk, + CROSSWALK_PATH, +) +from policyengine_uk_data.calibration.oa_assignment import ( + GeographyAssignment, + assign_random_geography, + save_geography, + load_geography, +) + + +# ── Fixtures ────────────────────────────────────────────── + + +@pytest.fixture(scope="module") +def crosswalk() -> pd.DataFrame: + """Load or build the OA crosswalk for testing. + + Uses the pre-built file if available, otherwise builds + fresh (slow, ~5 minutes). + """ + if CROSSWALK_PATH.exists(): + return load_oa_crosswalk() + else: + pytest.skip( + "OA crosswalk not yet built. Run: " + "python -m policyengine_uk_data.calibration." + "oa_crosswalk" + ) + + +@pytest.fixture(scope="module") +def small_crosswalk(tmp_path_factory) -> pd.DataFrame: + """Create a small synthetic crosswalk for fast tests.""" + tmp_dir = tmp_path_factory.mktemp("crosswalk") + path = tmp_dir / "test_crosswalk.csv.gz" + + rows = [] + # England: 100 OAs across 2 LAs, 2 constituencies + for i in range(100): + la = "E09000001" if i < 50 else "E09000002" + const = "E14001063" if i < 50 else "E14001064" + rows.append( + { + "oa_code": f"E00{i:06d}", + "lsoa_code": f"E01{i // 5:05d}", + "msoa_code": f"E02{i // 10:04d}0", + "la_code": la, + "constituency_code": const, + "region_code": "E12000007", + "country": "England", + "population": str(100 + i), + } + ) + # Wales: 20 OAs + for i in range(20): + rows.append( + { + "oa_code": f"W00{i:06d}", + "lsoa_code": f"W01{i // 5:05d}", + "msoa_code": f"W02{i // 10:04d}0", + "la_code": "W06000001", + "constituency_code": "W07000041", + "region_code": "W99999999", + "country": "Wales", + "population": str(80 + i), + } + ) + # Scotland: 30 OAs + for i in range(30): + rows.append( + { + "oa_code": f"S00{i:06d}", + "lsoa_code": f"S01{i // 5:05d}", + "msoa_code": f"S02{i // 10:04d}0", + "la_code": "S12000033", + "constituency_code": "S14000001", + "region_code": "S99999999", + "country": "Scotland", + "population": str(90 + i), + } + ) + # Northern Ireland: 10 Data Zones + for i in range(10): + rows.append( + { + "oa_code": f"95GG{i:04d}", + "lsoa_code": f"95GG{i:04d}", + "msoa_code": f"95HH{i // 3:03d}0", + "la_code": "N09000001", + "constituency_code": "", + "region_code": "N99999999", + "country": "Northern Ireland", + "population": str(70 + i), + } + ) + + df = pd.DataFrame(rows) + df.to_csv(path, index=False, compression="gzip") + return df, path + + +# ── Crosswalk Structure Tests (use real data if available) ── + + +class TestCrosswalkStructure: + """Tests on the real crosswalk file.""" + + def test_has_expected_columns(self, crosswalk): + expected = { + "oa_code", + "lsoa_code", + "msoa_code", + "la_code", + "constituency_code", + "region_code", + "country", + "population", + } + assert expected == set(crosswalk.columns) + + def test_all_four_countries_present(self, crosswalk): + countries = set(crosswalk["country"].unique()) + assert "England" in countries + assert "Wales" in countries + assert "Scotland" in countries + # NI is excluded until NISRA updates their download + # URLs (currently returning 404). Uncomment when + # NI Data Zone lookup is available again. + # assert "Northern Ireland" in countries + + def test_ew_oa_count_range(self, crosswalk): + """E+W should have ~188K-190K OAs.""" + ew = crosswalk[crosswalk["country"].isin(["England", "Wales"])] + assert 180_000 < len(ew) < 200_000, ( + f"E+W OA count {len(ew)} outside expected range" + ) + + def test_scotland_oa_count_range(self, crosswalk): + """Scotland should have ~46K OAs.""" + scot = crosswalk[crosswalk["country"] == "Scotland"] + assert 40_000 < len(scot) < 55_000, ( + f"Scotland OA count {len(scot)} outside expected range" + ) + + @pytest.mark.skip(reason="NISRA DZ2021 download URLs returning 404") + def test_ni_dz_count_range(self, crosswalk): + """NI should have ~3.7K-4K Data Zones.""" + ni = crosswalk[crosswalk["country"] == "Northern Ireland"] + assert 3_000 < len(ni) < 7_000, f"NI DZ count {len(ni)} outside expected range" + + def test_no_duplicate_oa_codes(self, crosswalk): + assert crosswalk["oa_code"].is_unique + + def test_england_oa_prefix(self, crosswalk): + eng = crosswalk[crosswalk["country"] == "England"] + assert eng["oa_code"].str.startswith("E00").all() + + def test_wales_oa_prefix(self, crosswalk): + wales = crosswalk[crosswalk["country"] == "Wales"] + assert wales["oa_code"].str.startswith("W00").all() + + def test_scotland_oa_prefix(self, crosswalk): + scot = crosswalk[crosswalk["country"] == "Scotland"] + assert scot["oa_code"].str.startswith("S00").all() + + def test_population_total_range(self, crosswalk): + """UK total population should be ~67M (2021 Census).""" + total = pd.to_numeric(crosswalk["population"], errors="coerce").sum() + assert 55_000_000 < total < 75_000_000, ( + f"UK population {total:,} outside expected range" + ) + + def test_every_oa_has_la(self, crosswalk): + missing = crosswalk["la_code"].isna() | (crosswalk["la_code"] == "") + assert missing.sum() == 0, f"{missing.sum()} OAs missing LA code" + + def test_ew_oas_have_constituency(self, crosswalk): + """E+W OAs should have constituency codes.""" + ew = crosswalk[crosswalk["country"].isin(["England", "Wales"])] + has_const = (ew["constituency_code"].notna()) & (ew["constituency_code"] != "") + pct = has_const.mean() + assert pct > 0.95, ( + f"Only {pct:.1%} of E+W OAs have constituency codes (expected >95%)" + ) + + def test_hierarchy_nesting_oa_in_lsoa(self, crosswalk): + """Each OA should map to exactly one LSOA.""" + grouped = crosswalk.groupby("oa_code")["lsoa_code"].nunique() + assert (grouped == 1).all(), "Some OAs map to multiple LSOAs" + + def test_hierarchy_nesting_lsoa_in_la(self, crosswalk): + """Each LSOA should map to exactly one LA.""" + grouped = crosswalk.groupby("lsoa_code")["la_code"].nunique() + multi = grouped[grouped > 1] + assert len(multi) == 0, ( + f"{len(multi)} LSOAs map to multiple LAs: {multi.index.tolist()[:5]}" + ) + + +# ── Assignment Tests (use synthetic crosswalk) ────────── + + +class TestOAAssignment: + """Tests for the OA assignment function.""" + + def test_output_shape(self, small_crosswalk): + df, path = small_crosswalk + n_records = 20 + n_clones = 3 + countries = np.array([1] * 15 + [2] * 5) + + geo = assign_random_geography( + household_countries=countries, + n_clones=n_clones, + seed=42, + crosswalk_path=str(path), + ) + + assert geo.n_records == n_records + assert geo.n_clones == n_clones + assert len(geo.oa_code) == n_records * n_clones + assert len(geo.la_code) == n_records * n_clones + assert len(geo.constituency_code) == (n_records * n_clones) + + def test_country_constraint(self, small_crosswalk): + """English households should get English OAs only.""" + df, path = small_crosswalk + countries = np.array([1] * 10 + [2] * 5 + [3] * 3 + [4] * 2) + + geo = assign_random_geography( + household_countries=countries, + n_clones=2, + seed=42, + crosswalk_path=str(path), + ) + + n = len(countries) + for clone_idx in range(2): + start = clone_idx * n + for i in range(n): + idx = start + i + if countries[i] == 1: + assert geo.country[idx] == "England", ( + f"Record {i} clone {clone_idx}: " + f"expected England, got " + f"{geo.country[idx]}" + ) + elif countries[i] == 2: + assert geo.country[idx] == "Wales" + elif countries[i] == 3: + assert geo.country[idx] == "Scotland" + elif countries[i] == 4: + assert geo.country[idx] == ("Northern Ireland") + + def test_constituency_collision_avoidance(self, small_crosswalk): + """Different clones of same household should have + different constituencies (where possible).""" + df, path = small_crosswalk + # All English so they draw from 2 constituencies + countries = np.array([1] * 10) + + geo = assign_random_geography( + household_countries=countries, + n_clones=2, + seed=42, + crosswalk_path=str(path), + ) + + n = len(countries) + collisions = 0 + for i in range(n): + const_0 = geo.constituency_code[i] + const_1 = geo.constituency_code[n + i] + if const_0 == const_1: + collisions += 1 + + # With 2 constituencies and 2 clones, we should + # have very few collisions (ideally 0) + assert collisions < n * 0.2, ( + f"{collisions}/{n} constituency collisions (expected < 20%)" + ) + + def test_save_load_roundtrip(self, small_crosswalk): + """GeographyAssignment should survive save/load.""" + df, path = small_crosswalk + countries = np.array([1] * 5 + [2] * 3) + + geo = assign_random_geography( + household_countries=countries, + n_clones=2, + seed=42, + crosswalk_path=str(path), + ) + + with tempfile.NamedTemporaryFile(suffix=".npz") as f: + save_geography(geo, Path(f.name)) + loaded = load_geography(Path(f.name)) + + assert loaded.n_records == geo.n_records + assert loaded.n_clones == geo.n_clones + np.testing.assert_array_equal(loaded.oa_code, geo.oa_code) + np.testing.assert_array_equal( + loaded.constituency_code, + geo.constituency_code, + ) + + def test_population_weighted_sampling(self, small_crosswalk): + """Higher-population OAs should be sampled more + frequently.""" + df, path = small_crosswalk + # All English, large sample for statistical test + countries = np.array([1] * 5000) + + geo = assign_random_geography( + household_countries=countries, + n_clones=1, + seed=42, + crosswalk_path=str(path), + ) + + # Count assignments per OA + oa_counts = pd.Series(geo.oa_code).value_counts() + + # The English OAs in our fixture have populations + # 100-199. Higher-numbered OAs should tend to be + # sampled more. Check that the top-10 most sampled + # OAs have higher average population than bottom-10. + eng_df = df[df["country"] == "England"].copy() + eng_df["population"] = eng_df["population"].astype(int) + + top_10 = oa_counts.head(10).index.tolist() + bottom_10 = oa_counts.tail(10).index.tolist() + + top_pop = eng_df[eng_df["oa_code"].isin(top_10)]["population"].mean() + bottom_pop = eng_df[eng_df["oa_code"].isin(bottom_10)]["population"].mean() + + assert top_pop > bottom_pop, ( + f"Top-10 avg pop ({top_pop}) should exceed bottom-10 ({bottom_pop})" + ) + + def test_string_country_input(self, small_crosswalk): + """Should accept string country names too.""" + df, path = small_crosswalk + countries = np.array(["England"] * 5 + ["Wales"] * 3) + + geo = assign_random_geography( + household_countries=countries, + n_clones=1, + seed=42, + crosswalk_path=str(path), + ) + + assert len(geo.oa_code) == 8 + for i in range(5): + assert geo.country[i] == "England" + for i in range(5, 8): + assert geo.country[i] == "Wales"