Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions scripts/census_county_business_patterns/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,21 @@ No preprocessing required.
fully auto refresh

# Script Execution Details
First, run `main.py` to download the data.
flags: data_start_year - this is the default flag which refers to the start year.
data_end_year - this is the default flag which refers to the current year.
output_dir - this is also the default flag which refers to the output directory for processed output from 'main.py' script.
The `main.py` script is used to download and process the source data.

### Download and Processing Logic
The script iterates through a range of years from `data_start_year` up to `data_end_year - 2`.
- **Historical Data:** For years prior to the latest available data, the script expects the data to be present at the source. If a download fails for these years, it is considered a critical failure and the script will stop.
- **Latest Year Data:** The script attempts to download data for the latest year (calculated as `data_end_year - 2`). If this data is not yet published (returning a 404 error), the script will log a warning and skip it, allowing the import to proceed with available historical data.
- **File Extraction:** Source files are downloaded as ZIP archives, and only relevant `.txt` files are extracted for processing.
- **In-Memory Processing:** Extracted `.txt` files are loaded into memory and processed into CSV format suitable for sharding.

### Flags
- `data_start_year`: The year to start downloading data from (default is 2016).
- `data_end_year`: The year to process data up to (default is the current year). The script attempts to download data up to `data_end_year - 2`.
- `output_dir`: The directory where processed CSV files will be saved.
- `test`: A boolean flag to run the script in test mode (default is False).

The `shard_input_csv.sh` script performs the following preprocessing steps:
1. Creates directories for shards, debug outputs, and final processed outputs.
2. Splits the large input CSV files into smaller shards of 500,000 rows each.
Expand Down
64 changes: 43 additions & 21 deletions scripts/census_county_business_patterns/main.py
Comment thread
kartik-s21 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def retry_method(url, headers=None):
def download_files():
start_year = FLAGS.data_start_year
end_year = FLAGS.data_end_year
# The latest year being attempted is end_year - 2 (e.g., 2026 - 2 = 2024)
latest_year = end_year - 2

for year in range(start_year, end_year - 1):
last_two_digits_formatted = f"{year % 100:02d}"
Expand All @@ -107,29 +109,49 @@ def download_files():
filename = name_template.format(last_two_digits_formatted)
url = url_template.format(year, last_two_digits_formatted)
logging.info(f"downloading url: {url}")
response = retry_method(url)
zip_content_stream = io.BytesIO(response.content)
with zipfile.ZipFile(zip_content_stream, 'r') as zip_ref:
for member in zip_ref.namelist():
if not member.endswith('/') and member.lower().endswith(
'.txt'):
extract_path = os.path.join(_LOCAL_OUTPUT_PATH,
os.path.basename(member))
abs_extract_path = os.path.abspath(extract_path)
abs_target_dir = os.path.abspath(_LOCAL_OUTPUT_PATH)
if not abs_extract_path.startswith(abs_target_dir):
logging.info(
f" WARNING: Path traversal attempt detected for '{member}'. Skipping."
)
continue # Skip this member to prevent security risk
try:
response = retry_method(url)
zip_content_stream = io.BytesIO(response.content)
with zipfile.ZipFile(zip_content_stream, 'r') as zip_ref:
for member in zip_ref.namelist():
if not member.endswith('/') and member.lower().endswith(
'.txt'):
extract_path = os.path.join(
_LOCAL_OUTPUT_PATH,
os.path.join(_LOCAL_OUTPUT_PATH,
os.path.basename(member)))
abs_extract_path = os.path.abspath(extract_path)
abs_target_dir = os.path.abspath(_LOCAL_OUTPUT_PATH)

if not abs_extract_path.startswith(abs_target_dir):
logging.info(
f" WARNING: Path traversal attempt detected for '{member}'. Skipping."
)
continue

# Read the file content from the in-memory zip and write it to disk
with open(extract_path, 'wb') as outfile:
outfile.write(zip_ref.read(member))
extracted_any_txt = True
else:
logging.info(
f"Skipping non-txt file/folder in zip: '{member}'")
with open(extract_path, 'wb') as outfile:
outfile.write(zip_ref.read(member))
else:
logging.info(
f" Skipping non-txt file/folder in zip: '{member}'"
)
except (requests.exceptions.RequestException,
zipfile.BadZipFile) as e:
# Check if this is the latest year which might not be published yet (404)
is_404 = (isinstance(e, requests.exceptions.HTTPError) and
e.response.status_code == 404)
if year == latest_year and is_404:
logging.warning(
f"Latest year {year} not yet available at {url}. Skipping."
)
continue
else:
# For historical years or non-404 errors, we want the script to fail
logging.error(
f"Critical failure: Could not download historical data for {year} at {url}."
)
raise e


def main(argv):
Expand Down
Loading