From ecd30cbd3937d3d71ca1d7f67dc7fa901ea7f282 Mon Sep 17 00:00:00 2001 From: mikemolinet Date: Fri, 22 May 2026 07:34:16 -0700 Subject: [PATCH] feat(messaging): state-machine invariant + StateTransitionResponse surfaces delivered_at (parity port of PR #923 + #927) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Batched OSS port of two private cueapi fixes: **Finding A** (private cueapi PR #923 mergeCommit 7a29082): state- machine invariant fix. Pre-fix: mark_read on queued msg → delivery_state="read" with delivered_at=NULL (impossible per canonical queued → delivered → read → acked path). Same fingerprint anomaly on mark_acked from queued state. Caused diagnostic confusion in private-side investigations. Fix: mark_read fills delivered_at=now when null; mark_acked dual-null-check fills both delivered_at + read_at when null. Pure- SELECT bypass paths now produce equivalent timestamps; invariant holds across all entry points. Note: cueapi-core has no `bulk_mark_read` (hosted-only per parity- manifest); third setter site from private #923 is omitted from this port. Only mark_read + mark_acked apply on OSS. **cmpg8jokl** (private cueapi PR #927 mergeCommit 34e3506): narrow- response observability follow-on. After Finding A fills delivered_at on /read + /ack, the narrow StateTransitionResponse omitted the field. Consumers couldn't observe the just-set value without a follow-up GET /v1/messages/{id}. Fix (additive only per API Contract Rule): - StateTransitionResponse adds `delivered_at: Optional[datetime] = None` - /read endpoint populates delivered_at - /ack endpoint populates all 3 setter timestamps (delivered_at + read_at + acked_at) symmetric with mark_acked's dual-null-check Tests: - 4 new regression guards in tests/test_messages.py: * test_mark_read_fills_delivered_at_when_null * test_mark_read_preserves_existing_delivered_at * test_mark_acked_fills_delivered_at_and_read_at_when_null * test_mark_acked_preserves_existing_timestamps - 22/22 message tests pass (existing 18 + 4 new) Both private fixes empirically prod-verified before this port: - Finding A T1+T3+T5 via path-d-test-recipient fixture - cmpg8jokl T1+T5 via path-d-test-recipient fixture Targets tag bump to messaging-v1.1.6 (supersedes v1.1.5-hotfix). Downstream consumers (cue.dock.svc et al) should bump pin once tag cuts. Co-Authored-By: Claude Opus 4.7 (1M context) --- app/routers/messages.py | 10 ++ app/schemas/message.py | 12 ++- app/services/message_service.py | 27 ++++- tests/test_messages.py | 172 ++++++++++++++++++++++++++++++++ 4 files changed, 218 insertions(+), 3 deletions(-) diff --git a/app/routers/messages.py b/app/routers/messages.py index 6c6e3af..35563ef 100644 --- a/app/routers/messages.py +++ b/app/routers/messages.py @@ -139,8 +139,12 @@ async def mark_read_endpoint( `expired`). """ msg = await mark_read(db, user, msg_id) + # delivered_at surfaced post state-machine invariant fix + # (cross-port from private cueapi PR #923): mark_read fills it + # when null, so the just-set value is visible without follow-up GET. return StateTransitionResponse( delivery_state=msg.delivery_state, + delivered_at=msg.delivered_at, read_at=msg.read_at, ) @@ -153,7 +157,13 @@ async def mark_acked_endpoint( ): """Acknowledge. Terminal — no further state transitions allowed.""" msg = await mark_acked(db, user, msg_id) + # delivered_at + read_at surfaced post state-machine invariant fix + # (cross-port from private cueapi PR #923): mark_acked's dual-null- + # check fills both when null, so all 3 setter timestamps are + # visible without follow-up GET. return StateTransitionResponse( delivery_state=msg.delivery_state, + delivered_at=msg.delivered_at, + read_at=msg.read_at, acked_at=msg.acked_at, ) diff --git a/app/schemas/message.py b/app/schemas/message.py index 7df74cc..96d0288 100644 --- a/app/schemas/message.py +++ b/app/schemas/message.py @@ -127,8 +127,18 @@ class CountResponse(BaseModel): class StateTransitionResponse(BaseModel): - """Returned by /read and /ack — minimal state-only response.""" + """Returned by /read and /ack — state + all setter timestamps. + + After the state-machine invariant fix (cross-port from private + cueapi PR #923), mark_read fills delivered_at when null and + mark_acked fills both delivered_at and read_at when null. The + narrow response now surfaces delivered_at so consumers can + observe the just-set value without a follow-up GET + /v1/messages/{id} round-trip. Cross-port from private cueapi + PR #927 (cmpg8jokl). + """ delivery_state: str + delivered_at: Optional[datetime] = None read_at: Optional[datetime] = None acked_at: Optional[datetime] = None diff --git a/app/services/message_service.py b/app/services/message_service.py index 3f3fa38..8a1a5ff 100644 --- a/app/services/message_service.py +++ b/app/services/message_service.py @@ -532,8 +532,19 @@ async def mark_read( ) if msg.delivery_state != "read": + now = datetime.now(timezone.utc) msg.delivery_state = "read" - msg.read_at = datetime.now(timezone.utc) + msg.read_at = now + # State-machine invariant fix: any "read" message must have + # been "delivered" at some point. Fast-readers (recipient calls + # /read before the inbox-list-poll's atomic queued→delivered + # UPDATE fires) bypass the canonical step and would otherwise + # leave delivered_at NULL — producing an impossible + # `delivery_state=read + delivered_at=NULL` fingerprint that + # confuses post-hoc debugging. Set delivered_at=now if NULL so + # the invariant holds. Cross-port from private cueapi PR #923. + if msg.delivered_at is None: + msg.delivered_at = now await db.commit() await db.refresh(msg) return msg @@ -556,8 +567,20 @@ async def mark_acked( f"cannot ack on terminal state '{msg.delivery_state}'", ) + now = datetime.now(timezone.utc) msg.delivery_state = "acked" - msg.acked_at = datetime.now(timezone.utc) + msg.acked_at = now + # State-machine invariant fix (symmetric to mark_read): any + # "acked" message must have been "delivered" AND "read" at some + # point. Fast-ackers (recipient calls /ack before deliver→read + # fires) bypass canonical steps and would otherwise leave + # delivered_at + read_at NULL. Set both to `now` if NULL so the + # invariant holds across all bypass paths. Cross-port from + # private cueapi PR #923. + if msg.delivered_at is None: + msg.delivered_at = now + if msg.read_at is None: + msg.read_at = now await db.commit() await db.refresh(msg) return msg diff --git a/tests/test_messages.py b/tests/test_messages.py index 043ef33..043f2cc 100644 --- a/tests/test_messages.py +++ b/tests/test_messages.py @@ -363,3 +363,175 @@ async def test_mark_acked_terminal(client, auth_headers): r3 = await client.post(f"/v1/messages/{msg_id}/read", headers=auth_headers) assert r3.status_code == 409 assert r3.json()["error"]["code"] == "invalid_state_transition" + + +# ─────────────────────────────────────────────────────────────────────── +# State-machine invariant + narrow-response visibility regression guards +# +# Cross-port from private cueapi PR #923 (Finding A: state-machine +# invariant — any "read"/"acked" message must have non-null delivered_at) +# + PR #927 (cmpg8jokl: narrow response surfaces delivered_at). +# +# Pre-fix anti-pattern: mark_read on queued msg → delivery_state="read" +# but delivered_at=NULL (impossible per canonical state machine +# queued → delivered → read → acked). +# Post-fix: mark_read fills delivered_at=now when null; mark_acked +# fills both delivered_at + read_at when null; /read and /ack narrow +# responses surface all setter timestamps. +# ─────────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_mark_read_fills_delivered_at_when_null(client, auth_headers): + """Finding A invariant: fast-read on queued msg populates delivered_at. + + Pre-fix: delivery_state="read" with delivered_at=NULL (impossible + state). Post-fix: delivered_at == read_at, atomically set. + """ + sender = await _make_agent(client, auth_headers, slug="fa-rd-s") + recipient = await _make_agent(client, auth_headers, slug="fa-rd-r") + sent = await client.post( + "/v1/messages", + json={"to": recipient["id"], "body": "fast read test"}, + headers={**auth_headers, **_from_header(sender)}, + ) + msg_id = sent.json()["id"] + + # Verify pre-state: queued + delivered_at NULL. + pre = await client.get(f"/v1/messages/{msg_id}", headers=auth_headers) + assert pre.json()["delivery_state"] == "queued" + assert pre.json()["delivered_at"] is None + + # Mark read directly (fast-read; bypasses inbox-list poll). + r = await client.post(f"/v1/messages/{msg_id}/read", headers=auth_headers) + assert r.status_code == 200 + body = r.json() + + # Narrow response (cmpg8jokl): surfaces both timestamps. + assert body["delivery_state"] == "read" + assert body["read_at"] is not None + assert body["delivered_at"] is not None, ( + "Finding A invariant: mark_read must fill delivered_at when null. " + "The narrow response (cmpg8jokl) must surface it." + ) + # Atomic fill: both set to the same timestamp. + assert body["delivered_at"] == body["read_at"] + + +@pytest.mark.asyncio +async def test_mark_read_preserves_existing_delivered_at(client, auth_headers): + """Finding A regression guard: if delivered_at was already set + (e.g. by a prior inbox-list poll), mark_read MUST preserve it. + The narrow response surfaces the preserved value, not a fresh now(). + """ + sender = await _make_agent(client, auth_headers, slug="fa-rd-p-s") + recipient = await _make_agent(client, auth_headers, slug="fa-rd-p-r") + sent = await client.post( + "/v1/messages", + json={"to": recipient["id"], "body": "preserve test"}, + headers={**auth_headers, **_from_header(sender)}, + ) + msg_id = sent.json()["id"] + + # Trigger queued→delivered via inbox poll (atomic UPDATE in agents.py). + inbox = await client.get( + f"/v1/agents/{recipient['id']}/inbox?limit=10", + headers=auth_headers, + ) + assert inbox.status_code == 200 + + pre = await client.get(f"/v1/messages/{msg_id}", headers=auth_headers) + delivered_at_pre = pre.json()["delivered_at"] + assert delivered_at_pre is not None # poll set it + + r = await client.post(f"/v1/messages/{msg_id}/read", headers=auth_headers) + assert r.status_code == 200 + body = r.json() + + assert body["delivery_state"] == "read" + assert body["delivered_at"] == delivered_at_pre, ( + "existing delivered_at must be preserved by mark_read's if-None guard" + ) + # read_at should be later than the preserved delivered_at. + assert body["read_at"] > body["delivered_at"] + + +@pytest.mark.asyncio +async def test_mark_acked_fills_delivered_at_and_read_at_when_null(client, auth_headers): + """Finding A invariant (symmetric): fast-ack on queued msg populates + all 3 setter timestamps via mark_acked's dual-null-check. + + Pre-fix: delivery_state="acked" with delivered_at=NULL + read_at=NULL + (terminal state with impossible history). Post-fix: all 3 + timestamps populated atomically + identical. + """ + sender = await _make_agent(client, auth_headers, slug="fa-ak-s") + recipient = await _make_agent(client, auth_headers, slug="fa-ak-r") + sent = await client.post( + "/v1/messages", + json={"to": recipient["id"], "body": "fast ack test"}, + headers={**auth_headers, **_from_header(sender)}, + ) + msg_id = sent.json()["id"] + + # Fast-ack from queued — skip deliver + read. + r = await client.post(f"/v1/messages/{msg_id}/ack", headers=auth_headers) + assert r.status_code == 200 + body = r.json() + + # Narrow response (cmpg8jokl): surfaces all 3 setter timestamps. + assert body["delivery_state"] == "acked" + assert body["delivered_at"] is not None, ( + "Finding A invariant: mark_acked must fill delivered_at when null" + ) + assert body["read_at"] is not None, ( + "Finding A invariant: mark_acked must fill read_at when null" + ) + assert body["acked_at"] is not None + # All 3 identical confirms dual-null-check filled atomically in + # the same transaction. + assert body["delivered_at"] == body["read_at"] == body["acked_at"] + + +@pytest.mark.asyncio +async def test_mark_acked_preserves_existing_timestamps(client, auth_headers): + """Finding A regression guard (symmetric): if delivered_at + + read_at were already set (poll→read→ack chain), mark_acked MUST + preserve them. acked_at is the only newly-set timestamp. + """ + sender = await _make_agent(client, auth_headers, slug="fa-ak-p-s") + recipient = await _make_agent(client, auth_headers, slug="fa-ak-p-r") + sent = await client.post( + "/v1/messages", + json={"to": recipient["id"], "body": "ack preserve test"}, + headers={**auth_headers, **_from_header(sender)}, + ) + msg_id = sent.json()["id"] + + # Poll triggers queued→delivered. + inbox = await client.get( + f"/v1/agents/{recipient['id']}/inbox?limit=10", + headers=auth_headers, + ) + assert inbox.status_code == 200 + delivered_at_t1 = ( + await client.get(f"/v1/messages/{msg_id}", headers=auth_headers) + ).json()["delivered_at"] + assert delivered_at_t1 is not None + + # Then /read sets read_at. + r_read = await client.post(f"/v1/messages/{msg_id}/read", headers=auth_headers) + read_at_t2 = r_read.json()["read_at"] + assert read_at_t2 is not None + + # Then /ack — both pre-existing timestamps preserved; acked_at set. + r_ack = await client.post(f"/v1/messages/{msg_id}/ack", headers=auth_headers) + assert r_ack.status_code == 200 + body = r_ack.json() + + assert body["delivery_state"] == "acked" + assert body["delivered_at"] == delivered_at_t1 + assert body["read_at"] == read_at_t2 + assert body["acked_at"] is not None + # Strict ordering preserved. + assert body["acked_at"] > body["read_at"] > body["delivered_at"]