From 7488761f72ff59d6681efa856333ce6a0b441f3b Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Fri, 24 Apr 2026 13:09:28 +0100 Subject: [PATCH 1/8] feat: add stream_validate() hook to Requirement (#900) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an async `stream_validate(chunk, backend, ctx)` method to the base `Requirement` class. The default implementation returns `PartialValidationResult("unknown")`; subclasses override to inspect the accumulated chunk and return `"pass"` or `"fail"` early. Per the Phase 1 design: `"pass"` is informational and does not short-circuit the final `validate()` call. The method must not mutate `self` — state isolation is the orchestrator's responsibility. Signed-off-by: Nigel Jones Assisted-by: Claude Code Signed-off-by: Nigel Jones --- mellea/core/requirement.py | 26 ++++++++++++ test/core/test_stream_validate.py | 68 +++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 test/core/test_stream_validate.py diff --git a/mellea/core/requirement.py b/mellea/core/requirement.py index 0d3381f16..80fad71bd 100644 --- a/mellea/core/requirement.py +++ b/mellea/core/requirement.py @@ -283,6 +283,32 @@ async def validate( context=val_ctx, ) + async def stream_validate( + self, chunk: str, backend: Backend, ctx: Context + ) -> PartialValidationResult: + """Hook for per-chunk streaming validation. + + The default implementation returns ``PartialValidationResult("unknown")`` + — meaning insufficient data to decide yet. Subclasses override this method + to inspect the accumulated chunk and return ``"pass"`` or ``"fail"`` early. + + This method must not mutate ``self``. The orchestrator is responsible for + cloning the requirement before each attempt; any state needed across chunks + must be managed externally. + + Args: + chunk: The accumulated model output so far (not just the latest token). + backend: The inference backend, available for backend-assisted checks. + ctx: The current generation context. + + Returns: + PartialValidationResult: ``"unknown"`` by default. Subclasses may return + ``"pass"`` (constraint satisfied so far) or ``"fail"`` (constraint violated, + streaming should be aborted). In Phase 1, ``"pass"`` is informational and + does not short-circuit the final ``validate()`` call. + """ + return PartialValidationResult("unknown") + def parts(self) -> list[Component | CBlock]: """Returns all of the constituent parts of a Requirement. diff --git a/test/core/test_stream_validate.py b/test/core/test_stream_validate.py new file mode 100644 index 000000000..ddf2e78a6 --- /dev/null +++ b/test/core/test_stream_validate.py @@ -0,0 +1,68 @@ +"""Unit tests for Requirement.stream_validate() hook.""" + +import inspect + +import pytest + +from mellea.core import PartialValidationResult, Requirement + + +@pytest.mark.asyncio +async def test_default_returns_unknown(): + req = Requirement(description="some requirement") + result = await req.stream_validate("some chunk", backend=None, ctx=None) # type: ignore[arg-type] + assert result.success == "unknown" + + +@pytest.mark.asyncio +async def test_default_returns_partial_validation_result_instance(): + req = Requirement() + result = await req.stream_validate("chunk", backend=None, ctx=None) # type: ignore[arg-type] + assert isinstance(result, PartialValidationResult) + + +def test_stream_validate_is_coroutine(): + req = Requirement() + assert inspect.iscoroutinefunction(req.stream_validate) + + +@pytest.mark.asyncio +async def test_subclass_can_return_pass(): + class PassRequirement(Requirement): + async def stream_validate(self, chunk, backend, ctx) -> PartialValidationResult: + return PartialValidationResult("pass") + + req = PassRequirement(description="always passes") + result = await req.stream_validate("any chunk", backend=None, ctx=None) # type: ignore[arg-type] + assert result.success == "pass" + + +@pytest.mark.asyncio +async def test_subclass_can_return_fail(): + class FailRequirement(Requirement): + async def stream_validate(self, chunk, backend, ctx) -> PartialValidationResult: + if "bad" in chunk: + return PartialValidationResult("fail", reason="bad word detected") + return PartialValidationResult("unknown") + + req = FailRequirement(description="no bad words") + result = await req.stream_validate("this is bad content", backend=None, ctx=None) # type: ignore[arg-type] + assert result.success == "fail" + assert result.reason == "bad word detected" + + result_unknown = await req.stream_validate("good content", backend=None, ctx=None) # type: ignore[arg-type] + assert result_unknown.success == "unknown" + + +@pytest.mark.asyncio +async def test_does_not_mutate_requirement(): + req = Requirement(description="original description") + original_description = req.description + original_output = req._output + original_validation_fn = req.validation_fn + + await req.stream_validate("some chunk", backend=None, ctx=None) # type: ignore[arg-type] + + assert req.description == original_description + assert req._output == original_output + assert req.validation_fn == original_validation_fn From 3054f6fc68f78ef7f8c336198e64e7fee814faa6 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Fri, 24 Apr 2026 13:25:16 +0100 Subject: [PATCH 2/8] fix(core): address PR #925 review feedback on stream_validate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove "In Phase 1" temporal qualifier from docstring — reworded to timeless statement about orchestrator responsibility - Add type annotations (str, Backend, Context) to test subclass overrides - Add idempotency test: multiple calls on the same Requirement instance leave state unchanged Assisted-by: Claude Code Signed-off-by: Nigel Jones --- mellea/core/requirement.py | 4 ++-- test/core/test_stream_validate.py | 20 ++++++++++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/mellea/core/requirement.py b/mellea/core/requirement.py index 80fad71bd..5cc8d97d7 100644 --- a/mellea/core/requirement.py +++ b/mellea/core/requirement.py @@ -304,8 +304,8 @@ async def stream_validate( Returns: PartialValidationResult: ``"unknown"`` by default. Subclasses may return ``"pass"`` (constraint satisfied so far) or ``"fail"`` (constraint violated, - streaming should be aborted). In Phase 1, ``"pass"`` is informational and - does not short-circuit the final ``validate()`` call. + streaming should be aborted). ``"pass"`` does not short-circuit the final + ``validate()`` call; the orchestrator decides whether to skip it. """ return PartialValidationResult("unknown") diff --git a/test/core/test_stream_validate.py b/test/core/test_stream_validate.py index ddf2e78a6..59046b544 100644 --- a/test/core/test_stream_validate.py +++ b/test/core/test_stream_validate.py @@ -5,6 +5,8 @@ import pytest from mellea.core import PartialValidationResult, Requirement +from mellea.core.backend import Backend +from mellea.core.base import Context @pytest.mark.asyncio @@ -29,7 +31,9 @@ def test_stream_validate_is_coroutine(): @pytest.mark.asyncio async def test_subclass_can_return_pass(): class PassRequirement(Requirement): - async def stream_validate(self, chunk, backend, ctx) -> PartialValidationResult: + async def stream_validate( + self, chunk: str, backend: Backend, ctx: Context + ) -> PartialValidationResult: return PartialValidationResult("pass") req = PassRequirement(description="always passes") @@ -40,7 +44,9 @@ async def stream_validate(self, chunk, backend, ctx) -> PartialValidationResult: @pytest.mark.asyncio async def test_subclass_can_return_fail(): class FailRequirement(Requirement): - async def stream_validate(self, chunk, backend, ctx) -> PartialValidationResult: + async def stream_validate( + self, chunk: str, backend: Backend, ctx: Context + ) -> PartialValidationResult: if "bad" in chunk: return PartialValidationResult("fail", reason="bad word detected") return PartialValidationResult("unknown") @@ -66,3 +72,13 @@ async def test_does_not_mutate_requirement(): assert req.description == original_description assert req._output == original_output assert req.validation_fn == original_validation_fn + + +@pytest.mark.asyncio +async def test_stream_validate_idempotent(): + req = Requirement(description="repeated calls") + result1 = await req.stream_validate("chunk one", backend=None, ctx=None) # type: ignore[arg-type] + result2 = await req.stream_validate("chunk two", backend=None, ctx=None) # type: ignore[arg-type] + assert result1.success == "unknown" + assert result2.success == "unknown" + assert req._output is None From 358e4d1462a0701c3277a59821eb05b3c061916c Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Fri, 24 Apr 2026 14:47:15 +0100 Subject: [PATCH 3/8] fix(core): make stream_validate backend/ctx keyword-only Prevents positional confusion and makes future parameter additions to the signature non-breaking for existing subclass overrides. Assisted-by: Claude Code --- mellea/core/requirement.py | 2 +- test/core/test_stream_validate.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mellea/core/requirement.py b/mellea/core/requirement.py index 5cc8d97d7..25092326d 100644 --- a/mellea/core/requirement.py +++ b/mellea/core/requirement.py @@ -284,7 +284,7 @@ async def validate( ) async def stream_validate( - self, chunk: str, backend: Backend, ctx: Context + self, chunk: str, *, backend: Backend, ctx: Context ) -> PartialValidationResult: """Hook for per-chunk streaming validation. diff --git a/test/core/test_stream_validate.py b/test/core/test_stream_validate.py index 59046b544..002ff7313 100644 --- a/test/core/test_stream_validate.py +++ b/test/core/test_stream_validate.py @@ -32,7 +32,7 @@ def test_stream_validate_is_coroutine(): async def test_subclass_can_return_pass(): class PassRequirement(Requirement): async def stream_validate( - self, chunk: str, backend: Backend, ctx: Context + self, chunk: str, *, backend: Backend, ctx: Context ) -> PartialValidationResult: return PartialValidationResult("pass") @@ -45,7 +45,7 @@ async def stream_validate( async def test_subclass_can_return_fail(): class FailRequirement(Requirement): async def stream_validate( - self, chunk: str, backend: Backend, ctx: Context + self, chunk: str, *, backend: Backend, ctx: Context ) -> PartialValidationResult: if "bad" in chunk: return PartialValidationResult("fail", reason="bad word detected") From efded18f9187e8b48dfe33524736fc362f801b07 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Mon, 27 Apr 2026 14:19:14 +0100 Subject: [PATCH 4/8] fix(core): fix stream_validate docstring and add missing stateful tests The docstring incorrectly stated that implementations must not mutate self. Issue #900 spec explicitly allows stateful accumulation and requires the shallow-copy caveat to be documented. Fix the docstring to match the spec. Add two tests required by the issue acceptance criteria: - test_stateful_subclass_accumulates_state: verifies a subclass can accumulate state (bullet counter) across stream_validate calls - test_stateful_subclass_clone_isolation: verifies copy() gives an independent clone, confirming the orchestrator clone pattern Assisted-by: Claude Code Signed-off-by: Nigel Jones --- mellea/core/requirement.py | 12 ++++-- test/core/test_stream_validate.py | 68 +++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/mellea/core/requirement.py b/mellea/core/requirement.py index 25092326d..24ff6672f 100644 --- a/mellea/core/requirement.py +++ b/mellea/core/requirement.py @@ -292,9 +292,15 @@ async def stream_validate( — meaning insufficient data to decide yet. Subclasses override this method to inspect the accumulated chunk and return ``"pass"`` or ``"fail"`` early. - This method must not mutate ``self``. The orchestrator is responsible for - cloning the requirement before each attempt; any state needed across chunks - must be managed externally. + Implementations may accumulate state on ``self`` across calls within a + single attempt. The orchestrator clones the requirement (``copy(req)``) + before each attempt, so state does not bleed across retries. + + Shallow-copy caveat: mutable container fields (e.g. ``self._buffer = []``) + are shared by reference under ``copy()``. Reassign rather than mutate in + place (``self._buffer = self._buffer + [chunk]``, not + ``self._buffer.append(chunk)``), or override ``__copy__`` for proper + isolation. Args: chunk: The accumulated model output so far (not just the latest token). diff --git a/test/core/test_stream_validate.py b/test/core/test_stream_validate.py index 002ff7313..2567c9923 100644 --- a/test/core/test_stream_validate.py +++ b/test/core/test_stream_validate.py @@ -1,6 +1,7 @@ """Unit tests for Requirement.stream_validate() hook.""" import inspect +from copy import copy import pytest @@ -82,3 +83,70 @@ async def test_stream_validate_idempotent(): assert result1.success == "unknown" assert result2.success == "unknown" assert req._output is None + + +@pytest.mark.asyncio +async def test_stateful_subclass_accumulates_state(): + """Stateful subclass correctly accumulates state across stream_validate calls.""" + + class BulletCounter(Requirement): + def __init__(self) -> None: + super().__init__(description="no more than 3 bullets") + self._bullet_count = 0 + + async def stream_validate( + self, chunk: str, *, backend: Backend, ctx: Context + ) -> PartialValidationResult: + self._bullet_count = chunk.count("\n-") + if self._bullet_count > 3: + return PartialValidationResult( + "fail", reason=f"{self._bullet_count} bullets exceeds limit" + ) + return PartialValidationResult("unknown") + + req = BulletCounter() + assert req._bullet_count == 0 + + await req.stream_validate("intro text", backend=None, ctx=None) # type: ignore[arg-type] + assert req._bullet_count == 0 + + await req.stream_validate("intro\n- one\n- two", backend=None, ctx=None) # type: ignore[arg-type] + assert req._bullet_count == 2 + + result = await req.stream_validate( + "intro\n- one\n- two\n- three\n- four", + backend=None, + ctx=None, # type: ignore[arg-type] + ) + assert req._bullet_count == 4 + assert result.success == "fail" + assert result.reason is not None and "4" in result.reason + + +@pytest.mark.asyncio +async def test_stateful_subclass_clone_isolation(): + """copy() of a stateful requirement gives an independent clone — orchestrator pattern.""" + + class CallCounter(Requirement): + def __init__(self) -> None: + super().__init__(description="call counter") + self._calls = 0 + + async def stream_validate( + self, chunk: str, *, backend: Backend, ctx: Context + ) -> PartialValidationResult: + self._calls += 1 + return PartialValidationResult("unknown") + + req = CallCounter() + await req.stream_validate("a", backend=None, ctx=None) # type: ignore[arg-type] + await req.stream_validate("b", backend=None, ctx=None) # type: ignore[arg-type] + assert req._calls == 2 + + # Simulate orchestrator cloning before a new attempt + cloned = copy(req) + assert cloned._calls == 2 # clone inherits state at clone time + + await cloned.stream_validate("c", backend=None, ctx=None) # type: ignore[arg-type] + assert cloned._calls == 3 # clone advances independently + assert req._calls == 2 # original is unchanged From 315a98c70257cca337d9965cc5de4e4b0cb17715 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Mon, 27 Apr 2026 15:02:01 +0100 Subject: [PATCH 5/8] test(core): make BulletCounter genuinely stateful via delta extraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous implementation overwrote _bullet_count from the full accumulated chunk on each call — equivalent to a pure function with no real dependency on prior state. Use _seen_len to extract only the new portion of each accumulated chunk, accumulating the count additively. This genuinely requires prior-call state to know where to slice, making the test name "accumulates_state" accurate. Assisted-by: Claude Code Signed-off-by: Nigel Jones --- test/core/test_stream_validate.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/test/core/test_stream_validate.py b/test/core/test_stream_validate.py index 2567c9923..4c191995d 100644 --- a/test/core/test_stream_validate.py +++ b/test/core/test_stream_validate.py @@ -87,17 +87,24 @@ async def test_stream_validate_idempotent(): @pytest.mark.asyncio async def test_stateful_subclass_accumulates_state(): - """Stateful subclass correctly accumulates state across stream_validate calls.""" + """Stateful subclass correctly accumulates state across stream_validate calls. + + Uses delta extraction (via _seen_len) to count only new bullet points per call — + a pattern that genuinely requires state from prior calls. + """ class BulletCounter(Requirement): def __init__(self) -> None: super().__init__(description="no more than 3 bullets") + self._seen_len = 0 self._bullet_count = 0 async def stream_validate( self, chunk: str, *, backend: Backend, ctx: Context ) -> PartialValidationResult: - self._bullet_count = chunk.count("\n-") + delta = chunk[self._seen_len :] + self._seen_len = len(chunk) + self._bullet_count += delta.count("\n-") if self._bullet_count > 3: return PartialValidationResult( "fail", reason=f"{self._bullet_count} bullets exceeds limit" @@ -110,15 +117,15 @@ async def stream_validate( await req.stream_validate("intro text", backend=None, ctx=None) # type: ignore[arg-type] assert req._bullet_count == 0 - await req.stream_validate("intro\n- one\n- two", backend=None, ctx=None) # type: ignore[arg-type] - assert req._bullet_count == 2 + await req.stream_validate("intro text\n- one\n- two", backend=None, ctx=None) # type: ignore[arg-type] + assert req._bullet_count == 2 # delta added 2 new bullets result = await req.stream_validate( - "intro\n- one\n- two\n- three\n- four", + "intro text\n- one\n- two\n- three\n- four", backend=None, ctx=None, # type: ignore[arg-type] ) - assert req._bullet_count == 4 + assert req._bullet_count == 4 # delta added 2 more assert result.success == "fail" assert result.reason is not None and "4" in result.reason From 2f3c42357cefbfe82e87d3fb8c39f027eb2e2717 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Mon, 27 Apr 2026 15:03:30 +0100 Subject: [PATCH 6/8] test(core): fix missing type: ignore on backend=None in multi-line call In multi-line calls, # type: ignore only suppresses errors on its own line. The backend=None argument was uncovered; add the ignore there too. Assisted-by: Claude Code Signed-off-by: Nigel Jones --- test/core/test_stream_validate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/core/test_stream_validate.py b/test/core/test_stream_validate.py index 4c191995d..86ef82e03 100644 --- a/test/core/test_stream_validate.py +++ b/test/core/test_stream_validate.py @@ -122,7 +122,7 @@ async def stream_validate( result = await req.stream_validate( "intro text\n- one\n- two\n- three\n- four", - backend=None, + backend=None, # type: ignore[arg-type] ctx=None, # type: ignore[arg-type] ) assert req._bullet_count == 4 # delta added 2 more From 09ef1a99ed32f069b49bcf0596265d1d7a019af6 Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Mon, 27 Apr 2026 15:06:03 +0100 Subject: [PATCH 7/8] test(core): fix import path and clone test orchestrator pattern Use the public API for imports: Backend and Context both appear in mellea.core.__all__, so import from mellea.core rather than the internal submodules. Rewrite test_stateful_subclass_clone_isolation to simulate the correct orchestrator pattern: the original requirement is never called directly; each attempt clones from the fresh original, giving _calls == 0 at the start of every attempt. The previous test cloned mid-stream, which tested shallow-copy isolation but demonstrated the wrong usage pattern. Assisted-by: Claude Code Signed-off-by: Nigel Jones --- test/core/test_stream_validate.py | 35 ++++++++++++++++++------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/test/core/test_stream_validate.py b/test/core/test_stream_validate.py index 86ef82e03..47baabac9 100644 --- a/test/core/test_stream_validate.py +++ b/test/core/test_stream_validate.py @@ -5,9 +5,7 @@ import pytest -from mellea.core import PartialValidationResult, Requirement -from mellea.core.backend import Backend -from mellea.core.base import Context +from mellea.core import Backend, Context, PartialValidationResult, Requirement @pytest.mark.asyncio @@ -132,7 +130,12 @@ async def stream_validate( @pytest.mark.asyncio async def test_stateful_subclass_clone_isolation(): - """copy() of a stateful requirement gives an independent clone — orchestrator pattern.""" + """Orchestrator clone pattern: copy() before each attempt gives a fresh independent clone. + + The orchestrator holds the original requirement and never calls stream_validate on it + directly. Before each attempt it clones the original; each clone starts from the + original's (zero) state and advances independently. + """ class CallCounter(Requirement): def __init__(self) -> None: @@ -145,15 +148,19 @@ async def stream_validate( self._calls += 1 return PartialValidationResult("unknown") - req = CallCounter() - await req.stream_validate("a", backend=None, ctx=None) # type: ignore[arg-type] - await req.stream_validate("b", backend=None, ctx=None) # type: ignore[arg-type] - assert req._calls == 2 + req = CallCounter() # original — never used directly by the orchestrator + + # Attempt 1 + attempt1 = copy(req) + assert attempt1._calls == 0 + await attempt1.stream_validate("a", backend=None, ctx=None) # type: ignore[arg-type] + await attempt1.stream_validate("b", backend=None, ctx=None) # type: ignore[arg-type] + assert attempt1._calls == 2 - # Simulate orchestrator cloning before a new attempt - cloned = copy(req) - assert cloned._calls == 2 # clone inherits state at clone time + # Attempt 2 (retry) — fresh clone from the same original + attempt2 = copy(req) + assert attempt2._calls == 0 # starts clean, not carrying attempt1's state + await attempt2.stream_validate("c", backend=None, ctx=None) # type: ignore[arg-type] + assert attempt2._calls == 1 - await cloned.stream_validate("c", backend=None, ctx=None) # type: ignore[arg-type] - assert cloned._calls == 3 # clone advances independently - assert req._calls == 2 # original is unchanged + assert req._calls == 0 # original never mutated From 82bdd3a5d29fed016d5e5047ba503bbde98be1ab Mon Sep 17 00:00:00 2001 From: Nigel Jones Date: Tue, 28 Apr 2026 16:59:27 +0100 Subject: [PATCH 8/8] fix(core): stream_validate receives a single chunk, not accumulated text MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Restores the chunk-at-a-time semantics set out in the #891 epic and #900 spec: stream_validate is called once per complete chunk produced by the chunking strategy, and receives that single chunk. Requirements that need history accumulate it on self. Commit 315a98c7 inadvertently flipped this: the BulletCounter test was rewritten to recover deltas from accumulated text via self._seen_len, and the docstring was updated to match ("The accumulated model output so far"). Neither change reflected a design decision — it was drift during a test fix, and buries a confusing workaround in what should be a straightforward stateful override. Changes: - requirement.py: rewrite chunk Args description to name the chunking-strategy-produced delta, clarify that ctx does not contain the generated output during streaming, and note the MOT single-consumer constraint - test_stream_validate.py: rewrite BulletCounter to accumulate its own running count (no self._seen_len); calls pass delta chunks ("\n- one\n- two") rather than re-sending accumulated text The corresponding orchestrator fix in stream_with_chunking() -- pass the chunk, iterate per chunk -- is in the stacked Wave 3 branch. Assisted-by: Claude Code --- mellea/core/requirement.py | 19 ++++++++++++++++--- test/core/test_stream_validate.py | 18 ++++++++---------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/mellea/core/requirement.py b/mellea/core/requirement.py index 24ff6672f..e8c55564b 100644 --- a/mellea/core/requirement.py +++ b/mellea/core/requirement.py @@ -290,7 +290,7 @@ async def stream_validate( The default implementation returns ``PartialValidationResult("unknown")`` — meaning insufficient data to decide yet. Subclasses override this method - to inspect the accumulated chunk and return ``"pass"`` or ``"fail"`` early. + to inspect the current chunk and return ``"pass"`` or ``"fail"`` early. Implementations may accumulate state on ``self`` across calls within a single attempt. The orchestrator clones the requirement (``copy(req)``) @@ -302,10 +302,23 @@ async def stream_validate( ``self._buffer.append(chunk)``), or override ``__copy__`` for proper isolation. + Implementations must not call ``mot.astream()`` or otherwise read the + underlying stream; the orchestrator is the single consumer of the MOT + stream (see ``ModelOutputThunk.astream``). Requirements that need access + to the text seen so far should accumulate it themselves from the + ``chunk`` values they receive. + Args: - chunk: The accumulated model output so far (not just the latest token). + chunk: A single complete semantic chunk produced by the chunking + strategy (e.g. one sentence for ``SentenceChunker``). This is + the delta since the previous ``stream_validate`` call for this + attempt, not the accumulated output. Requirements that need + earlier context should retain it on ``self`` across calls. backend: The inference backend, available for backend-assisted checks. - ctx: The current generation context. + ctx: The current generation context. During streaming the MOT is + not yet computed, so ``ctx`` does not contain the generated + output; use ``chunk`` (and any state accumulated on ``self``) + instead. Returns: PartialValidationResult: ``"unknown"`` by default. Subclasses may return diff --git a/test/core/test_stream_validate.py b/test/core/test_stream_validate.py index 47baabac9..4973608c8 100644 --- a/test/core/test_stream_validate.py +++ b/test/core/test_stream_validate.py @@ -87,22 +87,20 @@ async def test_stream_validate_idempotent(): async def test_stateful_subclass_accumulates_state(): """Stateful subclass correctly accumulates state across stream_validate calls. - Uses delta extraction (via _seen_len) to count only new bullet points per call — - a pattern that genuinely requires state from prior calls. + Each call receives a single chunk (the delta produced by the chunking + strategy). Requirements maintain their own running state across calls + rather than re-scanning accumulated text. """ class BulletCounter(Requirement): def __init__(self) -> None: super().__init__(description="no more than 3 bullets") - self._seen_len = 0 self._bullet_count = 0 async def stream_validate( self, chunk: str, *, backend: Backend, ctx: Context ) -> PartialValidationResult: - delta = chunk[self._seen_len :] - self._seen_len = len(chunk) - self._bullet_count += delta.count("\n-") + self._bullet_count += chunk.count("\n-") if self._bullet_count > 3: return PartialValidationResult( "fail", reason=f"{self._bullet_count} bullets exceeds limit" @@ -115,15 +113,15 @@ async def stream_validate( await req.stream_validate("intro text", backend=None, ctx=None) # type: ignore[arg-type] assert req._bullet_count == 0 - await req.stream_validate("intro text\n- one\n- two", backend=None, ctx=None) # type: ignore[arg-type] - assert req._bullet_count == 2 # delta added 2 new bullets + await req.stream_validate("\n- one\n- two", backend=None, ctx=None) # type: ignore[arg-type] + assert req._bullet_count == 2 result = await req.stream_validate( - "intro text\n- one\n- two\n- three\n- four", + "\n- three\n- four", backend=None, # type: ignore[arg-type] ctx=None, # type: ignore[arg-type] ) - assert req._bullet_count == 4 # delta added 2 more + assert req._bullet_count == 4 assert result.success == "fail" assert result.reason is not None and "4" in result.reason