diff --git a/packages/Ecotone/src/Messaging/Channel/Collector/CollectorPauseInterceptor.php b/packages/Ecotone/src/Messaging/Channel/Collector/CollectorPauseInterceptor.php new file mode 100644 index 000000000..87d80c207 --- /dev/null +++ b/packages/Ecotone/src/Messaging/Channel/Collector/CollectorPauseInterceptor.php @@ -0,0 +1,23 @@ +proceed(); + } finally { + CollectorStorage::resume(); + } + } +} diff --git a/packages/Ecotone/src/Messaging/Channel/Collector/CollectorStorage.php b/packages/Ecotone/src/Messaging/Channel/Collector/CollectorStorage.php index a6e454ac7..17861c750 100644 --- a/packages/Ecotone/src/Messaging/Channel/Collector/CollectorStorage.php +++ b/packages/Ecotone/src/Messaging/Channel/Collector/CollectorStorage.php @@ -17,6 +17,8 @@ */ final class CollectorStorage { + private static int $pauseDepth = 0; + /** * @param CollectedMessage[] $collectedMessages */ @@ -26,6 +28,21 @@ public function __construct( ) { } + public static function pause(): void + { + self::$pauseDepth++; + } + + public static function resume(): void + { + self::$pauseDepth--; + } + + public static function isPaused(): bool + { + return self::$pauseDepth > 0; + } + public function enable(): void { $this->enabled = true; diff --git a/packages/Ecotone/src/Messaging/Channel/Collector/Config/CollectorModule.php b/packages/Ecotone/src/Messaging/Channel/Collector/Config/CollectorModule.php index d18ef2b37..57065e98e 100644 --- a/packages/Ecotone/src/Messaging/Channel/Collector/Config/CollectorModule.php +++ b/packages/Ecotone/src/Messaging/Channel/Collector/Config/CollectorModule.php @@ -7,6 +7,7 @@ use Ecotone\AnnotationFinder\AnnotationFinder; use Ecotone\Messaging\Attribute\AsynchronousRunningEndpoint; use Ecotone\Messaging\Attribute\ModuleAnnotation; +use Ecotone\Messaging\Channel\Collector\CollectorPauseInterceptor; use Ecotone\Messaging\Channel\Collector\CollectorSenderInterceptor; use Ecotone\Messaging\Channel\Collector\CollectorStorage; use Ecotone\Messaging\Channel\DynamicChannel\DynamicMessageChannelBuilder; @@ -26,6 +27,7 @@ use Ecotone\Messaging\Handler\Processor\MethodInvoker\AroundInterceptorBuilder; use Ecotone\Messaging\Precedence; use Ecotone\Modelling\CommandBus; +use Ecotone\Projecting\Attribute\ProjectionFlush; #[ModuleAnnotation] /** @@ -51,6 +53,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO } + $hasCollectorEnabled = false; foreach ($pollableMessageChannels as $pollableMessageChannel) { $channelConfiguration = $globalPollableChannelConfiguration; @@ -64,6 +67,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO continue; } + $hasCollectorEnabled = true; $collectorReference = Reference::to('polling.'.$pollableMessageChannel->getMessageChannelName().'.collector_storage'); $messagingConfiguration->registerServiceDefinition($collectorReference, new Definition(CollectorStorage::class)); $messagingConfiguration->registerChannelInterceptor( @@ -91,6 +95,21 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO ) ); } + + if ($hasCollectorEnabled) { + $messagingConfiguration->registerServiceDefinition( + CollectorPauseInterceptor::class, + new Definition(CollectorPauseInterceptor::class) + ); + $messagingConfiguration->registerAroundMethodInterceptor( + AroundInterceptorBuilder::create( + CollectorPauseInterceptor::class, + $interfaceToCallRegistry->getFor(CollectorPauseInterceptor::class, 'pauseCollecting'), + Precedence::COLLECTOR_SENDER_PRECEDENCE, + ProjectionFlush::class + ) + ); + } } public static function create(AnnotationFinder $annotationRegistrationService, InterfaceToCallRegistry $interfaceToCallRegistry): static diff --git a/packages/Ecotone/src/Messaging/Channel/Collector/MessageCollectorChannelInterceptor.php b/packages/Ecotone/src/Messaging/Channel/Collector/MessageCollectorChannelInterceptor.php index 3f646cb0f..d57336d32 100644 --- a/packages/Ecotone/src/Messaging/Channel/Collector/MessageCollectorChannelInterceptor.php +++ b/packages/Ecotone/src/Messaging/Channel/Collector/MessageCollectorChannelInterceptor.php @@ -23,7 +23,7 @@ public function __construct( public function preSend(Message $message, MessageChannel $messageChannel): ?Message { - if ($this->collectorStorage->isEnabled()) { + if ($this->collectorStorage->isEnabled() && ! CollectorStorage::isPaused()) { $this->collectorStorage->collect($message, $this->logger); $message = null; diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 75fae5c9e..f6e8b06bb 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -58,6 +58,8 @@ public static function create(AnnotationFinder $annotationRegistrationService, I return new self(); } + private const WITHOUT_DBAL_TRANSACTION_CLASS = 'Ecotone\Dbal\DbalTransaction\WithoutDbalTransaction'; + public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void { $serviceConfiguration = ExtensionObjectResolver::resolveUnique(ServiceConfiguration::class, $extensionObjects, ServiceConfiguration::createWithDefaults()); @@ -97,25 +99,27 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO ); $projectionRegistryMap[$projectionName] = new Reference($projectingManagerReference); - $messagingConfiguration->registerMessageHandler( - MessageProcessorActivatorBuilder::create() - ->chainInterceptedProcessor( - MethodInvokerBuilder::create( - $projectingManagerReference, - InterfaceToCallReference::create(ProjectingManager::class, 'execute'), - [ - $projectionBuilder->partitionHeader() - ? HeaderBuilder::create('partitionKeyValue', $projectionBuilder->partitionHeader()) - : ($projectionBuilder->isPartitioned() - ? new PartitionHeaderBuilder('partitionKeyValue') - : ValueBuilder::create('partitionKeyValue', null)), - HeaderBuilder::createOptional('manualInitialization', ProjectingHeaders::MANUAL_INITIALIZATION), - ], - ) + $executeHandlerBuilder = MessageProcessorActivatorBuilder::create() + ->chainInterceptedProcessor( + MethodInvokerBuilder::create( + $projectingManagerReference, + InterfaceToCallReference::create(ProjectingManager::class, 'execute'), + [ + $projectionBuilder->partitionHeader() + ? HeaderBuilder::create('partitionKeyValue', $projectionBuilder->partitionHeader()) + : ($projectionBuilder->isPartitioned() + ? new PartitionHeaderBuilder('partitionKeyValue') + : ValueBuilder::create('partitionKeyValue', null)), + HeaderBuilder::createOptional('manualInitialization', ProjectingHeaders::MANUAL_INITIALIZATION), + ], ) - ->withEndpointId(self::endpointIdForProjection($projectionName)) - ->withInputChannelName(self::inputChannelForProjectingManager($projectionName)) - ); + ) + ->withEndpointId(self::endpointIdForProjection($projectionName)) + ->withInputChannelName(self::inputChannelForProjectingManager($projectionName)); + if (class_exists(self::WITHOUT_DBAL_TRANSACTION_CLASS)) { + $executeHandlerBuilder = $executeHandlerBuilder->withEndpointAnnotations([new AttributeDefinition(self::WITHOUT_DBAL_TRANSACTION_CLASS)]); + } + $messagingConfiguration->registerMessageHandler($executeHandlerBuilder); $messagingConfiguration->registerMessageHandler( MessageProcessorActivatorBuilder::create() @@ -160,25 +164,27 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO ]) ); - $messagingConfiguration->registerMessageHandler( - MessageProcessorActivatorBuilder::create() - ->chainInterceptedProcessor( - MethodInvokerBuilder::create( - BackfillExecutorHandler::class, - InterfaceToCallReference::create(BackfillExecutorHandler::class, 'executeBackfillBatch'), - [ - PayloadBuilder::create('projectionName'), - HeaderBuilder::createOptional('limit', 'backfill.limit'), - HeaderBuilder::createOptional('offset', 'backfill.offset'), - HeaderBuilder::createOptional('streamName', 'backfill.streamName'), - HeaderBuilder::createOptional('aggregateType', 'backfill.aggregateType'), - HeaderBuilder::createOptional('eventStoreReferenceName', 'backfill.eventStoreReferenceName'), - ], - ) + $backfillHandlerBuilder = MessageProcessorActivatorBuilder::create() + ->chainInterceptedProcessor( + MethodInvokerBuilder::create( + BackfillExecutorHandler::class, + InterfaceToCallReference::create(BackfillExecutorHandler::class, 'executeBackfillBatch'), + [ + PayloadBuilder::create('projectionName'), + HeaderBuilder::createOptional('limit', 'backfill.limit'), + HeaderBuilder::createOptional('offset', 'backfill.offset'), + HeaderBuilder::createOptional('streamName', 'backfill.streamName'), + HeaderBuilder::createOptional('aggregateType', 'backfill.aggregateType'), + HeaderBuilder::createOptional('eventStoreReferenceName', 'backfill.eventStoreReferenceName'), + ], ) - ->withEndpointId('backfill_executor_handler') - ->withInputChannelName(BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL) - ); + ) + ->withEndpointId('backfill_executor_handler') + ->withInputChannelName(BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL); + if (class_exists(self::WITHOUT_DBAL_TRANSACTION_CLASS)) { + $backfillHandlerBuilder = $backfillHandlerBuilder->withEndpointAnnotations([new AttributeDefinition(self::WITHOUT_DBAL_TRANSACTION_CLASS)]); + } + $messagingConfiguration->registerMessageHandler($backfillHandlerBuilder); // Register console commands $messagingConfiguration->registerServiceDefinition(ProjectingConsoleCommands::class, new Definition(ProjectingConsoleCommands::class, [new Reference(ProjectionRegistry::class)])); diff --git a/packages/Ecotone/tests/Messaging/Unit/Channel/Collector/CollectorStoragePauseTest.php b/packages/Ecotone/tests/Messaging/Unit/Channel/Collector/CollectorStoragePauseTest.php new file mode 100644 index 000000000..18078947f --- /dev/null +++ b/packages/Ecotone/tests/Messaging/Unit/Channel/Collector/CollectorStoragePauseTest.php @@ -0,0 +1,48 @@ +wasUsed, 'Userland state storage should be prioritized and used'); } + + public function test_collector_is_paused_during_projection_flush(): void + { + $projection = new #[ProjectionV2('collector_projection'), FromStream('test_stream')] class () { + public array $processedEvents = []; + public int $flushCount = 0; + public ?EventBus $eventBus = null; + + #[EventHandler('*')] + public function handle(array $event, #[\Ecotone\Messaging\Attribute\Parameter\Reference] EventBus $eventBus): void + { + $this->eventBus = $eventBus; + $this->processedEvents[] = $event; + } + + #[ProjectionFlush] + public function flush(): void + { + $this->flushCount++; + if ($this->eventBus && count($this->processedEvents) > 0) { + $this->eventBus->publish(new stdClass()); + } + } + }; + + $notificationHandler = new class () { + public int $receivedCount = 0; + + #[Asynchronous('notifications')] + #[EventHandler(endpointId: 'notification_handler')] + public function handle(stdClass $event): void + { + $this->receivedCount++; + } + }; + + $ecotone = EcotoneLite::bootstrapFlowTesting( + [$projection::class, $notificationHandler::class], + [$projection, $notificationHandler], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE])) + ->withLicenceKey(LicenceTesting::VALID_LICENCE) + ->withExtensionObjects([ + PollableChannelConfiguration::neverRetry('notifications')->withCollector(true), + ]), + enableAsynchronousProcessing: [ + SimpleMessageChannelBuilder::createQueueChannel('notifications'), + ], + ); + + $ecotone->withEvents([ + Event::createWithType('test-event', ['name' => 'Test']), + ]); + + $ecotone->triggerProjection('collector_projection'); + + self::assertCount(1, $projection->processedEvents); + self::assertGreaterThanOrEqual(1, $projection->flushCount); + $message = $ecotone->getMessageChannel('notifications')->receive(); + self::assertNotNull($message, 'Message sent during flush should bypass collector and go directly to channel'); + } }