Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions app/routers/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,12 @@ async def mark_read_endpoint(
`expired`).
"""
msg = await mark_read(db, user, msg_id)
# delivered_at surfaced post state-machine invariant fix
# (cross-port from private cueapi PR #923): mark_read fills it
# when null, so the just-set value is visible without follow-up GET.
return StateTransitionResponse(
delivery_state=msg.delivery_state,
delivered_at=msg.delivered_at,
read_at=msg.read_at,
)

Expand All @@ -153,7 +157,13 @@ async def mark_acked_endpoint(
):
"""Acknowledge. Terminal — no further state transitions allowed."""
msg = await mark_acked(db, user, msg_id)
# delivered_at + read_at surfaced post state-machine invariant fix
# (cross-port from private cueapi PR #923): mark_acked's dual-null-
# check fills both when null, so all 3 setter timestamps are
# visible without follow-up GET.
return StateTransitionResponse(
delivery_state=msg.delivery_state,
delivered_at=msg.delivered_at,
read_at=msg.read_at,
acked_at=msg.acked_at,
)
12 changes: 11 additions & 1 deletion app/schemas/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,18 @@ class CountResponse(BaseModel):


class StateTransitionResponse(BaseModel):
"""Returned by /read and /ack — minimal state-only response."""
"""Returned by /read and /ack — state + all setter timestamps.

After the state-machine invariant fix (cross-port from private
cueapi PR #923), mark_read fills delivered_at when null and
mark_acked fills both delivered_at and read_at when null. The
narrow response now surfaces delivered_at so consumers can
observe the just-set value without a follow-up GET
/v1/messages/{id} round-trip. Cross-port from private cueapi
PR #927 (cmpg8jokl).
"""

delivery_state: str
delivered_at: Optional[datetime] = None
read_at: Optional[datetime] = None
acked_at: Optional[datetime] = None
27 changes: 25 additions & 2 deletions app/services/message_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,19 @@ async def mark_read(
)

if msg.delivery_state != "read":
now = datetime.now(timezone.utc)
msg.delivery_state = "read"
msg.read_at = datetime.now(timezone.utc)
msg.read_at = now
# State-machine invariant fix: any "read" message must have
# been "delivered" at some point. Fast-readers (recipient calls
# /read before the inbox-list-poll's atomic queued→delivered
# UPDATE fires) bypass the canonical step and would otherwise
# leave delivered_at NULL — producing an impossible
# `delivery_state=read + delivered_at=NULL` fingerprint that
# confuses post-hoc debugging. Set delivered_at=now if NULL so
# the invariant holds. Cross-port from private cueapi PR #923.
if msg.delivered_at is None:
msg.delivered_at = now
await db.commit()
await db.refresh(msg)
return msg
Expand All @@ -556,8 +567,20 @@ async def mark_acked(
f"cannot ack on terminal state '{msg.delivery_state}'",
)

now = datetime.now(timezone.utc)
msg.delivery_state = "acked"
msg.acked_at = datetime.now(timezone.utc)
msg.acked_at = now
# State-machine invariant fix (symmetric to mark_read): any
# "acked" message must have been "delivered" AND "read" at some
# point. Fast-ackers (recipient calls /ack before deliver→read
# fires) bypass canonical steps and would otherwise leave
# delivered_at + read_at NULL. Set both to `now` if NULL so the
# invariant holds across all bypass paths. Cross-port from
# private cueapi PR #923.
if msg.delivered_at is None:
msg.delivered_at = now
if msg.read_at is None:
msg.read_at = now
await db.commit()
await db.refresh(msg)
return msg
Expand Down
172 changes: 172 additions & 0 deletions tests/test_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,175 @@ async def test_mark_acked_terminal(client, auth_headers):
r3 = await client.post(f"/v1/messages/{msg_id}/read", headers=auth_headers)
assert r3.status_code == 409
assert r3.json()["error"]["code"] == "invalid_state_transition"


# ───────────────────────────────────────────────────────────────────────
# State-machine invariant + narrow-response visibility regression guards
#
# Cross-port from private cueapi PR #923 (Finding A: state-machine
# invariant — any "read"/"acked" message must have non-null delivered_at)
# + PR #927 (cmpg8jokl: narrow response surfaces delivered_at).
#
# Pre-fix anti-pattern: mark_read on queued msg → delivery_state="read"
# but delivered_at=NULL (impossible per canonical state machine
# queued → delivered → read → acked).
# Post-fix: mark_read fills delivered_at=now when null; mark_acked
# fills both delivered_at + read_at when null; /read and /ack narrow
# responses surface all setter timestamps.
# ───────────────────────────────────────────────────────────────────────


@pytest.mark.asyncio
async def test_mark_read_fills_delivered_at_when_null(client, auth_headers):
"""Finding A invariant: fast-read on queued msg populates delivered_at.

Pre-fix: delivery_state="read" with delivered_at=NULL (impossible
state). Post-fix: delivered_at == read_at, atomically set.
"""
sender = await _make_agent(client, auth_headers, slug="fa-rd-s")
recipient = await _make_agent(client, auth_headers, slug="fa-rd-r")
sent = await client.post(
"/v1/messages",
json={"to": recipient["id"], "body": "fast read test"},
headers={**auth_headers, **_from_header(sender)},
)
msg_id = sent.json()["id"]

# Verify pre-state: queued + delivered_at NULL.
pre = await client.get(f"/v1/messages/{msg_id}", headers=auth_headers)
assert pre.json()["delivery_state"] == "queued"
assert pre.json()["delivered_at"] is None

# Mark read directly (fast-read; bypasses inbox-list poll).
r = await client.post(f"/v1/messages/{msg_id}/read", headers=auth_headers)
assert r.status_code == 200
body = r.json()

# Narrow response (cmpg8jokl): surfaces both timestamps.
assert body["delivery_state"] == "read"
assert body["read_at"] is not None
assert body["delivered_at"] is not None, (
"Finding A invariant: mark_read must fill delivered_at when null. "
"The narrow response (cmpg8jokl) must surface it."
)
# Atomic fill: both set to the same timestamp.
assert body["delivered_at"] == body["read_at"]


@pytest.mark.asyncio
async def test_mark_read_preserves_existing_delivered_at(client, auth_headers):
"""Finding A regression guard: if delivered_at was already set
(e.g. by a prior inbox-list poll), mark_read MUST preserve it.
The narrow response surfaces the preserved value, not a fresh now().
"""
sender = await _make_agent(client, auth_headers, slug="fa-rd-p-s")
recipient = await _make_agent(client, auth_headers, slug="fa-rd-p-r")
sent = await client.post(
"/v1/messages",
json={"to": recipient["id"], "body": "preserve test"},
headers={**auth_headers, **_from_header(sender)},
)
msg_id = sent.json()["id"]

# Trigger queued→delivered via inbox poll (atomic UPDATE in agents.py).
inbox = await client.get(
f"/v1/agents/{recipient['id']}/inbox?limit=10",
headers=auth_headers,
)
assert inbox.status_code == 200

pre = await client.get(f"/v1/messages/{msg_id}", headers=auth_headers)
delivered_at_pre = pre.json()["delivered_at"]
assert delivered_at_pre is not None # poll set it

r = await client.post(f"/v1/messages/{msg_id}/read", headers=auth_headers)
assert r.status_code == 200
body = r.json()

assert body["delivery_state"] == "read"
assert body["delivered_at"] == delivered_at_pre, (
"existing delivered_at must be preserved by mark_read's if-None guard"
)
# read_at should be later than the preserved delivered_at.
assert body["read_at"] > body["delivered_at"]


@pytest.mark.asyncio
async def test_mark_acked_fills_delivered_at_and_read_at_when_null(client, auth_headers):
"""Finding A invariant (symmetric): fast-ack on queued msg populates
all 3 setter timestamps via mark_acked's dual-null-check.

Pre-fix: delivery_state="acked" with delivered_at=NULL + read_at=NULL
(terminal state with impossible history). Post-fix: all 3
timestamps populated atomically + identical.
"""
sender = await _make_agent(client, auth_headers, slug="fa-ak-s")
recipient = await _make_agent(client, auth_headers, slug="fa-ak-r")
sent = await client.post(
"/v1/messages",
json={"to": recipient["id"], "body": "fast ack test"},
headers={**auth_headers, **_from_header(sender)},
)
msg_id = sent.json()["id"]

# Fast-ack from queued — skip deliver + read.
r = await client.post(f"/v1/messages/{msg_id}/ack", headers=auth_headers)
assert r.status_code == 200
body = r.json()

# Narrow response (cmpg8jokl): surfaces all 3 setter timestamps.
assert body["delivery_state"] == "acked"
assert body["delivered_at"] is not None, (
"Finding A invariant: mark_acked must fill delivered_at when null"
)
assert body["read_at"] is not None, (
"Finding A invariant: mark_acked must fill read_at when null"
)
assert body["acked_at"] is not None
# All 3 identical confirms dual-null-check filled atomically in
# the same transaction.
assert body["delivered_at"] == body["read_at"] == body["acked_at"]


@pytest.mark.asyncio
async def test_mark_acked_preserves_existing_timestamps(client, auth_headers):
"""Finding A regression guard (symmetric): if delivered_at +
read_at were already set (poll→read→ack chain), mark_acked MUST
preserve them. acked_at is the only newly-set timestamp.
"""
sender = await _make_agent(client, auth_headers, slug="fa-ak-p-s")
recipient = await _make_agent(client, auth_headers, slug="fa-ak-p-r")
sent = await client.post(
"/v1/messages",
json={"to": recipient["id"], "body": "ack preserve test"},
headers={**auth_headers, **_from_header(sender)},
)
msg_id = sent.json()["id"]

# Poll triggers queued→delivered.
inbox = await client.get(
f"/v1/agents/{recipient['id']}/inbox?limit=10",
headers=auth_headers,
)
assert inbox.status_code == 200
delivered_at_t1 = (
await client.get(f"/v1/messages/{msg_id}", headers=auth_headers)
).json()["delivered_at"]
assert delivered_at_t1 is not None

# Then /read sets read_at.
r_read = await client.post(f"/v1/messages/{msg_id}/read", headers=auth_headers)
read_at_t2 = r_read.json()["read_at"]
assert read_at_t2 is not None

# Then /ack — both pre-existing timestamps preserved; acked_at set.
r_ack = await client.post(f"/v1/messages/{msg_id}/ack", headers=auth_headers)
assert r_ack.status_code == 200
body = r_ack.json()

assert body["delivery_state"] == "acked"
assert body["delivered_at"] == delivered_at_t1
assert body["read_at"] == read_at_t2
assert body["acked_at"] is not None
# Strict ordering preserved.
assert body["acked_at"] > body["read_at"] > body["delivered_at"]