-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrefresh.py
More file actions
158 lines (140 loc) · 6.06 KB
/
refresh.py
File metadata and controls
158 lines (140 loc) · 6.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from glob import glob
from os.path import basename, exists, getsize
import yaml
from click import option
from utz import err, check, run
from path_data.cli.base import path_data, commit_opt
from path_data.paths import hourly_pdf, monthly_pdf
from path_data.utils import last_month, git_has_staged_changes, pdf_pages, verify_no_staged_changes
BASE_URL = 'https://www.panynj.gov/content/dam/path/about/statistics'
BT_BASE_URL = 'https://www.panynj.gov/content/dam/bridges-tunnels/pdfs'
def update_pdf(name: str, base_url: str = BASE_URL, data_dir: str = 'data') -> bool:
"""Update or import a PDF via DVX, return True if content changed."""
dvc_path = f'{data_dir}/{name}.dvc'
out_path = f'{data_dir}/{name}'
url = f'{base_url}/{name}'
err(f'\tchecking {name}')
if exists(dvc_path):
# Existing import — check for updates
run('dvx', 'update', dvc_path)
run('git', 'add', out_path, dvc_path)
return True
else:
# New PDF — try to import (404 = doesn't exist yet)
if check('dvx', 'import-url', '-G', url, '-o', out_path):
err(f'\t imported (new)')
run('git', 'add', out_path, dvc_path)
return True
else:
err(f'\t not found')
return False
def ensure_year_pipeline(year: int):
"""Create computation .dvc stubs for a new year, and add the year to all.pqt deps."""
files_created = []
monthly_stubs = [
(f'data/{year}.pqt.dvc', f'{year}.pqt', f'path-data monthly -y {year}'),
(f'data/{year}-day-types.pqt.dvc', f'{year}-day-types.pqt', f'path-data monthly -y {year}'),
]
hourly_stubs = [
(f'data/{year}-hourly.pqt.dvc', f'{year}-hourly.pqt', f'path-data parse-hourly -y {year}'),
(f'data/{year}-hourly-total.pqt.dvc', f'{year}-hourly-total.pqt', f'path-data parse-hourly -y {year}'),
(f'data/{year}-hourly-system.pqt.dvc', f'{year}-hourly-system.pqt', f'path-data parse-hourly -y {year}'),
]
# Hourly PDFs are only published from 2017 onward.
stubs = monthly_stubs + (hourly_stubs if year >= 2017 else [])
for dvc_path, out_path, cmd in stubs:
if not exists(dvc_path):
err(f'\tcreating computation stub: {dvc_path}')
stub = {
'outs': [{'path': out_path}],
'meta': {
'computation': {
'cmd': cmd,
}
},
}
with open(dvc_path, 'w') as f:
yaml.dump(stub, f, default_flow_style=False, sort_keys=False)
files_created.append(dvc_path)
# Add new year to deps in all .dvc files that depend on per-year parquets.
# Monthly .dvc files get `data/{year}.pqt`; hourly gets `data/{year}-hourly.pqt`.
dvc_files = ['data/all.pqt.dvc'] + sorted(glob('www/public/*.dvc'))
for dvc_path in dvc_files:
with open(dvc_path) as f:
dvc_data = yaml.safe_load(f)
deps = dvc_data.get('meta', {}).get('computation', {}).get('deps', {})
if deps is None:
continue
# Detect whether this .dvc depends on monthly or hourly parquets
has_monthly = any(k.endswith('.pqt') and '-hourly' not in k for k in deps)
has_hourly = any('-hourly.pqt' in k for k in deps)
added = False
if has_monthly:
dep_key = f'data/{year}.pqt'
if dep_key not in deps:
err(f'\tadding {dep_key} to {dvc_path} deps')
deps[dep_key] = None
added = True
if has_hourly and year >= 2017:
dep_key = f'data/{year}-hourly.pqt'
if dep_key not in deps:
err(f'\tadding {dep_key} to {dvc_path} deps')
deps[dep_key] = None
added = True
if added:
with open(dvc_path, 'w') as f:
yaml.dump(dvc_data, f, default_flow_style=False, sort_keys=False)
files_created.append(dvc_path)
if files_created:
run('git', 'add', *files_created)
@path_data.command
@commit_opt
@option('-y', '--year', type=int, help='Year to update PATH data PDFs for')
def refresh(commit: int, year: int | None):
"""Refresh local copies of PATH ridership data PDFs."""
verify_no_staged_changes()
last_ym = last_month()
if year is not None:
years = [year]
else:
# Check both current year (may have new months) and next year (may have started)
next_ym = last_ym + 1
years = sorted({last_ym.y, next_ym.y})
err(f"Most recent local data: {last_ym}, checking year(s): {', '.join(map(str, years))}")
new_years = []
for year in years:
monthly_name = basename(monthly_pdf(year))
is_new = not exists(f'data/{monthly_name}.dvc')
update_pdf(monthly_name)
if year >= 2017:
hourly_name = basename(hourly_pdf(year))
update_pdf(hourly_name)
if is_new and exists(f'data/{monthly_name}'):
new_years.append(year)
# Bridge & Tunnel PDFs (2011–present)
err('=== Bridge & Tunnel PDFs ===')
for bt_year in years:
bt_name = f'traffic-e-zpass-usage-{bt_year}.pdf'
update_pdf(bt_name, base_url=BT_BASE_URL)
# Create pipeline stages for any newly imported years
for y in new_years:
ensure_year_pipeline(y)
if git_has_staged_changes():
# Determine the latest month from the most recent monthly PDF
last_pdf_year = max(years)
monthly_pdf_path = monthly_pdf(last_pdf_year)
if exists(monthly_pdf_path) and getsize(monthly_pdf_path) > 0:
n_pages = pdf_pages(monthly_pdf_path)
updated_month = n_pages - 1
if updated_month > 0:
ym_str = f'{last_pdf_year}{updated_month:02d}'
else:
ym_str = f'{last_pdf_year}'
else:
ym_str = f'{last_pdf_year}'
if commit > 0:
run('git', 'commit', '-m', f'Update PATH data PDFs ({ym_str})')
if commit > 1:
run('git', 'push')
else:
err("No updated PDFs found")