Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3555.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Stabilized deployed integration tests for slow simulation polling and staging cache reuse.
4 changes: 2 additions & 2 deletions tests/data/calculate_us_1_data.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"period": "2023",
"min": 0,
"max": 200000,
"count": 401
"count": 101
}
]
]
Expand All @@ -24,4 +24,4 @@
"2023-01-01.2028-12-31": "101"
}
}
}
}
4 changes: 2 additions & 2 deletions tests/data/calculate_us_2_data.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"period": "2023",
"min": 0,
"max": 200000,
"count": 401
"count": 101
}
]
]
Expand All @@ -24,4 +24,4 @@
"2023-01-01.2028-12-31": "100"
}
}
}
}
57 changes: 56 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import os
import time
import uuid

import httpx
import pytest

INTEGRATION_TIMEOUT_SECONDS = float(
os.environ.get("STAGING_API_TEST_TIMEOUT_SECONDS", "900")
)
INTEGRATION_POLL_INTERVAL_SECONDS = float(
os.environ.get("STAGING_API_TEST_POLL_INTERVAL_SECONDS", "5")
)
TRANSIENT_POLL_STATUS_CODES = {500, 502, 503, 504}


@pytest.fixture
def api_base_url() -> str:
Expand All @@ -22,6 +32,51 @@ def api_client(api_base_url: str):
yield client


def _response_summary(response: httpx.Response) -> str:
return f"HTTP {response.status_code}: {response.text[:500]}"


def _poll_live_endpoint(
api_client: httpx.Client,
path: str,
params: dict,
*,
route_name: str,
) -> dict:
deadline = time.monotonic() + INTEGRATION_TIMEOUT_SECONDS
last_response = None

while True:
try:
response = api_client.get(path, params=params)
except httpx.RequestError as error:
last_response = f"{type(error).__name__}: {error}"
else:
if response.status_code in TRANSIENT_POLL_STATUS_CODES:
last_response = _response_summary(response)
else:
response.raise_for_status()
payload = response.json()

if payload["status"] != "computing":
return payload

last_response = payload

if time.monotonic() >= deadline:
raise AssertionError(
f"Timed out polling the {route_name} route; "
f"last response was {last_response}"
)
time.sleep(INTEGRATION_POLL_INTERVAL_SECONDS)


@pytest.fixture
def poll_live_endpoint():
return _poll_live_endpoint


@pytest.fixture(scope="session")
def integration_probe_id() -> str:
return os.environ.get("STAGING_API_TEST_PROBE_ID", "local-probe")
base_probe_id = os.environ.get("STAGING_API_TEST_PROBE_ID", "local-probe")
return f"{base_probe_id}-{uuid.uuid4().hex[:8]}"
6 changes: 3 additions & 3 deletions tests/integration/test_budget_window_in_flight_dedupe.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import os
from unittest.mock import MagicMock

from flask import Flask

os.environ.setdefault("POLICYENGINE_DB_PASSWORD", "test")


class FakeRedis:
def __init__(self):
Expand Down Expand Up @@ -32,6 +29,9 @@ def _create_client(economy_bp):
def test_budget_window_in_flight_dedupe_uses_existing_batch_without_live_db(
monkeypatch,
):
monkeypatch.setenv("POLICYENGINE_DB_PASSWORD", "test")
monkeypatch.setenv("FLASK_DEBUG", "1")

from policyengine_api.libs.simulation_api_modal import (
ModalBudgetWindowBatchExecution,
)
Expand Down
95 changes: 33 additions & 62 deletions tests/integration/test_live_budget_window_cache.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
import json
import os
import time
from pathlib import Path


INTEGRATION_TIMEOUT_SECONDS = float(
os.environ.get("STAGING_API_TEST_TIMEOUT_SECONDS", "900")
)
INTEGRATION_POLL_INTERVAL_SECONDS = float(
os.environ.get("STAGING_API_TEST_POLL_INTERVAL_SECONDS", "5")
)


def _load_reform_payload(filename: str) -> dict:
return json.loads(
(Path(__file__).resolve().parents[1] / "data" / filename).read_text(
Expand All @@ -20,23 +10,6 @@ def _load_reform_payload(filename: str) -> dict:
)


def _poll_budget_window(api_client, path: str, params: dict) -> dict:
deadline = time.monotonic() + INTEGRATION_TIMEOUT_SECONDS

while True:
response = api_client.get(path, params=params)
response.raise_for_status()
payload = response.json()

if payload["status"] != "computing":
return payload

assert time.monotonic() < deadline, (
f"Timed out polling the budget-window route; last response was {payload}"
)
time.sleep(INTEGRATION_POLL_INTERVAL_SECONDS)


def _get_current_law_id(api_client) -> int:
metadata_response = api_client.get("/us/metadata")
metadata_response.raise_for_status()
Expand All @@ -52,7 +25,11 @@ def _create_utah_reform_policy(api_client) -> int:
return policy_response.json()["result"]["policy_id"]


def test_live_budget_window_completed_result_cache(api_client, integration_probe_id):
def test_live_budget_window_completed_result_cache(
api_client,
integration_probe_id,
poll_live_endpoint,
):
current_law_id = _get_current_law_id(api_client)
policy_id = _create_utah_reform_policy(api_client)

Expand All @@ -64,7 +41,12 @@ def test_live_budget_window_completed_result_cache(api_client, integration_probe
"staging_probe": f"{integration_probe_id}-budget-window-cache",
}

first_payload = _poll_budget_window(api_client, path, params)
first_payload = poll_live_endpoint(
api_client,
path,
params,
route_name="budget-window",
)

assert first_payload["status"] == "ok", first_payload
assert first_payload["progress"] == 100, first_payload
Expand All @@ -82,7 +64,11 @@ def test_live_budget_window_completed_result_cache(api_client, integration_probe
assert second_response.headers["X-PolicyEngine-Budget-Window-Cache"] == "result-hit"


def test_live_budget_window_multi_year_run(api_client, integration_probe_id):
def test_live_budget_window_multi_year_run(
api_client,
integration_probe_id,
poll_live_endpoint,
):
current_law_id = _get_current_law_id(api_client)
policy_id = _create_utah_reform_policy(api_client)

Expand All @@ -94,7 +80,12 @@ def test_live_budget_window_multi_year_run(api_client, integration_probe_id):
"staging_probe": f"{integration_probe_id}-budget-window-multi-year",
}

payload = _poll_budget_window(api_client, path, params)
payload = poll_live_endpoint(
api_client,
path,
params,
route_name="budget-window",
)

assert payload["status"] == "ok", payload
assert payload["progress"] == 100, payload
Expand All @@ -111,7 +102,11 @@ def test_live_budget_window_multi_year_run(api_client, integration_probe_id):
assert result["totals"]["year"] == "Total", payload


def test_live_budget_window_failed_batch_mapping(api_client, integration_probe_id):
def test_live_budget_window_failed_batch_mapping(
api_client,
integration_probe_id,
poll_live_endpoint,
):
current_law_id = _get_current_law_id(api_client)
policy_id = _create_utah_reform_policy(api_client)

Expand All @@ -124,40 +119,16 @@ def test_live_budget_window_failed_batch_mapping(api_client, integration_probe_i
"staging_probe": f"{integration_probe_id}-budget-window-failure",
}

payload = _poll_budget_window(api_client, path, params)
payload = poll_live_endpoint(
api_client,
path,
params,
route_name="budget-window",
)

assert payload["status"] == "error", payload
assert payload["result"] is None, payload
assert payload["error"], payload
assert isinstance(payload["completed_years"], list), payload
assert isinstance(payload["computing_years"], list), payload
assert isinstance(payload["queued_years"], list), payload


def test_live_budget_window_in_flight_dedupe(api_client, integration_probe_id):
current_law_id = _get_current_law_id(api_client)
policy_id = _create_utah_reform_policy(api_client)

path = f"/us/economy/{policy_id}/over/{current_law_id}/budget-window"
params = {
"region": "ut",
"start_year": "2026",
"window_size": 2,
"staging_probe": f"{integration_probe_id}-budget-window-in-flight",
}

first_response = api_client.get(path, params=params)
first_response.raise_for_status()
first_payload = first_response.json()

assert first_payload["status"] == "computing", first_payload
assert first_response.headers["X-PolicyEngine-Budget-Window-Cache"] == "miss"

second_response = api_client.get(path, params=params)
second_response.raise_for_status()
second_payload = second_response.json()

assert second_response.headers["X-PolicyEngine-Budget-Window-Cache"] == (
"batch-id-hit"
)
assert second_payload["status"] in ("computing", "ok"), second_payload
Loading
Loading