Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,42 @@ human_comparison.py

Used to compare model forecasts against human forecasts.

## Historical-replay mode (benchmarking against human forecasters)

When benchmarking the pipeline against human forecasters on past questions,
the model must not be allowed to see sources that didn't exist (or contained
different content) at the time the human forecasted. Historical-replay mode
enforces this by reading a single per-question field, `ForecastQuestion.as_of_date`:

- When `as_of_date` is `None` (default), the pipeline behaves exactly as in
live mode. No code paths change.
- When `as_of_date` is set, the search backend receives `end_date=as_of_date`,
the cache key incorporates the cutoff, post-retrieval filtering drops any
result dated after the cutoff (and any undated result whose date cannot be
cheaply recovered), dashboard URLs are rewritten to the closest Wayback
snapshot at or before the cutoff (or suppressed if none exists), and the
extraction stage fetches from Wayback. Wayback fallback to live is logged
at INFO and recorded in `Document.fetch_strategy`, never silent.

The LLM "historical roleplay" prompt is *not* automatically enabled by
`as_of_date`; it lives behind a separate `historical_roleplay=True` flag on
`SearchStagePipeline` because its effect on query quality is harder to
predict. Turn it on for the benchmark and off for production.

What this mode does NOT fix: the LLMs themselves were trained on data that
postdates many of our benchmark questions. Retrieval fairness ≠ model
fairness. The `retrieval_free_baseline_forecast` metric in
`bioscancast/stages/eval_stage/contamination.py` reports how well the LLM
forecasts with no evidence at all; a small gap between that and the full
pipeline is itself evidence of training-data leakage and must be reported
alongside the headline Brier/log scores.

`filter_caught_contamination_rate` is also exposed by the same module. It
is a **lower bound** on contamination — it only counts post-cutoff results
whose `published_date` is known. Undated results and results whose content
changed post-cutoff are invisible to it. Reports MUST surface this caveat;
the metric's docstring repeats it for the same reason.

---

# Datasets
Expand Down
68 changes: 61 additions & 7 deletions bioscancast/extraction/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from curl_cffi import requests as curl_requests

from bioscancast.stages.search_stage.wayback import closest_snapshot_before

from .config import ExtractionConfig

logger = logging.getLogger(__name__)
Expand All @@ -25,6 +27,8 @@ class FetchResult:
content_bytes: Optional[bytes]
fetched_at: datetime
error: Optional[str]
fetch_strategy: str = "live"
snapshot_timestamp: Optional[datetime] = None


def _sniff_content_type(content: bytes) -> Optional[str]:
Expand All @@ -51,22 +55,72 @@ def fetch(
url: str,
*,
config: ExtractionConfig | None = None,
as_of_date: Optional[datetime] = None,
) -> FetchResult:
"""Fetch a URL and return the result. Never raises on network errors.

Uses curl_cffi with a browser TLS fingerprint (configurable via
ExtractionConfig.impersonate) to avoid Cloudflare/JA3-based blocks that
reject httpx and requests. The impersonation profile sets a matching
User-Agent automatically.

Historical-replay mode: when ``as_of_date`` is set the function first
asks Wayback for the closest capture at-or-before that date and fetches
the raw snapshot bytes via the ``id_`` modifier. The returned FetchResult
carries ``fetch_strategy="wayback"`` and ``snapshot_timestamp`` set to
the capture time. If no snapshot exists, or the Wayback fetch errors,
we fall back to a live fetch and tag the result
``fetch_strategy="wayback_fallback_to_live"`` so audit reports can see
the leak. The fallback is logged at INFO — never silent.
"""
if as_of_date is not None:
snapshot = closest_snapshot_before(url, as_of_date)
if snapshot is not None:
snapshot_dt, snapshot_url = snapshot
wb_result = _fetch_via_curl(
target_url=snapshot_url,
reported_url=url,
config=config,
)
if wb_result.error is None and wb_result.content_bytes is not None:
wb_result.fetch_strategy = "wayback"
wb_result.snapshot_timestamp = snapshot_dt
return wb_result
logger.info(
"Wayback fetch failed for %s (snapshot %s, error=%s); "
"falling back to live",
url, snapshot_dt.isoformat(), wb_result.error,
)
else:
logger.info(
"No Wayback snapshot for %s at-or-before %s; falling back to live",
url, as_of_date.isoformat(),
)
live_result = _fetch_via_curl(target_url=url, reported_url=url, config=config)
live_result.fetch_strategy = "wayback_fallback_to_live"
return live_result

return _fetch_via_curl(target_url=url, reported_url=url, config=config)


def _fetch_via_curl(
*,
target_url: str,
reported_url: str,
config: ExtractionConfig | None,
) -> FetchResult:
"""Issue the actual HTTP GET. ``target_url`` is what we hit (may be a
Wayback ``id_`` URL); ``reported_url`` is what we record in
``FetchResult.url`` so downstream consumers see the original publisher
URL, not archive.org."""
cfg = config or ExtractionConfig()
fetched_at = datetime.now(timezone.utc)

try:
# curl_cffi's streaming Response is not a context manager in the
# installed version, so we close it explicitly in a finally block.
response = curl_requests.get(
url,
target_url,
stream=True,
timeout=cfg.fetch_timeout_seconds,
impersonate=cfg.impersonate,
Expand All @@ -76,7 +130,7 @@ def fetch(
content_length = response.headers.get("content-length")
if content_length and int(content_length) > cfg.fetch_max_bytes:
return FetchResult(
url=url,
url=reported_url,
final_url=str(response.url),
status_code=response.status_code,
content_type=_normalize_content_type(
Expand All @@ -95,7 +149,7 @@ def fetch(
total += len(chunk)
if total > cfg.fetch_max_bytes:
return FetchResult(
url=url,
url=reported_url,
final_url=str(response.url),
status_code=response.status_code,
content_type=_normalize_content_type(
Expand All @@ -118,7 +172,7 @@ def fetch(
raw_ct = _sniff_content_type(content_bytes) or raw_ct

return FetchResult(
url=url,
url=reported_url,
final_url=str(response.url),
status_code=response.status_code,
content_type=raw_ct,
Expand All @@ -130,10 +184,10 @@ def fetch(
response.close()

except Exception as exc:
logger.warning("Fetch failed for %s: %s", url, exc)
logger.warning("Fetch failed for %s: %s", target_url, exc)
return FetchResult(
url=url,
final_url=url,
url=reported_url,
final_url=reported_url,
status_code=None,
content_type=None,
content_bytes=None,
Expand Down
28 changes: 25 additions & 3 deletions bioscancast/extraction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@


class ExtractionPipeline:
"""Orchestrates document fetching, parsing, and chunk normalization."""
"""Orchestrates document fetching, parsing, and chunk normalization.

def __init__(self, *, config: ExtractionConfig | None = None) -> None:
``as_of_date`` opts the fetcher into Wayback-rewrite mode. See
``bioscancast.extraction.fetcher.fetch`` for the strategy semantics
(live / wayback / wayback_fallback_to_live). The resulting strategy
and snapshot timestamp are copied onto each Document for audit.
"""

def __init__(
self,
*,
config: ExtractionConfig | None = None,
as_of_date: Optional[datetime] = None,
) -> None:
self._config = config or ExtractionConfig()
self._as_of_date = as_of_date
self._parsers = get_parsers(pdf_max_pages=self._config.pdf_max_pages)
# Lazily constructed on first PDF that reaches the refiner step.
self._docling_refiner = None
Expand Down Expand Up @@ -54,7 +66,11 @@ def extract_one(self, filtered_doc: FilteredDocument) -> Document:
doc_id = f"doc-{filtered_doc.result_id}"

# Step 1: Fetch
fetch_result = fetch(filtered_doc.url, config=self._config)
fetch_result = fetch(
filtered_doc.url,
config=self._config,
as_of_date=self._as_of_date,
)

if fetch_result.error or fetch_result.content_bytes is None:
return self._make_failed_document(
Expand Down Expand Up @@ -169,6 +185,9 @@ def extract_one(self, filtered_doc: FilteredDocument) -> Document:
chunks=chunks,
extracted_tables=extracted_tables,
extracted_dates=extracted_dates,
fetch_strategy=fetch_result.fetch_strategy,
snapshot_timestamp=fetch_result.snapshot_timestamp,
cutoff_applied=self._as_of_date,
)

def _get_docling_refiner(self):
Expand Down Expand Up @@ -212,6 +231,9 @@ def _make_failed_document(
error_message=error,
http_status=fetch_result.status_code if fetch_result else None,
content_type=fetch_result.content_type if fetch_result else None,
fetch_strategy=fetch_result.fetch_strategy if fetch_result else "live",
snapshot_timestamp=fetch_result.snapshot_timestamp if fetch_result else None,
cutoff_applied=self._as_of_date,
)

def _build_chunks(
Expand Down
16 changes: 16 additions & 0 deletions bioscancast/filtering/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ class ForecastQuestion:
pathogen: Optional[str] = None
event_type: Optional[str] = None
resolution_criteria: Optional[str] = None
# Historical-replay cutoff. When None (default), the pipeline runs in live
# mode and uses datetime.now() everywhere. When set, every cutoff-sensitive
# module (freshness scoring, search backend date filter, cache key,
# post-retrieval filter, dashboard Wayback rewrite, extraction Wayback
# rewrite, optional decomposition roleplay) treats this as "now" so the
# model sees only what a human forecaster could have seen at this moment.
as_of_date: Optional[datetime] = None


@dataclass
Expand Down Expand Up @@ -43,6 +50,15 @@ class SearchResult:
retrieval_reason: Optional[str] = None
contains_aggregator_forecast: bool = False
search_stage_score: float = 0.0
# Provenance for the date used to evaluate the historical-mode cutoff.
# One of: "backend" (Tavily/Google returned a date), "url_slug",
# "last_modified", "wayback_first_seen", "wayback_snapshot" (for dashboards
# rewritten to Wayback), or None (live mode, or date came from the backend
# in a way that didn't go through the recovery chain).
published_date_source: Optional[str] = None
# The as_of_date that was applied when this result was produced, copied
# off the ForecastQuestion. None in live mode. Useful for post-hoc audits.
cutoff_applied: Optional[datetime] = None


@dataclass
Expand Down
10 changes: 10 additions & 0 deletions bioscancast/schemas/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,13 @@ class Document:

extracted_dates: List[str] = field(default_factory=list)
"""Date strings found anywhere in the document, preserved as-is."""

# ---- historical-replay provenance ----
fetch_strategy: str = "live"
"""How the bytes were obtained: 'live', 'wayback', or 'wayback_fallback_to_live'."""

snapshot_timestamp: Optional[datetime] = None
"""Wayback capture timestamp when fetch_strategy == 'wayback'. None otherwise."""

cutoff_applied: Optional[datetime] = None
"""The as_of_date that was active when this document was fetched. None in live mode."""
Loading