From 9ea8ba319d9dc489e8ab3a2feb2fe905f7f3191d Mon Sep 17 00:00:00 2001 From: Jusii Date: Wed, 13 May 2026 15:57:35 +0300 Subject: [PATCH] Pipeline post-download work off the cycle's critical path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The sync worker used to run GPS extract + dashcam delete + mark_done inline on the same executor thread as the download. With Wi-Fi already saturated at N=1, every second spent on the post-download tail was a second of idle Wi-Fi between files. Add an opt-out PIPELINE_POST_DOWNLOAD setting (default on). When on, the tail runs on a dedicated single-thread executor so the worker can pick up the next pending file and start its download immediately. Validated on an A329: 4-7s of tail wall-clock per file moves off the critical path, saving ~6s × N files per cycle. Per-stage timing columns on download_queue (download_started_at, download_finished_at, tail_submitted_at, tail_finished_at) record ms timestamps so A/B benchmarking can attribute cycle time. DEBUG-level per-step logs cover the same ground for ad-hoc inspection. Test coverage: 4 new tests in test_sync_worker_pipeline.py pinning the tail-executor thread name, the inline fallback for A/B, end-of-cycle tail draining, and timing column population. --- tests/test_sync_worker_pipeline.py | 380 +++++++++++++++++++++++++++++ web/db.py | 12 + web/services/sync_worker.py | 304 +++++++++++++++++------ web/settings.py | 2 + web/settings_schema.py | 4 +- 5 files changed, 631 insertions(+), 71 deletions(-) create mode 100644 tests/test_sync_worker_pipeline.py diff --git a/tests/test_sync_worker_pipeline.py b/tests/test_sync_worker_pipeline.py new file mode 100644 index 0000000..d0e6798 --- /dev/null +++ b/tests/test_sync_worker_pipeline.py @@ -0,0 +1,380 @@ +"""Tests for the post-download pipeline refactor. + +Background: v1 of ``_download_one`` ran the post-download tail +(GPS extract → dashcam delete → mark_done) inline on the same +executor thread as the download. With Wi-Fi already saturated at +N=1, the only way to compress cycle wall-clock is to hand the +tail to a separate executor and let the worker immediately start +the next file's download. + +These tests pin: + +* ``pipeline_post_download=True`` actually dispatches the tail to + the named ``viofo-tail`` thread rather than running it inline. +* ``pipeline_post_download=False`` preserves legacy inline behaviour + (used for A/B benchmarking against the camera). +* The cycle awaits all in-flight tails before returning so the + post-cycle ``scanner.scan`` sees every sidecar on disk. +* Per-stage timing columns get populated for both paths. +""" +from __future__ import annotations + +import datetime as _dt +import threading +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from web.db import Database +from web.services import sync_worker as sw +from web.services.hub import Hub +from web.services.sync_worker import SyncWorker + +# ---------------------------------------------------------------- helpers + + +class _Rec: + """Minimal Recording stand-in for listing reconcile.""" + + def __init__( + self, filename: str, *, + filepath: str = "/DCIM/Movie", + size: int = 1000, + ) -> None: + self.filename = filename + self.filepath = filepath + self.size = size + self.datetime = _dt.datetime(2026, 5, 8, 12, 0, 0) + + +def _make_snap( + *, + pipeline_post_download: bool = True, + gps_extract: bool = False, + delete_after_download: bool = False, + recordings: str = "/tmp", +): + """Build a Snapshot stand-in covering only the fields the + sync worker actually reads.""" + snap = MagicMock() + snap.address = "192.168.1.230" + snap.use_html_listing = True + snap.grouping = "daily" + snap.recordings = recordings + snap.sync_ro_only = False + snap.pipeline_post_download = pipeline_post_download + snap.gps_extract = gps_extract + snap.delete_after_download = delete_after_download + snap.download_attempts = 1 + snap.max_attempts = 1 + snap.timeout = 5.0 + return snap + + +@pytest.fixture +def db(tmp_path: Path) -> Database: + return Database(str(tmp_path / ".viofosync.db")) + + +def _build_worker( + db: Database, + snap, + *, + recordings_dir: Path, + create_tail_executor: bool = True, +) -> SyncWorker: + """Spin up a SyncWorker wired to a real DB but with the + provider, hub broadcasts, and tail executor managed + in-test (we don't want to run the asyncio loop).""" + provider = MagicMock() + provider.get.return_value = snap + hub = Hub() + worker = SyncWorker(db, provider, hub) + snap.recordings = str(recordings_dir) + if create_tail_executor: + worker._tail_executor = ThreadPoolExecutor( + max_workers=1, thread_name_prefix="viofo-tail", + ) + return worker + + +def _shutdown(worker: SyncWorker) -> None: + if worker._tail_executor is not None: + worker._tail_executor.shutdown(wait=True, cancel_futures=False) + worker._tail_executor = None + + +def _queue_rows(db: Database): + with db.conn() as c: + return [ + dict(r) for r in c.execute( + "SELECT * FROM download_queue ORDER BY filename" + ).fetchall() + ] + + +def _make_fakes( + *, + listings: list[list[_Rec]], + download_payload: bytes = b"x" * 1024, + gps_calls: list | None = None, + delete_calls: list | None = None, +): + """Build the four fakes the worker calls into during a cycle. + The caller patches them onto ``vfs`` / ``SyncWorker`` itself.""" + listings_iter = iter(listings) + + def fake_fetch_listing(self): # noqa: ARG001 — bound-method shape + try: + return next(listings_iter) + except StopIteration: + return [] + + def fake_download_file_with( + base, rec, dest_root, group_name, + *, + progress_sink=None, # noqa: ARG001 + cancel_check=None, # noqa: ARG001 + max_attempts=None, # noqa: ARG001 + socket_timeout=None, # noqa: ARG001 + ): + from viofosync_lib import get_filepath + + dest = Path(get_filepath(dest_root, group_name, rec.filename)) + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(download_payload) + return True, "1.0 MB/s" + + def fake_extract_gps_data(path): + thread_name = threading.current_thread().name + if gps_calls is not None: + gps_calls.append({"path": path, "thread": thread_name}) + + def fake_delete_dashcam_file(base_url, source_dir, filename, **kwargs): # noqa: ARG001 + if delete_calls is not None: + delete_calls.append(filename) + return True + + return ( + fake_fetch_listing, + fake_download_file_with, + fake_extract_gps_data, + fake_delete_dashcam_file, + ) + + +# ---------------------------------------------------------------- tests + + +async def test_pipeline_on_runs_tail_on_dedicated_thread( + db: Database, tmp_path: Path, +) -> None: + """With pipelining enabled, GPS extract + dashcam delete + + mark_done execute on the ``viofo-tail`` thread, not on the + asyncio event loop's default executor thread.""" + snap = _make_snap( + pipeline_post_download=True, + gps_extract=True, + delete_after_download=True, + ) + worker = _build_worker(db, snap, recordings_dir=tmp_path) + + gps_calls: list = [] + delete_calls: list = [] + ( + fake_fetch, fake_download, fake_gps, fake_delete, + ) = _make_fakes( + listings=[[_Rec("A.MP4")]], + gps_calls=gps_calls, + delete_calls=delete_calls, + ) + + try: + with patch.object(SyncWorker, "_probe", return_value=True), \ + patch.object( + SyncWorker, "_fetch_listing", fake_fetch + ), \ + patch.object( + SyncWorker, "_present_filenames", return_value=[] + ), \ + patch.object(sw.vfs, "download_file_with", fake_download), \ + patch.object(sw.vfs, "extract_gps_data", fake_gps), \ + patch.object(sw.vfs, "delete_dashcam_file", fake_delete), \ + patch.object(sw.scanner, "scan", return_value=None), \ + patch.object( + sw.scanner, "sweep_missing_thumbs", return_value=None + ), \ + patch("web.services.retention.sweep", return_value=None): + did_any = await worker._cycle() + finally: + _shutdown(worker) + + assert did_any is True + rows = _queue_rows(db) + assert len(rows) == 1 + assert rows[0]["state"] == "done" + # Tail work landed on the dedicated executor thread. + assert len(gps_calls) == 1 + assert gps_calls[0]["thread"].startswith("viofo-tail"), \ + f"expected viofo-tail thread, got {gps_calls[0]['thread']!r}" + assert delete_calls == ["A.MP4"] + + +async def test_pipeline_off_runs_tail_inline( + db: Database, tmp_path: Path, +) -> None: + """With pipelining off, the tail runs on the same thread as + the download (the asyncio default executor) — used by the + A/B benchmark to compare against the legacy timing.""" + snap = _make_snap( + pipeline_post_download=False, + gps_extract=True, + ) + worker = _build_worker(db, snap, recordings_dir=tmp_path) + + gps_calls: list = [] + ( + fake_fetch, fake_download, fake_gps, fake_delete, + ) = _make_fakes( + listings=[[_Rec("B.MP4")]], + gps_calls=gps_calls, + ) + + try: + with patch.object(SyncWorker, "_probe", return_value=True), \ + patch.object(SyncWorker, "_fetch_listing", fake_fetch), \ + patch.object( + SyncWorker, "_present_filenames", return_value=[] + ), \ + patch.object(sw.vfs, "download_file_with", fake_download), \ + patch.object(sw.vfs, "extract_gps_data", fake_gps), \ + patch.object(sw.vfs, "delete_dashcam_file", fake_delete), \ + patch.object(sw.scanner, "scan", return_value=None), \ + patch.object( + sw.scanner, "sweep_missing_thumbs", return_value=None + ), \ + patch("web.services.retention.sweep", return_value=None): + await worker._cycle() + finally: + _shutdown(worker) + + rows = _queue_rows(db) + assert rows[0]["state"] == "done" + assert len(gps_calls) == 1 + # Inline tail runs on whatever thread the asyncio default + # executor used for the download — explicitly NOT on + # viofo-tail. + assert not gps_calls[0]["thread"].startswith("viofo-tail") + + +async def test_cycle_awaits_tail_before_returning( + db: Database, tmp_path: Path, +) -> None: + """The post-cycle ``scanner.scan`` relies on every sidecar + landing on disk before it runs. If the cycle returned while + a tail was still extracting GPS, ``clip_index.has_gpx`` + could miss the freshly-downloaded file. The cycle awaits.""" + snap = _make_snap( + pipeline_post_download=True, + gps_extract=True, + ) + worker = _build_worker(db, snap, recordings_dir=tmp_path) + + gate = threading.Event() + + def slow_gps(path): + # Block until released by the test, then mark done. + gate.wait(timeout=5.0) + + ( + fake_fetch, fake_download, _, fake_delete, + ) = _make_fakes( + listings=[[_Rec("C.MP4")]], + ) + + async def run_cycle(): + with patch.object(SyncWorker, "_probe", return_value=True), \ + patch.object(SyncWorker, "_fetch_listing", fake_fetch), \ + patch.object( + SyncWorker, "_present_filenames", return_value=[] + ), \ + patch.object(sw.vfs, "download_file_with", fake_download), \ + patch.object(sw.vfs, "extract_gps_data", slow_gps), \ + patch.object(sw.vfs, "delete_dashcam_file", fake_delete), \ + patch.object(sw.scanner, "scan", return_value=None), \ + patch.object( + sw.scanner, "sweep_missing_thumbs", return_value=None + ), \ + patch("web.services.retention.sweep", return_value=None): + return await worker._cycle() + + import asyncio + + try: + cycle_task = asyncio.create_task(run_cycle()) + # The download finishes near-instantly, but the tail is + # blocked on ``gate``. Give the worker a moment to submit + # to the tail executor, then assert the cycle hasn't + # returned yet. + await asyncio.sleep(0.1) + assert not cycle_task.done(), \ + "cycle returned before tail had a chance to start" + # Release the tail and let the cycle finish. + gate.set() + await asyncio.wait_for(cycle_task, timeout=5.0) + # mark_done must have happened — it lives at the end of + # the tail, which only fires after the gate is released. + assert _queue_rows(db)[0]["state"] == "done" + finally: + gate.set() + _shutdown(worker) + + +async def test_timing_columns_populated( + db: Database, tmp_path: Path, +) -> None: + """The A/B benchmark reads four per-stage timestamps from the + queue row. Verify each one gets a non-null value on the happy + path.""" + snap = _make_snap(pipeline_post_download=True) + worker = _build_worker(db, snap, recordings_dir=tmp_path) + + ( + fake_fetch, fake_download, fake_gps, fake_delete, + ) = _make_fakes( + listings=[[_Rec("D.MP4")]], + ) + + try: + with patch.object(SyncWorker, "_probe", return_value=True), \ + patch.object(SyncWorker, "_fetch_listing", fake_fetch), \ + patch.object( + SyncWorker, "_present_filenames", return_value=[] + ), \ + patch.object(sw.vfs, "download_file_with", fake_download), \ + patch.object(sw.vfs, "extract_gps_data", fake_gps), \ + patch.object(sw.vfs, "delete_dashcam_file", fake_delete), \ + patch.object(sw.scanner, "scan", return_value=None), \ + patch.object( + sw.scanner, "sweep_missing_thumbs", return_value=None + ), \ + patch("web.services.retention.sweep", return_value=None): + await worker._cycle() + finally: + _shutdown(worker) + + row = _queue_rows(db)[0] + for col in ( + "download_started_at", + "download_finished_at", + "tail_submitted_at", + "tail_finished_at", + ): + assert row[col] is not None, f"{col} should be set" + assert row["download_started_at"] <= row["download_finished_at"] + assert row["download_finished_at"] <= row["tail_submitted_at"] + assert row["tail_submitted_at"] <= row["tail_finished_at"] + + diff --git a/web/db.py b/web/db.py index 31915ac..d4a69c1 100644 --- a/web/db.py +++ b/web/db.py @@ -141,6 +141,18 @@ def _add_column(table: str, col: str, ddl: str) -> None: "WHERE has_gpx = 1 AND gps_examined = 0" ) + # Per-stage timings used to attribute cycle wall-clock to + # the download (Wi-Fi bound) vs. the post-download tail + # (GPS extract + dashcam delete + DB writes). Recorded in + # ms since the epoch; nullable so legacy rows survive. + for col in ( + "download_started_at", + "download_finished_at", + "tail_submitted_at", + "tail_finished_at", + ): + _add_column("download_queue", col, "INTEGER") + @contextmanager def conn(self) -> Iterator[sqlite3.Connection]: """Yield a connection with row-factory set. diff --git a/web/services/sync_worker.py b/web/services/sync_worker.py index 939798f..1a8d7c9 100644 --- a/web/services/sync_worker.py +++ b/web/services/sync_worker.py @@ -22,6 +22,7 @@ from __future__ import annotations import asyncio +import concurrent.futures import logging import os import socket @@ -29,6 +30,7 @@ import time import urllib.error import urllib.request +from concurrent.futures import Future, ThreadPoolExecutor from typing import Optional import viofosync_lib as vfs @@ -44,6 +46,12 @@ BACKOFF_STEPS = [10, 30, 120, 600] # seconds +def _now_ms() -> int: + """Current time in milliseconds since the epoch. Used for + the per-stage timing columns the A/B benchmarking reads.""" + return int(time.time() * 1000) + + def _filter_ro_only(listing): """Yield only Recordings whose dashcam source path lies under /RO/. Used when the user has 'Sync read-only files only' on.""" @@ -278,6 +286,12 @@ def __init__( self._loop: Optional[asyncio.AbstractEventLoop] = None self._running_cycle = False self._current_filename: Optional[str] = None + # Single-thread executor for post-download work (GPS + # extract, dashcam delete, mark_done). Strict FIFO keeps + # write ordering simple and avoids disk contention from + # parallel MP4 atom parses. Lazily created in start(). + self._tail_executor: Optional[ThreadPoolExecutor] = None + self._tail_futures: set[Future] = set() # ---- lifecycle ---- @@ -299,6 +313,11 @@ def start(self) -> None: "call bind_loop() during app startup" ) self._stop.clear() + if self._tail_executor is None: + self._tail_executor = ThreadPoolExecutor( + max_workers=1, + thread_name_prefix="viofo-tail", + ) # Schedule the coroutine onto the captured loop — works # both from the loop thread and from threadpool handlers. self._task = asyncio.run_coroutine_threadsafe( @@ -319,7 +338,6 @@ async def stop(self) -> None: if self._task is not None: # self._task may be an asyncio.Task or a # concurrent.futures.Future — wrap uniformly. - import concurrent.futures try: if isinstance(self._task, concurrent.futures.Future): await asyncio.wrap_future(self._task) @@ -330,6 +348,17 @@ async def stop(self) -> None: self._task.cancel() except Exception: pass + # Drain the tail executor: let in-flight GPS extracts / + # dashcam deletes finish so we don't leave half-written + # .gpx sidecars on disk. The executor is short-lived + # work; this should complete in <2 s typically. + if self._tail_executor is not None: + ex = self._tail_executor + self._tail_executor = None + try: + ex.shutdown(wait=True, cancel_futures=False) + except Exception: # pragma: no cover + log.exception("tail executor shutdown failed") def kick(self) -> None: """Trigger an immediate cycle (e.g. user clicked Start @@ -478,6 +507,7 @@ async def _refresh_listing_and_reconcile(self) -> bool: return True async def _cycle(self) -> bool: + cycle_start = time.monotonic() reachable = await self._probe() await self.hub.broadcast({ "type": "dashcam_online" if reachable else "dashcam_offline", @@ -499,6 +529,7 @@ async def _cycle(self) -> bool: # loop re-checks ``next_pending`` so a priority update # mid-cycle takes effect immediately. did_any = False + drained = 0 while not self._stop.is_set(): if self._paused.is_set(): break @@ -524,6 +555,7 @@ async def _cycle(self) -> bool: # Transient failure. Loop continues with next # pending item, which may well succeed. continue + drained += 1 # Refresh listing between downloads so clips the # dashcam recorded during this transfer show up in # the queue before we pick the next pending one. @@ -531,6 +563,12 @@ async def _cycle(self) -> bool: # leaves the existing queue intact. await self._refresh_listing_and_reconcile() + # Let the tail executor finish before the post-cycle scan: + # ``scanner.scan`` reads has_gpx off disk to set clip_index + # flags, and the dashcam-delete sink events shouldn't lag + # past sync_done. + await self._await_tails() + # Re-index + sweep thumbs so new clips appear in the UI. # Both calls are idempotent; the did_any gate is just to # skip the directory walk when nothing changed. @@ -562,6 +600,12 @@ async def _cycle(self) -> bool: "ok": True, "queue": q.list_all(self.db, limit=200), }) + cycle_duration = time.monotonic() - cycle_start + log.info( + "cycle done: drained=%d duration=%.1fs pipeline=%s", + drained, cycle_duration, + self._provider.get().pipeline_post_download, + ) return did_any def _fetch_listing(self): @@ -583,35 +627,49 @@ def _present_filenames(self): # ---- single item download ---- async def _download_one(self, item: q.QueueItem) -> bool: + """Download one queued file and hand its post-download + tail (GPS extract → dashcam delete → mark_done) to the + tail executor when ``pipeline_post_download`` is on. + + The download itself stays N=1 because the dashcam's Wi-Fi + is the wall — but the tail used to block the worker from + starting the next download. Moving it off the critical + path means file N+1's bytes start flowing while file N's + sidecar is still being parsed. + + When ``pipeline_post_download`` is off, the tail runs + inline on the same executor thread, restoring legacy + behaviour for A/B benchmarking. + """ snap = self._provider.get() q.mark_downloading(self.db, item.id) self._cancel_current.clear() loop = asyncio.get_running_loop() sink = WebSink(self.hub, loop) + base = f"http://{snap.address}" - def _blocking(): - """Runs on an executor thread. Synthesises the - Recording tuple ``download_file`` expects.""" - import datetime as _dt - # get_group_name wants a datetime; if the queue row - # didn't capture one, now() is a safe fallback. - recorded = ( - _dt.datetime.fromtimestamp(item.recorded_at) - if item.recorded_at - else _dt.datetime.now() - ) - group_name = vfs.get_group_name( - recorded, snap.grouping - ) - rec = vfs.Recording( - filename=item.filename, - filepath=item.source_dir, - size=item.remote_size, - timecode=None, - datetime=recorded, - attr=None, - ) - base = f"http://{snap.address}" + import datetime as _dt + # get_group_name wants a datetime; if the queue row + # didn't capture one, now() is a safe fallback. + recorded = ( + _dt.datetime.fromtimestamp(item.recorded_at) + if item.recorded_at + else _dt.datetime.now() + ) + group_name = vfs.get_group_name(recorded, snap.grouping) + rec = vfs.Recording( + filename=item.filename, + filepath=item.source_dir, + size=item.remote_size, + timecode=None, + datetime=recorded, + attr=None, + ) + + def _blocking_download(): + """Run on the default executor thread. Returns + (ok, dest_path, err). ``dest_path`` is only valid + when ``ok`` is True.""" try: ok, _ = vfs.download_file_with( base, rec, snap.recordings, @@ -621,55 +679,161 @@ def _blocking(): max_attempts=snap.download_attempts, socket_timeout=snap.timeout, ) - # download_file_with() doesn't pull GPX, so the worker - # has to do it here when the setting is on. - if ok: - dest_path = vfs.get_filepath( - snap.recordings, group_name, item.filename, - ) - # Adopt the actual byte count as the queue's - # remote_size (the HTML listing rounds to MB). - _refresh_queue_size(self.db, item, dest_path) - if snap.gps_extract: - try: - vfs.extract_gps_data(dest_path) - except Exception as e: - # Clips recorded without GPS lock have no - # track to extract; not a download failure. - log.info( - "gpx extract failed for %s: %s", - item.filename, e, - ) - _maybe_delete_from_dashcam( - item=item, - dest_path=dest_path, - delete_enabled=snap.delete_after_download, - base_url=base, - sink=sink, - ) - return ok, None + if not ok: + return False, None, None + dest_path = vfs.get_filepath( + snap.recordings, group_name, item.filename, + ) + return True, dest_path, None except Exception as e: - return False, str(e) + return False, None, str(e) + + self._set_timing(item.id, download_started_at=_now_ms()) + ok, dest_path, err = await loop.run_in_executor( + None, _blocking_download + ) + self._set_timing(item.id, download_finished_at=_now_ms()) + + if not ok: + new_state = q.mark_transient_failure( + self.db, + item.id, + err or "unknown", + snap.max_attempts, + ) + await self.hub.broadcast({ + "type": "item_state_change", + "filename": item.filename, + "state": new_state, + "error": err, + }) + return False + + # Download succeeded. Run the post-download tail either + # on a dedicated executor (pipelined: worker returns now + # and starts the next download immediately) or inline + # (legacy timing for the A/B comparison). + if (snap.pipeline_post_download + and self._tail_executor is not None): + self._set_timing(item.id, tail_submitted_at=_now_ms()) + fut = self._tail_executor.submit( + self._run_tail, item, dest_path, snap, sink, + ) + self._tail_futures.add(fut) + fut.add_done_callback(self._tail_futures.discard) + else: + self._set_timing(item.id, tail_submitted_at=_now_ms()) + self._run_tail(item, dest_path, snap, sink) + return True - ok, err = await loop.run_in_executor(None, _blocking) + # ---- tail stage ---- - if ok: + def _run_tail( + self, + item: q.QueueItem, + dest_path: str, + snap, + sink: "WebSink", + ) -> None: + """Post-download work for one file. Runs either on the + tail executor or inline; either way, the download itself + has already succeeded by the time we get here, so a + failure in any step here logs but never re-queues the + download (it's on disk; a re-download would waste Wi-Fi). + """ + t_start = time.perf_counter() + log.debug("tail begin: %s", item.filename) + try: + try: + _refresh_queue_size(self.db, item, dest_path) + except Exception: # pragma: no cover — DB hiccup + log.exception( + "refresh_queue_size failed for %s", + item.filename, + ) + t_rqs = time.perf_counter() + log.debug( + "tail: refresh_queue_size done in %.2fs (%s)", + t_rqs - t_start, item.filename, + ) + if snap.gps_extract: + try: + vfs.extract_gps_data(dest_path) + except Exception as e: + # Clips recorded without GPS lock have no + # track to extract; not a download failure. + log.info( + "gpx extract failed for %s: %s", + item.filename, e, + ) + t_gps = time.perf_counter() + log.debug( + "tail: gps done in %.2fs (%s)", + t_gps - t_rqs, item.filename, + ) + _maybe_delete_from_dashcam( + item=item, + dest_path=dest_path, + delete_enabled=snap.delete_after_download, + base_url=f"http://{snap.address}", + sink=sink, + ) + t_del = time.perf_counter() + log.debug( + "tail: dashcam_delete done in %.2fs (%s)", + t_del - t_gps, item.filename, + ) q.mark_done(self.db, item.id) - return True + log.debug( + "tail: mark_done done in %.2fs (%s)", + time.perf_counter() - t_del, item.filename, + ) + except Exception: + log.exception( + "post-download tail unexpected failure for %s", + item.filename, + ) + finally: + try: + self._set_timing( + item.id, + tail_finished_at=_now_ms(), + ) + except Exception: # pragma: no cover + pass + log.debug( + "tail end: %s total=%.2fs", + item.filename, time.perf_counter() - t_start, + ) - new_state = q.mark_transient_failure( - self.db, - item.id, - err or "unknown", - snap.max_attempts, + async def _await_tails(self) -> None: + """Block until every tail submitted during this cycle has + completed. Called at end-of-cycle so post-cycle scans see + every sidecar and queue row updated.""" + if not self._tail_futures: + return + pending = list(self._tail_futures) + await asyncio.gather( + *(asyncio.wrap_future(f) for f in pending), + return_exceptions=True, ) - await self.hub.broadcast({ - "type": "item_state_change", - "filename": item.filename, - "state": new_state, - "error": err, - }) - # If we gave up permanently, keep going; otherwise, - # yield to let the reachability re-probe decide - # whether to continue this cycle. - return False + + def _set_timing(self, item_id: int, **fields: int) -> None: + """Update timing columns on a download_queue row. + + ``fields`` keys must match nullable INTEGER columns added + by the db migration (``download_started_at`` etc.); values + are ms-since-epoch. Used by the A/B benchmarking — failure + here must never escape into the pipeline.""" + if not fields: + return + try: + cols = ", ".join(f"{k}=?" for k in fields) + values = list(fields.values()) + [item_id] + with self.db.write() as c: + c.execute( + f"UPDATE download_queue SET {cols} WHERE id=?", + values, + ) + except Exception: # pragma: no cover — DB hiccup + log.exception("_set_timing failed for %s", item_id) diff --git a/web/settings.py b/web/settings.py index da52a2a..2a15ba1 100644 --- a/web/settings.py +++ b/web/settings.py @@ -52,6 +52,7 @@ class Snapshot: sync_interval_seconds: int enable_scheduled_sync: bool sync_ro_only: bool + pipeline_post_download: bool retention_max_days: int retention_disk_pct: int retention_protect_ro: bool @@ -220,6 +221,7 @@ def _make_snapshot(self, data: dict) -> Snapshot: sync_interval_seconds=m.SYNC_INTERVAL, enable_scheduled_sync=m.ENABLE_SCHEDULED_SYNC, sync_ro_only=m.SYNC_RO_ONLY, + pipeline_post_download=m.PIPELINE_POST_DOWNLOAD, retention_max_days=m.RETENTION_MAX_DAYS, retention_disk_pct=m.RETENTION_DISK_PCT, retention_protect_ro=m.RETENTION_PROTECT_RO, diff --git a/web/settings_schema.py b/web/settings_schema.py index 4893538..e75dc52 100644 --- a/web/settings_schema.py +++ b/web/settings_schema.py @@ -40,6 +40,7 @@ class SettingsModel(BaseModel): SYNC_INTERVAL: int = Field(default=600, ge=60, le=86400) ENABLE_SCHEDULED_SYNC: bool = True SYNC_RO_ONLY: bool = False + PIPELINE_POST_DOWNLOAD: bool = True RETENTION_MAX_DAYS: int = Field(default=0, ge=0, le=3650) RETENTION_DISK_PCT: int = Field(default=0, ge=0, le=99) RETENTION_PROTECT_RO: bool = True @@ -83,7 +84,8 @@ def _validate_email(cls, v: str) -> str: "TIMEOUT", "DOWNLOAD_ATTEMPTS", "MAX_DOWNLOAD_ATTEMPTS", "SYNC_INTERVAL", "ENABLE_SCHEDULED_SYNC", "WEB_HOST", "WEB_PORT", "EXPORT_ENCODER", "NOMINATIM_EMAIL", "GEOCODE_ENABLED", - "SYNC_RO_ONLY", "RETENTION_MAX_DAYS", "RETENTION_DISK_PCT", + "SYNC_RO_ONLY", "PIPELINE_POST_DOWNLOAD", + "RETENTION_MAX_DAYS", "RETENTION_DISK_PCT", "RETENTION_PROTECT_RO", "DISTANCE_UNITS", "PIP_POSITION",