diff --git a/.claude/commands/create-pr.md b/.claude/commands/create-pr.md index 07bdf0fe9..486ae212e 100644 --- a/.claude/commands/create-pr.md +++ b/.claude/commands/create-pr.md @@ -73,9 +73,9 @@ Keep the title under 70 characters, concise and descriptive. **"Pull Request Contribution Terms"** section: - Ask the user using AskUserQuestion: "Do you agree to the contribution terms outlined in CONTRIBUTING.md?" with options: - "Yes, I agree" — Mark the checkbox with `[X]` - - "No" — Leave the checkbox empty `[ ]` + - "No" — Leave the checkbox empty `[]` - If agreed: `- [X] I have read and agree to the contribution terms outlined in [CONTRIBUTING](https://github.com/ecotoneframework/ecotone-dev/blob/main/CONTRIBUTING.md)` -- If not agreed: `- [ ] I have read and agree to the contribution terms outlined in [CONTRIBUTING](https://github.com/ecotoneframework/ecotone-dev/blob/main/CONTRIBUTING.md)` +- If not agreed: `- [] I have read and agree to the contribution terms outlined in [CONTRIBUTING](https://github.com/ecotoneframework/ecotone-dev/blob/main/CONTRIBUTING.md)` ### Step 4: Review with User diff --git a/packages/Ecotone/src/Projecting/AggregatePartitionKey.php b/packages/Ecotone/src/Projecting/AggregatePartitionKey.php new file mode 100644 index 000000000..8393ee8d5 --- /dev/null +++ b/packages/Ecotone/src/Projecting/AggregatePartitionKey.php @@ -0,0 +1,33 @@ + $parts[0], + 'aggregateType' => $parts[1], + 'aggregateId' => $parts[2], + ]; + } +} diff --git a/packages/Ecotone/src/Projecting/Attribute/PartitionAggregateId.php b/packages/Ecotone/src/Projecting/Attribute/PartitionAggregateId.php new file mode 100644 index 000000000..1f1a462f9 --- /dev/null +++ b/packages/Ecotone/src/Projecting/Attribute/PartitionAggregateId.php @@ -0,0 +1,25 @@ +partitionBatchSize < 1) { + throw new InvalidArgumentException('Rebuild partition batch size must be at least 1'); + } + } +} diff --git a/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php b/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php deleted file mode 100644 index 50186219c..000000000 --- a/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php +++ /dev/null @@ -1,55 +0,0 @@ -projectionRegistry->get($projectionName); - $streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName); - - foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) { - $projectingManager->execute($partition, true); - if ($this->terminationListener->shouldTerminate()) { - break; - } - } - } -} diff --git a/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php b/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php index 0ca04eeaf..2c7f1f982 100644 --- a/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php +++ b/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php @@ -27,6 +27,7 @@ class EcotoneProjectionExecutorBuilder implements ProjectionExecutorBuilder { private const DEFAULT_EVENT_LOADING_BATCH_SIZE = 1_000; private const DEFAULT_BACKFILL_PARTITION_BATCH_SIZE = 100; + private const DEFAULT_REBUILD_PARTITION_BATCH_SIZE = 100; /** * @param AnnotatedDefinition[] $projectionEventHandlers @@ -46,6 +47,9 @@ public function __construct( private ?int $backfillPartitionBatchSize = null, private ?string $backfillAsyncChannelName = null, private bool $partitioned = false, + private ?string $resetChannel = null, + private ?int $rebuildPartitionBatchSize = null, + private ?string $rebuildAsyncChannelName = null, ) { if ($this->partitionHeader && ! $this->automaticInitialization) { throw new ConfigurationException("Cannot set partition header for projection {$this->projectionName} with automatic initialization disabled"); @@ -92,6 +96,11 @@ public function setFlushChannel(string $inputChannel): void $this->flushChannel = $inputChannel; } + public function setResetChannel(string $inputChannel): void + { + $this->resetChannel = $inputChannel; + } + public function setAsyncChannel(string $asynchronousChannelName): void { $this->asyncChannelName = $asynchronousChannelName; @@ -117,6 +126,16 @@ public function backfillAsyncChannelName(): ?string return $this->backfillAsyncChannelName; } + public function rebuildPartitionBatchSize(): int + { + return $this->rebuildPartitionBatchSize ?? self::DEFAULT_REBUILD_PARTITION_BATCH_SIZE; + } + + public function rebuildAsyncChannelName(): ?string + { + return $this->rebuildAsyncChannelName; + } + public function compile(MessagingContainerBuilder $builder): Definition|Reference { $routerProcessor = $this->buildExecutionRouter($builder); @@ -129,6 +148,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc $this->deleteChannel, $this->flushChannel, $this->isLive, + $this->resetChannel, ]); } diff --git a/packages/Ecotone/src/Projecting/Config/PartitionHeaderConverter.php b/packages/Ecotone/src/Projecting/Config/PartitionHeaderConverter.php index 566c089bb..7cc5bd8f8 100644 --- a/packages/Ecotone/src/Projecting/Config/PartitionHeaderConverter.php +++ b/packages/Ecotone/src/Projecting/Config/PartitionHeaderConverter.php @@ -10,15 +10,16 @@ use Ecotone\Messaging\Handler\ParameterConverter; use Ecotone\Messaging\Message; use Ecotone\Messaging\MessageHeaders; +use Ecotone\Projecting\AggregatePartitionKey; class PartitionHeaderConverter implements ParameterConverter { public function getArgumentFrom(Message $message): string { - $streamName = $message->getHeaders()->get(MessageHeaders::EVENT_STREAM_NAME); - $aggregateType = $message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_TYPE); - $aggregateId = $message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_ID); - - return "{$streamName}:{$aggregateType}:{$aggregateId}"; + return AggregatePartitionKey::compose( + $message->getHeaders()->get(MessageHeaders::EVENT_STREAM_NAME), + $message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_TYPE), + $message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_ID), + ); } } diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php index 8134c8c9b..5a542cf4e 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php @@ -12,6 +12,7 @@ use Ecotone\AnnotationFinder\AnnotationFinder; use Ecotone\EventSourcing\Attribute\ProjectionDelete; use Ecotone\EventSourcing\Attribute\ProjectionInitialization; +use Ecotone\EventSourcing\Attribute\ProjectionReset; use Ecotone\Messaging\Attribute\Asynchronous; use Ecotone\Messaging\Attribute\ModuleAnnotation; use Ecotone\Messaging\Config\Annotation\AnnotatedDefinitionReference; @@ -36,6 +37,7 @@ use Ecotone\Projecting\Attribute\Polling; use Ecotone\Projecting\Attribute\ProjectionBackfill; use Ecotone\Projecting\Attribute\ProjectionDeployment; +use Ecotone\Projecting\Attribute\ProjectionRebuild; use Ecotone\Projecting\Attribute\ProjectionExecution; use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\Attribute\ProjectionV2; @@ -80,6 +82,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $projectionAttribute = $annotationRegistrationService->getAttributeForClass($projectionClassName, ProjectionV2::class); $batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionExecution::class); $backfillAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBackfill::class); + $rebuildAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionRebuild::class); $pollingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Polling::class); $streamingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Streaming::class); $projectionDeployment = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionDeployment::class); @@ -100,6 +103,8 @@ public static function create(AnnotationFinder $annotationRegistrationService, I backfillPartitionBatchSize: $backfillAttribute?->backfillPartitionBatchSize, backfillAsyncChannelName: $backfillAttribute?->asyncChannelName, partitioned: $partitionAttribute !== null, + rebuildPartitionBatchSize: $rebuildAttribute?->partitionBatchSize, + rebuildAsyncChannelName: $rebuildAttribute?->asyncChannelName, ); $asynchronousChannelName = self::getProjectionAsynchronousChannel($annotationRegistrationService, $projectionClassName); @@ -140,6 +145,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionInitialization::class), $annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionDelete::class), $annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionFlush::class), + $annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionReset::class), ); foreach ($lifecycleAnnotations as $lifecycleAnnotation) { /** @var ProjectionV2 $projectionAttribute */ @@ -153,6 +159,8 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $projectionBuilder->setDeleteChannel($inputChannel); } elseif ($lifecycleAnnotation->getAnnotationForMethod() instanceof ProjectionFlush) { $projectionBuilder->setFlushChannel($inputChannel); + } elseif ($lifecycleAnnotation->getAnnotationForMethod() instanceof ProjectionReset) { + $projectionBuilder->setResetChannel($inputChannel); } diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php b/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php index a90ad9951..1dd04dc00 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php @@ -45,6 +45,15 @@ public function backfillProjection(string $name): void $this->registry->get($name)->prepareBackfill(); } + #[ConsoleCommand('ecotone:projection:rebuild')] + public function rebuildProjection(string $name): void + { + if (! $this->registry->has($name)) { + throw new InvalidArgumentException("There is no projection with name {$name}"); + } + $this->registry->get($name)->prepareRebuild(); + } + #[ConsoleCommand('ecotone:projection:delete')] public function deleteProjection(string $name): void { diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 75fae5c9e..ccb007010 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -29,7 +29,7 @@ use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvokerBuilder; use Ecotone\Messaging\Handler\ServiceActivator\MessageProcessorActivatorBuilder; use Ecotone\Projecting\Attribute\ProjectionFlush; -use Ecotone\Projecting\BackfillExecutorHandler; +use Ecotone\Projecting\PartitionBatchExecutorHandler; use Ecotone\Projecting\InMemory\InMemoryProjectionRegistry; use Ecotone\Projecting\PartitionProviderRegistry; use Ecotone\Projecting\ProjectingHeaders; @@ -93,6 +93,8 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $projectionBuilder->automaticInitialization(), $projectionBuilder->backfillPartitionBatchSize(), $projectionBuilder->backfillAsyncChannelName(), + $projectionBuilder->rebuildPartitionBatchSize(), + $projectionBuilder->rebuildAsyncChannelName(), ]) ); $projectionRegistryMap[$projectionName] = new Reference($projectingManagerReference); @@ -122,10 +124,11 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO ->chainInterceptedProcessor( MethodInvokerBuilder::create( $projectingManagerReference, - InterfaceToCallReference::create(ProjectingManager::class, 'executeSingleBatch'), + InterfaceToCallReference::create(ProjectingManager::class, 'executePartitionBatch'), [ HeaderBuilder::createOptional('partitionKeyValue', ProjectingHeaders::PROJECTION_PARTITION_KEY), HeaderBuilder::create('canInitialize', ProjectingHeaders::PROJECTION_CAN_INITIALIZE), + HeaderBuilder::createOptional('shouldReset', 'projection.shouldReset'), ], ) ) @@ -151,10 +154,10 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO new Definition(InMemoryProjectionRegistry::class, [$projectionRegistryMap]) ); - // Register BackfillExecutorHandler and its message handler + // Register PartitionBatchExecutorHandler and its message handler $messagingConfiguration->registerServiceDefinition( - BackfillExecutorHandler::class, - new Definition(BackfillExecutorHandler::class, [ + PartitionBatchExecutorHandler::class, + new Definition(PartitionBatchExecutorHandler::class, [ new Reference(ProjectionRegistry::class), new Reference(TerminationListener::class), ]) @@ -164,20 +167,21 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO MessageProcessorActivatorBuilder::create() ->chainInterceptedProcessor( MethodInvokerBuilder::create( - BackfillExecutorHandler::class, - InterfaceToCallReference::create(BackfillExecutorHandler::class, 'executeBackfillBatch'), + PartitionBatchExecutorHandler::class, + InterfaceToCallReference::create(PartitionBatchExecutorHandler::class, 'executeBatch'), [ PayloadBuilder::create('projectionName'), - HeaderBuilder::createOptional('limit', 'backfill.limit'), - HeaderBuilder::createOptional('offset', 'backfill.offset'), - HeaderBuilder::createOptional('streamName', 'backfill.streamName'), - HeaderBuilder::createOptional('aggregateType', 'backfill.aggregateType'), - HeaderBuilder::createOptional('eventStoreReferenceName', 'backfill.eventStoreReferenceName'), + HeaderBuilder::createOptional('limit', 'partitionBatch.limit'), + HeaderBuilder::createOptional('offset', 'partitionBatch.offset'), + HeaderBuilder::createOptional('streamName', 'partitionBatch.streamName'), + HeaderBuilder::createOptional('aggregateType', 'partitionBatch.aggregateType'), + HeaderBuilder::createOptional('eventStoreReferenceName', 'partitionBatch.eventStoreReferenceName'), + HeaderBuilder::createOptional('shouldReset', 'partitionBatch.shouldReset'), ], ) ) - ->withEndpointId('backfill_executor_handler') - ->withInputChannelName(BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL) + ->withEndpointId('partition_batch_executor_handler') + ->withInputChannelName(PartitionBatchExecutorHandler::PARTITION_BATCH_EXECUTOR_CHANNEL) ); // Register console commands diff --git a/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php b/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php index c309d20c0..08e7792af 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php @@ -19,4 +19,6 @@ public function automaticInitialization(): bool; public function eventLoadingBatchSize(): int; public function backfillPartitionBatchSize(): int; public function backfillAsyncChannelName(): ?string; + public function rebuildPartitionBatchSize(): int; + public function rebuildAsyncChannelName(): ?string; } diff --git a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php index 5a6decfc9..215b17035 100644 --- a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php @@ -28,6 +28,7 @@ public function __construct( private ?string $deleteChannel = null, private ?string $flushChannel = null, private bool $isLive = true, + private ?string $resetChannel = null, ) { } @@ -88,4 +89,25 @@ public function flush(mixed $userState = null): void ], $this->flushChannel); } } + + public function reset(?string $partitionKey = null): void + { + if ($this->resetChannel) { + $headers = [ + ProjectingHeaders::PROJECTION_NAME => $this->projectionName, + ]; + + if ($partitionKey !== null) { + $headers[ProjectingHeaders::REBUILD_PARTITION_KEY] = $partitionKey; + + $decomposed = AggregatePartitionKey::decompose($partitionKey); + if ($decomposed !== null) { + $headers[MessageHeaders::EVENT_AGGREGATE_TYPE] = $decomposed['aggregateType']; + $headers[MessageHeaders::EVENT_AGGREGATE_ID] = $decomposed['aggregateId']; + } + } + + $this->messagingEntrypoint->sendWithHeaders([], $headers, $this->resetChannel); + } + } } diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php index 74188e895..6821fee45 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php @@ -77,4 +77,9 @@ public function flush(mixed $userState = null): void { // No flushing needed } + + public function reset(?string $partitionKey = null): void + { + // No reset needed + } } diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php index 4dabc0a72..392fae963 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php @@ -65,6 +65,16 @@ public function backfillAsyncChannelName(): ?string return null; } + public function rebuildPartitionBatchSize(): int + { + return 100; + } + + public function rebuildAsyncChannelName(): ?string + { + return null; + } + public function compile(MessagingContainerBuilder $builder): Definition|Reference { return new Definition( diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php index 90baf9366..17c3599ef 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php @@ -43,4 +43,9 @@ public function delete(): void public function flush(mixed $userState = null): void { } + + public function reset(?string $partitionKey = null): void + { + $this->projectedEvents = []; + } } diff --git a/packages/Ecotone/src/Projecting/PartitionBatchExecutorHandler.php b/packages/Ecotone/src/Projecting/PartitionBatchExecutorHandler.php new file mode 100644 index 000000000..49da5c550 --- /dev/null +++ b/packages/Ecotone/src/Projecting/PartitionBatchExecutorHandler.php @@ -0,0 +1,45 @@ +projectionRegistry->get($projectionName); + $streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName); + + foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) { + if ($shouldReset) { + $projectingManager->executeWithReset($partition); + } else { + $projectingManager->execute($partition, true); + } + if ($this->terminationListener->shouldTerminate()) { + break; + } + } + } +} diff --git a/packages/Ecotone/src/Projecting/ProjectingHeaders.php b/packages/Ecotone/src/Projecting/ProjectingHeaders.php index 2f5a57b91..a15871f91 100644 --- a/packages/Ecotone/src/Projecting/ProjectingHeaders.php +++ b/packages/Ecotone/src/Projecting/ProjectingHeaders.php @@ -20,4 +20,5 @@ class ProjectingHeaders public const MANUAL_INITIALIZATION = 'projection.manual_initialization'; public const PROJECTION_PARTITION_KEY = 'projection.partitionKey'; public const PROJECTION_CAN_INITIALIZE = 'projection.canInitialize'; + public const REBUILD_PARTITION_KEY = 'projection.rebuild.partitionKey'; } diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index a92d24083..d2e87397f 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -15,6 +15,7 @@ class ProjectingManager { private const DEFAULT_BACKFILL_PARTITION_BATCH_SIZE = 100; + private const DEFAULT_REBUILD_PARTITION_BATCH_SIZE = 100; private ?ProjectionStateStorage $projectionStateStorage = null; @@ -31,6 +32,8 @@ public function __construct( private bool $automaticInitialization = true, private int $backfillPartitionBatchSize = self::DEFAULT_BACKFILL_PARTITION_BATCH_SIZE, private ?string $backfillAsyncChannelName = null, + private int $rebuildPartitionBatchSize = self::DEFAULT_REBUILD_PARTITION_BATCH_SIZE, + private ?string $rebuildAsyncChannelName = null, ) { if ($eventLoadingBatchSize < 1) { throw new InvalidArgumentException('Event loading batch size must be at least 1'); @@ -38,6 +41,9 @@ public function __construct( if ($backfillPartitionBatchSize < 1) { throw new InvalidArgumentException('Backfill partition batch size must be at least 1'); } + if ($rebuildPartitionBatchSize < 1) { + throw new InvalidArgumentException('Rebuild partition batch size must be at least 1'); + } } private function getProjectionStateStorage(): ProjectionStateStorage @@ -62,7 +68,7 @@ public function execute(?string $partitionKeyValue = null, bool $manualInitializ } while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true); } - public function executeSingleBatch(?string $partitionKeyValue = null, bool $canInitialize = false): int + public function executePartitionBatch(?string $partitionKeyValue = null, bool $canInitialize = false, bool $shouldReset = false): int { $transaction = $this->getProjectionStateStorage()->beginTransaction(); try { @@ -72,31 +78,46 @@ public function executeSingleBatch(?string $partitionKeyValue = null, bool $canI return 0; } - $streamSource = $this->streamSourceRegistry->getFor($this->projectionName); - $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); - - $userState = $projectionState->userState; - $processedEvents = 0; - foreach ($streamPage->events as $event) { - $userState = $this->projectorExecutor->project($event, $userState); - $processedEvents++; - } - if ($processedEvents > 0) { - $this->projectorExecutor->flush($userState); + if ($shouldReset) { + $this->projectorExecutor->reset($partitionKeyValue); + $projectionState = new ProjectionPartitionState( + $projectionState->projectionName, + $projectionState->partitionKey, + null, + null, + $projectionState->status, + ); } - $projectionState = $projectionState - ->withLastPosition($streamPage->lastPosition) - ->withUserState($userState); + $streamSource = $this->streamSourceRegistry->getFor($this->projectionName); + $totalProcessedEvents = 0; + $userState = $projectionState->userState; - if ($processedEvents === 0 && $canInitialize) { - // If we are forcing execution and there are no new events, we still want to enable the projection if it was uninitialized + do { + $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); + + $batchProcessedEvents = 0; + foreach ($streamPage->events as $event) { + $userState = $this->projectorExecutor->project($event, $userState); + $batchProcessedEvents++; + } + if ($batchProcessedEvents > 0) { + $this->projectorExecutor->flush($userState); + } + + $totalProcessedEvents += $batchProcessedEvents; + $projectionState = $projectionState + ->withLastPosition($streamPage->lastPosition) + ->withUserState($userState); + } while ($shouldReset && $batchProcessedEvents >= $this->eventLoadingBatchSize); + + if ($totalProcessedEvents === 0 && $canInitialize) { $projectionState = $projectionState->withStatus(ProjectionInitializationStatus::INITIALIZED); } $this->getProjectionStateStorage()->savePartition($projectionState); $transaction->commit(); - return $processedEvents; + return $totalProcessedEvents; } catch (Throwable $e) { $transaction->rollBack(); throw $e; @@ -137,71 +158,85 @@ public function delete(): void $this->projectorExecutor->delete(); } + public function prepareBackfill(): void + { + $this->preparePartitionBatches($this->backfillPartitionBatchSize, $this->backfillAsyncChannelName, false); + } + /** - * Prepares backfill by calculating batches and sending messages to BackfillExecutorHandler. - * Each batch message contains a limit and offset for processing a subset of partitions. - * This enables the backfill to be executed synchronously or asynchronously depending on configuration. + * @deprecated Use prepareBackfill() instead. This method is kept for backward compatibility. */ - public function prepareBackfill(): void + public function backfill(): void { - $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); + $this->prepareBackfill(); + } - foreach ($streamFilters as $streamFilter) { - $this->prepareBackfillForFilter($streamFilter); - } + public function executeWithReset(?string $partitionKeyValue = null): void + { + $this->messagingEntrypoint->sendWithHeaders( + [], + [ + ProjectingHeaders::PROJECTION_PARTITION_KEY => $partitionKeyValue, + ProjectingHeaders::PROJECTION_CAN_INITIALIZE => true, + 'projection.shouldReset' => true, + ], + self::batchChannelFor($this->projectionName) + ); } - private function prepareBackfillForFilter(StreamFilter $streamFilter): void + public function prepareRebuild(): void { - $totalPartitions = $this->getPartitionProvider()->count($streamFilter); + $this->preparePartitionBatches($this->rebuildPartitionBatchSize, $this->rebuildAsyncChannelName, true); + } - if ($totalPartitions === 0) { - return; - } + private function preparePartitionBatches(int $partitionBatchSize, ?string $asyncChannelName, bool $shouldReset): void + { + $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); + + foreach ($streamFilters as $streamFilter) { + $totalPartitions = $this->getPartitionProvider()->count($streamFilter); + + if ($totalPartitions === 0) { + continue; + } - $numberOfBatches = (int) ceil($totalPartitions / $this->backfillPartitionBatchSize); + $numberOfBatches = (int) ceil($totalPartitions / $partitionBatchSize); - for ($batch = 0; $batch < $numberOfBatches; $batch++) { - $offset = $batch * $this->backfillPartitionBatchSize; + for ($batch = 0; $batch < $numberOfBatches; $batch++) { + $offset = $batch * $partitionBatchSize; - $headers = [ - 'backfill.limit' => $this->backfillPartitionBatchSize, - 'backfill.offset' => $offset, - 'backfill.streamName' => $streamFilter->streamName, - 'backfill.aggregateType' => $streamFilter->aggregateType, - 'backfill.eventStoreReferenceName' => $streamFilter->eventStoreReferenceName, - ]; + $headers = [ + 'partitionBatch.limit' => $partitionBatchSize, + 'partitionBatch.offset' => $offset, + 'partitionBatch.streamName' => $streamFilter->streamName, + 'partitionBatch.aggregateType' => $streamFilter->aggregateType, + 'partitionBatch.eventStoreReferenceName' => $streamFilter->eventStoreReferenceName, + 'partitionBatch.shouldReset' => $shouldReset, + ]; - $this->sendBackfillMessage($headers); + $this->sendPartitionBatchMessage($headers, $asyncChannelName); + } } } - private function sendBackfillMessage(array $headers): void + private function sendPartitionBatchMessage(array $headers, ?string $asyncChannelName): void { - if ($this->backfillAsyncChannelName !== null) { + if ($asyncChannelName !== null) { $this->messagingEntrypoint->sendWithHeaders( $this->projectionName, $headers, - $this->backfillAsyncChannelName, - BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL + $asyncChannelName, + PartitionBatchExecutorHandler::PARTITION_BATCH_EXECUTOR_CHANNEL ); } else { $this->messagingEntrypoint->sendWithHeaders( $this->projectionName, $headers, - BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL + PartitionBatchExecutorHandler::PARTITION_BATCH_EXECUTOR_CHANNEL ); } } - /** - * @deprecated Use prepareBackfill() instead. This method is kept for backward compatibility. - */ - public function backfill(): void - { - $this->prepareBackfill(); - } - private function loadOrInitializePartitionState(?string $partitionKey, bool $canInitialize): ?ProjectionPartitionState { $storage = $this->getProjectionStateStorage(); diff --git a/packages/Ecotone/src/Projecting/ProjectorExecutor.php b/packages/Ecotone/src/Projecting/ProjectorExecutor.php index 261dffa67..ebaa11419 100644 --- a/packages/Ecotone/src/Projecting/ProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/ProjectorExecutor.php @@ -19,4 +19,5 @@ public function project(Event $event, mixed $userState = null): mixed; public function init(): void; public function delete(): void; public function flush(mixed $userState = null): void; + public function reset(?string $partitionKey = null): void; } diff --git a/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php new file mode 100644 index 000000000..3379a915e --- /dev/null +++ b/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php @@ -0,0 +1,438 @@ +connection->executeStatement( + "INSERT INTO {$this->tableName()} VALUES (?,?)", + [$event->getTicketId(), $event->getTicketType()] + ); + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement( + "CREATE TABLE IF NOT EXISTS {$this->tableName()} (ticket_id VARCHAR(36) PRIMARY KEY, ticket_type VARCHAR(25))" + ); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->connection->executeStatement("DROP TABLE IF EXISTS {$this->tableName()}"); + } + + #[ProjectionReset] + public function reset(): void + { + $this->connection->executeStatement("DELETE FROM {$this->tableName()}"); + } + + public function getTickets(): array + { + return $this->connection->executeQuery("SELECT * FROM {$this->tableName()} ORDER BY ticket_id ASC")->fetchAllAssociative(); + } +} + +abstract class AbstractRebuildPartitionedProjection +{ + public function __construct(protected Connection $connection) + { + } + + abstract protected function tableName(): string; + + #[EventHandler] + public function addTicket(TicketWasRegistered $event): void + { + $this->connection->executeStatement( + "INSERT INTO {$this->tableName()} VALUES (?,?)", + [$event->getTicketId(), $event->getTicketType()] + ); + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement( + "CREATE TABLE IF NOT EXISTS {$this->tableName()} (ticket_id VARCHAR(36) PRIMARY KEY, ticket_type VARCHAR(25))" + ); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->connection->executeStatement("DROP TABLE IF EXISTS {$this->tableName()}"); + } + + #[ProjectionReset] + public function reset( + #[PartitionAggregateId] string $aggregateId, + ): void { + $this->connection->executeStatement("DELETE FROM {$this->tableName()} WHERE ticket_id = ?", [$aggregateId]); + } + + public function getTickets(): array + { + return $this->connection->executeQuery("SELECT * FROM {$this->tableName()} ORDER BY ticket_id ASC")->fetchAllAssociative(); + } +} + +#[ProjectionV2('rebuild_rollback')] +#[Partitioned] +#[ProjectionRebuild] +#[ProjectionExecution(eventLoadingBatchSize: 3)] +#[FromStream(stream: Ticket::class, aggregateType: Ticket::class)] +class RebuildRollbackProjection +{ + public bool $shouldFailOnProjection = false; + public int $projectedEventsCount = 0; + + public function __construct(private Connection $connection) + { + } + + #[EventHandler] + public function onTicketRegistered(TicketWasRegistered $event): void + { + $this->handleEvent($event->getTicketId()); + if ($this->connection->getDatabasePlatform() instanceof MySQLPlatform) { + $this->connection->executeStatement('INSERT IGNORE INTO rebuild_rollback_tickets VALUES (?,?)', [$event->getTicketId(), $event->getTicketType()]); + } else { + $this->connection->executeStatement('INSERT INTO rebuild_rollback_tickets VALUES (?,?) ON CONFLICT(ticket_id) DO NOTHING', [$event->getTicketId(), $event->getTicketType()]); + } + } + + #[EventHandler] + public function onAssignedPersonChanged(AssignedPersonWasChanged $event): void + { + $this->handleEvent($event->getTicketId()); + } + + #[EventHandler] + public function onTicketClosed(TicketWasClosed $event): void + { + $this->handleEvent($event->getTicketId()); + } + + private function handleEvent(string $ticketId): void + { + if ($this->shouldFailOnProjection) { + $this->projectedEventsCount++; + if ($this->projectedEventsCount >= 4) { + throw new RuntimeException('Projection failed on 4th event during rebuild'); + } + } + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement( + 'CREATE TABLE IF NOT EXISTS rebuild_rollback_tickets (ticket_id VARCHAR(36) PRIMARY KEY, ticket_type VARCHAR(25))' + ); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->connection->executeStatement('DROP TABLE IF EXISTS rebuild_rollback_tickets'); + } + + #[ProjectionReset] + public function reset( + #[PartitionAggregateId] string $aggregateId, + ): void { + $this->connection->executeStatement('DELETE FROM rebuild_rollback_tickets WHERE ticket_id = ?', [$aggregateId]); + } + + #[QueryHandler('getRebuildRollbackTickets')] + public function query(): array + { + return $this->connection->executeQuery('SELECT * FROM rebuild_rollback_tickets ORDER BY ticket_id ASC')->fetchAllAssociative(); + } +} + +/** + * licence Enterprise + * @internal + */ +final class RebuildProjectionTest extends ProjectingTestCase +{ + public function test_throws_exception_when_rebuild_batch_size_is_less_than_one(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Rebuild partition batch size must be at least 1'); + + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_batch0_projection'), Partitioned, ProjectionRebuild(partitionBatchSize: 0), FromStream(stream: Ticket::class, aggregateType: Ticket::class) ] class ($connection) extends AbstractRebuildPartitionedProjection { + protected function tableName(): string + { + return 'rebuild_batch0_tickets'; + } + }; + + $this->bootstrapEcotone([$projection::class], [$projection], true); + } + + public function test_partitioned_projection_async_rebuild_with_batch_of_2(): void + { + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_batch2_async'), Partitioned, ProjectionRebuild(partitionBatchSize: 2, asyncChannelName: 'rebuild_channel'), FromStream(stream: Ticket::class, aggregateType: Ticket::class) ] class ($connection) extends AbstractRebuildPartitionedProjection { + #[QueryHandler('getRebuildBatch2Tickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'rebuild_batch2_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone( + [$projection::class], + [$projection], + [SimpleMessageChannelBuilder::createQueueChannel('rebuild_channel')], + TestConfiguration::createWithDefaults()->withSpyOnChannel('rebuild_channel') + ); + + $this->createPartitions($ecotone, 5); + self::assertCount(5, $ecotone->sendQueryWithRouting('getRebuildBatch2Tickets')); + + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_batch2_async']); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('rebuild_channel'); + self::assertCount(3, $messages); + + $ecotone->run('rebuild_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + $ecotone->run('rebuild_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + $ecotone->run('rebuild_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(5, $ecotone->sendQueryWithRouting('getRebuildBatch2Tickets')); + } + + public function test_partitioned_projection_sync_rebuild(): void + { + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_sync_partitioned'), Partitioned, ProjectionRebuild(partitionBatchSize: 2), FromStream(stream: Ticket::class, aggregateType: Ticket::class) ] class ($connection) extends AbstractRebuildPartitionedProjection { + #[QueryHandler('getRebuildSyncPartitionedTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'rebuild_sync_partitioned_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $this->createPartitions($ecotone, 5); + + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_sync_partitioned']); + + self::assertCount(5, $ecotone->sendQueryWithRouting('getRebuildSyncPartitionedTickets')); + } + + public function test_global_projection_async_rebuild(): void + { + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_global_async'), ProjectionRebuild(asyncChannelName: 'rebuild_global_channel'), FromStream(Ticket::class) ] class ($connection) extends AbstractRebuildGlobalProjection { + #[QueryHandler('getRebuildGlobalAsyncTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'rebuild_global_async_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone( + [$projection::class], + [$projection], + [SimpleMessageChannelBuilder::createQueueChannel('rebuild_global_channel')], + TestConfiguration::createWithDefaults()->withSpyOnChannel('rebuild_global_channel') + ); + + $this->createTickets($ecotone, 3); + + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_global_async']); + + self::assertCount(3, $ecotone->sendQueryWithRouting('getRebuildGlobalAsyncTickets')); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('rebuild_global_channel'); + self::assertCount(1, $messages); + + $ecotone->run('rebuild_global_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(3, $ecotone->sendQueryWithRouting('getRebuildGlobalAsyncTickets')); + } + + public function test_global_projection_sync_rebuild(): void + { + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_global_sync'), ProjectionRebuild, FromStream(Ticket::class) ] class ($connection) extends AbstractRebuildGlobalProjection { + #[QueryHandler('getRebuildGlobalSyncTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'rebuild_global_sync_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $this->createTickets($ecotone, 3); + + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_global_sync']); + + self::assertCount(3, $ecotone->sendQueryWithRouting('getRebuildGlobalSyncTickets')); + } + + public function test_rebuild_resets_existing_data(): void + { + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_resets_data'), Partitioned, ProjectionRebuild, FromStream(stream: Ticket::class, aggregateType: Ticket::class) ] class ($connection) extends AbstractRebuildPartitionedProjection { + #[QueryHandler('getRebuildResetsDataTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'rebuild_resets_data_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $ecotone->sendCommand(new RegisterTicket('1', 'User1', 'alert')); + $ecotone->sendCommand(new RegisterTicket('2', 'User2', 'info')); + + self::assertCount(2, $ecotone->sendQueryWithRouting('getRebuildResetsDataTickets')); + + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_resets_data']); + + $tickets = $ecotone->sendQueryWithRouting('getRebuildResetsDataTickets'); + self::assertCount(2, $tickets); + self::assertSame('1', $tickets[0]['ticket_id']); + self::assertSame('2', $tickets[1]['ticket_id']); + } + + public function test_rebuild_rolls_back_on_exception_during_reprojection(): void + { + $connection = $this->getConnection(); + $projection = new RebuildRollbackProjection($connection); + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $ecotone->sendCommand(new RegisterTicket('1', 'User1', 'alert')); + $ecotone->sendCommand(new ChangeAssignedPerson('1', 'User2')); + $ecotone->sendCommand(new ChangeAssignedPerson('1', 'User3')); + $ecotone->sendCommand(new ChangeAssignedPerson('1', 'User4')); + $ecotone->sendCommand(new CloseTicket('1')); + + self::assertCount(1, $ecotone->sendQueryWithRouting('getRebuildRollbackTickets')); + self::assertTrue($ecotone->sendQueryWithRouting('ticket.isClosed', metadata: ['aggregate.id' => '1'])); + + $projection->shouldFailOnProjection = true; + + $thrownException = false; + try { + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_rollback']); + } catch (RuntimeException $e) { + $thrownException = true; + } + + self::assertTrue($thrownException, 'Expected RuntimeException to be thrown during rebuild'); + self::assertCount(1, $ecotone->sendQueryWithRouting('getRebuildRollbackTickets')); + self::assertTrue($ecotone->sendQueryWithRouting('ticket.isClosed', metadata: ['aggregate.id' => '1'])); + } + + private function createPartitions(FlowTestSupport $ecotone, int $count): void + { + for ($i = 1; $i <= $count; $i++) { + $ecotone->sendCommand(new RegisterTicket((string) $i, "User{$i}", "type{$i}")); + } + } + + private function createTickets(FlowTestSupport $ecotone, int $count): void + { + for ($i = 1; $i <= $count; $i++) { + $ecotone->sendCommand(new RegisterTicket((string) $i, "User{$i}", 'alert')); + } + } + + private function bootstrapEcotone(array $classesToResolve, array $services, bool|array $channels, ?TestConfiguration $testConfiguration = null): FlowTestSupport + { + return EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [...$classesToResolve, Ticket::class, TicketEventConverter::class], + containerOrAvailableServices: [...$services, new TicketEventConverter(), self::getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::DBAL_PACKAGE, + ModulePackageList::EVENT_SOURCING_PACKAGE, + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])), + runForProductionEventStore: true, + enableAsynchronousProcessing: $channels, + licenceKey: LicenceTesting::VALID_LICENCE, + testConfiguration: $testConfiguration, + ); + } +}