Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a0c011d
#185 initial commit or work done
bryce-sandag Jan 29, 2026
c66e9b8
#189 - block level naics72 split query
Jan 30, 2026
2c28142
#185 separate lodes data pull from join to mgra
bryce-sandag Jan 31, 2026
86019c7
#185 #188 #189 Update logic for employment estimates
bryce-sandag Feb 3, 2026
dec3eaf
#185 change "jobs" in output to "value" and fix year left in SQL query
bryce-sandag Feb 3, 2026
3243c07
#185 #186 Create output/input table in SQL and add ability to output …
bryce-sandag Feb 4, 2026
b7e8062
#188 change function read_sql_query_acs to read_sql_query_custom
bryce-sandag Feb 4, 2026
8410963
#188 update wiki for change in read_sql_query_acs to read_sql_query_c…
bryce-sandag Feb 4, 2026
3be99f5
#185 reset config
bryce-sandag Feb 4, 2026
a2b6275
#185 reset config
bryce-sandag Feb 4, 2026
3788834
#185 cleanup in a few spots
bryce-sandag Feb 4, 2026
6015a5a
#185 remove output folder used during testing
bryce-sandag Feb 4, 2026
0cc89b9
#185 Change connection when grabbing mgras
bryce-sandag Feb 4, 2026
827d066
#185 #188 addressed first 2 comments from @Eric-Liu-SANDAG in pull re…
bryce-sandag Feb 5, 2026
b8dfdb6
#185 address pull request feedback for get_lodes_data.sql
bryce-sandag Feb 5, 2026
f20e4c6
#185 Update utils.py and utility.md for update to read_sql_query_fall…
bryce-sandag Feb 5, 2026
26a00d6
#185 #189 addressed pull request feedback and added parenthesis for c…
bryce-sandag Feb 5, 2026
5f376d5
#185 remove get_mgra.sql and use as string directly
bryce-sandag Feb 5, 2026
824b8ef
#185 fix using only spaces vs tabs in sql files
bryce-sandag Feb 5, 2026
223fcc4
#185 remove year left in query from testing
bryce-sandag Feb 5, 2026
46cda0e
#185 fix table being called for [inputs].[mgra]
bryce-sandag Feb 5, 2026
f7a109b
#185 #188 update utility.md and utils.py
bryce-sandag Feb 5, 2026
b8bb79c
#185 #189 Better format based on feedback and fix year set when was d…
bryce-sandag Feb 5, 2026
73bca20
#185 better formatting to match rest of estimates program
bryce-sandag Feb 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ debug:
population: False
population_by_ase: False
household_characteristics: False
employment: False
staging: False
6 changes: 6 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import python.pop_type as pop
import python.ase as ase
import python.hh_characteristics as hh_characteristics
import python.employment as employment
import python.staging as staging

import python.utils as utils
Expand Down Expand Up @@ -57,6 +58,11 @@
logger.info("Running Household Characteristics module...")
hh_characteristics.run_hh_characteristics(year)

# Employment module
if utils.RUN_INSTRUCTIONS["employment"]:
logger.info("Running Employment module...")
employment.run_employment(year)

# Diagnostic print for this year
logger.info(f"Finished running {year}\n")

Expand Down
8 changes: 4 additions & 4 deletions python/ase.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _get_controls_inputs(year: int) -> dict[str, pd.DataFrame]:

# Get regional age/sex/ethnicity group quarters distributions
with open(utils.SQL_FOLDER / "ase/get_region_gq_ase_dist.sql") as file:
region_gq_ase_dist = utils.read_sql_query_acs(
region_gq_ase_dist = utils.read_sql_query_fallback(
sql=sql.text(file.read()),
con=con,
params={
Expand Down Expand Up @@ -262,7 +262,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]:
with utils.ESTIMATES_ENGINE.connect() as con:
# Get the Age/Sex B010001 table data
with open(utils.SQL_FOLDER / "ase/get_B01001.sql") as file:
b01001 = utils.read_sql_query_acs(
b01001 = utils.read_sql_query_fallback(
sql=sql.text(file.read()),
con=con,
params={
Expand All @@ -272,7 +272,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]:

# Get the Ethnicity B03002 table data
with open(utils.SQL_FOLDER / "ase/get_B03002.sql") as file:
b03002 = utils.read_sql_query_acs(
b03002 = utils.read_sql_query_fallback(
sql=sql.text(file.read()),
con=con,
params={
Expand All @@ -282,7 +282,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]:

# Get Age/Sex/Ethnicity data from B01001(B-I) table data
with open(utils.SQL_FOLDER / "ase/get_B01001(B-I).sql") as file:
b01001_b_i = utils.read_sql_query_acs(
b01001_b_i = utils.read_sql_query_fallback(
sql=sql.text(file.read()),
con=con,
params={
Expand Down
252 changes: 252 additions & 0 deletions python/employment.py
Copy link
Contributor

@Eric-Liu-SANDAG Eric-Liu-SANDAG Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dude GitHub completely ate my comments on this file? I don't see them on my previous request changes...

  • Need to restructure employment.py to match the format of the other modules. For example, see how pop_type.py:run_pop() is structured, with explicit inputs, outputs, validation, and insertion
  • The validation part is super super important. Make sure to run on both inputs and outputs
  • A lot of the processing can be combined via chained operators to remove a ton of the intermediate variables. For example, in aggregate_lodes_to_mgra(), you have the variables lehd_to_mgra, lehd_to_mgra_summed, and final_lehd_to_mgra, which feels excessive
  • As a continuation of above, not of fan of variable names like jobs_frame (why not just jobs, we already know it's a pd.DataFrame) and final_etc
  • Remove self-explanatory comments like # Add run_id column
  • Be extremely careful with your usages of utils.integerize_1d(). If you look at other locations it is used, it is nearly always proceeded by some kind of sort. This is because a single row or value being different can completely change the output of utils.integerize_1d() due to it's random nature. I would recommend that you sort values before and additionally, do two consecutive runs back to back and write a script to ensure that outputs are the same between runs. Note, the output of SQL scripts is not guaranteed to output in the same order each time, unless you do an explicit ORDER BY and ensure that there are no ties
  • Add some comments at the top similar to other modules. See pop_type.py for example
  • Surely there's a better way to cross join in pandas without using that weird key thing

Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import numpy as np
import pandas as pd
import sqlalchemy as sql

import python.utils as utils

generator = np.random.default_rng(utils.RANDOM_SEED)


def run_employment(year: int):
"""Control function to create jobs data by industry_code (NAICS) at the MGRA level.

Get the LEHD LODES data, aggregate to the MGRA level using the block to MGRA
crosswalk, then apply control totals from QCEW using integerization.

Functionality is split apart for code encapsulation (function inputs not included):
get_LODES_data - Get LEHD LODES data for a specified year, including
special handling for industry_code 72 (Accommodation and Food Services)
xref_block_to_mgra - Get crosswalk from Census blocks to MGRAs
aggregate_lodes_to_mgra - Aggregate LODES data to MGRA level using allocation
percentages from the block to MGRA crosswalk
get_control_totals - Load QCEW employment data as county total controls
apply_employment_controls - Apply control totals to employment data using
utils.integerize_1d()
_insert_jobs - Store both the control totals and controlled employment
inputs/outputs to the production database


Args:
year (int): estimates year
"""

# Check MGRA version and raise error if not 'mgra15'
if utils.MGRA_VERSION != "mgra15":
raise ValueError(
f"Employment module only works with MGRA_VERSION = 'mgra15'. "
f"Current MGRA_VERSION is '{utils.MGRA_VERSION}'."
)

jobs_inputs = _get_jobs_inputs(year)
# TODO _validate_jobs_inputs here before proceeding

jobs_outputs = _create_jobs_output(jobs_inputs)
# TODO _validate_jobs_outputs here before proceeding

_insert_jobs(jobs_inputs, jobs_outputs)


def get_LODES_data(year: int) -> pd.DataFrame:
"""Retrieve LEHD LODES data for a specified year.

Args:
year (int): The year for which to retrieve LEHD LODES data.
"""

with utils.LEHD_ENGINE.connect() as con:
with open(utils.SQL_FOLDER / "employment/get_lodes_data.sql") as file:
lodes_data = utils.read_sql_query_fallback(
max_lookback=2,
sql=sql.text(file.read()),
con=con,
params={"year": year},
)

with utils.GIS_ENGINE.connect() as con:
with open(utils.SQL_FOLDER / "employment/get_naics72_split.sql") as file:
split_naics_72 = utils.read_sql_query_fallback(
max_lookback=3,
sql=sql.text(file.read()),
con=con,
params={"year": year},
)

# Separate industry_code 72 from other industries
lodes_72 = lodes_data[lodes_data["industry_code"] == "72"].copy()
lodes_other = lodes_data[lodes_data["industry_code"] != "72"].copy()

# Join industry_code 72 data with split percentages
lodes_72_split = lodes_72.merge(split_naics_72, on="block", how="left")

# Create rows for industry_code 721
lodes_721 = lodes_72_split[["year", "block"]].copy()
lodes_721["industry_code"] = "721"
lodes_721["jobs"] = lodes_72_split["jobs"] * lodes_72_split["pct_721"]

# Create rows for industry_code 722
lodes_722 = lodes_72_split[["year", "block"]].copy()
lodes_722["industry_code"] = "722"
lodes_722["jobs"] = lodes_72_split["jobs"] * lodes_72_split["pct_722"]

# Combine all data
combined_data = pd.concat([lodes_other, lodes_721, lodes_722], ignore_index=True)
combined_data = combined_data[["year", "block", "industry_code", "jobs"]]

return combined_data


def aggregate_lodes_to_mgra(
combined_data: pd.DataFrame, xref: pd.DataFrame, year: int
) -> pd.DataFrame:
"""Aggregate LODES data to MGRA level using allocation percentages.

Args:
combined_data (pd.DataFrame): LODES data with columns: year, block, industry_code, jobs
xref (pd.DataFrame): Crosswalk with columns: block, mgra, allocation_pct
year (int): The year for which to aggregate data

Returns:
pd.DataFrame: Aggregated data at MGRA level with columns: year, mgra, industry_code, jobs
"""
# Get MGRA data from SQL
with utils.ESTIMATES_ENGINE.connect() as con:
mgra_data = pd.read_sql_query(
sql=sql.text(
"""
SELECT DISTINCT [mgra]
FROM [inputs].[mgra]
WHERE run_id = :run_id
ORDER BY [mgra]
"""
),
con=con,
params={"run_id": utils.RUN_ID},
)

# Get unique industry codes and cross join with MGRA data
unique_industries = combined_data["industry_code"].unique()
jobs = mgra_data.merge(
pd.DataFrame({"industry_code": unique_industries}), how="cross"
)
jobs["year"] = year
jobs = jobs[["year", "mgra", "industry_code"]]

# Join combined_data to xref and calculate allocated jobs
lehd_to_mgra = combined_data.merge(xref, on="block", how="inner")
lehd_to_mgra["value"] = lehd_to_mgra["jobs"] * lehd_to_mgra["allocation_pct"]

# Join summed data to jobs, keeping all MGRAs and industry codes
jobs = jobs.merge(
lehd_to_mgra.groupby(["year", "mgra", "industry_code"], as_index=False)[
"value"
].sum(),
on=["year", "mgra", "industry_code"],
how="left",
)
jobs["value"] = jobs["value"].fillna(0)
jobs["run_id"] = utils.RUN_ID
jobs = jobs[["run_id", "year", "mgra", "industry_code", "value"]]

return jobs


def _get_jobs_inputs(year: int) -> dict[str, pd.DataFrame]:
"""Get input data related to jobs for a specified year.

Args:
year (int): The year for which to retrieve input data.
Returns:
dict[str, pd.DataFrame]: A dictionary containing input DataFrames related to jobs.
"""
# Store results here
jobs_inputs = {}

jobs_inputs["LODES_data"] = get_LODES_data(year)

with utils.LEHD_ENGINE.connect() as con:
# get crosswalk from Census blocks to MGRAs
with open(utils.SQL_FOLDER / "employment/xref_block_to_mgra.sql") as file:
jobs_inputs["xref_block_to_mgra"] = pd.read_sql_query(
sql=sql.text(file.read()),
con=con,
params={"mgra_version": utils.MGRA_VERSION},
)

# get regional employment control totals from QCEW
with open(utils.SQL_FOLDER / "employment/QCEW_control.sql") as file:
jobs_inputs["control_totals"] = utils.read_sql_query_fallback(
sql=sql.text(file.read()),
con=con,
params={
"year": year,
},
)
jobs_inputs["control_totals"]["run_id"] = utils.RUN_ID

jobs_inputs["lehd_jobs"] = aggregate_lodes_to_mgra(
jobs_inputs["LODES_data"], jobs_inputs["xref_block_to_mgra"], year
)

return jobs_inputs


def _create_jobs_output(
jobs_inputs: dict[str, pd.DataFrame],
) -> dict[str, pd.DataFrame]:
"""Apply control totals to employment data using utils.integerize_1d().

Args:
original_data (pd.DataFrame): LEHD LODES data at MGRA level.
control_totals (pd.DataFrame): Employment control totals from QCEW.
generator (np.random.Generator): NumPy random number generator.

Returns:
pd.DataFrame: Controlled employment data.
"""
jobs_outputs = {}
# Create a copy of original_data for controlled results
jobs_outputs["results"] = jobs_inputs["lehd_jobs"].copy()

# Get unique industry codes
industry_codes = jobs_inputs["lehd_jobs"]["industry_code"].unique()

# Apply integerize_1d to each industry code
for industry_code in industry_codes:
# Filter original data for this industry
industry_mask = jobs_inputs["lehd_jobs"]["industry_code"] == industry_code

# Get control value for this industry
control_value = jobs_inputs["control_totals"][
jobs_inputs["control_totals"]["industry_code"] == industry_code
]["value"].iloc[0]

# Apply integerize_1d and update controlled_data
jobs_outputs["results"].loc[industry_mask, "value"] = utils.integerize_1d(
data=jobs_inputs["lehd_jobs"].loc[industry_mask, "value"],
control=control_value,
methodology="weighted_random",
generator=generator,
)

return jobs_outputs


def _insert_jobs(
jobs_inputs: dict[str, pd.DataFrame], jobs_outputs: dict[str, pd.DataFrame]
) -> None:
"""Insert input and output data related to jobs to the database."""

# Insert input and output data to database
with utils.ESTIMATES_ENGINE.connect() as con:

jobs_inputs["control_totals"].to_sql(
name="controls_jobs",
con=con,
schema="inputs",
if_exists="append",
index=False,
)

jobs_outputs["results"].to_sql(
name="jobs", con=con, schema="outputs", if_exists="append", index=False
)
12 changes: 7 additions & 5 deletions python/hh_characteristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ def _get_hh_income_inputs(year: int) -> dict[str, pd.DataFrame]:
with open(
utils.SQL_FOLDER / "hh_characteristics" / "get_tract_controls_hh_income.sql"
) as file:
hh_income_inputs["hh_income_tract_controls"] = utils.read_sql_query_acs(
sql=sql.text(file.read()), # type: ignore
con=con, # type: ignore
params={"run_id": utils.RUN_ID, "year": year},
hh_income_inputs["hh_income_tract_controls"] = (
utils.read_sql_query_fallback(
sql=sql.text(file.read()), # type: ignore
con=con, # type: ignore
params={"run_id": utils.RUN_ID, "year": year},
)
)

return hh_income_inputs
Expand Down Expand Up @@ -120,7 +122,7 @@ def _get_hh_size_inputs(year: int) -> dict[str, pd.DataFrame]:
/ "hh_characteristics"
/ "get_tract_controls_hh_by_size.sql"
) as file:
hh_char_inputs["hhs_tract_controls"] = utils.read_sql_query_acs(
hh_char_inputs["hhs_tract_controls"] = utils.read_sql_query_fallback(
sql=sql.text(file.read()), # type: ignore
con=con, # type: ignore
params={"run_id": utils.RUN_ID, "year": year},
Expand Down
2 changes: 1 addition & 1 deletion python/hs_hh.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def _get_hs_hh_inputs(year: int) -> dict[str, pd.DataFrame]:

# Get tract occupancy controls
with open(utils.SQL_FOLDER / "hs_hh/get_tract_controls_hh.sql") as file:
hs_hh_inputs["tract_controls"] = utils.read_sql_query_acs(
hs_hh_inputs["tract_controls"] = utils.read_sql_query_fallback(
sql=sql.text(file.read()),
con=con,
params={
Expand Down
2 changes: 2 additions & 0 deletions python/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def parse_config(self) -> None:
"population",
"population_by_ase",
"household_characteristics",
"employment",
"staging",
]:
self.run_instructions[key] = self._config["debug"][key]
Expand Down Expand Up @@ -161,6 +162,7 @@ def _validate_config(self) -> None:
"population": {"type": "boolean"},
"population_by_ase": {"type": "boolean"},
"household_characteristics": {"type": "boolean"},
"employment": {"type": "boolean"},
"staging": {"type": "boolean"},
},
},
Expand Down
2 changes: 1 addition & 1 deletion python/pop_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def _get_hhp_inputs(year: int) -> dict[str, pd.DataFrame]:

# Get tract level household size controls
with open(utils.SQL_FOLDER / "pop_type/get_tract_controls_hhs.sql") as file:
tract_controls = utils.read_sql_query_acs(
tract_controls = utils.read_sql_query_fallback(
sql=sql.text(file.read()),
con=con,
params={
Expand Down
Loading