diff --git a/config.yml b/config.yml index 659a083..1441abd 100644 --- a/config.yml +++ b/config.yml @@ -28,4 +28,5 @@ debug: population: False population_by_ase: False household_characteristics: False + employment: False staging: False diff --git a/main.py b/main.py index 29f35e2..be83321 100644 --- a/main.py +++ b/main.py @@ -13,6 +13,7 @@ import python.pop_type as pop import python.ase as ase import python.hh_characteristics as hh_characteristics +import python.employment as employment import python.staging as staging import python.utils as utils @@ -57,6 +58,11 @@ logger.info("Running Household Characteristics module...") hh_characteristics.run_hh_characteristics(year) + # Employment module + if utils.RUN_INSTRUCTIONS["employment"]: + logger.info("Running Employment module...") + employment.run_employment(year) + # Diagnostic print for this year logger.info(f"Finished running {year}\n") diff --git a/python/ase.py b/python/ase.py index ed56105..bf4cfaf 100644 --- a/python/ase.py +++ b/python/ase.py @@ -98,7 +98,7 @@ def _get_controls_inputs(year: int) -> dict[str, pd.DataFrame]: # Get regional age/sex/ethnicity group quarters distributions with open(utils.SQL_FOLDER / "ase/get_region_gq_ase_dist.sql") as file: - region_gq_ase_dist = utils.read_sql_query_acs( + region_gq_ase_dist = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ @@ -262,7 +262,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]: with utils.ESTIMATES_ENGINE.connect() as con: # Get the Age/Sex B010001 table data with open(utils.SQL_FOLDER / "ase/get_B01001.sql") as file: - b01001 = utils.read_sql_query_acs( + b01001 = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ @@ -272,7 +272,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]: # Get the Ethnicity B03002 table data with open(utils.SQL_FOLDER / "ase/get_B03002.sql") as file: - b03002 = utils.read_sql_query_acs( + b03002 = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ @@ -282,7 +282,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]: # Get Age/Sex/Ethnicity data from B01001(B-I) table data with open(utils.SQL_FOLDER / "ase/get_B01001(B-I).sql") as file: - b01001_b_i = utils.read_sql_query_acs( + b01001_b_i = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ diff --git a/python/employment.py b/python/employment.py new file mode 100644 index 0000000..4fbcf43 --- /dev/null +++ b/python/employment.py @@ -0,0 +1,252 @@ +import numpy as np +import pandas as pd +import sqlalchemy as sql + +import python.utils as utils + +generator = np.random.default_rng(utils.RANDOM_SEED) + + +def run_employment(year: int): + """Control function to create jobs data by industry_code (NAICS) at the MGRA level. + + Get the LEHD LODES data, aggregate to the MGRA level using the block to MGRA + crosswalk, then apply control totals from QCEW using integerization. + + Functionality is split apart for code encapsulation (function inputs not included): + get_LODES_data - Get LEHD LODES data for a specified year, including + special handling for industry_code 72 (Accommodation and Food Services) + xref_block_to_mgra - Get crosswalk from Census blocks to MGRAs + aggregate_lodes_to_mgra - Aggregate LODES data to MGRA level using allocation + percentages from the block to MGRA crosswalk + get_control_totals - Load QCEW employment data as county total controls + apply_employment_controls - Apply control totals to employment data using + utils.integerize_1d() + _insert_jobs - Store both the control totals and controlled employment + inputs/outputs to the production database + + + Args: + year (int): estimates year + """ + + # Check MGRA version and raise error if not 'mgra15' + if utils.MGRA_VERSION != "mgra15": + raise ValueError( + f"Employment module only works with MGRA_VERSION = 'mgra15'. " + f"Current MGRA_VERSION is '{utils.MGRA_VERSION}'." + ) + + jobs_inputs = _get_jobs_inputs(year) + # TODO _validate_jobs_inputs here before proceeding + + jobs_outputs = _create_jobs_output(jobs_inputs) + # TODO _validate_jobs_outputs here before proceeding + + _insert_jobs(jobs_inputs, jobs_outputs) + + +def get_LODES_data(year: int) -> pd.DataFrame: + """Retrieve LEHD LODES data for a specified year. + + Args: + year (int): The year for which to retrieve LEHD LODES data. + """ + + with utils.LEHD_ENGINE.connect() as con: + with open(utils.SQL_FOLDER / "employment/get_lodes_data.sql") as file: + lodes_data = utils.read_sql_query_fallback( + max_lookback=2, + sql=sql.text(file.read()), + con=con, + params={"year": year}, + ) + + with utils.GIS_ENGINE.connect() as con: + with open(utils.SQL_FOLDER / "employment/get_naics72_split.sql") as file: + split_naics_72 = utils.read_sql_query_fallback( + max_lookback=3, + sql=sql.text(file.read()), + con=con, + params={"year": year}, + ) + + # Separate industry_code 72 from other industries + lodes_72 = lodes_data[lodes_data["industry_code"] == "72"].copy() + lodes_other = lodes_data[lodes_data["industry_code"] != "72"].copy() + + # Join industry_code 72 data with split percentages + lodes_72_split = lodes_72.merge(split_naics_72, on="block", how="left") + + # Create rows for industry_code 721 + lodes_721 = lodes_72_split[["year", "block"]].copy() + lodes_721["industry_code"] = "721" + lodes_721["jobs"] = lodes_72_split["jobs"] * lodes_72_split["pct_721"] + + # Create rows for industry_code 722 + lodes_722 = lodes_72_split[["year", "block"]].copy() + lodes_722["industry_code"] = "722" + lodes_722["jobs"] = lodes_72_split["jobs"] * lodes_72_split["pct_722"] + + # Combine all data + combined_data = pd.concat([lodes_other, lodes_721, lodes_722], ignore_index=True) + combined_data = combined_data[["year", "block", "industry_code", "jobs"]] + + return combined_data + + +def aggregate_lodes_to_mgra( + combined_data: pd.DataFrame, xref: pd.DataFrame, year: int +) -> pd.DataFrame: + """Aggregate LODES data to MGRA level using allocation percentages. + + Args: + combined_data (pd.DataFrame): LODES data with columns: year, block, industry_code, jobs + xref (pd.DataFrame): Crosswalk with columns: block, mgra, allocation_pct + year (int): The year for which to aggregate data + + Returns: + pd.DataFrame: Aggregated data at MGRA level with columns: year, mgra, industry_code, jobs + """ + # Get MGRA data from SQL + with utils.ESTIMATES_ENGINE.connect() as con: + mgra_data = pd.read_sql_query( + sql=sql.text( + """ + SELECT DISTINCT [mgra] + FROM [inputs].[mgra] + WHERE run_id = :run_id + ORDER BY [mgra] + """ + ), + con=con, + params={"run_id": utils.RUN_ID}, + ) + + # Get unique industry codes and cross join with MGRA data + unique_industries = combined_data["industry_code"].unique() + jobs = mgra_data.merge( + pd.DataFrame({"industry_code": unique_industries}), how="cross" + ) + jobs["year"] = year + jobs = jobs[["year", "mgra", "industry_code"]] + + # Join combined_data to xref and calculate allocated jobs + lehd_to_mgra = combined_data.merge(xref, on="block", how="inner") + lehd_to_mgra["value"] = lehd_to_mgra["jobs"] * lehd_to_mgra["allocation_pct"] + + # Join summed data to jobs, keeping all MGRAs and industry codes + jobs = jobs.merge( + lehd_to_mgra.groupby(["year", "mgra", "industry_code"], as_index=False)[ + "value" + ].sum(), + on=["year", "mgra", "industry_code"], + how="left", + ) + jobs["value"] = jobs["value"].fillna(0) + jobs["run_id"] = utils.RUN_ID + jobs = jobs[["run_id", "year", "mgra", "industry_code", "value"]] + + return jobs + + +def _get_jobs_inputs(year: int) -> dict[str, pd.DataFrame]: + """Get input data related to jobs for a specified year. + + Args: + year (int): The year for which to retrieve input data. + Returns: + dict[str, pd.DataFrame]: A dictionary containing input DataFrames related to jobs. + """ + # Store results here + jobs_inputs = {} + + jobs_inputs["LODES_data"] = get_LODES_data(year) + + with utils.LEHD_ENGINE.connect() as con: + # get crosswalk from Census blocks to MGRAs + with open(utils.SQL_FOLDER / "employment/xref_block_to_mgra.sql") as file: + jobs_inputs["xref_block_to_mgra"] = pd.read_sql_query( + sql=sql.text(file.read()), + con=con, + params={"mgra_version": utils.MGRA_VERSION}, + ) + + # get regional employment control totals from QCEW + with open(utils.SQL_FOLDER / "employment/QCEW_control.sql") as file: + jobs_inputs["control_totals"] = utils.read_sql_query_fallback( + sql=sql.text(file.read()), + con=con, + params={ + "year": year, + }, + ) + jobs_inputs["control_totals"]["run_id"] = utils.RUN_ID + + jobs_inputs["lehd_jobs"] = aggregate_lodes_to_mgra( + jobs_inputs["LODES_data"], jobs_inputs["xref_block_to_mgra"], year + ) + + return jobs_inputs + + +def _create_jobs_output( + jobs_inputs: dict[str, pd.DataFrame], +) -> dict[str, pd.DataFrame]: + """Apply control totals to employment data using utils.integerize_1d(). + + Args: + original_data (pd.DataFrame): LEHD LODES data at MGRA level. + control_totals (pd.DataFrame): Employment control totals from QCEW. + generator (np.random.Generator): NumPy random number generator. + + Returns: + pd.DataFrame: Controlled employment data. + """ + jobs_outputs = {} + # Create a copy of original_data for controlled results + jobs_outputs["results"] = jobs_inputs["lehd_jobs"].copy() + + # Get unique industry codes + industry_codes = jobs_inputs["lehd_jobs"]["industry_code"].unique() + + # Apply integerize_1d to each industry code + for industry_code in industry_codes: + # Filter original data for this industry + industry_mask = jobs_inputs["lehd_jobs"]["industry_code"] == industry_code + + # Get control value for this industry + control_value = jobs_inputs["control_totals"][ + jobs_inputs["control_totals"]["industry_code"] == industry_code + ]["value"].iloc[0] + + # Apply integerize_1d and update controlled_data + jobs_outputs["results"].loc[industry_mask, "value"] = utils.integerize_1d( + data=jobs_inputs["lehd_jobs"].loc[industry_mask, "value"], + control=control_value, + methodology="weighted_random", + generator=generator, + ) + + return jobs_outputs + + +def _insert_jobs( + jobs_inputs: dict[str, pd.DataFrame], jobs_outputs: dict[str, pd.DataFrame] +) -> None: + """Insert input and output data related to jobs to the database.""" + + # Insert input and output data to database + with utils.ESTIMATES_ENGINE.connect() as con: + + jobs_inputs["control_totals"].to_sql( + name="controls_jobs", + con=con, + schema="inputs", + if_exists="append", + index=False, + ) + + jobs_outputs["results"].to_sql( + name="jobs", con=con, schema="outputs", if_exists="append", index=False + ) diff --git a/python/hh_characteristics.py b/python/hh_characteristics.py index 3b28e60..05834a2 100644 --- a/python/hh_characteristics.py +++ b/python/hh_characteristics.py @@ -86,10 +86,12 @@ def _get_hh_income_inputs(year: int) -> dict[str, pd.DataFrame]: with open( utils.SQL_FOLDER / "hh_characteristics" / "get_tract_controls_hh_income.sql" ) as file: - hh_income_inputs["hh_income_tract_controls"] = utils.read_sql_query_acs( - sql=sql.text(file.read()), # type: ignore - con=con, # type: ignore - params={"run_id": utils.RUN_ID, "year": year}, + hh_income_inputs["hh_income_tract_controls"] = ( + utils.read_sql_query_fallback( + sql=sql.text(file.read()), # type: ignore + con=con, # type: ignore + params={"run_id": utils.RUN_ID, "year": year}, + ) ) return hh_income_inputs @@ -120,7 +122,7 @@ def _get_hh_size_inputs(year: int) -> dict[str, pd.DataFrame]: / "hh_characteristics" / "get_tract_controls_hh_by_size.sql" ) as file: - hh_char_inputs["hhs_tract_controls"] = utils.read_sql_query_acs( + hh_char_inputs["hhs_tract_controls"] = utils.read_sql_query_fallback( sql=sql.text(file.read()), # type: ignore con=con, # type: ignore params={"run_id": utils.RUN_ID, "year": year}, diff --git a/python/hs_hh.py b/python/hs_hh.py index 7a435a5..10e0a43 100644 --- a/python/hs_hh.py +++ b/python/hs_hh.py @@ -105,7 +105,7 @@ def _get_hs_hh_inputs(year: int) -> dict[str, pd.DataFrame]: # Get tract occupancy controls with open(utils.SQL_FOLDER / "hs_hh/get_tract_controls_hh.sql") as file: - hs_hh_inputs["tract_controls"] = utils.read_sql_query_acs( + hs_hh_inputs["tract_controls"] = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ diff --git a/python/parsers.py b/python/parsers.py index 3cf2043..9a2a7c4 100644 --- a/python/parsers.py +++ b/python/parsers.py @@ -95,6 +95,7 @@ def parse_config(self) -> None: "population", "population_by_ase", "household_characteristics", + "employment", "staging", ]: self.run_instructions[key] = self._config["debug"][key] @@ -161,6 +162,7 @@ def _validate_config(self) -> None: "population": {"type": "boolean"}, "population_by_ase": {"type": "boolean"}, "household_characteristics": {"type": "boolean"}, + "employment": {"type": "boolean"}, "staging": {"type": "boolean"}, }, }, diff --git a/python/pop_type.py b/python/pop_type.py index a116974..08ea6f6 100644 --- a/python/pop_type.py +++ b/python/pop_type.py @@ -204,7 +204,7 @@ def _get_hhp_inputs(year: int) -> dict[str, pd.DataFrame]: # Get tract level household size controls with open(utils.SQL_FOLDER / "pop_type/get_tract_controls_hhs.sql") as file: - tract_controls = utils.read_sql_query_acs( + tract_controls = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, params={ diff --git a/python/utils.py b/python/utils.py index c2436fe..7a26da2 100644 --- a/python/utils.py +++ b/python/utils.py @@ -18,7 +18,6 @@ ROOT_FOLDER = pathlib.Path(__file__).parent.resolve().parent SQL_FOLDER = ROOT_FOLDER / "sql" - ########### # LOGGING # ########### @@ -78,6 +77,16 @@ fast_executemany=True, ) +LEHD_ENGINE = sql.create_engine( + "mssql+pyodbc://@" + + _secrets["sql"]["socioec"]["server"] + + "/" + + _secrets["sql"]["socioec"]["database"] + + "?trusted_connection=yes&driver=" + + "ODBC Driver 17 for SQL Server", + fast_executemany=True, +) + # Other SQL configuration GIS_SERVER = _secrets["sql"]["gis"]["server"] @@ -673,45 +682,84 @@ def integerize_2d( return array_2d -def read_sql_query_acs(**kwargs: dict) -> pd.DataFrame: +def read_sql_query_fallback(max_lookback: int = 1, **kwargs: dict) -> pd.DataFrame: """Read SQL query allowing for dynamic year adjustment. This function executes a SQL query using pandas read_sql_query allowing for dynamic adjustment of the 'year' parameter in the query. If the query - returns a message indicating that the ACS 5-Year Table does not exist for - a given year, it will automatically decrement the year by one and re-run - the query. Note this function is specific to ACS 5-Year Tables and - requires the SQL query file to return a DataFrame with a single column - called 'msg' with the text 'ACS 5-Year Table does not exist' when no data - is found for the specified year. + returns a message indicating that the year does not exist in Table being + accessed, it will automatically decrement the year by one and re-run + the query. This process will continue for up to 5 years back. Note this + function is specific to querying ACS 5-Year Tables, LEHD LODES and EDD + point-level data. This function requires the SQL query file to return a + DataFrame with a single column called 'msg' with one of 4 text strings: + 'ACS 5-Year Table does not exist', 'LODES data does not exist', + 'EDD point-level data does not exist', or 'QCEW data does not exist' + when no data is found for the specified year. Args: + max_lookback (int = 1): Maximum number of years to look back if data is not found, defaults to 1 kwargs (dict): Keyword arguments for pd.read_sql_query Returns: pd.DataFrame: Result of the SQL query + + Raises: + ValueError: If data is not found after 5 year lookback or if an + unexpected message is returned """ - df = pd.read_sql_query(**kwargs) # type: ignore - - # Check if returned DataFrame contains SQL message - if df.columns.tolist() == ["msg"]: - msg = df["msg"].values[0] - if msg == "ACS 5-Year Table does not exist" and "year" in kwargs["params"]: - # If the table does not exist run query for prior year - kwargs["params"]["year"] -= 1 - - logger.warning( - "Re-running ACS SQL query with 'year' set to: " - + str(kwargs["params"]["year"]) - ) + # Store original year for potential relabeling + original_year = kwargs["params"]["year"] + + # Get max_lookback from kwargs and REMOVE it, default to 5 if not provided + # max_lookback = kwargs.pop("max_lookback", 1) + + # Messages that trigger year lookback + lookback_messages = [ + "ACS 5-Year Table does not exist", + "LODES data does not exist", + "EDD point-level data does not exist", + "QCEW data does not exist", + ] - df = pd.read_sql_query(**kwargs) # type: ignore + # Try up to max_lookback + 1 times (original year + 5 lookbacks) + for attempt in range(max_lookback + 1): + df = pd.read_sql_query(**kwargs) # type: ignore + + # Check if returned DataFrame contains SQL message + if df.columns.tolist() == ["msg"]: + msg = df["msg"].values[0] + + # Check if message is in the lookback list and year parameter exists + if msg in lookback_messages and "year" in kwargs["params"]: + # If we've exhausted all lookback attempts, raise error + if attempt >= max_lookback: + raise ValueError( + f"Data not found after {max_lookback} year lookback. " + f"Original year: {original_year}, " + f"Final attempted year: {kwargs['params']['year']}" + ) - # If the year column exists, set it to the original year - if "year" in df.columns: - df["year"] = kwargs["params"]["year"] + 1 - else: - # Raise error if the message is not expected - raise ValueError(f"SQL query returned a message: {msg}.") + # Decrement year and try again + kwargs["params"]["year"] -= 1 + + logger.warning( + f"Re-running SQL query with 'year' set to: " + f"{kwargs['params']['year']} (attempt {attempt + 2}/{max_lookback + 1})" + ) + + continue # Continue to next iteration + else: + # Raise error if the message is not expected + raise ValueError(f"SQL query returned an unexpected message: {msg}") + + # If we got valid data, relabel year column if it exists and year was adjusted + if "year" in df.columns and original_year is not None: + if kwargs["params"]["year"] != original_year: + logger.info( + f"Relabeling 'year' column from {kwargs['params']['year']} " + f"to {original_year}" + ) + df["year"] = original_year - return df + return df diff --git a/sql/create_objects.sql b/sql/create_objects.sql index dc3e6ca..fb88330 100644 --- a/sql/create_objects.sql +++ b/sql/create_objects.sql @@ -110,6 +110,17 @@ INSERT INTO [inputs].[special_mgras] VALUES transferred to the Las Colinas Detention Facility.') GO +CREATE TABLE [inputs].[controls_jobs] ( + [run_id] INT NOT NULL, + [year] INT NOT NULL, + [industry_code] NVARCHAR(5) NOT NULL, + [value] INT NOT NULL, + INDEX [ccsi_inputs_controls_jobs] CLUSTERED COLUMNSTORE, + CONSTRAINT [ixuq_inputs_controls_jobs] UNIQUE ([run_id], [year], [industry_code]) WITH (DATA_COMPRESSION = PAGE), + CONSTRAINT [fk_inputs_controls_jobs_run_id] FOREIGN KEY ([run_id]) REFERENCES [metadata].[run] ([run_id]), + CONSTRAINT [chk_non_negative_inputs_controls_jobs] CHECK ([value] >= 0) +) +GO CREATE SCHEMA [outputs] @@ -196,3 +207,17 @@ CREATE TABLE [outputs].[hhp] ( CONSTRAINT [chk_non_negative_outputs_hhp] CHECK ([value] >= 0) ) GO + +CREATE TABLE [outputs].[jobs] ( + [run_id] INT NOT NULL, + [year] INT NOT NULL, + [mgra] INT NOT NULL, + [industry_code] NVARCHAR(5) NOT NULL, + [value] INT NOT NULL, + INDEX [ccsi_outputs_jobs] CLUSTERED COLUMNSTORE, + CONSTRAINT [ixuq_outputs_jobs] UNIQUE ([run_id], [year], [mgra], [industry_code]) WITH (DATA_COMPRESSION = PAGE), + CONSTRAINT [fk_outputs_jobs_run_id] FOREIGN KEY ([run_id]) REFERENCES [metadata].[run] ([run_id]), + CONSTRAINT [fk_outputs_jobs_mgra] FOREIGN KEY ([run_id], [mgra]) REFERENCES [inputs].[mgra] ([run_id], [mgra]), + CONSTRAINT [chk_non_negative_outputs_jobs] CHECK ([value] >= 0) +) +GO diff --git a/sql/employment/QCEW_control.sql b/sql/employment/QCEW_control.sql new file mode 100644 index 0000000..1e048f4 --- /dev/null +++ b/sql/employment/QCEW_control.sql @@ -0,0 +1,27 @@ +-- Initialize parameters ----------------------------------------------------- +DECLARE @year integer = :year; +DECLARE @msg nvarchar(25) = 'QCEW data does not exist'; + +-- Send error message if no data exists -------------------------------------- +IF NOT EXISTS ( + SELECT TOP (1) * + FROM [socioec_data].[bls].[qcew_by_area_annual] + WHERE [year] = @year +) +SELECT @msg AS [msg] +ELSE +BEGIN + +SELECT + [year], + [industry_code], + SUM([annual_avg_emplvl]) AS [value] +FROM [socioec_data].[bls].[qcew_by_area_annual] +INNER JOIN [socioec_data].[bls].[industry_code] + ON [qcew_by_area_annual].[naics_id] = [industry_code].[naics_id] +WHERE [area_fips] = '06073' + AND [year] = @year + AND [industry_code] IN ('11', '21', '22', '23', '31-33', '42', '44-45', '48-49', '51', '52', '53', '54', '55', '56', '61', '62', '71', '721', '722', '81', '92') +GROUP BY [year], [industry_code] + +END \ No newline at end of file diff --git a/sql/employment/get_lodes_data.sql b/sql/employment/get_lodes_data.sql new file mode 100644 index 0000000..ecd845a --- /dev/null +++ b/sql/employment/get_lodes_data.sql @@ -0,0 +1,61 @@ +-- Documentation +-- The mapping below used for [CNS01] to [CNS20] to [industry_code] (2-digit NAICS) in WAC section of the document linked below. +-- [industry_code] is used to represent NAICS as the QCEW data brought in a later step uses [industry_code] +-- https://lehd.ces.census.gov/doc/help/onthemap/LODESTechDoc.pdf + + +-- Initialize parameters ----------------------------------------------------- +DECLARE @year integer = :year; +DECLARE @msg nvarchar(25) = 'LODES data does not exist'; + +-- Send error message if no data exists -------------------------------------- +IF NOT EXISTS ( + SELECT TOP (1) * + FROM [socioec_data].[lehd].[lodes_8_wac] + WHERE [SEG] = 'S000' -- 'S000' represents segment 'Total number of jobs' as seen in 'OD' section from document linked at top of file + AND [TYPE] = 'JT00' -- 'JT00' is for 'All Jobs' as seen in 'OD' section from document linked at top of file + AND [version] = 2 -- latest version loaded into database + AND [YEAR] = @year +) +BEGIN + SELECT @msg AS [msg] +END +ELSE +BEGIN + + -- Build the return table of QCEW control Totals by industry_code (NAICS) + SELECT + [YEAR] AS [year], + [w_geocode] AS [block], + [industry_code], + SUM([value]) AS [jobs] + FROM [socioec_data].[lehd].[lodes_8_wac] + CROSS APPLY ( + VALUES + ('11', [CNS01]), + ('21', [CNS02]), + ('22', [CNS03]), + ('23', [CNS04]), + ('31-33',[CNS05]), + ('42', [CNS06]), + ('44-45',[CNS07]), + ('48-49',[CNS08]), + ('51', [CNS09]), + ('52', [CNS10]), + ('53', [CNS11]), + ('54', [CNS12]), + ('55', [CNS13]), + ('56', [CNS14]), + ('61', [CNS15]), + ('62', [CNS16]), + ('71', [CNS17]), + ('72', [CNS18]), + ('81', [CNS19]), + ('92', [CNS20]) + ) AS u([industry_code], [value]) + WHERE [SEG] = 'S000' -- 'S000' represents segment 'Total number of jobs' as seen in 'OD' section from document linked at top of file + AND [TYPE] = 'JT00' -- 'JT00' is for 'All Jobs' as seen in 'OD' section from document linked at top of file + AND [version] = 2 -- latest version loaded into database + AND [YEAR] = @year + GROUP BY [YEAR], [w_geocode], [industry_code] +END \ No newline at end of file diff --git a/sql/employment/get_naics72_split.sql b/sql/employment/get_naics72_split.sql new file mode 100644 index 0000000..abe8c61 --- /dev/null +++ b/sql/employment/get_naics72_split.sql @@ -0,0 +1,203 @@ +/* +This query provides a split of 2-digit NAICS 72 into 3-digit NAICS codes. +Point-level data is gotten from the confidential EDD dataset, assigned to +2020 Census Blocks and the percentage split of 2-digit NAICS 72 into the +3-digit NAICS codes of 721 and 722 is calculated within each block. + +Notes: + 1) This query assumes the connection is to the GIS server. + 2) Data prior to year 2017 is not present in the EDD view and must be + queried directly from the source database table. + 3) If no split is present for a block, the regional percentage split is + substituted. All 2020 Census blocks are represented. +*/ + +SET NOCOUNT ON; +-- Initialize parameters and return table ------------------------------------ +DECLARE @year INTEGER = :year; +DECLARE @msg nvarchar(45) = 'EDD point-level data does not exist'; + + +--Drop temp table if exist and then create temp table +DROP TABLE IF EXISTS [#edd]; +CREATE TABLE [#edd] ( + [id] INTEGER IDENTITY(1,1) NOT NULL, + [industry_code] NVARCHAR(3) NOT NULL, + [average_monthly_jobs] FLOAT NOT NULL, + [Shape] GEOMETRY NOT NULL, + CONSTRAINT [pk_tt_edd] PRIMARY KEY ([id]) +) + +-- Create spatial index for later spatial join +-- Bounding box coordinates from SANDAG GIS team +-- Identical to spatial index on LUDU point layers in GIS database +CREATE SPATIAL INDEX [sidx_tt_edd] ON [#edd] +([Shape]) USING GEOMETRY_AUTO_GRID +WITH (BOUNDING_BOX = ( + 6151635.98006938, + 1775442.36347014, + 6613401.66775663, + 2129306.52024172), + CELLS_PER_OBJECT = 8 +) + + +-- Get SANDAG GIS team EDD dataset ------------------------------------------- +IF @year >= 2017 +BEGIN + INSERT INTO [#edd] + SELECT + [industry_code], + 1.0 * [emp_total]/[emp_valid] AS [average_monthly_jobs], + [SHAPE] + FROM ( + SELECT + CASE + WHEN LEFT([code], 3) = '721' THEN '721' + WHEN LEFT([code], 3) = '722' THEN '722' + ELSE NULL + END AS [industry_code], + CASE WHEN [emp_m1] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m2] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m3] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m4] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m5] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m6] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m7] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m8] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m9] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m10] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m11] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m12] IS NOT NULL THEN 1 ELSE 0 + END AS [emp_valid], + ISNULL([emp_m1], 0) + + ISNULL([emp_m2], 0) + + ISNULL([emp_m3], 0) + + ISNULL([emp_m4], 0) + + ISNULL([emp_m5], 0) + + ISNULL([emp_m6], 0) + + ISNULL([emp_m7], 0) + + ISNULL([emp_m8], 0) + + ISNULL([emp_m9], 0) + + ISNULL([emp_m10], 0) + + ISNULL([emp_m11], 0) + + ISNULL([emp_m12], 0) + AS [emp_total], + [SHAPE] + FROM [EMPCORE].[ca_edd].[vi_ca_edd_employment] + INNER JOIN [EMPCORE].[ca_edd].[naics] + ON [vi_ca_edd_employment].[naics_id] = [naics].[naics_id] + WHERE + [year] = @year + AND LEFT([code], 3) IN ('721','722') + ) AS [tt] + WHERE + [emp_valid] > 0 + AND [emp_total] > 0 +END +ELSE IF @year = 2016 OR @year BETWEEN 2010 AND 2014 +BEGIN + INSERT INTO [#edd] + SELECT + CASE + WHEN LEFT([code], 3) = '721' THEN '721' + WHEN LEFT([code], 3) = '722' THEN '722' + ELSE NULL + END AS [industry_code], + [employment] * ISNULL([headquarters].[share], 1) AS [average_monthly_jobs], + ISNULL([headquarters].[shape], [businesses].[shape]) AS [SHAPE] + FROM [EMPCORE].[ca_edd].[businesses] + INNER JOIN [EMPCORE].[ca_edd].[naics] + ON [businesses].[naics_id] = [naics].[naics_id] + LEFT JOIN [EMPCORE].[ca_edd].[headquarters] + ON [businesses].[year] = [headquarters].[year] + AND [businesses].[emp_id] = [headquarters].[emp_id] + INNER JOIN ( + SELECT [year], [emp_id], [employment] + FROM [EMPCORE].[ca_edd].[employment] + WHERE + [month_id] = 14 -- adjusted employment + AND [employment] > 0 + AND [year] = @year + ) AS [employment] + ON [businesses].[year] = [employment].[year] + AND [businesses].[emp_id] = [employment].[emp_id] + WHERE LEFT([code], 3) IN ('721','722') +END +ELSE IF @year = 2015 +BEGIN + INSERT INTO [#edd] + SELECT + CASE + WHEN LEFT([code], 3) = '721' THEN '721' + WHEN LEFT([code], 3) = '722' THEN '722' + ELSE NULL + END AS [industry_code], + [employment] * ISNULL([headquarters].[share], 1) AS [average_monthly_jobs], + ISNULL([headquarters].[shape], [businesses].[shape]) AS [SHAPE] + FROM [EMPCORE].[ca_edd].[businesses] + INNER JOIN [EMPCORE].[ca_edd].[naics] + ON [businesses].[naics_id] = [naics].[naics_id] + LEFT JOIN [EMPCORE].[ca_edd].[headquarters] + ON [businesses].[year] = [headquarters].[year] + AND [businesses].[emp_id] = [headquarters].[emp_id] + INNER JOIN ( + SELECT + [year], + [emp_id], + 1.0 * ((ISNULL([15], 0) + ISNULL([16], 0) + ISNULL([17], 0)) + / + (CASE WHEN [15] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [16] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [17] IS NOT NULL THEN 1 ELSE 0 END + )) + AS [employment] + FROM [EMPCORE].[ca_edd].[employment] + PIVOT(SUM([employment]) FOR [month_id] IN ([15], [16], [17])) AS [pivot] + WHERE + [year] = @year + AND ([15] IS NOT NULL OR [16] IS NOT NULL OR [17] IS NOT NULL) + ) AS [employment] + ON [businesses].[year] = [employment].[year] + AND [businesses].[emp_id] = [employment].[emp_id] + WHERE LEFT([code], 3) IN ('721','722') +END + + +-- Send error message if no data exists -------------------------------------- +IF NOT EXISTS ( + SELECT TOP (1) * + FROM [#edd] +) +BEGIN + SELECT @msg AS [msg] +END +ELSE +BEGIN + -- Calculate % split of NAICS 72 into 721 and 722 for 2020 Census Blocks - + SELECT + [GEOID20] AS [block], + CASE + WHEN [72] = 0 THEN SUM([721]) OVER() / SUM([72]) OVER() + ELSE [721] / [72] + END AS [pct_721], + CASE + WHEN [72] = 0 THEN SUM([722]) OVER() / SUM([72]) OVER() + ELSE [722] / [72] + END AS [pct_722] + FROM ( + SELECT + [GEOID20], + SUM(CASE WHEN [industry_code] = '721' THEN [average_monthly_jobs] ELSE 0 END) AS [721], + SUM(CASE WHEN [industry_code] = '722' THEN [average_monthly_jobs] ELSE 0 END) AS [722], + ISNULL(SUM([average_monthly_jobs]), 0) AS [72] + FROM [#edd] + RIGHT OUTER JOIN [GeoDepot].[sde].[CENSUSBLOCKS] + ON [#edd].[Shape].STIntersects([CENSUSBLOCKS].[Shape]) = 1 + GROUP BY [GEOID20] + ) [tt] + ORDER BY [GEOID20] +END + +--Drop Temp table +DROP TABLE IF EXISTS [#edd]; \ No newline at end of file diff --git a/sql/employment/xref_block_to_mgra.sql b/sql/employment/xref_block_to_mgra.sql new file mode 100644 index 0000000..c9ebbaf --- /dev/null +++ b/sql/employment/xref_block_to_mgra.sql @@ -0,0 +1,11 @@ +-- Initialize parameters ----------------------------------------------------- +DECLARE @mgra_version nvarchar(10) = :mgra_version; + + +SELECT + [from_zone] AS [block], + CAST([to_zone] AS INT) AS [mgra], + [allocation_pct] +FROM [GeoAnalyst].[geography].[fn_xref_zones]( + CASE WHEN @mgra_version = 'mgra15' THEN 18 ELSE NULL END + ) \ No newline at end of file diff --git a/wiki/Utility.md b/wiki/Utility.md index f767c7e..dfd2c9a 100644 --- a/wiki/Utility.md +++ b/wiki/Utility.md @@ -71,9 +71,9 @@ For rows with negative deviations, is not always possible to find non-zero colum Both the "Nearest Neighbors" and abandonment of the non-zero requirement can lead to values being increased in columns that are implausible for the row. For example, in the "Population by Age Sex Ethnicity" module, a row defined as a MGRA containing male-only adult persons may have female or juvenile values increased if the function cannot reallocate to non-zero columns. This necessitates the use of an additional "balancer" in the "Population by Age Sex Ethnicity" module for special MGRAs. -## Dealing with un-released ACS 5-year Detailed Tables (`read_sql_query_acs()`) +## Dealing with un-released ACS 5-year Detailed Tables (`read_sql_query_fallback()`) -This function is a wrapper for `pd.read_sql_query` with an extension built in that handles requests for ACS 5-year Detailed Tables data that are currently not released. Essentially, all SQL scripts dealing with ACS data have an `IF/ELSE` statement at the top which checks for the existence of data. If the data for the specified table/year could not be found, then an error message is returned. This function detects the presence of the error message and re-runs the query using the previous year instead. +This function is a wrapper for `pd.read_sql_query` with an extension built in that handles requests for ACS 5-year Detailed Tables, LEHD LODES, QCEW, and EDD point-level data data that are currently not released. Essentially, all SQL scripts dealing with ACS, LODES, QCEW, and EDD data have an `IF/ELSE` statement at the top which checks for the existence of data. If the data for the specified table/year could not be found, then an error message is returned. This function detects the presence of the error message and re-runs the query using the previous year or will iteratively try to retrieve data to a defined number of years back, specified with parameter `max_lookback`, which default value is 1. ### A Note on ACS 5-year Detailed Tables