feat: disable global DbalTransaction and pause Collector during projection batch execution#647
Open
feat: disable global DbalTransaction and pause Collector during projection batch execution#647
Conversation
…ction batch execution ProjectionV2's executeSingleBatch() manages its own per-batch transaction, but the global DbalTransactionInterceptor and CollectorSenderInterceptor were wrapping the outer handler in async/backfill scenarios, causing all batches to run in one big transaction and collector to release messages outside per-batch transaction scope.
The WithoutDbalTransaction class lives in the Dbal package. When running core-only tests without Dbal installed, AttributeDefinition::instance() fails with "class not found". The annotation is only needed when the DbalTransactionInterceptor is active, so skip it when the class is absent.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why is this change proposed?
When ProjectionV2 runs asynchronously or during backfill, the global
DbalTransactionInterceptorwraps the entire execution in one big transaction — overriding the projection's own per-batch transaction management. This means all batches commit or rollback together instead of independently. Additionally, theCollectorSenderInterceptorcollects messages sent duringflush()and releases them only after all batches complete, outside the per-batch transaction scope.Description of Changes
Disable global DbalTransaction for projection execute and backfill handlers — Added
WithoutDbalTransactionendpoint annotation to both theexecute()andBackfillExecutorHandlermessage handlers inProjectingModule. In async mode, the projection's ownexecuteSingleBatch()now manages real per-batch transactions (sinceisTransactionActive()returnsfalse). In synchronous mode (within CommandBus), the existing transaction is still detected and reused viaNoOpTransaction.Pause Collector during ProjectionFlush — Added a static
pause()/resume()mechanism toCollectorStorageand a newCollectorPauseInterceptor(around interceptor onProjectionFlushpointcut). When a projection batch executes, collector is paused so messages sent duringflush()go directly to the channel within the batch transaction, rather than being collected and released later.Updated
MessageCollectorChannelInterceptor—preSend()now checksCollectorStorage::isPaused()and lets messages through when paused.Tests — Unit tests for
CollectorStoragepause/resume mechanism, integration test verifying messages sent during projection flush bypass the collector.Flow: Async Projection with Collector (after fix)
sequenceDiagram participant AsyncConsumer participant ProjectingManager participant executeSingleBatch participant Flush participant Channel AsyncConsumer->>ProjectingManager: execute() [no global DbalTransaction] loop Per batch ProjectingManager->>executeSingleBatch: [ProjectionFlush pointcut] Note over executeSingleBatch: beginTransaction() → real DB transaction Note over executeSingleBatch: CollectorStorage::pause() executeSingleBatch->>Flush: flush() Flush->>Channel: send message (bypasses collector) Note over executeSingleBatch: commit transaction Note over executeSingleBatch: CollectorStorage::resume() endPull Request Contribution Terms