From a0c011d31941246ad05e08b16298733ee10e86ce Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 29 Jan 2026 11:39:24 -0800 Subject: [PATCH 01/24] #185 initial commit or work done --- config.yml | 9 +- main.py | 6 ++ python/employment.py | 114 +++++++++++++++++++++++++ python/parsers.py | 2 + python/utils.py | 12 ++- sql/employment/LODES_to_MGRA.sql | 141 +++++++++++++++++++++++++++++++ sql/employment/QCEW_control.sql | 11 +++ 7 files changed, 290 insertions(+), 5 deletions(-) create mode 100644 python/employment.py create mode 100644 sql/employment/LODES_to_MGRA.sql create mode 100644 sql/employment/QCEW_control.sql diff --git a/config.yml b/config.yml index 659a083..cd09993 100644 --- a/config.yml +++ b/config.yml @@ -5,7 +5,7 @@ # The `run` section contains configuration for running every module of the Estimates # Program for a specified set of years run: - enabled: True + enabled: False mgra: mgra15 start_year: 2020 end_year: 2024 @@ -17,15 +17,16 @@ run: # `run_id` and `comments`. If `run_id` is `null`, then a new `run_id` will be # automatically created, similar to `run` mode debug: - enabled: False + enabled: True run_id: null start_year: 2022 - end_year: 2023 + end_year: 2024 version: 1.1.1-dev - comments: null + comments: Test run employment 1st time startup: False housing_and_households: False population: False population_by_ase: False household_characteristics: False + employment: True staging: False diff --git a/main.py b/main.py index 29f35e2..be83321 100644 --- a/main.py +++ b/main.py @@ -13,6 +13,7 @@ import python.pop_type as pop import python.ase as ase import python.hh_characteristics as hh_characteristics +import python.employment as employment import python.staging as staging import python.utils as utils @@ -57,6 +58,11 @@ logger.info("Running Household Characteristics module...") hh_characteristics.run_hh_characteristics(year) + # Employment module + if utils.RUN_INSTRUCTIONS["employment"]: + logger.info("Running Employment module...") + employment.run_employment(year) + # Diagnostic print for this year logger.info(f"Finished running {year}\n") diff --git a/python/employment.py b/python/employment.py new file mode 100644 index 0000000..2377d91 --- /dev/null +++ b/python/employment.py @@ -0,0 +1,114 @@ +import numpy as np +import pandas as pd +import sqlalchemy as sql + +import python.utils as utils + +generator = np.random.default_rng(utils.RANDOM_SEED) + +# Load configuration from YAML file +# with open("config.yaml", "r") as f: +# config = yaml.safe_load(f) + + +def run_employment(year): + """Run the Employment module for a specified year. + + This function processes employment data by applying control totals to + LEHD LODES data at the MGRA level using integerization. + + Args: + year (int): The year for which to run the Employment module. + """ + + # Check MGRA version and raise error if not 'mgra15' + if utils.MGRA_VERSION != "mgra15": + raise ValueError( + f"Employment module only works with MGRA_VERSION = 'mgra15'. " + f"Current MGRA_VERSION is '{utils.MGRA_VERSION}'." + ) + + # Load data from SQL + original_data, control_totals = run_employment_sql(year) + + # Apply controls + controlled_data = apply_employment_controls( + original_data, control_totals, generator + ) + + # Export results + output_filepath = utils.OUTPUT_FOLDER / f"controlled_data_{year}.csv" + controlled_data.to_csv(output_filepath, index=False) + + return controlled_data + + +def run_employment_sql(year): + """Load employment data from SQL queries. + + Args: + year (int): The year for which to load employment data. + + Returns: + tuple: A tuple containing (original_data, control_totals) as DataFrames. + """ + with utils.LEHD_ENGINE.connect() as con: + # Get LEHD LODES data at MGRA level + with open(utils.SQL_FOLDER / "employment/LODES_to_MGRA.sql") as file: + original_data = pd.read_sql( + sql=sql.text(file.read()), + con=con, + params={ + "year": year, + "mgra_version": utils.MGRA_VERSION, + }, + ) + + # Get employment control totals from QCEW + with open(utils.SQL_FOLDER / "employment/QCEW_control.sql") as file: + control_totals = pd.read_sql( + sql=sql.text(file.read()), + con=con, + params={ + "year": year, + }, + ) + return original_data, control_totals + + +def apply_employment_controls(original_data, control_totals, generator): + """Apply control totals to employment data using integerization. + + Args: + original_data (pd.DataFrame): LEHD LODES data at MGRA level. + control_totals (pd.DataFrame): Employment control totals from QCEW. + generator: NumPy random number generator. + + Returns: + pd.DataFrame: Controlled employment data. + """ + # Create a copy of original_data for controlled results + controlled_data = original_data.copy() + + # Get unique industry codes + industry_codes = original_data["industry_code"].unique() + + # Apply integerize_1d to each industry code + for industry_code in industry_codes: + # Filter original data for this industry + industry_mask = original_data["industry_code"] == industry_code + + # Get control value for this industry + control_value = control_totals[ + control_totals["industry_code"] == industry_code + ]["jobs"].iloc[0] + + # Apply integerize_1d and update controlled_data + controlled_data.loc[industry_mask, "jobs"] = utils.integerize_1d( + data=original_data.loc[industry_mask, "jobs"], + control=control_value, + methodology="weighted_random", + generator=generator, + ) + + return controlled_data diff --git a/python/parsers.py b/python/parsers.py index 3cf2043..9a2a7c4 100644 --- a/python/parsers.py +++ b/python/parsers.py @@ -95,6 +95,7 @@ def parse_config(self) -> None: "population", "population_by_ase", "household_characteristics", + "employment", "staging", ]: self.run_instructions[key] = self._config["debug"][key] @@ -161,6 +162,7 @@ def _validate_config(self) -> None: "population": {"type": "boolean"}, "population_by_ase": {"type": "boolean"}, "household_characteristics": {"type": "boolean"}, + "employment": {"type": "boolean"}, "staging": {"type": "boolean"}, }, }, diff --git a/python/utils.py b/python/utils.py index c2436fe..c30a862 100644 --- a/python/utils.py +++ b/python/utils.py @@ -17,7 +17,7 @@ # Store project root folder ROOT_FOLDER = pathlib.Path(__file__).parent.resolve().parent SQL_FOLDER = ROOT_FOLDER / "sql" - +OUTPUT_FOLDER = ROOT_FOLDER / "output" ########### # LOGGING # @@ -78,6 +78,16 @@ fast_executemany=True, ) +LEHD_ENGINE = sql.create_engine( + "mssql+pyodbc://@" + + _secrets["sql"]["socioec"]["server"] + + "/" + + _secrets["sql"]["socioec"]["database"] + + "?trusted_connection=yes&driver=" + + "ODBC Driver 17 for SQL Server", + fast_executemany=True, +) + # Other SQL configuration GIS_SERVER = _secrets["sql"]["gis"]["server"] diff --git a/sql/employment/LODES_to_MGRA.sql b/sql/employment/LODES_to_MGRA.sql new file mode 100644 index 0000000..97ee1e6 --- /dev/null +++ b/sql/employment/LODES_to_MGRA.sql @@ -0,0 +1,141 @@ +-- Initialize parameters ----------------------------------------------------- +DECLARE @year integer = :year; +DECLARE @mgra_version nvarchar(10) = :mgra_version; + + +-- Send error message if no data exists -------------------------------------- +IF NOT EXISTS ( + SELECT TOP (1) * + FROM [socioec_data].[lehd].[lodes_8_wac] + WHERE [SEG] = 'S000' + AND [TYPE] = 'JT00' + AND [version] = 2 + AND [YEAR] = @year +) +SELECT 'Data does not exist' AS [msg] +ELSE +BEGIN + + + -- All MGRAs + WITH [AllMGRAs] AS ( + SELECT DISTINCT CAST([zone] AS INT) AS [MGRA] + FROM [GeoAnalyst].[geography].[zone] + WHERE [geography_id] = + CASE + WHEN @mgra_version = 'mgra15' THEN 4 -- which mgra list to grab from table, here is where to add more mgra versions to grab + ELSE NULL + END + ), + + -- All industry codes (2 - Digit NAICS) + [AllIndustries] AS ( + SELECT [industry_code] + FROM (VALUES + ('11'), + ('21'), + ('22'), + ('23'), + ('31-33'), + ('42'), + ('44-45'), + ('48-49'), + ('51'), + ('52'), + ('53'), + ('54'), + ('55'), + ('56'), + ('61'), + ('62'), + ('71'), + ('72'), + ('81'), + ('92') + ) AS i(industry_code) + ), + + -- Create the cross product = every MGRA × every industry_code + [AllCombinations] AS ( + SELECT + [MGRA], + [industry_code] + FROM [AllMGRAs] + CROSS JOIN [AllIndustries] + ), + + -- aggregated Jobs data from LEHD LODES into MGRAs that exist in xwalk from Census Block to MGRA + [AggregatedJobs] AS ( + SELECT + [YEAR], + [MGRA], + [industry_code], + SUM([value]) AS [jobs] + FROM [socioec_data].[lehd].[lodes_8_wac] + INNER JOIN ( + SELECT + [xref_id], + [from_zone_id], + [block].[zone] AS [CTblock], + [to_zone_id], + CAST([MGRA15].[zone] AS INT) AS [MGRA], + [allocation_pct] + FROM [GeoAnalyst].[geography].[xref_zone] + INNER JOIN [GeoAnalyst].[geography].[zone] AS [block] + ON [xref_zone].[from_zone_id] = [block].[zone_id] + INNER JOIN [GeoAnalyst].[geography].[zone] AS [MGRA15] + ON [xref_zone].[to_zone_id] = [MGRA15].[zone_id] + WHERE + -- This is where to specify which xref to use based on MGRA version + -- Curretly only works using mgra15 but this is where to add if additional mgra versions to be used + [xref_id] = + CASE + WHEN @mgra_version = 'mgra15' THEN 18 + ELSE NULL + END + ) AS [XREF] + ON [lodes_8_wac].[w_geocode] = [XREF].[CTblock] + CROSS APPLY ( + VALUES + ('11', [CNS01] * [allocation_pct]), + ('21', [CNS02] * [allocation_pct]), + ('22', [CNS03] * [allocation_pct]), + ('23', [CNS04] * [allocation_pct]), + ('31-33',[CNS05] * [allocation_pct]), + ('42', [CNS06] * [allocation_pct]), + ('44-45',[CNS07] * [allocation_pct]), + ('48-49',[CNS08] * [allocation_pct]), + ('51', [CNS09] * [allocation_pct]), + ('52', [CNS10] * [allocation_pct]), + ('53', [CNS11] * [allocation_pct]), + ('54', [CNS12] * [allocation_pct]), + ('55', [CNS13] * [allocation_pct]), + ('56', [CNS14] * [allocation_pct]), + ('61', [CNS15] * [allocation_pct]), + ('62', [CNS16] * [allocation_pct]), + ('71', [CNS17] * [allocation_pct]), + ('72', [CNS18] * [allocation_pct]), + ('81', [CNS19] * [allocation_pct]), + ('92', [CNS20] * [allocation_pct]) + ) AS u([industry_code], [value]) + WHERE [SEG] = 'S000' + AND [TYPE] = 'JT00' + AND [version] = 2 + AND [YEAR] = @year + GROUP BY [YEAR], [MGRA], [industry_code] + ) + + + -- Final result: left join the aggregated jobs to the full combinations + SELECT + @year AS [YEAR], + [AllCombinations].[MGRA], + [AllCombinations].[industry_code], + COALESCE([AggregatedJobs].[jobs], 0) AS [jobs] + FROM [AllCombinations] + LEFT JOIN [AggregatedJobs] + ON [AggregatedJobs].[MGRA] = [AllCombinations].[MGRA] + AND [AggregatedJobs].[industry_code] = [AllCombinations].[industry_code] + ORDER BY [AllCombinations].[MGRA], [AllCombinations].[industry_code] +END + diff --git a/sql/employment/QCEW_control.sql b/sql/employment/QCEW_control.sql new file mode 100644 index 0000000..3e74b42 --- /dev/null +++ b/sql/employment/QCEW_control.sql @@ -0,0 +1,11 @@ +SELECT + [year], + [industry_code], + SUM([annual_avg_emplvl]) AS jobs +FROM [socioec_data].[bls].[qcew_by_area_annual] +INNER JOIN [socioec_data].[bls].[industry_code] + ON [qcew_by_area_annual].[naics_id] = [industry_code].[naics_id] +WHERE [area_fips] = '06073' + AND [year] = :year + AND [industry_code] IN ('11', '21', '22', '23', '31-33', '42', '44-45', '48-49', '51', '52', '53', '54', '55', '56', '61', '62', '71', '72', '81', '92') +GROUP BY [year], [industry_code] From c66e9b832aa1bf8605592550d5bb0bcca1f3f7aa Mon Sep 17 00:00:00 2001 From: GregorSchroeder Date: Thu, 29 Jan 2026 17:18:58 -0800 Subject: [PATCH 02/24] #189 - block level naics72 split query --- sql/employment/get_naics72_split.sql | 177 +++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 sql/employment/get_naics72_split.sql diff --git a/sql/employment/get_naics72_split.sql b/sql/employment/get_naics72_split.sql new file mode 100644 index 0000000..2025db1 --- /dev/null +++ b/sql/employment/get_naics72_split.sql @@ -0,0 +1,177 @@ +/* +This query provides a split of 2-digit NAICS 72 into 3-digit NAICS codes. +Point-level data is gotten from the confidential EDD dataset, assigned to +2020 Census Blocks and the percentage split of 2-digit NAICS 72 into the +3-digit NAICS codes of 721 and 722 is calculated within each block. + +Notes: + 1) This query assumes the connection is to the GIS server. + 2) Data prior to year 2017 is not present in the EDD view and must be + queried directly from the source database table. + 3) If no split is present for a block, the regional percentage split is + substituted. All 2020 Census blocks are represented. +*/ + +SET NOCOUNT ON; +-- Initialize parameters and return table ------------------------------------ +DECLARE @year INTEGER = :year; +DECLARE @msg nvarchar(45) = 'EDD point-level data does not exist'; + +DROP TABLE IF EXISTS [#edd]; +CREATE TABLE [#edd] ( + [id] INTEGER IDENTITY(1,1) NOT NULL, + [industry_code] NVARCHAR(3) NOT NULL, + [jobs] FLOAT NOT NULL, + [Shape] GEOMETRY NOT NULL, + CONSTRAINT [pk_tt_edd] PRIMARY KEY ([id]) +) + +-- Create spatial index for later spatial join +-- Bounding box coordinates from SANDAG GIS team +-- Identical to spatial index on LUDU point layers in GIS database +CREATE SPATIAL INDEX [sidx_tt_edd] ON [#edd] +([Shape]) USING GEOMETRY_AUTO_GRID +WITH (BOUNDING_BOX = ( + 6151635.98006938, + 1775442.36347014, + 6613401.66775663, + 2129306.52024172), + CELLS_PER_OBJECT = 8 +) + + +-- Get SANDAG GIS team EDD dataset ------------------------------------------- +DECLARE @qry NVARCHAR(max) +IF @year >= 2017 +BEGIN + INSERT INTO [#edd] + SELECT + [industry_code], + 1.0 * [emp_total]/[emp_valid] AS [jobs], + [SHAPE] + FROM ( + SELECT + CASE WHEN LEFT([code], 3) = '721' THEN '721' + WHEN LEFT([code], 3) = '722' THEN '722' + ELSE NULL END AS [industry_code], + CASE WHEN [emp_m1] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m2] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m3] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m4] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m5] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m6] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m7] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m8] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m9] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m10] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m11] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m12] IS NOT NULL THEN 1 ELSE 0 END + AS [emp_valid], + COALESCE([emp_m1], 0) + COALESCE([emp_m2], 0) + COALESCE([emp_m3], 0) + + COALESCE([emp_m4], 0) + COALESCE([emp_m5], 0) + COALESCE([emp_m6], 0) + + COALESCE([emp_m7], 0) + COALESCE([emp_m8], 0) + COALESCE([emp_m9], 0) + + COALESCE([emp_m10], 0) + COALESCE([emp_m11], 0) + COALESCE([emp_m12], 0) + AS [emp_total], + [SHAPE] + FROM [EMPCORE].[ca_edd].[vi_ca_edd_employment] + INNER JOIN [EMPCORE].[ca_edd].[naics] + ON [vi_ca_edd_employment].[naics_id] = [naics].[naics_id] + WHERE + [year] = @year + AND LEFT([code], 3) IN ('721','722') + ) AS [tt] + WHERE + [emp_valid] > 0 + AND [emp_total] > 0 +END +ELSE IF @year = 2016 OR @year BETWEEN 2010 AND 2014 +BEGIN + INSERT INTO [#edd] + SELECT + CASE WHEN LEFT([code], 3) = '721' THEN '721' + WHEN LEFT([code], 3) = '722' THEN '722' + ELSE NULL END AS [industry_code], + [employment] * ISNULL([headquarters].[share], 1) AS [jobs], + ISNULL([headquarters].[shape], [businesses].[shape]) AS [SHAPE] + FROM [EMPCORE].[ca_edd].[businesses] + INNER JOIN [EMPCORE].[ca_edd].[naics] + ON [businesses].[naics_id] = [naics].[naics_id] + LEFT JOIN [EMPCORE].[ca_edd].[headquarters] + ON [businesses].[year] = [headquarters].[year] + AND [businesses].[emp_id] = [headquarters].[emp_id] + INNER JOIN ( + SELECT [year], [emp_id], [employment] + FROM [EMPCORE].[ca_edd].[employment] + WHERE + [month_id] = 14 -- adjusted employment + AND [employment] > 0 + AND [year] = @year + ) AS [employment] + ON [businesses].[year] = [employment].[year] + AND [businesses].[emp_id] = [employment].[emp_id] + WHERE LEFT([code], 3) IN ('721','722') +END +ELSE IF @year = 2015 +BEGIN + INSERT INTO [#edd] + SELECT + CASE WHEN LEFT([code], 3) = '721' THEN '721' + WHEN LEFT([code], 3) = '722' THEN '722' + ELSE NULL END AS [industry_code], + [employment] * ISNULL([headquarters].[share], 1) AS [jobs], + ISNULL([headquarters].[shape], [businesses].[shape]) AS [SHAPE] + FROM [EMPCORE].[ca_edd].[businesses] + INNER JOIN [EMPCORE].[ca_edd].[naics] + ON [businesses].[naics_id] = [naics].[naics_id] + LEFT JOIN [EMPCORE].[ca_edd].[headquarters] + ON [businesses].[year] = [headquarters].[year] + AND [businesses].[emp_id] = [headquarters].[emp_id] + INNER JOIN ( + SELECT + [year], + [emp_id], + 1.0 * COALESCE([15], 0) + COALESCE([16], 0) + COALESCE([17], 0) / + CASE WHEN [15] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [16] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [17] IS NOT NULL THEN 1 ELSE 0 END + AS [employment] + FROM [EMPCORE].[ca_edd].[employment] + PIVOT(SUM([employment]) FOR [month_id] IN ([15], [16], [17])) AS [pivot] + WHERE + [year] = @year + AND ([15] IS NOT NULL OR [16] IS NOT NULL OR [17] IS NOT NULL) + ) AS [employment] + ON [businesses].[year] = [employment].[year] + AND [businesses].[emp_id] = [employment].[emp_id] + WHERE LEFT([code], 3) IN ('721','722') +END + + +-- Send error message if no data exists -------------------------------------- +IF NOT EXISTS ( + SELECT TOP (1) * + FROM [#edd] +) +SELECT @msg AS [msg] +ELSE +BEGIN + -- Calculate % split of NAICS 72 into 721 and 722 for 2020 Census Blocks - + SELECT + [GEOID20] AS [block], + CASE WHEN [72] = 0 THEN SUM([721]) OVER() / SUM([72]) OVER() + ELSE [721] / [72] END AS [pct_721], + CASE WHEN [72] = 0 THEN SUM([722]) OVER() / SUM([72]) OVER() + ELSE [722] / [72] END AS [pct_722] + FROM ( + SELECT + [GEOID20], + SUM(CASE WHEN [industry_code] = '721' THEN [jobs] ELSE 0 END) AS [721], + SUM(CASE WHEN [industry_code] = '722' THEN [jobs] ELSE 0 END) AS [722], + COALESCE(SUM([jobs]), 0) AS [72] + FROM [#edd] + RIGHT OUTER JOIN [GeoDepot].[sde].[CENSUSBLOCKS] + ON [#edd].[Shape].STIntersects([CENSUSBLOCKS].[Shape]) = 1 + GROUP BY [GEOID20] + ) [tt] + ORDER BY [GEOID20] +END \ No newline at end of file From 2c28142ec1af54946d66a690aac55f6ed3950fe7 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Fri, 30 Jan 2026 16:20:50 -0800 Subject: [PATCH 03/24] #185 separate lodes data pull from join to mgra --- sql/employment/get_lodes_data.sql | 52 +++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 sql/employment/get_lodes_data.sql diff --git a/sql/employment/get_lodes_data.sql b/sql/employment/get_lodes_data.sql new file mode 100644 index 0000000..d6a4952 --- /dev/null +++ b/sql/employment/get_lodes_data.sql @@ -0,0 +1,52 @@ +-- Initialize parameters ----------------------------------------------------- +DECLARE @year integer = :year; + +-- Send error message if no data exists -------------------------------------- +IF NOT EXISTS ( + SELECT TOP (1) * + FROM [socioec_data].[lehd].[lodes_8_wac] + WHERE [SEG] = 'S000' + AND [TYPE] = 'JT00' + AND [version] = 2 + AND [YEAR] = @year +) +SELECT 'LODES data does not exist' AS [msg] +ELSE +BEGIN + + + SELECT + [YEAR] AS [year], + [w_geocode] AS [block], + [industry_code], + SUM([value]) AS [jobs] + FROM [socioec_data].[lehd].[lodes_8_wac] + CROSS APPLY ( + VALUES + ('11', [CNS01]), + ('21', [CNS02]), + ('22', [CNS03]), + ('23', [CNS04]), + ('31-33',[CNS05]), + ('42', [CNS06]), + ('44-45',[CNS07]), + ('48-49',[CNS08]), + ('51', [CNS09]), + ('52', [CNS10]), + ('53', [CNS11]), + ('54', [CNS12]), + ('55', [CNS13]), + ('56', [CNS14]), + ('61', [CNS15]), + ('62', [CNS16]), + ('71', [CNS17]), + ('72', [CNS18]), + ('81', [CNS19]), + ('92', [CNS20]) + ) AS u([industry_code], [value]) + WHERE [SEG] = 'S000' + AND [TYPE] = 'JT00' + AND [version] = 2 + AND [YEAR] = @year + GROUP BY [YEAR], [w_geocode], [industry_code] +END From 86019c719dc4f107e697bebde90a71c53eb19ffc Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Tue, 3 Feb 2026 14:29:02 -0800 Subject: [PATCH 04/24] #185 #188 #189 Update logic for employment estimates This update separates out some of the functionality. Then addressed the ability to lookback multiple years to grab most recently available data. Integrated the ability to split NAICS 72 into NIACS 721 and 722 --- python/employment.py | 159 +++++++++++++++++++++++--- python/utils.py | 86 ++++++++++---- sql/employment/LODES_to_MGRA.sql | 141 ----------------------- sql/employment/QCEW_control.sql | 7 +- sql/employment/get_mgra.sql | 7 ++ sql/employment/get_naics72_split.sql | 7 +- sql/employment/xref_block_to_mgra.sql | 12 ++ 7 files changed, 233 insertions(+), 186 deletions(-) delete mode 100644 sql/employment/LODES_to_MGRA.sql create mode 100644 sql/employment/get_mgra.sql create mode 100644 sql/employment/xref_block_to_mgra.sql diff --git a/python/employment.py b/python/employment.py index 2377d91..86c44b7 100644 --- a/python/employment.py +++ b/python/employment.py @@ -28,13 +28,15 @@ def run_employment(year): f"Current MGRA_VERSION is '{utils.MGRA_VERSION}'." ) - # Load data from SQL - original_data, control_totals = run_employment_sql(year) + LODES_data = get_LODES_data(year) + xref = get_xref_block_to_mgra(utils.MGRA_VERSION) + + lehd_jobs = aggregate_lodes_to_mgra(LODES_data, xref, utils.RUN_ID, year) + + control_totals = get_control_totals(year) # Apply controls - controlled_data = apply_employment_controls( - original_data, control_totals, generator - ) + controlled_data = apply_employment_controls(lehd_jobs, control_totals, generator) # Export results output_filepath = utils.OUTPUT_FOLDER / f"controlled_data_{year}.csv" @@ -43,27 +45,150 @@ def run_employment(year): return controlled_data -def run_employment_sql(year): - """Load employment data from SQL queries. +def get_LODES_data(year) -> pd.DataFrame: + """Retrieve LEHD LODES data for a specified year. Args: - year (int): The year for which to load employment data. + year (int): The year for which to retrieve LEHD LODES data. + """ + + with utils.LEHD_ENGINE.connect() as con: + with open(utils.SQL_FOLDER / "employment/get_lodes_data.sql") as file: + lodes_data = utils.read_sql_query_acs( + sql=sql.text(file.read()), + con=con, + params={"year": year}, + ) + with utils.GIS_ENGINE.connect() as con: + with open(utils.SQL_FOLDER / "employment/get_naics72_split.sql") as file: + split_naics_72 = utils.read_sql_query_acs( + sql=sql.text(file.read()), + con=con, + params={"year": year}, + ) + + # Separate industry_code 72 from other industries + lodes_72 = lodes_data[lodes_data["industry_code"] == "72"].copy() + lodes_other = lodes_data[lodes_data["industry_code"] != "72"].copy() + + # Join industry_code 72 data with split percentages + lodes_72_split = lodes_72.merge(split_naics_72, on="block", how="left") + + # Create rows for industry_code 721 + lodes_721 = lodes_72_split[["year", "block"]].copy() + lodes_721["industry_code"] = "721" + lodes_721["jobs"] = lodes_72_split["jobs"] * lodes_72_split["pct_721"] + + # Create rows for industry_code 722 + lodes_722 = lodes_72_split[["year", "block"]].copy() + lodes_722["industry_code"] = "722" + lodes_722["jobs"] = lodes_72_split["jobs"] * lodes_72_split["pct_722"] + + # Combine all data + combined_data = pd.concat([lodes_other, lodes_721, lodes_722], ignore_index=True) + combined_data = combined_data[["year", "block", "industry_code", "jobs"]] + + return combined_data + + +def get_xref_block_to_mgra(mgra_version) -> pd.DataFrame: + """Retrieve crosswalk from Census blocks to MGRAs. + Args: + mgra_version (str): The MGRA version to use for the crosswalk. Returns: - tuple: A tuple containing (original_data, control_totals) as DataFrames. + pd.DataFrame: A DataFrame containing the crosswalk from blocks to MGRAs. """ + with utils.LEHD_ENGINE.connect() as con: - # Get LEHD LODES data at MGRA level - with open(utils.SQL_FOLDER / "employment/LODES_to_MGRA.sql") as file: - original_data = pd.read_sql( + with open(utils.SQL_FOLDER / "employment/xref_block_to_mgra.sql") as file: + xref = utils.read_sql_query_acs( sql=sql.text(file.read()), con=con, - params={ - "year": year, - "mgra_version": utils.MGRA_VERSION, - }, + params={"mgra_version": mgra_version}, ) + return xref + + +def aggregate_lodes_to_mgra(combined_data, xref, run_id, year) -> pd.DataFrame: + """Aggregate LODES data to MGRA level using allocation percentages. + + Args: + combined_data (pd.DataFrame): LODES data with columns: year, block, industry_code, jobs + xref (pd.DataFrame): Crosswalk with columns: block, mgra, allocation_pct + run_id (int): The run ID for tracking + year (int): The year for which to aggregate data + + Returns: + pd.DataFrame: Aggregated data at MGRA level with columns: year, mgra, industry_code, jobs + """ + # Get MGRA data from SQL + with utils.LEHD_ENGINE.connect() as con: + with open(utils.SQL_FOLDER / "employment/get_mgra.sql") as file: + mgra_data = utils.read_sql_query_acs( + sql=sql.text(file.read()), + con=con, + params={"run_id": run_id}, + ) + + # Get unique industry codes and cross join with MGRA data + unique_industries = combined_data["industry_code"].unique() + jobs_frame = ( + mgra_data.assign(key=1) + .merge( + pd.DataFrame({"industry_code": unique_industries, "key": 1}), + on="key", + ) + .drop("key", axis=1) + ) + jobs_frame["year"] = year + jobs_frame = jobs_frame[["year", "mgra", "industry_code"]] + + # Join combined_data to xref and calculate allocated jobs + lehd_to_mgra = combined_data.merge(xref, on="block", how="inner") + lehd_to_mgra["jobs_alloc"] = lehd_to_mgra["jobs"] * lehd_to_mgra["allocation_pct"] + lehd_to_mgra = lehd_to_mgra[ + [ + "year", + "block", + "mgra", + "industry_code", + "jobs", + "allocation_pct", + "jobs_alloc", + ] + ] + + # Sum allocated jobs by year, mgra, and industry_code + lehd_to_mgra_summed = lehd_to_mgra.groupby( + ["year", "mgra", "industry_code"], as_index=False + )["jobs_alloc"].sum() + lehd_to_mgra_summed = lehd_to_mgra_summed.rename(columns={"jobs_alloc": "jobs"}) + + # Join summed data to jobs_frame, keeping all MGRAs and industry codes + final_lehd_to_mgra = jobs_frame.merge( + lehd_to_mgra_summed, + on=["year", "mgra", "industry_code"], + how="left", + ) + final_lehd_to_mgra["jobs"] = final_lehd_to_mgra["jobs"].fillna(0) + final_lehd_to_mgra = final_lehd_to_mgra[["year", "mgra", "industry_code", "jobs"]] + + return final_lehd_to_mgra + + +def get_control_totals(year) -> pd.DataFrame: + """Load employment data from SQL queries. + + Args: + year (int): The year for which to load employment data. + + Returns: + pd.DataFrame: Employment control totals from QCEW. + """ + with utils.LEHD_ENGINE.connect() as con: + # Get employment control totals from QCEW with open(utils.SQL_FOLDER / "employment/QCEW_control.sql") as file: control_totals = pd.read_sql( @@ -73,7 +198,7 @@ def run_employment_sql(year): "year": year, }, ) - return original_data, control_totals + return control_totals def apply_employment_controls(original_data, control_totals, generator): diff --git a/python/utils.py b/python/utils.py index c30a862..52e5643 100644 --- a/python/utils.py +++ b/python/utils.py @@ -690,38 +690,74 @@ def read_sql_query_acs(**kwargs: dict) -> pd.DataFrame: for dynamic adjustment of the 'year' parameter in the query. If the query returns a message indicating that the ACS 5-Year Table does not exist for a given year, it will automatically decrement the year by one and re-run - the query. Note this function is specific to ACS 5-Year Tables and - requires the SQL query file to return a DataFrame with a single column - called 'msg' with the text 'ACS 5-Year Table does not exist' when no data - is found for the specified year. + the query. This process will continue for up to 5 years back. Note this + function is specific to ACS 5-Year Tables and requires the SQL query file + to return a DataFrame with a single column called 'msg' with the text + 'ACS 5-Year Table does not exist' when no data is found for the specified + year. Args: kwargs (dict): Keyword arguments for pd.read_sql_query Returns: pd.DataFrame: Result of the SQL query + + Raises: + ValueError: If data is not found after 5 year lookback or if an + unexpected message is returned """ - df = pd.read_sql_query(**kwargs) # type: ignore - - # Check if returned DataFrame contains SQL message - if df.columns.tolist() == ["msg"]: - msg = df["msg"].values[0] - if msg == "ACS 5-Year Table does not exist" and "year" in kwargs["params"]: - # If the table does not exist run query for prior year - kwargs["params"]["year"] -= 1 - - logger.warning( - "Re-running ACS SQL query with 'year' set to: " - + str(kwargs["params"]["year"]) - ) + # Store original year for potential relabeling + original_year = kwargs.get("params", {}).get("year") + max_lookback = 5 + + # Messages that trigger year lookback + lookback_messages = [ + "ACS 5-Year Table does not exist", + "LODES data does not exist", + "EDD point-level data does not exist", + ] - df = pd.read_sql_query(**kwargs) # type: ignore + # Try up to max_lookback + 1 times (original year + 5 lookbacks) + for attempt in range(max_lookback + 1): + df = pd.read_sql_query(**kwargs) # type: ignore + + # Check if returned DataFrame contains SQL message + if df.columns.tolist() == ["msg"]: + msg = df["msg"].values[0] + + # Check if message is in the lookback list and year parameter exists + if msg in lookback_messages and "year" in kwargs["params"]: + # If we've exhausted all lookback attempts, raise error + if attempt >= max_lookback: + raise ValueError( + f"Data not found after {max_lookback} year lookback. " + f"Original year: {original_year}, " + f"Final attempted year: {kwargs['params']['year']}" + ) - # If the year column exists, set it to the original year - if "year" in df.columns: - df["year"] = kwargs["params"]["year"] + 1 - else: - # Raise error if the message is not expected - raise ValueError(f"SQL query returned a message: {msg}.") + # Decrement year and try again + kwargs["params"]["year"] -= 1 + + logger.warning( + f"Re-running ACS SQL query with 'year' set to: " + f"{kwargs['params']['year']} (attempt {attempt + 2}/{max_lookback + 1})" + ) + + continue # Continue to next iteration + else: + # Raise error if the message is not expected + raise ValueError(f"SQL query returned an unexpected message: {msg}") + + # If we got valid data, relabel year column if it exists and year was adjusted + if "year" in df.columns and original_year is not None: + if kwargs["params"]["year"] != original_year: + logger.info( + f"Relabeling 'year' column from {kwargs['params']['year']} " + f"to {original_year}" + ) + df["year"] = original_year + + return df - return df + # If we exit the loop without returning, raise an error + raise ValueError("Failed to retrieve data after maximum lookback attempts.") diff --git a/sql/employment/LODES_to_MGRA.sql b/sql/employment/LODES_to_MGRA.sql deleted file mode 100644 index 97ee1e6..0000000 --- a/sql/employment/LODES_to_MGRA.sql +++ /dev/null @@ -1,141 +0,0 @@ --- Initialize parameters ----------------------------------------------------- -DECLARE @year integer = :year; -DECLARE @mgra_version nvarchar(10) = :mgra_version; - - --- Send error message if no data exists -------------------------------------- -IF NOT EXISTS ( - SELECT TOP (1) * - FROM [socioec_data].[lehd].[lodes_8_wac] - WHERE [SEG] = 'S000' - AND [TYPE] = 'JT00' - AND [version] = 2 - AND [YEAR] = @year -) -SELECT 'Data does not exist' AS [msg] -ELSE -BEGIN - - - -- All MGRAs - WITH [AllMGRAs] AS ( - SELECT DISTINCT CAST([zone] AS INT) AS [MGRA] - FROM [GeoAnalyst].[geography].[zone] - WHERE [geography_id] = - CASE - WHEN @mgra_version = 'mgra15' THEN 4 -- which mgra list to grab from table, here is where to add more mgra versions to grab - ELSE NULL - END - ), - - -- All industry codes (2 - Digit NAICS) - [AllIndustries] AS ( - SELECT [industry_code] - FROM (VALUES - ('11'), - ('21'), - ('22'), - ('23'), - ('31-33'), - ('42'), - ('44-45'), - ('48-49'), - ('51'), - ('52'), - ('53'), - ('54'), - ('55'), - ('56'), - ('61'), - ('62'), - ('71'), - ('72'), - ('81'), - ('92') - ) AS i(industry_code) - ), - - -- Create the cross product = every MGRA × every industry_code - [AllCombinations] AS ( - SELECT - [MGRA], - [industry_code] - FROM [AllMGRAs] - CROSS JOIN [AllIndustries] - ), - - -- aggregated Jobs data from LEHD LODES into MGRAs that exist in xwalk from Census Block to MGRA - [AggregatedJobs] AS ( - SELECT - [YEAR], - [MGRA], - [industry_code], - SUM([value]) AS [jobs] - FROM [socioec_data].[lehd].[lodes_8_wac] - INNER JOIN ( - SELECT - [xref_id], - [from_zone_id], - [block].[zone] AS [CTblock], - [to_zone_id], - CAST([MGRA15].[zone] AS INT) AS [MGRA], - [allocation_pct] - FROM [GeoAnalyst].[geography].[xref_zone] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [block] - ON [xref_zone].[from_zone_id] = [block].[zone_id] - INNER JOIN [GeoAnalyst].[geography].[zone] AS [MGRA15] - ON [xref_zone].[to_zone_id] = [MGRA15].[zone_id] - WHERE - -- This is where to specify which xref to use based on MGRA version - -- Curretly only works using mgra15 but this is where to add if additional mgra versions to be used - [xref_id] = - CASE - WHEN @mgra_version = 'mgra15' THEN 18 - ELSE NULL - END - ) AS [XREF] - ON [lodes_8_wac].[w_geocode] = [XREF].[CTblock] - CROSS APPLY ( - VALUES - ('11', [CNS01] * [allocation_pct]), - ('21', [CNS02] * [allocation_pct]), - ('22', [CNS03] * [allocation_pct]), - ('23', [CNS04] * [allocation_pct]), - ('31-33',[CNS05] * [allocation_pct]), - ('42', [CNS06] * [allocation_pct]), - ('44-45',[CNS07] * [allocation_pct]), - ('48-49',[CNS08] * [allocation_pct]), - ('51', [CNS09] * [allocation_pct]), - ('52', [CNS10] * [allocation_pct]), - ('53', [CNS11] * [allocation_pct]), - ('54', [CNS12] * [allocation_pct]), - ('55', [CNS13] * [allocation_pct]), - ('56', [CNS14] * [allocation_pct]), - ('61', [CNS15] * [allocation_pct]), - ('62', [CNS16] * [allocation_pct]), - ('71', [CNS17] * [allocation_pct]), - ('72', [CNS18] * [allocation_pct]), - ('81', [CNS19] * [allocation_pct]), - ('92', [CNS20] * [allocation_pct]) - ) AS u([industry_code], [value]) - WHERE [SEG] = 'S000' - AND [TYPE] = 'JT00' - AND [version] = 2 - AND [YEAR] = @year - GROUP BY [YEAR], [MGRA], [industry_code] - ) - - - -- Final result: left join the aggregated jobs to the full combinations - SELECT - @year AS [YEAR], - [AllCombinations].[MGRA], - [AllCombinations].[industry_code], - COALESCE([AggregatedJobs].[jobs], 0) AS [jobs] - FROM [AllCombinations] - LEFT JOIN [AggregatedJobs] - ON [AggregatedJobs].[MGRA] = [AllCombinations].[MGRA] - AND [AggregatedJobs].[industry_code] = [AllCombinations].[industry_code] - ORDER BY [AllCombinations].[MGRA], [AllCombinations].[industry_code] -END - diff --git a/sql/employment/QCEW_control.sql b/sql/employment/QCEW_control.sql index 3e74b42..f90305f 100644 --- a/sql/employment/QCEW_control.sql +++ b/sql/employment/QCEW_control.sql @@ -1,3 +1,6 @@ +-- Initialize parameters ----------------------------------------------------- +DECLARE @year integer = 2023--:year; + SELECT [year], [industry_code], @@ -6,6 +9,6 @@ FROM [socioec_data].[bls].[qcew_by_area_annual] INNER JOIN [socioec_data].[bls].[industry_code] ON [qcew_by_area_annual].[naics_id] = [industry_code].[naics_id] WHERE [area_fips] = '06073' - AND [year] = :year - AND [industry_code] IN ('11', '21', '22', '23', '31-33', '42', '44-45', '48-49', '51', '52', '53', '54', '55', '56', '61', '62', '71', '72', '81', '92') + AND [year] = @year + AND [industry_code] IN ('11', '21', '22', '23', '31-33', '42', '44-45', '48-49', '51', '52', '53', '54', '55', '56', '61', '62', '71', '721', '722', '81', '92') GROUP BY [year], [industry_code] diff --git a/sql/employment/get_mgra.sql b/sql/employment/get_mgra.sql new file mode 100644 index 0000000..10c7dc3 --- /dev/null +++ b/sql/employment/get_mgra.sql @@ -0,0 +1,7 @@ +-- Initialize parameters ----------------------------------------------------- +DECLARE @run_id integer = :run_id; + +SELECT DISTINCT [mgra] +FROM [EstimatesProgram].[inputs].[mgra] +WHERE run_id = @run_id +ORDER BY [mgra] diff --git a/sql/employment/get_naics72_split.sql b/sql/employment/get_naics72_split.sql index 2025db1..9eedce6 100644 --- a/sql/employment/get_naics72_split.sql +++ b/sql/employment/get_naics72_split.sql @@ -17,6 +17,8 @@ SET NOCOUNT ON; DECLARE @year INTEGER = :year; DECLARE @msg nvarchar(45) = 'EDD point-level data does not exist'; + +--Drop Temp table and ok and spatial index if they exist DROP TABLE IF EXISTS [#edd]; CREATE TABLE [#edd] ( [id] INTEGER IDENTITY(1,1) NOT NULL, @@ -174,4 +176,7 @@ BEGIN GROUP BY [GEOID20] ) [tt] ORDER BY [GEOID20] -END \ No newline at end of file +END + +--Drop Temp table +DROP TABLE IF EXISTS [#edd]; \ No newline at end of file diff --git a/sql/employment/xref_block_to_mgra.sql b/sql/employment/xref_block_to_mgra.sql new file mode 100644 index 0000000..d0ae9ac --- /dev/null +++ b/sql/employment/xref_block_to_mgra.sql @@ -0,0 +1,12 @@ +-- Initialize parameters ----------------------------------------------------- +DECLARE @mgra_version nvarchar(10) = :mgra_version; + + +SELECT + [from_zone] AS [block], + CAST([to_zone] AS INT) AS [mgra], + [allocation_pct] + FROM [GeoAnalyst].[geography].[fn_xref_zones]( + CASE WHEN @mgra_version = 'mgra15' THEN 18 ELSE NULL END + ) + ORDER BY [block], [mgra] \ No newline at end of file From dec3eaf915e298d8f26d8008627fda39cf96fa67 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Tue, 3 Feb 2026 15:36:43 -0800 Subject: [PATCH 05/24] #185 change "jobs" in output to "value" and fix year left in SQL query --- python/employment.py | 13 ++++++++----- sql/employment/QCEW_control.sql | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/python/employment.py b/python/employment.py index 86c44b7..eb309c0 100644 --- a/python/employment.py +++ b/python/employment.py @@ -164,7 +164,7 @@ def aggregate_lodes_to_mgra(combined_data, xref, run_id, year) -> pd.DataFrame: lehd_to_mgra_summed = lehd_to_mgra.groupby( ["year", "mgra", "industry_code"], as_index=False )["jobs_alloc"].sum() - lehd_to_mgra_summed = lehd_to_mgra_summed.rename(columns={"jobs_alloc": "jobs"}) + lehd_to_mgra_summed = lehd_to_mgra_summed.rename(columns={"jobs_alloc": "value"}) # Join summed data to jobs_frame, keeping all MGRAs and industry codes final_lehd_to_mgra = jobs_frame.merge( @@ -172,8 +172,11 @@ def aggregate_lodes_to_mgra(combined_data, xref, run_id, year) -> pd.DataFrame: on=["year", "mgra", "industry_code"], how="left", ) - final_lehd_to_mgra["jobs"] = final_lehd_to_mgra["jobs"].fillna(0) - final_lehd_to_mgra = final_lehd_to_mgra[["year", "mgra", "industry_code", "jobs"]] + final_lehd_to_mgra["value"] = final_lehd_to_mgra["value"].fillna(0) + final_lehd_to_mgra["run_id"] = run_id # Add run_id column + final_lehd_to_mgra = final_lehd_to_mgra[ + ["run_id", "year", "mgra", "industry_code", "value"] + ] return final_lehd_to_mgra @@ -229,8 +232,8 @@ def apply_employment_controls(original_data, control_totals, generator): ]["jobs"].iloc[0] # Apply integerize_1d and update controlled_data - controlled_data.loc[industry_mask, "jobs"] = utils.integerize_1d( - data=original_data.loc[industry_mask, "jobs"], + controlled_data.loc[industry_mask, "value"] = utils.integerize_1d( + data=original_data.loc[industry_mask, "value"], control=control_value, methodology="weighted_random", generator=generator, diff --git a/sql/employment/QCEW_control.sql b/sql/employment/QCEW_control.sql index f90305f..7457113 100644 --- a/sql/employment/QCEW_control.sql +++ b/sql/employment/QCEW_control.sql @@ -1,5 +1,5 @@ -- Initialize parameters ----------------------------------------------------- -DECLARE @year integer = 2023--:year; +DECLARE @year integer = :year; SELECT [year], From 3243c07d657a6d2e15d141991c49caad2cb3f42f Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Tue, 3 Feb 2026 16:49:36 -0800 Subject: [PATCH 06/24] #185 #186 Create output/input table in SQL and add ability to output to SQL --- python/employment.py | 31 +++++++++++++++++++++++++++---- sql/create_objects.sql | 25 +++++++++++++++++++++++++ sql/employment/QCEW_control.sql | 2 +- 3 files changed, 53 insertions(+), 5 deletions(-) diff --git a/python/employment.py b/python/employment.py index eb309c0..f564649 100644 --- a/python/employment.py +++ b/python/employment.py @@ -39,10 +39,11 @@ def run_employment(year): controlled_data = apply_employment_controls(lehd_jobs, control_totals, generator) # Export results - output_filepath = utils.OUTPUT_FOLDER / f"controlled_data_{year}.csv" - controlled_data.to_csv(output_filepath, index=False) + # output_filepath = utils.OUTPUT_FOLDER / f"controlled_data_{year}.csv" + # controlled_data.to_csv(output_filepath, index=False) - return controlled_data + _insert_jobs(control_totals, controlled_data) + # return controlled_data def get_LODES_data(year) -> pd.DataFrame: @@ -201,6 +202,9 @@ def get_control_totals(year) -> pd.DataFrame: "year": year, }, ) + + control_totals["run_id"] = utils.RUN_ID # Add run_id column + return control_totals @@ -229,7 +233,7 @@ def apply_employment_controls(original_data, control_totals, generator): # Get control value for this industry control_value = control_totals[ control_totals["industry_code"] == industry_code - ]["jobs"].iloc[0] + ]["value"].iloc[0] # Apply integerize_1d and update controlled_data controlled_data.loc[industry_mask, "value"] = utils.integerize_1d( @@ -240,3 +244,22 @@ def apply_employment_controls(original_data, control_totals, generator): ) return controlled_data + + +def _insert_jobs(jobs_inputs, jobs_outputs) -> None: + """Insert input and output data related to household population""" + + # Insert input and output data to database + with utils.ESTIMATES_ENGINE.connect() as con: + + jobs_inputs.to_sql( + name="controls_jobs", + con=con, + schema="inputs", + if_exists="append", + index=False, + ) + + jobs_outputs.to_sql( + name="jobs", con=con, schema="outputs", if_exists="append", index=False + ) diff --git a/sql/create_objects.sql b/sql/create_objects.sql index dc3e6ca..6e856ac 100644 --- a/sql/create_objects.sql +++ b/sql/create_objects.sql @@ -110,6 +110,17 @@ INSERT INTO [inputs].[special_mgras] VALUES transferred to the Las Colinas Detention Facility.') GO +CREATE TABLE [inputs].[controls_jobs] ( + [run_id] INT NOT NULL, + [year] INT NOT NULL, + [industry_code] NVARCHAR(5) NOT NULL, + [value] INT NOT NULL, + INDEX [ccsi_inputs_controls_jobs] CLUSTERED COLUMNSTORE, + CONSTRAINT [ixuq_inputs_controls_jobs] UNIQUE ([run_id], [year], [industry_code]) WITH (DATA_COMPRESSION = PAGE), + CONSTRAINT [fk_inputs_controls_ase_run_id] FOREIGN KEY ([run_id]) REFERENCES [metadata].[run] ([run_id]), + CONSTRAINT [chk_non_negative_inputs_controls_ase] CHECK ([value] >= 0) +) +GO CREATE SCHEMA [outputs] @@ -196,3 +207,17 @@ CREATE TABLE [outputs].[hhp] ( CONSTRAINT [chk_non_negative_outputs_hhp] CHECK ([value] >= 0) ) GO + +CREATE TABLE [outputs].[jobs] ( + [run_id] INT NOT NULL, + [year] INT NOT NULL, + [mgra] INT NOT NULL, + [industry_code] NVARCHAR(5) NOT NULL, + [value] INT NOT NULL, + INDEX [ccsi_outputs_jobs] CLUSTERED COLUMNSTORE, + CONSTRAINT [ixuq_outputs_jobs] UNIQUE ([run_id], [year], [mgra], [industry_code]) WITH (DATA_COMPRESSION = PAGE), + CONSTRAINT [fk_outputs_jobs_run_id] FOREIGN KEY ([run_id]) REFERENCES [metadata].[run] ([run_id]), + CONSTRAINT [fk_outputs_jobs_mgra] FOREIGN KEY ([run_id], [mgra]) REFERENCES [inputs].[mgra] ([run_id], [mgra]), + CONSTRAINT [chk_non_negative_outputs_jobs] CHECK ([value] >= 0) +) +GO diff --git a/sql/employment/QCEW_control.sql b/sql/employment/QCEW_control.sql index 7457113..cb1a29e 100644 --- a/sql/employment/QCEW_control.sql +++ b/sql/employment/QCEW_control.sql @@ -4,7 +4,7 @@ DECLARE @year integer = :year; SELECT [year], [industry_code], - SUM([annual_avg_emplvl]) AS jobs + SUM([annual_avg_emplvl]) AS [value] FROM [socioec_data].[bls].[qcew_by_area_annual] INNER JOIN [socioec_data].[bls].[industry_code] ON [qcew_by_area_annual].[naics_id] = [industry_code].[naics_id] From b7e8062527f46fb9ad56a269251f633856460069 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Wed, 4 Feb 2026 10:21:51 -0800 Subject: [PATCH 07/24] #188 change function read_sql_query_acs to read_sql_query_custom change name of function and changed all the spots where read_sql_query_acs was being used --- python/ase.py | 8 ++-- python/employment.py | 82 +++++++++++++++++------------------- python/hh_characteristics.py | 4 +- python/hs_hh.py | 2 +- python/pop_type.py | 2 +- python/utils.py | 21 +++++---- wiki/Utility.md | 2 +- 7 files changed, 57 insertions(+), 64 deletions(-) diff --git a/python/ase.py b/python/ase.py index ed56105..c29436e 100644 --- a/python/ase.py +++ b/python/ase.py @@ -98,7 +98,7 @@ def _get_controls_inputs(year: int) -> dict[str, pd.DataFrame]: # Get regional age/sex/ethnicity group quarters distributions with open(utils.SQL_FOLDER / "ase/get_region_gq_ase_dist.sql") as file: - region_gq_ase_dist = utils.read_sql_query_acs( + region_gq_ase_dist = utils.read_sql_query_custom( sql=sql.text(file.read()), con=con, params={ @@ -262,7 +262,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]: with utils.ESTIMATES_ENGINE.connect() as con: # Get the Age/Sex B010001 table data with open(utils.SQL_FOLDER / "ase/get_B01001.sql") as file: - b01001 = utils.read_sql_query_acs( + b01001 = utils.read_sql_query_custom( sql=sql.text(file.read()), con=con, params={ @@ -272,7 +272,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]: # Get the Ethnicity B03002 table data with open(utils.SQL_FOLDER / "ase/get_B03002.sql") as file: - b03002 = utils.read_sql_query_acs( + b03002 = utils.read_sql_query_custom( sql=sql.text(file.read()), con=con, params={ @@ -282,7 +282,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]: # Get Age/Sex/Ethnicity data from B01001(B-I) table data with open(utils.SQL_FOLDER / "ase/get_B01001(B-I).sql") as file: - b01001_b_i = utils.read_sql_query_acs( + b01001_b_i = utils.read_sql_query_custom( sql=sql.text(file.read()), con=con, params={ diff --git a/python/employment.py b/python/employment.py index f564649..476f5fa 100644 --- a/python/employment.py +++ b/python/employment.py @@ -6,12 +6,8 @@ generator = np.random.default_rng(utils.RANDOM_SEED) -# Load configuration from YAML file -# with open("config.yaml", "r") as f: -# config = yaml.safe_load(f) - -def run_employment(year): +def run_employment(year: int): """Run the Employment module for a specified year. This function processes employment data by applying control totals to @@ -30,23 +26,18 @@ def run_employment(year): LODES_data = get_LODES_data(year) - xref = get_xref_block_to_mgra(utils.MGRA_VERSION) + xref = get_xref_block_to_mgra() - lehd_jobs = aggregate_lodes_to_mgra(LODES_data, xref, utils.RUN_ID, year) + lehd_jobs = aggregate_lodes_to_mgra(LODES_data, xref, year) control_totals = get_control_totals(year) - # Apply controls - controlled_data = apply_employment_controls(lehd_jobs, control_totals, generator) - # Export results - # output_filepath = utils.OUTPUT_FOLDER / f"controlled_data_{year}.csv" - # controlled_data.to_csv(output_filepath, index=False) + controlled_data = apply_employment_controls(lehd_jobs, control_totals, generator) _insert_jobs(control_totals, controlled_data) - # return controlled_data -def get_LODES_data(year) -> pd.DataFrame: +def get_LODES_data(year: int) -> pd.DataFrame: """Retrieve LEHD LODES data for a specified year. Args: @@ -55,7 +46,7 @@ def get_LODES_data(year) -> pd.DataFrame: with utils.LEHD_ENGINE.connect() as con: with open(utils.SQL_FOLDER / "employment/get_lodes_data.sql") as file: - lodes_data = utils.read_sql_query_acs( + lodes_data = utils.read_sql_query_custom( sql=sql.text(file.read()), con=con, params={"year": year}, @@ -63,7 +54,7 @@ def get_LODES_data(year) -> pd.DataFrame: with utils.GIS_ENGINE.connect() as con: with open(utils.SQL_FOLDER / "employment/get_naics72_split.sql") as file: - split_naics_72 = utils.read_sql_query_acs( + split_naics_72 = utils.read_sql_query_custom( sql=sql.text(file.read()), con=con, params={"year": year}, @@ -93,32 +84,32 @@ def get_LODES_data(year) -> pd.DataFrame: return combined_data -def get_xref_block_to_mgra(mgra_version) -> pd.DataFrame: +def get_xref_block_to_mgra() -> pd.DataFrame: """Retrieve crosswalk from Census blocks to MGRAs. - Args: - mgra_version (str): The MGRA version to use for the crosswalk. + Returns: pd.DataFrame: A DataFrame containing the crosswalk from blocks to MGRAs. """ with utils.LEHD_ENGINE.connect() as con: with open(utils.SQL_FOLDER / "employment/xref_block_to_mgra.sql") as file: - xref = utils.read_sql_query_acs( + xref = utils.read_sql_query_custom( sql=sql.text(file.read()), con=con, - params={"mgra_version": mgra_version}, + params={"mgra_version": utils.MGRA_VERSION}, ) return xref -def aggregate_lodes_to_mgra(combined_data, xref, run_id, year) -> pd.DataFrame: +def aggregate_lodes_to_mgra( + combined_data: pd.DataFrame, xref: pd.DataFrame, year: int +) -> pd.DataFrame: """Aggregate LODES data to MGRA level using allocation percentages. Args: combined_data (pd.DataFrame): LODES data with columns: year, block, industry_code, jobs xref (pd.DataFrame): Crosswalk with columns: block, mgra, allocation_pct - run_id (int): The run ID for tracking year (int): The year for which to aggregate data Returns: @@ -127,10 +118,10 @@ def aggregate_lodes_to_mgra(combined_data, xref, run_id, year) -> pd.DataFrame: # Get MGRA data from SQL with utils.LEHD_ENGINE.connect() as con: with open(utils.SQL_FOLDER / "employment/get_mgra.sql") as file: - mgra_data = utils.read_sql_query_acs( + mgra_data = utils.read_sql_query_custom( sql=sql.text(file.read()), con=con, - params={"run_id": run_id}, + params={"run_id": utils.RUN_ID}, ) # Get unique industry codes and cross join with MGRA data @@ -148,24 +139,23 @@ def aggregate_lodes_to_mgra(combined_data, xref, run_id, year) -> pd.DataFrame: # Join combined_data to xref and calculate allocated jobs lehd_to_mgra = combined_data.merge(xref, on="block", how="inner") - lehd_to_mgra["jobs_alloc"] = lehd_to_mgra["jobs"] * lehd_to_mgra["allocation_pct"] - lehd_to_mgra = lehd_to_mgra[ - [ - "year", - "block", - "mgra", - "industry_code", - "jobs", - "allocation_pct", - "jobs_alloc", - ] - ] + lehd_to_mgra["value"] = lehd_to_mgra["jobs"] * lehd_to_mgra["allocation_pct"] + # lehd_to_mgra = lehd_to_mgra[ + # [ + # "year", + # "block", + # "mgra", + # "industry_code", + # "jobs", + # "allocation_pct", + # "value", + # ] + # ] # Sum allocated jobs by year, mgra, and industry_code lehd_to_mgra_summed = lehd_to_mgra.groupby( ["year", "mgra", "industry_code"], as_index=False - )["jobs_alloc"].sum() - lehd_to_mgra_summed = lehd_to_mgra_summed.rename(columns={"jobs_alloc": "value"}) + )["value"].sum() # Join summed data to jobs_frame, keeping all MGRAs and industry codes final_lehd_to_mgra = jobs_frame.merge( @@ -174,7 +164,7 @@ def aggregate_lodes_to_mgra(combined_data, xref, run_id, year) -> pd.DataFrame: how="left", ) final_lehd_to_mgra["value"] = final_lehd_to_mgra["value"].fillna(0) - final_lehd_to_mgra["run_id"] = run_id # Add run_id column + final_lehd_to_mgra["run_id"] = utils.RUN_ID # Add run_id column final_lehd_to_mgra = final_lehd_to_mgra[ ["run_id", "year", "mgra", "industry_code", "value"] ] @@ -182,7 +172,7 @@ def aggregate_lodes_to_mgra(combined_data, xref, run_id, year) -> pd.DataFrame: return final_lehd_to_mgra -def get_control_totals(year) -> pd.DataFrame: +def get_control_totals(year: int) -> pd.DataFrame: """Load employment data from SQL queries. Args: @@ -208,13 +198,17 @@ def get_control_totals(year) -> pd.DataFrame: return control_totals -def apply_employment_controls(original_data, control_totals, generator): +def apply_employment_controls( + original_data: pd.DataFrame, + control_totals: pd.DataFrame, + generator: np.random.Generator, +) -> pd.DataFrame: """Apply control totals to employment data using integerization. Args: original_data (pd.DataFrame): LEHD LODES data at MGRA level. control_totals (pd.DataFrame): Employment control totals from QCEW. - generator: NumPy random number generator. + generator (np.random.Generator): NumPy random number generator. Returns: pd.DataFrame: Controlled employment data. @@ -246,7 +240,7 @@ def apply_employment_controls(original_data, control_totals, generator): return controlled_data -def _insert_jobs(jobs_inputs, jobs_outputs) -> None: +def _insert_jobs(jobs_inputs: pd.DataFrame, jobs_outputs: pd.DataFrame) -> None: """Insert input and output data related to household population""" # Insert input and output data to database diff --git a/python/hh_characteristics.py b/python/hh_characteristics.py index 3b28e60..ea78a93 100644 --- a/python/hh_characteristics.py +++ b/python/hh_characteristics.py @@ -86,7 +86,7 @@ def _get_hh_income_inputs(year: int) -> dict[str, pd.DataFrame]: with open( utils.SQL_FOLDER / "hh_characteristics" / "get_tract_controls_hh_income.sql" ) as file: - hh_income_inputs["hh_income_tract_controls"] = utils.read_sql_query_acs( + hh_income_inputs["hh_income_tract_controls"] = utils.read_sql_query_custom( sql=sql.text(file.read()), # type: ignore con=con, # type: ignore params={"run_id": utils.RUN_ID, "year": year}, @@ -120,7 +120,7 @@ def _get_hh_size_inputs(year: int) -> dict[str, pd.DataFrame]: / "hh_characteristics" / "get_tract_controls_hh_by_size.sql" ) as file: - hh_char_inputs["hhs_tract_controls"] = utils.read_sql_query_acs( + hh_char_inputs["hhs_tract_controls"] = utils.read_sql_query_custom( sql=sql.text(file.read()), # type: ignore con=con, # type: ignore params={"run_id": utils.RUN_ID, "year": year}, diff --git a/python/hs_hh.py b/python/hs_hh.py index 7a435a5..55fbfa8 100644 --- a/python/hs_hh.py +++ b/python/hs_hh.py @@ -105,7 +105,7 @@ def _get_hs_hh_inputs(year: int) -> dict[str, pd.DataFrame]: # Get tract occupancy controls with open(utils.SQL_FOLDER / "hs_hh/get_tract_controls_hh.sql") as file: - hs_hh_inputs["tract_controls"] = utils.read_sql_query_acs( + hs_hh_inputs["tract_controls"] = utils.read_sql_query_custom( sql=sql.text(file.read()), con=con, params={ diff --git a/python/pop_type.py b/python/pop_type.py index a116974..4b07dbf 100644 --- a/python/pop_type.py +++ b/python/pop_type.py @@ -204,7 +204,7 @@ def _get_hhp_inputs(year: int) -> dict[str, pd.DataFrame]: # Get tract level household size controls with open(utils.SQL_FOLDER / "pop_type/get_tract_controls_hhs.sql") as file: - tract_controls = utils.read_sql_query_acs( + tract_controls = utils.read_sql_query_custom( sql=sql.text(file.read()), con=con, params={ diff --git a/python/utils.py b/python/utils.py index 52e5643..aa5805c 100644 --- a/python/utils.py +++ b/python/utils.py @@ -683,18 +683,20 @@ def integerize_2d( return array_2d -def read_sql_query_acs(**kwargs: dict) -> pd.DataFrame: +def read_sql_query_custom(**kwargs: dict) -> pd.DataFrame: """Read SQL query allowing for dynamic year adjustment. This function executes a SQL query using pandas read_sql_query allowing for dynamic adjustment of the 'year' parameter in the query. If the query - returns a message indicating that the ACS 5-Year Table does not exist for - a given year, it will automatically decrement the year by one and re-run + returns a message indicating that the year does not exist in Table being + accessed, it will automatically decrement the year by one and re-run the query. This process will continue for up to 5 years back. Note this - function is specific to ACS 5-Year Tables and requires the SQL query file - to return a DataFrame with a single column called 'msg' with the text - 'ACS 5-Year Table does not exist' when no data is found for the specified - year. + function is specific to querying ACS 5-Year Tables, LEHD LODES and EDD + point-level data. This function requires the SQL query file to return a + DataFrame with a single column called 'msg' with one of 3 text strings: + 'ACS 5-Year Table does not exist', 'LODES data does not exist', or + 'EDD point-level data does not exist' when no data is found for the + specified year. Args: kwargs (dict): Keyword arguments for pd.read_sql_query @@ -739,7 +741,7 @@ def read_sql_query_acs(**kwargs: dict) -> pd.DataFrame: kwargs["params"]["year"] -= 1 logger.warning( - f"Re-running ACS SQL query with 'year' set to: " + f"Re-running SQL query with 'year' set to: " f"{kwargs['params']['year']} (attempt {attempt + 2}/{max_lookback + 1})" ) @@ -758,6 +760,3 @@ def read_sql_query_acs(**kwargs: dict) -> pd.DataFrame: df["year"] = original_year return df - - # If we exit the loop without returning, raise an error - raise ValueError("Failed to retrieve data after maximum lookback attempts.") diff --git a/wiki/Utility.md b/wiki/Utility.md index f767c7e..6811116 100644 --- a/wiki/Utility.md +++ b/wiki/Utility.md @@ -71,7 +71,7 @@ For rows with negative deviations, is not always possible to find non-zero colum Both the "Nearest Neighbors" and abandonment of the non-zero requirement can lead to values being increased in columns that are implausible for the row. For example, in the "Population by Age Sex Ethnicity" module, a row defined as a MGRA containing male-only adult persons may have female or juvenile values increased if the function cannot reallocate to non-zero columns. This necessitates the use of an additional "balancer" in the "Population by Age Sex Ethnicity" module for special MGRAs. -## Dealing with un-released ACS 5-year Detailed Tables (`read_sql_query_acs()`) +## Dealing with un-released ACS 5-year Detailed Tables (`read_sql_query_custom()`) This function is a wrapper for `pd.read_sql_query` with an extension built in that handles requests for ACS 5-year Detailed Tables data that are currently not released. Essentially, all SQL scripts dealing with ACS data have an `IF/ELSE` statement at the top which checks for the existence of data. If the data for the specified table/year could not be found, then an error message is returned. This function detects the presence of the error message and re-runs the query using the previous year instead. From 841096340a897d4defdd5ff76d8eb807b150fb4d Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Wed, 4 Feb 2026 10:35:37 -0800 Subject: [PATCH 08/24] #188 update wiki for change in read_sql_query_acs to read_sql_query_custom --- wiki/Utility.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wiki/Utility.md b/wiki/Utility.md index 6811116..23354f6 100644 --- a/wiki/Utility.md +++ b/wiki/Utility.md @@ -73,7 +73,7 @@ Both the "Nearest Neighbors" and abandonment of the non-zero requirement can lea ## Dealing with un-released ACS 5-year Detailed Tables (`read_sql_query_custom()`) -This function is a wrapper for `pd.read_sql_query` with an extension built in that handles requests for ACS 5-year Detailed Tables data that are currently not released. Essentially, all SQL scripts dealing with ACS data have an `IF/ELSE` statement at the top which checks for the existence of data. If the data for the specified table/year could not be found, then an error message is returned. This function detects the presence of the error message and re-runs the query using the previous year instead. +This function is a wrapper for `pd.read_sql_query` with an extension built in that handles requests for ACS 5-year Detailed Tables, LEHD LODES, and EDD point-level data data that are currently not released. Essentially, all SQL scripts dealing with ACS, LODES, EDD data have an `IF/ELSE` statement at the top which checks for the existence of data. If the data for the specified table/year could not be found, then an error message is returned. This function detects the presence of the error message and re-runs the query using the previous year instead, if fails again, checks up to a total of 5 years back. ### A Note on ACS 5-year Detailed Tables From 3be99f5312c8acb2b4b747f5c5f716e9f858124a Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Wed, 4 Feb 2026 10:58:31 -0800 Subject: [PATCH 09/24] #185 reset config --- config.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/config.yml b/config.yml index cd09993..47e4561 100644 --- a/config.yml +++ b/config.yml @@ -5,7 +5,7 @@ # The `run` section contains configuration for running every module of the Estimates # Program for a specified set of years run: - enabled: False + enabled: True mgra: mgra15 start_year: 2020 end_year: 2024 @@ -17,16 +17,16 @@ run: # `run_id` and `comments`. If `run_id` is `null`, then a new `run_id` will be # automatically created, similar to `run` mode debug: - enabled: True + enabled: False run_id: null start_year: 2022 - end_year: 2024 + end_year: 2023 version: 1.1.1-dev - comments: Test run employment 1st time - startup: False + comments: null + startup: True housing_and_households: False population: False population_by_ase: False household_characteristics: False - employment: True + employment: False staging: False From a2b6275adb3470785e40499f0d12038702ba2a61 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Wed, 4 Feb 2026 10:59:36 -0800 Subject: [PATCH 10/24] #185 reset config --- config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yml b/config.yml index 47e4561..1441abd 100644 --- a/config.yml +++ b/config.yml @@ -23,7 +23,7 @@ debug: end_year: 2023 version: 1.1.1-dev comments: null - startup: True + startup: False housing_and_households: False population: False population_by_ase: False From 378883458ec3000942939765a5909db9b3e06e23 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Wed, 4 Feb 2026 11:15:44 -0800 Subject: [PATCH 11/24] #185 cleanup in a few spots --- python/employment.py | 38 ++++++++++++++------------- sql/create_objects.sql | 4 +-- sql/employment/xref_block_to_mgra.sql | 13 +++++---- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/python/employment.py b/python/employment.py index 476f5fa..56cd722 100644 --- a/python/employment.py +++ b/python/employment.py @@ -8,13 +8,26 @@ def run_employment(year: int): - """Run the Employment module for a specified year. + """Control function to create jobs data by industry_code (NAICS) at the MGRA level. + + Get the LEHD LODES data, aggregate to the MGRA level using the block to MGRA + crosswalk, then apply control totals from QCEW using integerization. + + Functionality is split apart for code encapsulation (function inputs not included): + get_LODES_data - Get LEHD LODES data for a specified year, including + special handling for industry_code 72 (Accommodation and Food Services) + xref_block_to_mgra - Get crosswalk from Census blocks to MGRAs + aggregate_lodes_to_mgra - Aggregate LODES data to MGRA level using allocation + percentages from the block to MGRA crosswalk + get_control_totals - Load QCEW employment data as county total controls + apply_employment_controls - Apply control totals to employment data using + utils.integerize_1d() + _insert_jobs - Store both the control totals and controlled employment + inputs/outputs to the production database - This function processes employment data by applying control totals to - LEHD LODES data at the MGRA level using integerization. Args: - year (int): The year for which to run the Employment module. + year (int): estimates year """ # Check MGRA version and raise error if not 'mgra15' @@ -140,17 +153,6 @@ def aggregate_lodes_to_mgra( # Join combined_data to xref and calculate allocated jobs lehd_to_mgra = combined_data.merge(xref, on="block", how="inner") lehd_to_mgra["value"] = lehd_to_mgra["jobs"] * lehd_to_mgra["allocation_pct"] - # lehd_to_mgra = lehd_to_mgra[ - # [ - # "year", - # "block", - # "mgra", - # "industry_code", - # "jobs", - # "allocation_pct", - # "value", - # ] - # ] # Sum allocated jobs by year, mgra, and industry_code lehd_to_mgra_summed = lehd_to_mgra.groupby( @@ -173,7 +175,7 @@ def aggregate_lodes_to_mgra( def get_control_totals(year: int) -> pd.DataFrame: - """Load employment data from SQL queries. + """Load QCEW employment data as control totals. Args: year (int): The year for which to load employment data. @@ -203,7 +205,7 @@ def apply_employment_controls( control_totals: pd.DataFrame, generator: np.random.Generator, ) -> pd.DataFrame: - """Apply control totals to employment data using integerization. + """Apply control totals to employment data using utils.integerize_1d(). Args: original_data (pd.DataFrame): LEHD LODES data at MGRA level. @@ -241,7 +243,7 @@ def apply_employment_controls( def _insert_jobs(jobs_inputs: pd.DataFrame, jobs_outputs: pd.DataFrame) -> None: - """Insert input and output data related to household population""" + """Insert input and output data related to jobs to the database.""" # Insert input and output data to database with utils.ESTIMATES_ENGINE.connect() as con: diff --git a/sql/create_objects.sql b/sql/create_objects.sql index 6e856ac..fb88330 100644 --- a/sql/create_objects.sql +++ b/sql/create_objects.sql @@ -117,8 +117,8 @@ CREATE TABLE [inputs].[controls_jobs] ( [value] INT NOT NULL, INDEX [ccsi_inputs_controls_jobs] CLUSTERED COLUMNSTORE, CONSTRAINT [ixuq_inputs_controls_jobs] UNIQUE ([run_id], [year], [industry_code]) WITH (DATA_COMPRESSION = PAGE), - CONSTRAINT [fk_inputs_controls_ase_run_id] FOREIGN KEY ([run_id]) REFERENCES [metadata].[run] ([run_id]), - CONSTRAINT [chk_non_negative_inputs_controls_ase] CHECK ([value] >= 0) + CONSTRAINT [fk_inputs_controls_jobs_run_id] FOREIGN KEY ([run_id]) REFERENCES [metadata].[run] ([run_id]), + CONSTRAINT [chk_non_negative_inputs_controls_jobs] CHECK ([value] >= 0) ) GO diff --git a/sql/employment/xref_block_to_mgra.sql b/sql/employment/xref_block_to_mgra.sql index d0ae9ac..c9ebbaf 100644 --- a/sql/employment/xref_block_to_mgra.sql +++ b/sql/employment/xref_block_to_mgra.sql @@ -3,10 +3,9 @@ DECLARE @mgra_version nvarchar(10) = :mgra_version; SELECT - [from_zone] AS [block], - CAST([to_zone] AS INT) AS [mgra], - [allocation_pct] - FROM [GeoAnalyst].[geography].[fn_xref_zones]( - CASE WHEN @mgra_version = 'mgra15' THEN 18 ELSE NULL END - ) - ORDER BY [block], [mgra] \ No newline at end of file + [from_zone] AS [block], + CAST([to_zone] AS INT) AS [mgra], + [allocation_pct] +FROM [GeoAnalyst].[geography].[fn_xref_zones]( + CASE WHEN @mgra_version = 'mgra15' THEN 18 ELSE NULL END + ) \ No newline at end of file From 6015a5aa6519b4e6237b34c8974cc93dac3a2d5e Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Wed, 4 Feb 2026 11:26:45 -0800 Subject: [PATCH 12/24] #185 remove output folder used during testing --- python/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/utils.py b/python/utils.py index aa5805c..841c92f 100644 --- a/python/utils.py +++ b/python/utils.py @@ -17,7 +17,6 @@ # Store project root folder ROOT_FOLDER = pathlib.Path(__file__).parent.resolve().parent SQL_FOLDER = ROOT_FOLDER / "sql" -OUTPUT_FOLDER = ROOT_FOLDER / "output" ########### # LOGGING # From 0cc89b928742fd84ba3059b069ebbc3b790ebde2 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Wed, 4 Feb 2026 11:30:24 -0800 Subject: [PATCH 13/24] #185 Change connection when grabbing mgras --- python/employment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/employment.py b/python/employment.py index 56cd722..93ea80a 100644 --- a/python/employment.py +++ b/python/employment.py @@ -129,7 +129,7 @@ def aggregate_lodes_to_mgra( pd.DataFrame: Aggregated data at MGRA level with columns: year, mgra, industry_code, jobs """ # Get MGRA data from SQL - with utils.LEHD_ENGINE.connect() as con: + with utils.ESTIMATES_ENGINE.connect() as con: with open(utils.SQL_FOLDER / "employment/get_mgra.sql") as file: mgra_data = utils.read_sql_query_custom( sql=sql.text(file.read()), From 827d066014316344aa2e8c46f20fa12e7267272d Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Wed, 4 Feb 2026 16:19:32 -0800 Subject: [PATCH 14/24] #185 #188 addressed first 2 comments from @Eric-Liu-SANDAG in pull request --- python/ase.py | 8 ++++---- python/employment.py | 12 +++++++----- python/hh_characteristics.py | 12 +++++++----- python/hs_hh.py | 2 +- python/pop_type.py | 2 +- python/utils.py | 10 +++++++--- sql/employment/QCEW_control.sql | 13 +++++++++++++ sql/employment/get_lodes_data.sql | 3 ++- wiki/Utility.md | 2 +- 9 files changed, 43 insertions(+), 21 deletions(-) diff --git a/python/ase.py b/python/ase.py index c29436e..bf4cfaf 100644 --- a/python/ase.py +++ b/python/ase.py @@ -98,7 +98,7 @@ def _get_controls_inputs(year: int) -> dict[str, pd.DataFrame]: # Get regional age/sex/ethnicity group quarters distributions with open(utils.SQL_FOLDER / "ase/get_region_gq_ase_dist.sql") as file: - region_gq_ase_dist = utils.read_sql_query_custom( + region_gq_ase_dist = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ @@ -262,7 +262,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]: with utils.ESTIMATES_ENGINE.connect() as con: # Get the Age/Sex B010001 table data with open(utils.SQL_FOLDER / "ase/get_B01001.sql") as file: - b01001 = utils.read_sql_query_custom( + b01001 = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ @@ -272,7 +272,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]: # Get the Ethnicity B03002 table data with open(utils.SQL_FOLDER / "ase/get_B03002.sql") as file: - b03002 = utils.read_sql_query_custom( + b03002 = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ @@ -282,7 +282,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]: # Get Age/Sex/Ethnicity data from B01001(B-I) table data with open(utils.SQL_FOLDER / "ase/get_B01001(B-I).sql") as file: - b01001_b_i = utils.read_sql_query_custom( + b01001_b_i = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ diff --git a/python/employment.py b/python/employment.py index 93ea80a..ed52fe1 100644 --- a/python/employment.py +++ b/python/employment.py @@ -59,7 +59,8 @@ def get_LODES_data(year: int) -> pd.DataFrame: with utils.LEHD_ENGINE.connect() as con: with open(utils.SQL_FOLDER / "employment/get_lodes_data.sql") as file: - lodes_data = utils.read_sql_query_custom( + lodes_data = utils.read_sql_query_fallback( + max_lookback=2, sql=sql.text(file.read()), con=con, params={"year": year}, @@ -67,7 +68,8 @@ def get_LODES_data(year: int) -> pd.DataFrame: with utils.GIS_ENGINE.connect() as con: with open(utils.SQL_FOLDER / "employment/get_naics72_split.sql") as file: - split_naics_72 = utils.read_sql_query_custom( + split_naics_72 = utils.read_sql_query_fallback( + max_lookback=3, sql=sql.text(file.read()), con=con, params={"year": year}, @@ -106,7 +108,7 @@ def get_xref_block_to_mgra() -> pd.DataFrame: with utils.LEHD_ENGINE.connect() as con: with open(utils.SQL_FOLDER / "employment/xref_block_to_mgra.sql") as file: - xref = utils.read_sql_query_custom( + xref = pd.read_sql_query( sql=sql.text(file.read()), con=con, params={"mgra_version": utils.MGRA_VERSION}, @@ -131,7 +133,7 @@ def aggregate_lodes_to_mgra( # Get MGRA data from SQL with utils.ESTIMATES_ENGINE.connect() as con: with open(utils.SQL_FOLDER / "employment/get_mgra.sql") as file: - mgra_data = utils.read_sql_query_custom( + mgra_data = pd.read_sql_query( sql=sql.text(file.read()), con=con, params={"run_id": utils.RUN_ID}, @@ -187,7 +189,7 @@ def get_control_totals(year: int) -> pd.DataFrame: # Get employment control totals from QCEW with open(utils.SQL_FOLDER / "employment/QCEW_control.sql") as file: - control_totals = pd.read_sql( + control_totals = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ diff --git a/python/hh_characteristics.py b/python/hh_characteristics.py index ea78a93..05834a2 100644 --- a/python/hh_characteristics.py +++ b/python/hh_characteristics.py @@ -86,10 +86,12 @@ def _get_hh_income_inputs(year: int) -> dict[str, pd.DataFrame]: with open( utils.SQL_FOLDER / "hh_characteristics" / "get_tract_controls_hh_income.sql" ) as file: - hh_income_inputs["hh_income_tract_controls"] = utils.read_sql_query_custom( - sql=sql.text(file.read()), # type: ignore - con=con, # type: ignore - params={"run_id": utils.RUN_ID, "year": year}, + hh_income_inputs["hh_income_tract_controls"] = ( + utils.read_sql_query_fallback( + sql=sql.text(file.read()), # type: ignore + con=con, # type: ignore + params={"run_id": utils.RUN_ID, "year": year}, + ) ) return hh_income_inputs @@ -120,7 +122,7 @@ def _get_hh_size_inputs(year: int) -> dict[str, pd.DataFrame]: / "hh_characteristics" / "get_tract_controls_hh_by_size.sql" ) as file: - hh_char_inputs["hhs_tract_controls"] = utils.read_sql_query_custom( + hh_char_inputs["hhs_tract_controls"] = utils.read_sql_query_fallback( sql=sql.text(file.read()), # type: ignore con=con, # type: ignore params={"run_id": utils.RUN_ID, "year": year}, diff --git a/python/hs_hh.py b/python/hs_hh.py index 55fbfa8..10e0a43 100644 --- a/python/hs_hh.py +++ b/python/hs_hh.py @@ -105,7 +105,7 @@ def _get_hs_hh_inputs(year: int) -> dict[str, pd.DataFrame]: # Get tract occupancy controls with open(utils.SQL_FOLDER / "hs_hh/get_tract_controls_hh.sql") as file: - hs_hh_inputs["tract_controls"] = utils.read_sql_query_custom( + hs_hh_inputs["tract_controls"] = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ diff --git a/python/pop_type.py b/python/pop_type.py index 4b07dbf..08ea6f6 100644 --- a/python/pop_type.py +++ b/python/pop_type.py @@ -204,7 +204,7 @@ def _get_hhp_inputs(year: int) -> dict[str, pd.DataFrame]: # Get tract level household size controls with open(utils.SQL_FOLDER / "pop_type/get_tract_controls_hhs.sql") as file: - tract_controls = utils.read_sql_query_custom( + tract_controls = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ diff --git a/python/utils.py b/python/utils.py index 841c92f..7c1e06d 100644 --- a/python/utils.py +++ b/python/utils.py @@ -682,7 +682,7 @@ def integerize_2d( return array_2d -def read_sql_query_custom(**kwargs: dict) -> pd.DataFrame: +def read_sql_query_fallback(max_lookback: int = 1, **kwargs: dict) -> pd.DataFrame: """Read SQL query allowing for dynamic year adjustment. This function executes a SQL query using pandas read_sql_query allowing @@ -698,6 +698,7 @@ def read_sql_query_custom(**kwargs: dict) -> pd.DataFrame: specified year. Args: + max_lookback (int): Maximum number of years to look back if data is not found kwargs (dict): Keyword arguments for pd.read_sql_query Returns: @@ -708,14 +709,17 @@ def read_sql_query_custom(**kwargs: dict) -> pd.DataFrame: unexpected message is returned """ # Store original year for potential relabeling - original_year = kwargs.get("params", {}).get("year") - max_lookback = 5 + original_year = kwargs["params"]["year"] + + # Get max_lookback from kwargs and REMOVE it, default to 5 if not provided + # max_lookback = kwargs.pop("max_lookback", 1) # Messages that trigger year lookback lookback_messages = [ "ACS 5-Year Table does not exist", "LODES data does not exist", "EDD point-level data does not exist", + "QCEW data does not exist", ] # Try up to max_lookback + 1 times (original year + 5 lookbacks) diff --git a/sql/employment/QCEW_control.sql b/sql/employment/QCEW_control.sql index cb1a29e..66906e4 100644 --- a/sql/employment/QCEW_control.sql +++ b/sql/employment/QCEW_control.sql @@ -1,5 +1,16 @@ -- Initialize parameters ----------------------------------------------------- DECLARE @year integer = :year; +DECLARE @msg nvarchar(25) = 'QCEW data does not exist'; + +-- Send error message if no data exists -------------------------------------- +IF NOT EXISTS ( + SELECT TOP (1) * + FROM [socioec_data].[bls].[qcew_by_area_annual] + WHERE [year] = @year +) +SELECT @msg AS [msg] +ELSE +BEGIN SELECT [year], @@ -12,3 +23,5 @@ WHERE [area_fips] = '06073' AND [year] = @year AND [industry_code] IN ('11', '21', '22', '23', '31-33', '42', '44-45', '48-49', '51', '52', '53', '54', '55', '56', '61', '62', '71', '721', '722', '81', '92') GROUP BY [year], [industry_code] + +END \ No newline at end of file diff --git a/sql/employment/get_lodes_data.sql b/sql/employment/get_lodes_data.sql index d6a4952..b720264 100644 --- a/sql/employment/get_lodes_data.sql +++ b/sql/employment/get_lodes_data.sql @@ -1,5 +1,6 @@ -- Initialize parameters ----------------------------------------------------- DECLARE @year integer = :year; +DECLARE @msg nvarchar(25) = 'LODES data does not exist'; -- Send error message if no data exists -------------------------------------- IF NOT EXISTS ( @@ -10,7 +11,7 @@ IF NOT EXISTS ( AND [version] = 2 AND [YEAR] = @year ) -SELECT 'LODES data does not exist' AS [msg] +SELECT @msg AS [msg] ELSE BEGIN diff --git a/wiki/Utility.md b/wiki/Utility.md index 23354f6..091d5e5 100644 --- a/wiki/Utility.md +++ b/wiki/Utility.md @@ -71,7 +71,7 @@ For rows with negative deviations, is not always possible to find non-zero colum Both the "Nearest Neighbors" and abandonment of the non-zero requirement can lead to values being increased in columns that are implausible for the row. For example, in the "Population by Age Sex Ethnicity" module, a row defined as a MGRA containing male-only adult persons may have female or juvenile values increased if the function cannot reallocate to non-zero columns. This necessitates the use of an additional "balancer" in the "Population by Age Sex Ethnicity" module for special MGRAs. -## Dealing with un-released ACS 5-year Detailed Tables (`read_sql_query_custom()`) +## Dealing with un-released ACS 5-year Detailed Tables (`read_sql_query_fallback()`) This function is a wrapper for `pd.read_sql_query` with an extension built in that handles requests for ACS 5-year Detailed Tables, LEHD LODES, and EDD point-level data data that are currently not released. Essentially, all SQL scripts dealing with ACS, LODES, EDD data have an `IF/ELSE` statement at the top which checks for the existence of data. If the data for the specified table/year could not be found, then an error message is returned. This function detects the presence of the error message and re-runs the query using the previous year instead, if fails again, checks up to a total of 5 years back. From b8dfdb60df9e26495aec2eff11dd78c006a5286f Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 5 Feb 2026 09:52:50 -0800 Subject: [PATCH 15/24] #185 address pull request feedback for get_lodes_data.sql --- sql/employment/get_lodes_data.sql | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/sql/employment/get_lodes_data.sql b/sql/employment/get_lodes_data.sql index b720264..bafde8e 100644 --- a/sql/employment/get_lodes_data.sql +++ b/sql/employment/get_lodes_data.sql @@ -1,3 +1,8 @@ +-- Documentation +-- The mapping below used for [CNS01] to [CNS20] to [industry_code] (2-digit NAICS) in WAC section of the document linked below. +-- [industry_code] is used to represent NAICS as the QCEW data brought in a later step uses [industry_code] +-- https://lehd.ces.census.gov/doc/help/onthemap/LODESTechDoc.pdf + -- Initialize parameters ----------------------------------------------------- DECLARE @year integer = :year; DECLARE @msg nvarchar(25) = 'LODES data does not exist'; @@ -6,16 +11,18 @@ DECLARE @msg nvarchar(25) = 'LODES data does not exist'; IF NOT EXISTS ( SELECT TOP (1) * FROM [socioec_data].[lehd].[lodes_8_wac] - WHERE [SEG] = 'S000' - AND [TYPE] = 'JT00' - AND [version] = 2 - AND [YEAR] = @year + WHERE [SEG] = 'S000' -- 'S000' represents segment 'Total number of jobs' as seen in 'OD' section from document linked at top of file + AND [TYPE] = 'JT00' -- 'JT00' is for 'All Jobs' as seen in 'OD' section from document linked at top of file + AND [version] = 2 -- latest version loaded into database + AND [YEAR] = @year ) -SELECT @msg AS [msg] +BEGIN + SELECT @msg AS [msg] +END ELSE BEGIN - - + + -- Build the return table of QCEW control Totals by industry_code (NAICS) SELECT [YEAR] AS [year], [w_geocode] AS [block], @@ -45,9 +52,9 @@ BEGIN ('81', [CNS19]), ('92', [CNS20]) ) AS u([industry_code], [value]) - WHERE [SEG] = 'S000' - AND [TYPE] = 'JT00' - AND [version] = 2 + WHERE [SEG] = 'S000' -- 'S000' represents segment 'Total number of jobs' as seen in 'OD' section from document linked at top of file + AND [TYPE] = 'JT00' -- 'JT00' is for 'All Jobs' as seen in 'OD' section from document linked at top of file + AND [version] = 2 -- latest version loaded into database AND [YEAR] = @year GROUP BY [YEAR], [w_geocode], [industry_code] -END +END \ No newline at end of file From f20e4c64962a781e5bdeefceefff1274decd3fa7 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 5 Feb 2026 09:53:38 -0800 Subject: [PATCH 16/24] #185 Update utils.py and utility.md for update to read_sql_query_fallback() --- python/utils.py | 8 ++++---- wiki/Utility.md | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python/utils.py b/python/utils.py index 7c1e06d..a96f213 100644 --- a/python/utils.py +++ b/python/utils.py @@ -692,10 +692,10 @@ def read_sql_query_fallback(max_lookback: int = 1, **kwargs: dict) -> pd.DataFra the query. This process will continue for up to 5 years back. Note this function is specific to querying ACS 5-Year Tables, LEHD LODES and EDD point-level data. This function requires the SQL query file to return a - DataFrame with a single column called 'msg' with one of 3 text strings: - 'ACS 5-Year Table does not exist', 'LODES data does not exist', or - 'EDD point-level data does not exist' when no data is found for the - specified year. + DataFrame with a single column called 'msg' with one of 4 text strings: + 'ACS 5-Year Table does not exist', 'LODES data does not exist', + 'EDD point-level data does not exist', or 'QCEW data does not exist' + when no data is found for the specified year. Args: max_lookback (int): Maximum number of years to look back if data is not found diff --git a/wiki/Utility.md b/wiki/Utility.md index 091d5e5..be9a147 100644 --- a/wiki/Utility.md +++ b/wiki/Utility.md @@ -73,7 +73,7 @@ Both the "Nearest Neighbors" and abandonment of the non-zero requirement can lea ## Dealing with un-released ACS 5-year Detailed Tables (`read_sql_query_fallback()`) -This function is a wrapper for `pd.read_sql_query` with an extension built in that handles requests for ACS 5-year Detailed Tables, LEHD LODES, and EDD point-level data data that are currently not released. Essentially, all SQL scripts dealing with ACS, LODES, EDD data have an `IF/ELSE` statement at the top which checks for the existence of data. If the data for the specified table/year could not be found, then an error message is returned. This function detects the presence of the error message and re-runs the query using the previous year instead, if fails again, checks up to a total of 5 years back. +This function is a wrapper for `pd.read_sql_query` with an extension built in that handles requests for ACS 5-year Detailed Tables, LEHD LODES, QCEW, and EDD point-level data data that are currently not released. Essentially, all SQL scripts dealing with ACS, LODES, QCEW, and EDD data have an `IF/ELSE` statement at the top which checks for the existence of data. If the data for the specified table/year could not be found, then an error message is returned. This function detects the presence of the error message and re-runs the query using the previous year instead or up to number of years back specified with parameter `max_lookback`, which default value is 1. ### A Note on ACS 5-year Detailed Tables From 26a00d6176d77fee983016613ec90aced7020976 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 5 Feb 2026 11:11:15 -0800 Subject: [PATCH 17/24] #185 #189 addressed pull request feedback and added parenthesis for correct math in 2015 year section --- sql/employment/get_naics72_split.sql | 94 +++++++++++++++------------- 1 file changed, 52 insertions(+), 42 deletions(-) diff --git a/sql/employment/get_naics72_split.sql b/sql/employment/get_naics72_split.sql index 9eedce6..3a2bbf6 100644 --- a/sql/employment/get_naics72_split.sql +++ b/sql/employment/get_naics72_split.sql @@ -18,12 +18,12 @@ DECLARE @year INTEGER = :year; DECLARE @msg nvarchar(45) = 'EDD point-level data does not exist'; ---Drop Temp table and ok and spatial index if they exist +--Drop temp table if exist and then create temp table DROP TABLE IF EXISTS [#edd]; CREATE TABLE [#edd] ( [id] INTEGER IDENTITY(1,1) NOT NULL, [industry_code] NVARCHAR(3) NOT NULL, - [jobs] FLOAT NOT NULL, + [average_monthly_jobs] FLOAT NOT NULL, [Shape] GEOMETRY NOT NULL, CONSTRAINT [pk_tt_edd] PRIMARY KEY ([id]) ) @@ -43,43 +43,44 @@ WITH (BOUNDING_BOX = ( -- Get SANDAG GIS team EDD dataset ------------------------------------------- -DECLARE @qry NVARCHAR(max) IF @year >= 2017 BEGIN INSERT INTO [#edd] SELECT [industry_code], - 1.0 * [emp_total]/[emp_valid] AS [jobs], + 1.0 * [emp_total]/[emp_valid] AS [average_monthly_jobs], [SHAPE] FROM ( SELECT - CASE WHEN LEFT([code], 3) = '721' THEN '721' - WHEN LEFT([code], 3) = '722' THEN '722' - ELSE NULL END AS [industry_code], - CASE WHEN [emp_m1] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [emp_m2] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [emp_m3] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [emp_m4] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [emp_m5] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [emp_m6] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [emp_m7] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [emp_m8] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [emp_m9] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [emp_m10] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [emp_m11] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [emp_m12] IS NOT NULL THEN 1 ELSE 0 END + CASE + WHEN LEFT([code], 3) = '721' THEN '721' + WHEN LEFT([code], 3) = '722' THEN '722' + ELSE NULL + END AS [industry_code], + CASE WHEN [emp_m1] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m2] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m3] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m4] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m5] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m6] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m7] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m8] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m9] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m10] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m11] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m12] IS NOT NULL THEN 1 ELSE 0 END AS [emp_valid], - COALESCE([emp_m1], 0) + COALESCE([emp_m2], 0) + COALESCE([emp_m3], 0) + - COALESCE([emp_m4], 0) + COALESCE([emp_m5], 0) + COALESCE([emp_m6], 0) + - COALESCE([emp_m7], 0) + COALESCE([emp_m8], 0) + COALESCE([emp_m9], 0) + - COALESCE([emp_m10], 0) + COALESCE([emp_m11], 0) + COALESCE([emp_m12], 0) + ISNULL([emp_m1], 0) + ISNULL([emp_m2], 0) + ISNULL([emp_m3], 0) + + ISNULL([emp_m4], 0) + ISNULL([emp_m5], 0) + ISNULL([emp_m6], 0) + + ISNULL([emp_m7], 0) + ISNULL([emp_m8], 0) + ISNULL([emp_m9], 0) + + ISNULL([emp_m10], 0) + ISNULL([emp_m11], 0) + ISNULL([emp_m12], 0) AS [emp_total], [SHAPE] FROM [EMPCORE].[ca_edd].[vi_ca_edd_employment] INNER JOIN [EMPCORE].[ca_edd].[naics] ON [vi_ca_edd_employment].[naics_id] = [naics].[naics_id] WHERE - [year] = @year + [year] = 2017--@year AND LEFT([code], 3) IN ('721','722') ) AS [tt] WHERE @@ -90,10 +91,12 @@ ELSE IF @year = 2016 OR @year BETWEEN 2010 AND 2014 BEGIN INSERT INTO [#edd] SELECT - CASE WHEN LEFT([code], 3) = '721' THEN '721' - WHEN LEFT([code], 3) = '722' THEN '722' - ELSE NULL END AS [industry_code], - [employment] * ISNULL([headquarters].[share], 1) AS [jobs], + CASE + WHEN LEFT([code], 3) = '721' THEN '721' + WHEN LEFT([code], 3) = '722' THEN '722' + ELSE NULL + END AS [industry_code], + [employment] * ISNULL([headquarters].[share], 1) AS [average_monthly_jobs], ISNULL([headquarters].[shape], [businesses].[shape]) AS [SHAPE] FROM [EMPCORE].[ca_edd].[businesses] INNER JOIN [EMPCORE].[ca_edd].[naics] @@ -117,10 +120,12 @@ ELSE IF @year = 2015 BEGIN INSERT INTO [#edd] SELECT - CASE WHEN LEFT([code], 3) = '721' THEN '721' - WHEN LEFT([code], 3) = '722' THEN '722' - ELSE NULL END AS [industry_code], - [employment] * ISNULL([headquarters].[share], 1) AS [jobs], + CASE + WHEN LEFT([code], 3) = '721' THEN '721' + WHEN LEFT([code], 3) = '722' THEN '722' + ELSE NULL + END AS [industry_code], + [employment] * ISNULL([headquarters].[share], 1) AS [average_monthly_jobs], ISNULL([headquarters].[shape], [businesses].[shape]) AS [SHAPE] FROM [EMPCORE].[ca_edd].[businesses] INNER JOIN [EMPCORE].[ca_edd].[naics] @@ -132,10 +137,11 @@ BEGIN SELECT [year], [emp_id], - 1.0 * COALESCE([15], 0) + COALESCE([16], 0) + COALESCE([17], 0) / - CASE WHEN [15] IS NOT NULL THEN 1 ELSE 0 END + + 1.0 * ((ISNULL([15], 0) + ISNULL([16], 0) + ISNULL([17], 0)) + / + (CASE WHEN [15] IS NOT NULL THEN 1 ELSE 0 END + CASE WHEN [16] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [17] IS NOT NULL THEN 1 ELSE 0 END + CASE WHEN [17] IS NOT NULL THEN 1 ELSE 0 END)) AS [employment] FROM [EMPCORE].[ca_edd].[employment] PIVOT(SUM([employment]) FOR [month_id] IN ([15], [16], [17])) AS [pivot] @@ -160,16 +166,20 @@ BEGIN -- Calculate % split of NAICS 72 into 721 and 722 for 2020 Census Blocks - SELECT [GEOID20] AS [block], - CASE WHEN [72] = 0 THEN SUM([721]) OVER() / SUM([72]) OVER() - ELSE [721] / [72] END AS [pct_721], - CASE WHEN [72] = 0 THEN SUM([722]) OVER() / SUM([72]) OVER() - ELSE [722] / [72] END AS [pct_722] + CASE + WHEN [72] = 0 THEN SUM([721]) OVER() / SUM([72]) OVER() + ELSE [721] / [72] + END AS [pct_721], + CASE + WHEN [72] = 0 THEN SUM([722]) OVER() / SUM([72]) OVER() + ELSE [722] / [72] + END AS [pct_722] FROM ( SELECT [GEOID20], - SUM(CASE WHEN [industry_code] = '721' THEN [jobs] ELSE 0 END) AS [721], - SUM(CASE WHEN [industry_code] = '722' THEN [jobs] ELSE 0 END) AS [722], - COALESCE(SUM([jobs]), 0) AS [72] + SUM(CASE WHEN [industry_code] = '721' THEN [average_monthly_jobs] ELSE 0 END) AS [721], + SUM(CASE WHEN [industry_code] = '722' THEN [average_monthly_jobs] ELSE 0 END) AS [722], + ISNULL(SUM([average_monthly_jobs]), 0) AS [72] FROM [#edd] RIGHT OUTER JOIN [GeoDepot].[sde].[CENSUSBLOCKS] ON [#edd].[Shape].STIntersects([CENSUSBLOCKS].[Shape]) = 1 From 5f376d5fca18ccb4175c0587d367257590dac811 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 5 Feb 2026 11:29:44 -0800 Subject: [PATCH 18/24] #185 remove get_mgra.sql and use as string directly --- python/employment.py | 18 ++++++++++++------ sql/employment/get_mgra.sql | 7 ------- 2 files changed, 12 insertions(+), 13 deletions(-) delete mode 100644 sql/employment/get_mgra.sql diff --git a/python/employment.py b/python/employment.py index ed52fe1..0cb83f5 100644 --- a/python/employment.py +++ b/python/employment.py @@ -132,12 +132,18 @@ def aggregate_lodes_to_mgra( """ # Get MGRA data from SQL with utils.ESTIMATES_ENGINE.connect() as con: - with open(utils.SQL_FOLDER / "employment/get_mgra.sql") as file: - mgra_data = pd.read_sql_query( - sql=sql.text(file.read()), - con=con, - params={"run_id": utils.RUN_ID}, - ) + mgra_data = pd.read_sql_query( + sql=sql.text( + """ + SELECT DISTINCT [mgra] + FROM [EstimatesProgram].[inputs].[mgra] + WHERE run_id = :run_id + ORDER BY [mgra] + """ + ), + con=con, + params={"run_id": utils.RUN_ID}, + ) # Get unique industry codes and cross join with MGRA data unique_industries = combined_data["industry_code"].unique() diff --git a/sql/employment/get_mgra.sql b/sql/employment/get_mgra.sql deleted file mode 100644 index 10c7dc3..0000000 --- a/sql/employment/get_mgra.sql +++ /dev/null @@ -1,7 +0,0 @@ --- Initialize parameters ----------------------------------------------------- -DECLARE @run_id integer = :run_id; - -SELECT DISTINCT [mgra] -FROM [EstimatesProgram].[inputs].[mgra] -WHERE run_id = @run_id -ORDER BY [mgra] From 824b8efdf67361545ec7a5c73112141bfb79cee2 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 5 Feb 2026 11:42:20 -0800 Subject: [PATCH 19/24] #185 fix using only spaces vs tabs in sql files --- sql/employment/QCEW_control.sql | 4 ++-- sql/employment/get_lodes_data.sql | 3 ++- sql/employment/get_naics72_split.sql | 28 ++++++++++++++-------------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/sql/employment/QCEW_control.sql b/sql/employment/QCEW_control.sql index 66906e4..f4b230f 100644 --- a/sql/employment/QCEW_control.sql +++ b/sql/employment/QCEW_control.sql @@ -1,11 +1,11 @@ -- Initialize parameters ----------------------------------------------------- -DECLARE @year integer = :year; +DECLARE @year integer = 2024--:year; DECLARE @msg nvarchar(25) = 'QCEW data does not exist'; -- Send error message if no data exists -------------------------------------- IF NOT EXISTS ( SELECT TOP (1) * - FROM [socioec_data].[bls].[qcew_by_area_annual] + FROM [socioec_data].[bls].[qcew_by_area_annual] WHERE [year] = @year ) SELECT @msg AS [msg] diff --git a/sql/employment/get_lodes_data.sql b/sql/employment/get_lodes_data.sql index bafde8e..ecd845a 100644 --- a/sql/employment/get_lodes_data.sql +++ b/sql/employment/get_lodes_data.sql @@ -3,6 +3,7 @@ -- [industry_code] is used to represent NAICS as the QCEW data brought in a later step uses [industry_code] -- https://lehd.ces.census.gov/doc/help/onthemap/LODESTechDoc.pdf + -- Initialize parameters ----------------------------------------------------- DECLARE @year integer = :year; DECLARE @msg nvarchar(25) = 'LODES data does not exist'; @@ -10,7 +11,7 @@ DECLARE @msg nvarchar(25) = 'LODES data does not exist'; -- Send error message if no data exists -------------------------------------- IF NOT EXISTS ( SELECT TOP (1) * - FROM [socioec_data].[lehd].[lodes_8_wac] + FROM [socioec_data].[lehd].[lodes_8_wac] WHERE [SEG] = 'S000' -- 'S000' represents segment 'Total number of jobs' as seen in 'OD' section from document linked at top of file AND [TYPE] = 'JT00' -- 'JT00' is for 'All Jobs' as seen in 'OD' section from document linked at top of file AND [version] = 2 -- latest version loaded into database diff --git a/sql/employment/get_naics72_split.sql b/sql/employment/get_naics72_split.sql index 3a2bbf6..050661e 100644 --- a/sql/employment/get_naics72_split.sql +++ b/sql/employment/get_naics72_split.sql @@ -134,20 +134,20 @@ BEGIN ON [businesses].[year] = [headquarters].[year] AND [businesses].[emp_id] = [headquarters].[emp_id] INNER JOIN ( - SELECT - [year], - [emp_id], - 1.0 * ((ISNULL([15], 0) + ISNULL([16], 0) + ISNULL([17], 0)) + SELECT + [year], + [emp_id], + 1.0 * ((ISNULL([15], 0) + ISNULL([16], 0) + ISNULL([17], 0)) / - (CASE WHEN [15] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [16] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [17] IS NOT NULL THEN 1 ELSE 0 END)) - AS [employment] - FROM [EMPCORE].[ca_edd].[employment] - PIVOT(SUM([employment]) FOR [month_id] IN ([15], [16], [17])) AS [pivot] - WHERE - [year] = @year - AND ([15] IS NOT NULL OR [16] IS NOT NULL OR [17] IS NOT NULL) + (CASE WHEN [15] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [16] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [17] IS NOT NULL THEN 1 ELSE 0 END)) + AS [employment] + FROM [EMPCORE].[ca_edd].[employment] + PIVOT(SUM([employment]) FOR [month_id] IN ([15], [16], [17])) AS [pivot] + WHERE + [year] = @year + AND ([15] IS NOT NULL OR [16] IS NOT NULL OR [17] IS NOT NULL) ) AS [employment] ON [businesses].[year] = [employment].[year] AND [businesses].[emp_id] = [employment].[emp_id] @@ -158,7 +158,7 @@ END -- Send error message if no data exists -------------------------------------- IF NOT EXISTS ( SELECT TOP (1) * - FROM [#edd] + FROM [#edd] ) SELECT @msg AS [msg] ELSE From 223fcc46dd723e456710bdc1587a6aa11673f19f Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 5 Feb 2026 11:45:24 -0800 Subject: [PATCH 20/24] #185 remove year left in query from testing --- sql/employment/QCEW_control.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/employment/QCEW_control.sql b/sql/employment/QCEW_control.sql index f4b230f..1e048f4 100644 --- a/sql/employment/QCEW_control.sql +++ b/sql/employment/QCEW_control.sql @@ -1,5 +1,5 @@ -- Initialize parameters ----------------------------------------------------- -DECLARE @year integer = 2024--:year; +DECLARE @year integer = :year; DECLARE @msg nvarchar(25) = 'QCEW data does not exist'; -- Send error message if no data exists -------------------------------------- From 46cda0e21e888d4d0d1e0df062d586fbfb2858bc Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 5 Feb 2026 13:03:57 -0800 Subject: [PATCH 21/24] #185 fix table being called for [inputs].[mgra] --- python/employment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/employment.py b/python/employment.py index 0cb83f5..c9d60d8 100644 --- a/python/employment.py +++ b/python/employment.py @@ -136,7 +136,7 @@ def aggregate_lodes_to_mgra( sql=sql.text( """ SELECT DISTINCT [mgra] - FROM [EstimatesProgram].[inputs].[mgra] + FROM [inputs].[mgra] WHERE run_id = :run_id ORDER BY [mgra] """ From f7a109b34f26391acd95f367b60bfc894de75bae Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 5 Feb 2026 14:56:31 -0800 Subject: [PATCH 22/24] #185 #188 update utility.md and utils.py --- python/utils.py | 2 +- wiki/Utility.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/utils.py b/python/utils.py index a96f213..7a26da2 100644 --- a/python/utils.py +++ b/python/utils.py @@ -698,7 +698,7 @@ def read_sql_query_fallback(max_lookback: int = 1, **kwargs: dict) -> pd.DataFra when no data is found for the specified year. Args: - max_lookback (int): Maximum number of years to look back if data is not found + max_lookback (int = 1): Maximum number of years to look back if data is not found, defaults to 1 kwargs (dict): Keyword arguments for pd.read_sql_query Returns: diff --git a/wiki/Utility.md b/wiki/Utility.md index be9a147..dfd2c9a 100644 --- a/wiki/Utility.md +++ b/wiki/Utility.md @@ -73,7 +73,7 @@ Both the "Nearest Neighbors" and abandonment of the non-zero requirement can lea ## Dealing with un-released ACS 5-year Detailed Tables (`read_sql_query_fallback()`) -This function is a wrapper for `pd.read_sql_query` with an extension built in that handles requests for ACS 5-year Detailed Tables, LEHD LODES, QCEW, and EDD point-level data data that are currently not released. Essentially, all SQL scripts dealing with ACS, LODES, QCEW, and EDD data have an `IF/ELSE` statement at the top which checks for the existence of data. If the data for the specified table/year could not be found, then an error message is returned. This function detects the presence of the error message and re-runs the query using the previous year instead or up to number of years back specified with parameter `max_lookback`, which default value is 1. +This function is a wrapper for `pd.read_sql_query` with an extension built in that handles requests for ACS 5-year Detailed Tables, LEHD LODES, QCEW, and EDD point-level data data that are currently not released. Essentially, all SQL scripts dealing with ACS, LODES, QCEW, and EDD data have an `IF/ELSE` statement at the top which checks for the existence of data. If the data for the specified table/year could not be found, then an error message is returned. This function detects the presence of the error message and re-runs the query using the previous year or will iteratively try to retrieve data to a defined number of years back, specified with parameter `max_lookback`, which default value is 1. ### A Note on ACS 5-year Detailed Tables From b8bb79c851629bc26f8ffd960804d73f498b1191 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 5 Feb 2026 15:38:59 -0800 Subject: [PATCH 23/24] #185 #189 Better format based on feedback and fix year set when was debugging --- sql/employment/get_naics72_split.sql | 55 +++++++++++++++++----------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/sql/employment/get_naics72_split.sql b/sql/employment/get_naics72_split.sql index 050661e..abe8c61 100644 --- a/sql/employment/get_naics72_split.sql +++ b/sql/employment/get_naics72_split.sql @@ -58,29 +58,37 @@ BEGIN ELSE NULL END AS [industry_code], CASE WHEN [emp_m1] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m2] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m3] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m4] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m5] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m6] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m7] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m8] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m9] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m10] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m11] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m12] IS NOT NULL THEN 1 ELSE 0 END - AS [emp_valid], - ISNULL([emp_m1], 0) + ISNULL([emp_m2], 0) + ISNULL([emp_m3], 0) - + ISNULL([emp_m4], 0) + ISNULL([emp_m5], 0) + ISNULL([emp_m6], 0) - + ISNULL([emp_m7], 0) + ISNULL([emp_m8], 0) + ISNULL([emp_m9], 0) - + ISNULL([emp_m10], 0) + ISNULL([emp_m11], 0) + ISNULL([emp_m12], 0) - AS [emp_total], + + CASE WHEN [emp_m2] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m3] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m4] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m5] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m6] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m7] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m8] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m9] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m10] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m11] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m12] IS NOT NULL THEN 1 ELSE 0 + END AS [emp_valid], + ISNULL([emp_m1], 0) + + ISNULL([emp_m2], 0) + + ISNULL([emp_m3], 0) + + ISNULL([emp_m4], 0) + + ISNULL([emp_m5], 0) + + ISNULL([emp_m6], 0) + + ISNULL([emp_m7], 0) + + ISNULL([emp_m8], 0) + + ISNULL([emp_m9], 0) + + ISNULL([emp_m10], 0) + + ISNULL([emp_m11], 0) + + ISNULL([emp_m12], 0) + AS [emp_total], [SHAPE] FROM [EMPCORE].[ca_edd].[vi_ca_edd_employment] INNER JOIN [EMPCORE].[ca_edd].[naics] ON [vi_ca_edd_employment].[naics_id] = [naics].[naics_id] WHERE - [year] = 2017--@year + [year] = @year AND LEFT([code], 3) IN ('721','722') ) AS [tt] WHERE @@ -139,9 +147,10 @@ BEGIN [emp_id], 1.0 * ((ISNULL([15], 0) + ISNULL([16], 0) + ISNULL([17], 0)) / - (CASE WHEN [15] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [16] IS NOT NULL THEN 1 ELSE 0 END + - CASE WHEN [17] IS NOT NULL THEN 1 ELSE 0 END)) + (CASE WHEN [15] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [16] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [17] IS NOT NULL THEN 1 ELSE 0 END + )) AS [employment] FROM [EMPCORE].[ca_edd].[employment] PIVOT(SUM([employment]) FOR [month_id] IN ([15], [16], [17])) AS [pivot] @@ -160,7 +169,9 @@ IF NOT EXISTS ( SELECT TOP (1) * FROM [#edd] ) -SELECT @msg AS [msg] +BEGIN + SELECT @msg AS [msg] +END ELSE BEGIN -- Calculate % split of NAICS 72 into 721 and 722 for 2020 Census Blocks - From 73bca206510d58c5993d566f47e54c39db54ff9e Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Fri, 6 Feb 2026 16:53:27 -0800 Subject: [PATCH 24/24] #185 better formatting to match rest of estimates program --- python/employment.py | 133 +++++++++++++++++++------------------------ 1 file changed, 58 insertions(+), 75 deletions(-) diff --git a/python/employment.py b/python/employment.py index c9d60d8..4fbcf43 100644 --- a/python/employment.py +++ b/python/employment.py @@ -37,17 +37,13 @@ def run_employment(year: int): f"Current MGRA_VERSION is '{utils.MGRA_VERSION}'." ) - LODES_data = get_LODES_data(year) + jobs_inputs = _get_jobs_inputs(year) + # TODO _validate_jobs_inputs here before proceeding - xref = get_xref_block_to_mgra() + jobs_outputs = _create_jobs_output(jobs_inputs) + # TODO _validate_jobs_outputs here before proceeding - lehd_jobs = aggregate_lodes_to_mgra(LODES_data, xref, year) - - control_totals = get_control_totals(year) - - controlled_data = apply_employment_controls(lehd_jobs, control_totals, generator) - - _insert_jobs(control_totals, controlled_data) + _insert_jobs(jobs_inputs, jobs_outputs) def get_LODES_data(year: int) -> pd.DataFrame: @@ -99,24 +95,6 @@ def get_LODES_data(year: int) -> pd.DataFrame: return combined_data -def get_xref_block_to_mgra() -> pd.DataFrame: - """Retrieve crosswalk from Census blocks to MGRAs. - - Returns: - pd.DataFrame: A DataFrame containing the crosswalk from blocks to MGRAs. - """ - - with utils.LEHD_ENGINE.connect() as con: - with open(utils.SQL_FOLDER / "employment/xref_block_to_mgra.sql") as file: - xref = pd.read_sql_query( - sql=sql.text(file.read()), - con=con, - params={"mgra_version": utils.MGRA_VERSION}, - ) - - return xref - - def aggregate_lodes_to_mgra( combined_data: pd.DataFrame, xref: pd.DataFrame, year: int ) -> pd.DataFrame: @@ -147,72 +125,74 @@ def aggregate_lodes_to_mgra( # Get unique industry codes and cross join with MGRA data unique_industries = combined_data["industry_code"].unique() - jobs_frame = ( - mgra_data.assign(key=1) - .merge( - pd.DataFrame({"industry_code": unique_industries, "key": 1}), - on="key", - ) - .drop("key", axis=1) + jobs = mgra_data.merge( + pd.DataFrame({"industry_code": unique_industries}), how="cross" ) - jobs_frame["year"] = year - jobs_frame = jobs_frame[["year", "mgra", "industry_code"]] + jobs["year"] = year + jobs = jobs[["year", "mgra", "industry_code"]] # Join combined_data to xref and calculate allocated jobs lehd_to_mgra = combined_data.merge(xref, on="block", how="inner") lehd_to_mgra["value"] = lehd_to_mgra["jobs"] * lehd_to_mgra["allocation_pct"] - # Sum allocated jobs by year, mgra, and industry_code - lehd_to_mgra_summed = lehd_to_mgra.groupby( - ["year", "mgra", "industry_code"], as_index=False - )["value"].sum() - - # Join summed data to jobs_frame, keeping all MGRAs and industry codes - final_lehd_to_mgra = jobs_frame.merge( - lehd_to_mgra_summed, + # Join summed data to jobs, keeping all MGRAs and industry codes + jobs = jobs.merge( + lehd_to_mgra.groupby(["year", "mgra", "industry_code"], as_index=False)[ + "value" + ].sum(), on=["year", "mgra", "industry_code"], how="left", ) - final_lehd_to_mgra["value"] = final_lehd_to_mgra["value"].fillna(0) - final_lehd_to_mgra["run_id"] = utils.RUN_ID # Add run_id column - final_lehd_to_mgra = final_lehd_to_mgra[ - ["run_id", "year", "mgra", "industry_code", "value"] - ] + jobs["value"] = jobs["value"].fillna(0) + jobs["run_id"] = utils.RUN_ID + jobs = jobs[["run_id", "year", "mgra", "industry_code", "value"]] - return final_lehd_to_mgra + return jobs -def get_control_totals(year: int) -> pd.DataFrame: - """Load QCEW employment data as control totals. +def _get_jobs_inputs(year: int) -> dict[str, pd.DataFrame]: + """Get input data related to jobs for a specified year. Args: - year (int): The year for which to load employment data. - + year (int): The year for which to retrieve input data. Returns: - pd.DataFrame: Employment control totals from QCEW. + dict[str, pd.DataFrame]: A dictionary containing input DataFrames related to jobs. """ + # Store results here + jobs_inputs = {} + + jobs_inputs["LODES_data"] = get_LODES_data(year) + with utils.LEHD_ENGINE.connect() as con: + # get crosswalk from Census blocks to MGRAs + with open(utils.SQL_FOLDER / "employment/xref_block_to_mgra.sql") as file: + jobs_inputs["xref_block_to_mgra"] = pd.read_sql_query( + sql=sql.text(file.read()), + con=con, + params={"mgra_version": utils.MGRA_VERSION}, + ) - # Get employment control totals from QCEW + # get regional employment control totals from QCEW with open(utils.SQL_FOLDER / "employment/QCEW_control.sql") as file: - control_totals = utils.read_sql_query_fallback( + jobs_inputs["control_totals"] = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ "year": year, }, ) + jobs_inputs["control_totals"]["run_id"] = utils.RUN_ID - control_totals["run_id"] = utils.RUN_ID # Add run_id column + jobs_inputs["lehd_jobs"] = aggregate_lodes_to_mgra( + jobs_inputs["LODES_data"], jobs_inputs["xref_block_to_mgra"], year + ) - return control_totals + return jobs_inputs -def apply_employment_controls( - original_data: pd.DataFrame, - control_totals: pd.DataFrame, - generator: np.random.Generator, -) -> pd.DataFrame: +def _create_jobs_output( + jobs_inputs: dict[str, pd.DataFrame], +) -> dict[str, pd.DataFrame]: """Apply control totals to employment data using utils.integerize_1d(). Args: @@ -223,40 +203,43 @@ def apply_employment_controls( Returns: pd.DataFrame: Controlled employment data. """ + jobs_outputs = {} # Create a copy of original_data for controlled results - controlled_data = original_data.copy() + jobs_outputs["results"] = jobs_inputs["lehd_jobs"].copy() # Get unique industry codes - industry_codes = original_data["industry_code"].unique() + industry_codes = jobs_inputs["lehd_jobs"]["industry_code"].unique() # Apply integerize_1d to each industry code for industry_code in industry_codes: # Filter original data for this industry - industry_mask = original_data["industry_code"] == industry_code + industry_mask = jobs_inputs["lehd_jobs"]["industry_code"] == industry_code # Get control value for this industry - control_value = control_totals[ - control_totals["industry_code"] == industry_code + control_value = jobs_inputs["control_totals"][ + jobs_inputs["control_totals"]["industry_code"] == industry_code ]["value"].iloc[0] # Apply integerize_1d and update controlled_data - controlled_data.loc[industry_mask, "value"] = utils.integerize_1d( - data=original_data.loc[industry_mask, "value"], + jobs_outputs["results"].loc[industry_mask, "value"] = utils.integerize_1d( + data=jobs_inputs["lehd_jobs"].loc[industry_mask, "value"], control=control_value, methodology="weighted_random", generator=generator, ) - return controlled_data + return jobs_outputs -def _insert_jobs(jobs_inputs: pd.DataFrame, jobs_outputs: pd.DataFrame) -> None: +def _insert_jobs( + jobs_inputs: dict[str, pd.DataFrame], jobs_outputs: dict[str, pd.DataFrame] +) -> None: """Insert input and output data related to jobs to the database.""" # Insert input and output data to database with utils.ESTIMATES_ENGINE.connect() as con: - jobs_inputs.to_sql( + jobs_inputs["control_totals"].to_sql( name="controls_jobs", con=con, schema="inputs", @@ -264,6 +247,6 @@ def _insert_jobs(jobs_inputs: pd.DataFrame, jobs_outputs: pd.DataFrame) -> None: index=False, ) - jobs_outputs.to_sql( + jobs_outputs["results"].to_sql( name="jobs", con=con, schema="outputs", if_exists="append", index=False )