What
Create mellea/stdlib/streaming.py with a stream_with_chunking() function and a StreamChunkingResult return type.
async def stream_with_chunking(
action,
backend,
ctx,
*,
quick_check_requirements: list[Requirement] | None = None,
chunking: str | ChunkingStrategy = "sentence",
quick_check_backend: Backend | None = None,
) -> StreamChunkingResult:
Why
This is the core orchestration primitive for streaming validation. It bridges the MOT's raw token stream, the ChunkingStrategy, and per-chunk stream_validate() calls on requirements. All higher-level streaming APIs build on this function.
Implementation
Internals:
- Clone each requirement (
copy(req)) before the attempt starts — streaming state accumulates on the clone; the original is never mutated
- Start a single background
asyncio.Task consuming mot.astream() — the MOT enforces a single-consumer constraint (documented at mellea/core/base.py:434-436), so the orchestrator is the only caller; fanning out to requirements happens via a result queue, not by sharing the MOT
- Accumulate token deltas, apply
ChunkingStrategy.split() to produce complete chunks
- For each complete chunk: run
stream_validate() across all requirements in parallel via asyncio.gather
- On any
"fail" result: call mot.cancel_generation() to cancel the background task, drain the queue, and end any open telemetry span; set completed = False. Without this, asyncio.Queue(maxsize=20) blocks the producer indefinitely once the consumer stops
- After the stream ends (
mot._computed is True): call validate() on every requirement that did not return "fail" — both "unknown" and "pass" trigger final validation; "pass" mid-stream is informational only in Phase 1
- Route validation calls to
quick_check_backend if provided
StreamChunkingResult (also defined in this module):
astream() -> AsyncIterator[str] — yields validated text chunks as they complete
acomplete() — awaits full completion
completed: bool — False if the stream exited early due to a "fail" result
full_text: str — complete generated text (available after acomplete())
as_thunk — wraps the output as a ModelOutputThunk for interop with existing mellea code
Test infrastructure: A StreamingMockBackend test helper needs to be created (does not currently exist in the codebase). It should accept a fixed response string and feed it into a MOT's queue token by token. It can be defined in test/stdlib/test_streaming.py or a shared fixture. This is the primary vehicle for testing the orchestration logic without a live backend.
Example: Add docs/examples/streaming/stream_with_chunking.py demonstrating how to write a stream_validate() override and use it with this function. Marker: # pytest: ollama, integration.
Acceptance criteria
Blocked by #900, #899
Depends on #908 merging (uses mot.generation.* API throughout)
Part of #891
What
Create
mellea/stdlib/streaming.pywith astream_with_chunking()function and aStreamChunkingResultreturn type.Why
This is the core orchestration primitive for streaming validation. It bridges the MOT's raw token stream, the
ChunkingStrategy, and per-chunkstream_validate()calls on requirements. All higher-level streaming APIs build on this function.Implementation
Internals:
copy(req)) before the attempt starts — streaming state accumulates on the clone; the original is never mutatedasyncio.Taskconsumingmot.astream()— the MOT enforces a single-consumer constraint (documented atmellea/core/base.py:434-436), so the orchestrator is the only caller; fanning out to requirements happens via a result queue, not by sharing the MOTChunkingStrategy.split()to produce complete chunksstream_validate()across all requirements in parallel viaasyncio.gather"fail"result: callmot.cancel_generation()to cancel the background task, drain the queue, and end any open telemetry span; setcompleted = False. Without this,asyncio.Queue(maxsize=20)blocks the producer indefinitely once the consumer stopsmot._computed is True): callvalidate()on every requirement that did not return"fail"— both"unknown"and"pass"trigger final validation;"pass"mid-stream is informational only in Phase 1quick_check_backendif providedStreamChunkingResult(also defined in this module):astream() -> AsyncIterator[str]— yields validated text chunks as they completeacomplete()— awaits full completioncompleted: bool—Falseif the stream exited early due to a"fail"resultfull_text: str— complete generated text (available afteracomplete())as_thunk— wraps the output as aModelOutputThunkfor interop with existing mellea codeTest infrastructure: A
StreamingMockBackendtest helper needs to be created (does not currently exist in the codebase). It should accept a fixed response string and feed it into a MOT's queue token by token. It can be defined intest/stdlib/test_streaming.pyor a shared fixture. This is the primary vehicle for testing the orchestration logic without a live backend.Example: Add
docs/examples/streaming/stream_with_chunking.pydemonstrating how to write astream_validate()override and use it with this function. Marker:# pytest: ollama, integration.Acceptance criteria
mellea/stdlib/streaming.pycreated withstream_with_chunking()andStreamChunkingResultcopy(req)) before each attempt; the original requirement instance is never mutatedasyncio.Taskconsumesmot.astream()— no other code callsastream()on the same MOTstream_validate()called viaasyncio.gather(parallel, not sequential)"fail"cancels the remaining stream;StreamChunkingResult.completedisFalse"fail"requirements (both"pass"and"unknown") getvalidate()called at stream endquick_check_backendroutes validation calls to the alternate backendStreamChunkingResult.as_thunkwraps the output as aModelOutputThunkStreamingMockBackendtest helper createdtest/stdlib/test_streaming.pyusingStreamingMockBackendcovering:"unknown",validate()called at stream end)"fail"quick_check_backendroutingcancel_generation()added toModelOutputThunkinmellea/core/base.py; cancels_generatetask, drains queue, ends open telemetry span; orchestrator calls it on early"fail"exitmot.generation.*telemetry paths throughout (requires refactor!: partition ModelOutputThunk execution metadata into Generat… #908 merged;mot.generation.usagewill beNoneon early exit sincepost_processing()does not run —StreamChunkingResultexposes this)SAMPLING_LOOP_START,SAMPLING_REPAIR, etc.) do not fire — feat(stdlib): add standard streaming event types #902 event types are the observability substitute@pytest.mark.integrationand@pytest.mark.ollamadocs/examples/streaming/with# pytest: ollama, integrationmarkerstream_with_chunking()andStreamChunkingResultBlocked by #900, #899
Depends on #908 merging (uses
mot.generation.*API throughout)Part of #891