From 8302ad2c23b6bc1bb560f19d6699c8e5fd637549 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Sun, 1 Mar 2026 15:59:27 +0100 Subject: [PATCH 1/8] feat: projection rebuild --- plan.md | 272 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 272 insertions(+) create mode 100644 plan.md diff --git a/plan.md b/plan.md new file mode 100644 index 000000000..4a6532552 --- /dev/null +++ b/plan.md @@ -0,0 +1,272 @@ +# Plan: Add `rebuild` Command for ProjectionV2 + +## Context + +ProjectionV2 has `backfill` but lacks a `rebuild` command that resets tracking and re-processes from scratch. For partitioned projections, rebuild must work per-partition: reset that partition's tracking, call the rebuild handler with partition context (aggregate ID, aggregate type, stream name), then re-process all events — all within the same transaction. For global projections, it resets the single global tracking and re-processes. Batching follows `ProjectionBackfill` attribute config. + +## Changes + +### 1. Create `ProjectionRebuild` method attribute +**New file:** `packages/Ecotone/src/Projecting/Attribute/ProjectionRebuild.php` + +```php +#[Attribute(Attribute::TARGET_METHOD)] +class ProjectionRebuild {} +``` + +Follow the pattern of `ProjectionFlush`, `ProjectionDelete` in the same directory. + +### 2. Create parameter attributes for rebuild context +**New files in** `packages/Ecotone/src/Projecting/Attribute/`: + +Each extends `Header` (like `ProjectionName` and `AggregateIdentifier`), returning a specific header name: + +- **`PartitionAggregateId.php`** — `extends Header`, returns `ProjectingHeaders::PARTITION_AGGREGATE_ID` +- **`PartitionAggregateType.php`** — `extends Header`, returns `ProjectingHeaders::PARTITION_AGGREGATE_TYPE` +- **`PartitionStreamName.php`** — `extends Header`, returns `ProjectingHeaders::PARTITION_STREAM_NAME` + +User's rebuild handler uses them as parameter annotations: +```php +#[ProjectionRebuild] +public function rebuild( + #[PartitionAggregateId] string $aggregateId, + #[PartitionAggregateType] string $aggregateType, + #[PartitionStreamName] string $streamName, +): void { + $this->connection->executeStatement("DELETE FROM tickets WHERE ticket_id = ?", [$aggregateId]); +} +``` + +No `HeaderBuilder` converters needed — resolution happens automatically via `Header` attribute. + +### 3. Add partition header constants to `ProjectingHeaders` +**File:** `packages/Ecotone/src/Projecting/ProjectingHeaders.php` + +```php +public const PARTITION_AGGREGATE_ID = 'partition.aggregateId'; +public const PARTITION_AGGREGATE_TYPE = 'partition.aggregateType'; +public const PARTITION_STREAM_NAME = 'partition.streamName'; +``` + +### 4. Add `rebuild()` to `ProjectorExecutor` interface +**File:** `packages/Ecotone/src/Projecting/ProjectorExecutor.php` + +```php +public function rebuild(?string $aggregateId = null, ?string $aggregateType = null, ?string $streamName = null): void; +``` + +### 5. Implement `rebuild()` in all `ProjectorExecutor` implementations + +**`EcotoneProjectorExecutor.php`** — Add `?string $rebuildChannel = null` as the **last** constructor parameter. Implement `rebuild()` dispatching to `$rebuildChannel` with headers: +- `ProjectingHeaders::PROJECTION_NAME` +- `ProjectingHeaders::PARTITION_AGGREGATE_ID` → `$aggregateId` +- `ProjectingHeaders::PARTITION_AGGREGATE_TYPE` → `$aggregateType` +- `ProjectingHeaders::PARTITION_STREAM_NAME` → `$streamName` + +**`InMemoryProjector.php`** — Add `rebuild(): void { $this->projectedEvents = []; }` + +**`EventStoreChannelAdapterProjection.php`** — Add no-op `rebuild(): void {}` + +### 6. Wire `rebuildChannel` through the builder +**File:** `packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php` + +- Add `?string $rebuildChannel = null` property +- Add `setRebuildChannel(?string $rebuildChannel): void` method +- Update `compile()` to pass `$this->rebuildChannel` as last arg to `EcotoneProjectorExecutor` + +### 7. Scan `#[ProjectionRebuild]` in `ProjectingAttributeModule` +**File:** `packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php` + +- Import `Ecotone\Projecting\Attribute\ProjectionRebuild` +- Add to `$lifecycleAnnotations` merge: `findCombined(ProjectionV2::class, ProjectionRebuild::class)` +- Add branch: `instanceof ProjectionRebuild` → `$projectionBuilder->setRebuildChannel($inputChannel)` +- No special parameter converters needed — the `#[PartitionAggregateId]` etc. attributes on user's method parameters handle resolution automatically + +### 8. Add `executeRebuild()` and `prepareRebuild()` to `ProjectingManager` +**File:** `packages/Ecotone/src/Projecting/ProjectingManager.php` + +**`executeRebuild()`** — per-partition, all in one transaction: +```php +public function executeRebuild(?string $partitionKeyValue = null, ?string $aggregateId = null, ?string $aggregateType = null, ?string $streamName = null): void +{ + $transaction = $this->getProjectionStateStorage()->beginTransaction(); + try { + $storage = $this->getProjectionStateStorage(); + + // Lock existing partition or init new one + $projectionState = $storage->loadPartition($this->projectionName, $partitionKeyValue, lock: true); + if ($projectionState === null) { + $storage->initPartition($this->projectionName, $partitionKeyValue); + } + + // Reset partition to initial state (lastPosition=null, userState=null) + $projectionState = new ProjectionPartitionState( + $this->projectionName, + $partitionKeyValue, + null, + null, + ProjectionInitializationStatus::UNINITIALIZED, + ); + $storage->savePartition($projectionState); + + // Call rebuild handler with partition context + $this->projectorExecutor->rebuild($aggregateId, $aggregateType, $streamName); + + // Re-process all events from scratch + $streamSource = $this->streamSourceRegistry->getFor($this->projectionName); + do { + $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); + $userState = $projectionState->userState; + $processedEvents = 0; + foreach ($streamPage->events as $event) { + $userState = $this->projectorExecutor->project($event, $userState); + $processedEvents++; + } + if ($processedEvents > 0) { + $this->projectorExecutor->flush($userState); + } + $projectionState = $projectionState + ->withLastPosition($streamPage->lastPosition) + ->withUserState($userState) + ->withStatus(ProjectionInitializationStatus::INITIALIZED); + } while ($processedEvents > 0); + + $storage->savePartition($projectionState); + $transaction->commit(); + } catch (Throwable $e) { + $transaction->rollBack(); + throw $e; + } +} +``` + +**`prepareRebuild()`** — reuses backfill batching but targets rebuild handler: +```php +public function prepareRebuild(): void +{ + $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); + foreach ($streamFilters as $streamFilter) { + $this->prepareBatchesForFilter($streamFilter, RebuildExecutorHandler::REBUILD_EXECUTOR_CHANNEL); + } +} +``` + +Refactor `prepareBackfillForFilter` → `prepareBatchesForFilter(StreamFilter, string $targetChannel)` shared between `prepareBackfill()` and `prepareRebuild()`. Similarly refactor `sendBackfillMessage` → `sendBatchMessage(array $headers, string $targetChannel)`. + +### 9. Create `RebuildExecutorHandler` +**New file:** `packages/Ecotone/src/Projecting/RebuildExecutorHandler.php` + +Follows `BackfillExecutorHandler` pattern. For each partition, extracts aggregate ID from the composite partition key (`{streamName}:{aggregateType}:{aggregateId}`) and calls `executeRebuild()`: + +```php +class RebuildExecutorHandler +{ + public const REBUILD_EXECUTOR_CHANNEL = 'ecotone.projection.rebuild.executor'; + + public function executeRebuildBatch( + string $projectionName, + ?int $limit = null, + int $offset = 0, + string $streamName = '', + ?string $aggregateType = null, + string $eventStoreReferenceName = '', + ): void { + $projectingManager = $this->projectionRegistry->get($projectionName); + $streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName); + + foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) { + // Extract aggregateId from composite key "{streamName}:{aggregateType}:{aggregateId}" + $prefix = $streamFilter->streamName . ':' . $streamFilter->aggregateType . ':'; + $aggregateId = substr($partition, strlen($prefix)); + + $projectingManager->executeRebuild($partition, $aggregateId, $streamFilter->aggregateType, $streamFilter->streamName); + if ($this->terminationListener->shouldTerminate()) { + break; + } + } + } +} +``` + +For **global projections** (SinglePartitionProvider yields `null`): `executeRebuild(null, null, null, null)` — rebuild handler receives all nulls, user's method declares no parameters or optional ones. + +### 10. Register `RebuildExecutorHandler` in `ProjectingModule` +**File:** `packages/Ecotone/src/Projecting/Config/ProjectingModule.php` + +Register `RebuildExecutorHandler` service definition and message handler following the exact pattern of `BackfillExecutorHandler` registration (lines 155-181), but with: +- `RebuildExecutorHandler::class` service +- `executeRebuildBatch` method +- `RebuildExecutorHandler::REBUILD_EXECUTOR_CHANNEL` input channel +- Same `HeaderBuilder` mappings using `backfill.*` header names (reused from batch dispatching) + +### 11. Add `ecotone:projection:rebuild` console command +**File:** `packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php` + +```php +#[ConsoleCommand('ecotone:projection:rebuild')] +public function rebuildProjection(string $name): void +``` + +### 12. Update `FlowTestSupport` +**File:** `packages/Ecotone/src/Lite/Test/FlowTestSupport.php` + +Add `rebuildProjection()` method: +```php +public function rebuildProjection(string $projectionName): self +{ + $this->getGateway(ProjectionRegistry::class)->get($projectionName)->prepareRebuild(); + return $this; +} +``` + +### 13. Tests +**New file:** `packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php` + +Follow `BackfillProjectionTest` patterns with `AbstractTicketProjection` base class. The rebuild handler receives partition context. Test cases: + +1. **Partitioned async rebuild (batch=2, 5 partitions)**: Events already processed → rebuild sends 3 batch messages → each run resets partition, calls `#[ProjectionRebuild]`, re-processes events → verify data is rebuilt correctly +2. **Partitioned sync rebuild**: All 5 partitions rebuilt immediately +3. **Global sync rebuild**: Resets global tracking, re-processes all events +4. **Global async rebuild**: Single message, re-processes after running channel +5. **Rebuild calls `#[ProjectionRebuild]` handler with partition context**: Verify `aggregateId`, `aggregateType`, `streamName` are passed to the rebuild method +6. **Rebuild without `#[ProjectionRebuild]` handler works**: Tracking resets, data overwritten by re-processing + +Test projection fixture for partitioned rebuild: +```php +#[ProjectionRebuild] +public function rebuild( + #[PartitionAggregateId] string $aggregateId, + #[PartitionAggregateType] string $aggregateType, + #[PartitionStreamName] string $streamName, +): void { + $this->connection->executeStatement("DELETE FROM {$this->tableName()} WHERE ticket_id = ?", [$aggregateId]); +} +``` + +## Key Files + +| File | Action | +|------|--------| +| `packages/Ecotone/src/Projecting/Attribute/ProjectionRebuild.php` | Create | +| `packages/Ecotone/src/Projecting/Attribute/PartitionAggregateId.php` | Create | +| `packages/Ecotone/src/Projecting/Attribute/PartitionAggregateType.php` | Create | +| `packages/Ecotone/src/Projecting/Attribute/PartitionStreamName.php` | Create | +| `packages/Ecotone/src/Projecting/RebuildExecutorHandler.php` | Create | +| `packages/Ecotone/src/Projecting/ProjectorExecutor.php` | Add `rebuild()` | +| `packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php` | Implement `rebuild()` | +| `packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php` | Implement `rebuild()` | +| `packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php` | Implement `rebuild()` | +| `packages/Ecotone/src/Projecting/ProjectingHeaders.php` | Add rebuild constants | +| `packages/Ecotone/src/Projecting/ProjectingManager.php` | Add `executeRebuild()`, `prepareRebuild()`, refactor batch helpers | +| `packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php` | Add `rebuildChannel` | +| `packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php` | Scan `#[ProjectionRebuild]` | +| `packages/Ecotone/src/Projecting/Config/ProjectingModule.php` | Register `RebuildExecutorHandler` | +| `packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php` | Add rebuild command | +| `packages/Ecotone/src/Lite/Test/FlowTestSupport.php` | Add `rebuildProjection()` | +| `packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php` | Create | + +## Verification + +1. Run existing backfill tests (no regressions): `vendor/bin/phpunit packages/PdoEventSourcing/tests/Projecting/BackfillProjectionTest.php` +2. Run new rebuild tests: `vendor/bin/phpunit packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php` +3. Run full projecting test suite: `vendor/bin/phpunit packages/PdoEventSourcing/tests/Projecting/` From 6b71a025161f59fa9c631612a2d12de1f85ef418 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 3 Mar 2026 19:25:43 +0000 Subject: [PATCH 2/8] feat: add projection rebuild with reset support and tests --- .claude/commands/create-pr.md | 4 +- .../Attribute/ProjectionRebuild.php | 24 ++ .../EcotoneProjectionExecutorBuilder.php | 20 ++ .../Config/ProjectingAttributeModule.php | 8 + .../Config/ProjectingConsoleCommands.php | 9 + .../Projecting/Config/ProjectingModule.php | 33 ++ .../Projecting/EcotoneProjectorExecutor.php | 23 ++ .../EventStoreChannelAdapterProjection.php | 5 + .../Projecting/InMemory/InMemoryProjector.php | 5 + .../src/Projecting/ProjectingHeaders.php | 4 + .../src/Projecting/ProjectingManager.php | 88 ++++- .../src/Projecting/ProjectorExecutor.php | 1 + .../src/Projecting/RebuildExecutorHandler.php | 40 +++ .../Projecting/RebuildProjectionTest.php | 331 ++++++++++++++++++ plan.md | 272 -------------- 15 files changed, 592 insertions(+), 275 deletions(-) create mode 100644 packages/Ecotone/src/Projecting/Attribute/ProjectionRebuild.php create mode 100644 packages/Ecotone/src/Projecting/RebuildExecutorHandler.php create mode 100644 packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php delete mode 100644 plan.md diff --git a/.claude/commands/create-pr.md b/.claude/commands/create-pr.md index 07bdf0fe9..486ae212e 100644 --- a/.claude/commands/create-pr.md +++ b/.claude/commands/create-pr.md @@ -73,9 +73,9 @@ Keep the title under 70 characters, concise and descriptive. **"Pull Request Contribution Terms"** section: - Ask the user using AskUserQuestion: "Do you agree to the contribution terms outlined in CONTRIBUTING.md?" with options: - "Yes, I agree" — Mark the checkbox with `[X]` - - "No" — Leave the checkbox empty `[ ]` + - "No" — Leave the checkbox empty `[]` - If agreed: `- [X] I have read and agree to the contribution terms outlined in [CONTRIBUTING](https://github.com/ecotoneframework/ecotone-dev/blob/main/CONTRIBUTING.md)` -- If not agreed: `- [ ] I have read and agree to the contribution terms outlined in [CONTRIBUTING](https://github.com/ecotoneframework/ecotone-dev/blob/main/CONTRIBUTING.md)` +- If not agreed: `- [] I have read and agree to the contribution terms outlined in [CONTRIBUTING](https://github.com/ecotoneframework/ecotone-dev/blob/main/CONTRIBUTING.md)` ### Step 4: Review with User diff --git a/packages/Ecotone/src/Projecting/Attribute/ProjectionRebuild.php b/packages/Ecotone/src/Projecting/Attribute/ProjectionRebuild.php new file mode 100644 index 000000000..c1c59cd91 --- /dev/null +++ b/packages/Ecotone/src/Projecting/Attribute/ProjectionRebuild.php @@ -0,0 +1,24 @@ +partitionBatchSize < 1) { + throw new InvalidArgumentException('Rebuild partition batch size must be at least 1'); + } + } +} diff --git a/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php b/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php index 0ca04eeaf..2c7f1f982 100644 --- a/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php +++ b/packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php @@ -27,6 +27,7 @@ class EcotoneProjectionExecutorBuilder implements ProjectionExecutorBuilder { private const DEFAULT_EVENT_LOADING_BATCH_SIZE = 1_000; private const DEFAULT_BACKFILL_PARTITION_BATCH_SIZE = 100; + private const DEFAULT_REBUILD_PARTITION_BATCH_SIZE = 100; /** * @param AnnotatedDefinition[] $projectionEventHandlers @@ -46,6 +47,9 @@ public function __construct( private ?int $backfillPartitionBatchSize = null, private ?string $backfillAsyncChannelName = null, private bool $partitioned = false, + private ?string $resetChannel = null, + private ?int $rebuildPartitionBatchSize = null, + private ?string $rebuildAsyncChannelName = null, ) { if ($this->partitionHeader && ! $this->automaticInitialization) { throw new ConfigurationException("Cannot set partition header for projection {$this->projectionName} with automatic initialization disabled"); @@ -92,6 +96,11 @@ public function setFlushChannel(string $inputChannel): void $this->flushChannel = $inputChannel; } + public function setResetChannel(string $inputChannel): void + { + $this->resetChannel = $inputChannel; + } + public function setAsyncChannel(string $asynchronousChannelName): void { $this->asyncChannelName = $asynchronousChannelName; @@ -117,6 +126,16 @@ public function backfillAsyncChannelName(): ?string return $this->backfillAsyncChannelName; } + public function rebuildPartitionBatchSize(): int + { + return $this->rebuildPartitionBatchSize ?? self::DEFAULT_REBUILD_PARTITION_BATCH_SIZE; + } + + public function rebuildAsyncChannelName(): ?string + { + return $this->rebuildAsyncChannelName; + } + public function compile(MessagingContainerBuilder $builder): Definition|Reference { $routerProcessor = $this->buildExecutionRouter($builder); @@ -129,6 +148,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc $this->deleteChannel, $this->flushChannel, $this->isLive, + $this->resetChannel, ]); } diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php index 8134c8c9b..5a542cf4e 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php @@ -12,6 +12,7 @@ use Ecotone\AnnotationFinder\AnnotationFinder; use Ecotone\EventSourcing\Attribute\ProjectionDelete; use Ecotone\EventSourcing\Attribute\ProjectionInitialization; +use Ecotone\EventSourcing\Attribute\ProjectionReset; use Ecotone\Messaging\Attribute\Asynchronous; use Ecotone\Messaging\Attribute\ModuleAnnotation; use Ecotone\Messaging\Config\Annotation\AnnotatedDefinitionReference; @@ -36,6 +37,7 @@ use Ecotone\Projecting\Attribute\Polling; use Ecotone\Projecting\Attribute\ProjectionBackfill; use Ecotone\Projecting\Attribute\ProjectionDeployment; +use Ecotone\Projecting\Attribute\ProjectionRebuild; use Ecotone\Projecting\Attribute\ProjectionExecution; use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\Attribute\ProjectionV2; @@ -80,6 +82,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $projectionAttribute = $annotationRegistrationService->getAttributeForClass($projectionClassName, ProjectionV2::class); $batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionExecution::class); $backfillAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBackfill::class); + $rebuildAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionRebuild::class); $pollingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Polling::class); $streamingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Streaming::class); $projectionDeployment = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionDeployment::class); @@ -100,6 +103,8 @@ public static function create(AnnotationFinder $annotationRegistrationService, I backfillPartitionBatchSize: $backfillAttribute?->backfillPartitionBatchSize, backfillAsyncChannelName: $backfillAttribute?->asyncChannelName, partitioned: $partitionAttribute !== null, + rebuildPartitionBatchSize: $rebuildAttribute?->partitionBatchSize, + rebuildAsyncChannelName: $rebuildAttribute?->asyncChannelName, ); $asynchronousChannelName = self::getProjectionAsynchronousChannel($annotationRegistrationService, $projectionClassName); @@ -140,6 +145,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionInitialization::class), $annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionDelete::class), $annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionFlush::class), + $annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionReset::class), ); foreach ($lifecycleAnnotations as $lifecycleAnnotation) { /** @var ProjectionV2 $projectionAttribute */ @@ -153,6 +159,8 @@ public static function create(AnnotationFinder $annotationRegistrationService, I $projectionBuilder->setDeleteChannel($inputChannel); } elseif ($lifecycleAnnotation->getAnnotationForMethod() instanceof ProjectionFlush) { $projectionBuilder->setFlushChannel($inputChannel); + } elseif ($lifecycleAnnotation->getAnnotationForMethod() instanceof ProjectionReset) { + $projectionBuilder->setResetChannel($inputChannel); } diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php b/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php index a90ad9951..1dd04dc00 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php @@ -45,6 +45,15 @@ public function backfillProjection(string $name): void $this->registry->get($name)->prepareBackfill(); } + #[ConsoleCommand('ecotone:projection:rebuild')] + public function rebuildProjection(string $name): void + { + if (! $this->registry->has($name)) { + throw new InvalidArgumentException("There is no projection with name {$name}"); + } + $this->registry->get($name)->prepareRebuild(); + } + #[ConsoleCommand('ecotone:projection:delete')] public function deleteProjection(string $name): void { diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 75fae5c9e..16dbad1e7 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -30,6 +30,7 @@ use Ecotone\Messaging\Handler\ServiceActivator\MessageProcessorActivatorBuilder; use Ecotone\Projecting\Attribute\ProjectionFlush; use Ecotone\Projecting\BackfillExecutorHandler; +use Ecotone\Projecting\RebuildExecutorHandler; use Ecotone\Projecting\InMemory\InMemoryProjectionRegistry; use Ecotone\Projecting\PartitionProviderRegistry; use Ecotone\Projecting\ProjectingHeaders; @@ -93,6 +94,8 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO $projectionBuilder->automaticInitialization(), $projectionBuilder->backfillPartitionBatchSize(), $projectionBuilder->backfillAsyncChannelName(), + $projectionBuilder->rebuildPartitionBatchSize(), + $projectionBuilder->rebuildAsyncChannelName(), ]) ); $projectionRegistryMap[$projectionName] = new Reference($projectingManagerReference); @@ -126,6 +129,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO [ HeaderBuilder::createOptional('partitionKeyValue', ProjectingHeaders::PROJECTION_PARTITION_KEY), HeaderBuilder::create('canInitialize', ProjectingHeaders::PROJECTION_CAN_INITIALIZE), + HeaderBuilder::createOptional('shouldReset', 'projection.shouldReset'), ], ) ) @@ -180,6 +184,35 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO ->withInputChannelName(BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL) ); + // Register RebuildExecutorHandler and its message handler + $messagingConfiguration->registerServiceDefinition( + RebuildExecutorHandler::class, + new Definition(RebuildExecutorHandler::class, [ + new Reference(ProjectionRegistry::class), + new Reference(TerminationListener::class), + ]) + ); + + $messagingConfiguration->registerMessageHandler( + MessageProcessorActivatorBuilder::create() + ->chainInterceptedProcessor( + MethodInvokerBuilder::create( + RebuildExecutorHandler::class, + InterfaceToCallReference::create(RebuildExecutorHandler::class, 'executeRebuildBatch'), + [ + PayloadBuilder::create('projectionName'), + HeaderBuilder::createOptional('limit', 'rebuild.limit'), + HeaderBuilder::createOptional('offset', 'rebuild.offset'), + HeaderBuilder::createOptional('streamName', 'rebuild.streamName'), + HeaderBuilder::createOptional('aggregateType', 'rebuild.aggregateType'), + HeaderBuilder::createOptional('eventStoreReferenceName', 'rebuild.eventStoreReferenceName'), + ], + ) + ) + ->withEndpointId('rebuild_executor_handler') + ->withInputChannelName(RebuildExecutorHandler::REBUILD_EXECUTOR_CHANNEL) + ); + // Register console commands $messagingConfiguration->registerServiceDefinition(ProjectingConsoleCommands::class, new Definition(ProjectingConsoleCommands::class, [new Reference(ProjectionRegistry::class)])); } diff --git a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php index 5a6decfc9..66472c73d 100644 --- a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php @@ -28,6 +28,7 @@ public function __construct( private ?string $deleteChannel = null, private ?string $flushChannel = null, private bool $isLive = true, + private ?string $resetChannel = null, ) { } @@ -88,4 +89,26 @@ public function flush(mixed $userState = null): void ], $this->flushChannel); } } + + public function reset(?string $partitionKey = null): void + { + if ($this->resetChannel) { + $headers = [ + ProjectingHeaders::PROJECTION_NAME => $this->projectionName, + ]; + + if ($partitionKey !== null) { + $headers[ProjectingHeaders::REBUILD_PARTITION_KEY] = $partitionKey; + + $parts = explode(':', $partitionKey, 3); + if (count($parts) === 3) { + $headers[ProjectingHeaders::REBUILD_STREAM] = $parts[0]; + $headers[ProjectingHeaders::REBUILD_AGGREGATE_TYPE] = $parts[1]; + $headers[ProjectingHeaders::REBUILD_AGGREGATE_ID] = $parts[2]; + } + } + + $this->messagingEntrypoint->sendWithHeaders([], $headers, $this->resetChannel); + } + } } diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php index 74188e895..6821fee45 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php @@ -77,4 +77,9 @@ public function flush(mixed $userState = null): void { // No flushing needed } + + public function reset(?string $partitionKey = null): void + { + // No reset needed + } } diff --git a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php index 90baf9366..17c3599ef 100644 --- a/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php +++ b/packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php @@ -43,4 +43,9 @@ public function delete(): void public function flush(mixed $userState = null): void { } + + public function reset(?string $partitionKey = null): void + { + $this->projectedEvents = []; + } } diff --git a/packages/Ecotone/src/Projecting/ProjectingHeaders.php b/packages/Ecotone/src/Projecting/ProjectingHeaders.php index 2f5a57b91..2f5ff0422 100644 --- a/packages/Ecotone/src/Projecting/ProjectingHeaders.php +++ b/packages/Ecotone/src/Projecting/ProjectingHeaders.php @@ -20,4 +20,8 @@ class ProjectingHeaders public const MANUAL_INITIALIZATION = 'projection.manual_initialization'; public const PROJECTION_PARTITION_KEY = 'projection.partitionKey'; public const PROJECTION_CAN_INITIALIZE = 'projection.canInitialize'; + public const REBUILD_PARTITION_KEY = 'projection.rebuild.partitionKey'; + public const REBUILD_AGGREGATE_ID = 'projection.rebuild.aggregateId'; + public const REBUILD_AGGREGATE_TYPE = 'projection.rebuild.aggregateType'; + public const REBUILD_STREAM = 'projection.rebuild.stream'; } diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index a92d24083..0ad26a793 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -15,6 +15,7 @@ class ProjectingManager { private const DEFAULT_BACKFILL_PARTITION_BATCH_SIZE = 100; + private const DEFAULT_REBUILD_PARTITION_BATCH_SIZE = 100; private ?ProjectionStateStorage $projectionStateStorage = null; @@ -31,6 +32,8 @@ public function __construct( private bool $automaticInitialization = true, private int $backfillPartitionBatchSize = self::DEFAULT_BACKFILL_PARTITION_BATCH_SIZE, private ?string $backfillAsyncChannelName = null, + private int $rebuildPartitionBatchSize = self::DEFAULT_REBUILD_PARTITION_BATCH_SIZE, + private ?string $rebuildAsyncChannelName = null, ) { if ($eventLoadingBatchSize < 1) { throw new InvalidArgumentException('Event loading batch size must be at least 1'); @@ -38,6 +41,9 @@ public function __construct( if ($backfillPartitionBatchSize < 1) { throw new InvalidArgumentException('Backfill partition batch size must be at least 1'); } + if ($rebuildPartitionBatchSize < 1) { + throw new InvalidArgumentException('Rebuild partition batch size must be at least 1'); + } } private function getProjectionStateStorage(): ProjectionStateStorage @@ -62,7 +68,7 @@ public function execute(?string $partitionKeyValue = null, bool $manualInitializ } while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true); } - public function executeSingleBatch(?string $partitionKeyValue = null, bool $canInitialize = false): int + public function executeSingleBatch(?string $partitionKeyValue = null, bool $canInitialize = false, bool $shouldReset = false): int { $transaction = $this->getProjectionStateStorage()->beginTransaction(); try { @@ -72,6 +78,17 @@ public function executeSingleBatch(?string $partitionKeyValue = null, bool $canI return 0; } + if ($shouldReset) { + $this->projectorExecutor->reset($partitionKeyValue); + $projectionState = new ProjectionPartitionState( + $projectionState->projectionName, + $projectionState->partitionKey, + null, + null, + $projectionState->status, + ); + } + $streamSource = $this->streamSourceRegistry->getFor($this->projectionName); $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); @@ -202,6 +219,75 @@ public function backfill(): void $this->prepareBackfill(); } + public function executeWithReset(?string $partitionKeyValue = null): void + { + $first = true; + do { + $processedEvents = $this->messagingEntrypoint->sendWithHeaders( + [], + [ + ProjectingHeaders::PROJECTION_PARTITION_KEY => $partitionKeyValue, + ProjectingHeaders::PROJECTION_CAN_INITIALIZE => true, + 'projection.shouldReset' => $first, + ], + self::batchChannelFor($this->projectionName) + ); + $first = false; + } while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true); + } + + public function prepareRebuild(): void + { + $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); + + foreach ($streamFilters as $streamFilter) { + $this->prepareRebuildForFilter($streamFilter); + } + } + + private function prepareRebuildForFilter(StreamFilter $streamFilter): void + { + $totalPartitions = $this->getPartitionProvider()->count($streamFilter); + + if ($totalPartitions === 0) { + return; + } + + $numberOfBatches = (int) ceil($totalPartitions / $this->rebuildPartitionBatchSize); + + for ($batch = 0; $batch < $numberOfBatches; $batch++) { + $offset = $batch * $this->rebuildPartitionBatchSize; + + $headers = [ + 'rebuild.limit' => $this->rebuildPartitionBatchSize, + 'rebuild.offset' => $offset, + 'rebuild.streamName' => $streamFilter->streamName, + 'rebuild.aggregateType' => $streamFilter->aggregateType, + 'rebuild.eventStoreReferenceName' => $streamFilter->eventStoreReferenceName, + ]; + + $this->sendRebuildMessage($headers); + } + } + + private function sendRebuildMessage(array $headers): void + { + if ($this->rebuildAsyncChannelName !== null) { + $this->messagingEntrypoint->sendWithHeaders( + $this->projectionName, + $headers, + $this->rebuildAsyncChannelName, + RebuildExecutorHandler::REBUILD_EXECUTOR_CHANNEL + ); + } else { + $this->messagingEntrypoint->sendWithHeaders( + $this->projectionName, + $headers, + RebuildExecutorHandler::REBUILD_EXECUTOR_CHANNEL + ); + } + } + private function loadOrInitializePartitionState(?string $partitionKey, bool $canInitialize): ?ProjectionPartitionState { $storage = $this->getProjectionStateStorage(); diff --git a/packages/Ecotone/src/Projecting/ProjectorExecutor.php b/packages/Ecotone/src/Projecting/ProjectorExecutor.php index 261dffa67..ebaa11419 100644 --- a/packages/Ecotone/src/Projecting/ProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/ProjectorExecutor.php @@ -19,4 +19,5 @@ public function project(Event $event, mixed $userState = null): mixed; public function init(): void; public function delete(): void; public function flush(mixed $userState = null): void; + public function reset(?string $partitionKey = null): void; } diff --git a/packages/Ecotone/src/Projecting/RebuildExecutorHandler.php b/packages/Ecotone/src/Projecting/RebuildExecutorHandler.php new file mode 100644 index 000000000..87ba99feb --- /dev/null +++ b/packages/Ecotone/src/Projecting/RebuildExecutorHandler.php @@ -0,0 +1,40 @@ +projectionRegistry->get($projectionName); + $streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName); + + foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) { + $projectingManager->executeWithReset($partition); + if ($this->terminationListener->shouldTerminate()) { + break; + } + } + } +} diff --git a/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php new file mode 100644 index 000000000..e43b75ccc --- /dev/null +++ b/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php @@ -0,0 +1,331 @@ +connection->executeStatement( + "INSERT INTO {$this->tableName()} VALUES (?,?)", + [$event->getTicketId(), $event->getTicketType()] + ); + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement( + "CREATE TABLE IF NOT EXISTS {$this->tableName()} (ticket_id VARCHAR(36) PRIMARY KEY, ticket_type VARCHAR(25))" + ); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->connection->executeStatement("DROP TABLE IF EXISTS {$this->tableName()}"); + } + + #[ProjectionReset] + public function reset(): void + { + $this->connection->executeStatement("DELETE FROM {$this->tableName()}"); + } + + public function getTickets(): array + { + return $this->connection->executeQuery("SELECT * FROM {$this->tableName()} ORDER BY ticket_id ASC")->fetchAllAssociative(); + } +} + +abstract class AbstractRebuildPartitionedProjection +{ + public function __construct(protected Connection $connection) + { + } + + abstract protected function tableName(): string; + + #[EventHandler] + public function addTicket(TicketWasRegistered $event): void + { + $this->connection->executeStatement( + "INSERT INTO {$this->tableName()} VALUES (?,?)", + [$event->getTicketId(), $event->getTicketType()] + ); + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement( + "CREATE TABLE IF NOT EXISTS {$this->tableName()} (ticket_id VARCHAR(36) PRIMARY KEY, ticket_type VARCHAR(25))" + ); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->connection->executeStatement("DROP TABLE IF EXISTS {$this->tableName()}"); + } + + #[ProjectionReset] + public function reset( + #[Header(ProjectingHeaders::REBUILD_AGGREGATE_ID)] ?string $aggregateId = null, + ): void { + if ($aggregateId !== null) { + $this->connection->executeStatement("DELETE FROM {$this->tableName()} WHERE ticket_id = ?", [$aggregateId]); + } else { + $this->connection->executeStatement("DELETE FROM {$this->tableName()}"); + } + } + + public function getTickets(): array + { + return $this->connection->executeQuery("SELECT * FROM {$this->tableName()} ORDER BY ticket_id ASC")->fetchAllAssociative(); + } +} + +/** + * licence Enterprise + * @internal + */ +final class RebuildProjectionTest extends ProjectingTestCase +{ + public function test_throws_exception_when_rebuild_batch_size_is_less_than_one(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Rebuild partition batch size must be at least 1'); + + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_batch0_projection'), Partitioned, ProjectionRebuild(partitionBatchSize: 0), FromStream(stream: Ticket::class, aggregateType: Ticket::class) ] class ($connection) extends AbstractRebuildPartitionedProjection { + protected function tableName(): string + { + return 'rebuild_batch0_tickets'; + } + }; + + $this->bootstrapEcotone([$projection::class], [$projection], true); + } + + public function test_partitioned_projection_async_rebuild_with_batch_of_2(): void + { + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_batch2_async'), Partitioned, ProjectionRebuild(partitionBatchSize: 2, asyncChannelName: 'rebuild_channel'), FromStream(stream: Ticket::class, aggregateType: Ticket::class) ] class ($connection) extends AbstractRebuildPartitionedProjection { + #[QueryHandler('getRebuildBatch2Tickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'rebuild_batch2_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone( + [$projection::class], + [$projection], + [SimpleMessageChannelBuilder::createQueueChannel('rebuild_channel')], + TestConfiguration::createWithDefaults()->withSpyOnChannel('rebuild_channel') + ); + + $this->createPartitions($ecotone, 5); + self::assertCount(5, $ecotone->sendQueryWithRouting('getRebuildBatch2Tickets')); + + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_batch2_async']); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('rebuild_channel'); + self::assertCount(3, $messages); + + $ecotone->run('rebuild_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + $ecotone->run('rebuild_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + $ecotone->run('rebuild_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(5, $ecotone->sendQueryWithRouting('getRebuildBatch2Tickets')); + } + + public function test_partitioned_projection_sync_rebuild(): void + { + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_sync_partitioned'), Partitioned, ProjectionRebuild(partitionBatchSize: 2), FromStream(stream: Ticket::class, aggregateType: Ticket::class) ] class ($connection) extends AbstractRebuildPartitionedProjection { + #[QueryHandler('getRebuildSyncPartitionedTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'rebuild_sync_partitioned_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $this->createPartitions($ecotone, 5); + + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_sync_partitioned']); + + self::assertCount(5, $ecotone->sendQueryWithRouting('getRebuildSyncPartitionedTickets')); + } + + public function test_global_projection_async_rebuild(): void + { + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_global_async'), ProjectionRebuild(asyncChannelName: 'rebuild_global_channel'), FromStream(Ticket::class) ] class ($connection) extends AbstractRebuildGlobalProjection { + #[QueryHandler('getRebuildGlobalAsyncTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'rebuild_global_async_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone( + [$projection::class], + [$projection], + [SimpleMessageChannelBuilder::createQueueChannel('rebuild_global_channel')], + TestConfiguration::createWithDefaults()->withSpyOnChannel('rebuild_global_channel') + ); + + $this->createTickets($ecotone, 3); + + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_global_async']); + + self::assertCount(3, $ecotone->sendQueryWithRouting('getRebuildGlobalAsyncTickets')); + + $messages = $ecotone->getRecordedMessagePayloadsFrom('rebuild_global_channel'); + self::assertCount(1, $messages); + + $ecotone->run('rebuild_global_channel', ExecutionPollingMetadata::createWithTestingSetup(amountOfMessagesToHandle: 1)); + self::assertCount(3, $ecotone->sendQueryWithRouting('getRebuildGlobalAsyncTickets')); + } + + public function test_global_projection_sync_rebuild(): void + { + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_global_sync'), ProjectionRebuild(), FromStream(Ticket::class) ] class ($connection) extends AbstractRebuildGlobalProjection { + #[QueryHandler('getRebuildGlobalSyncTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'rebuild_global_sync_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $this->createTickets($ecotone, 3); + + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_global_sync']); + + self::assertCount(3, $ecotone->sendQueryWithRouting('getRebuildGlobalSyncTickets')); + } + + public function test_rebuild_resets_existing_data(): void + { + $connection = $this->getConnection(); + $projection = new #[ ProjectionV2('rebuild_resets_data'), Partitioned, ProjectionRebuild(), FromStream(stream: Ticket::class, aggregateType: Ticket::class) ] class ($connection) extends AbstractRebuildPartitionedProjection { + #[QueryHandler('getRebuildResetsDataTickets')] + public function query(): array + { + return $this->getTickets(); + } + + protected function tableName(): string + { + return 'rebuild_resets_data_tickets'; + } + }; + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $ecotone->sendCommand(new RegisterTicket('1', 'User1', 'alert')); + $ecotone->sendCommand(new RegisterTicket('2', 'User2', 'info')); + + self::assertCount(2, $ecotone->sendQueryWithRouting('getRebuildResetsDataTickets')); + + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_resets_data']); + + $tickets = $ecotone->sendQueryWithRouting('getRebuildResetsDataTickets'); + self::assertCount(2, $tickets); + self::assertSame('1', $tickets[0]['ticket_id']); + self::assertSame('2', $tickets[1]['ticket_id']); + } + + private function createPartitions(FlowTestSupport $ecotone, int $count): void + { + for ($i = 1; $i <= $count; $i++) { + $ecotone->sendCommand(new RegisterTicket((string) $i, "User{$i}", "type{$i}")); + } + } + + private function createTickets(FlowTestSupport $ecotone, int $count): void + { + for ($i = 1; $i <= $count; $i++) { + $ecotone->sendCommand(new RegisterTicket((string) $i, "User{$i}", 'alert')); + } + } + + private function bootstrapEcotone(array $classesToResolve, array $services, bool|array $channels, ?TestConfiguration $testConfiguration = null): FlowTestSupport + { + return EcotoneLite::bootstrapFlowTestingWithEventStore( + classesToResolve: [...$classesToResolve, Ticket::class, TicketEventConverter::class], + containerOrAvailableServices: [...$services, new TicketEventConverter(), self::getConnectionFactory()], + configuration: ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ + ModulePackageList::DBAL_PACKAGE, + ModulePackageList::EVENT_SOURCING_PACKAGE, + ModulePackageList::ASYNCHRONOUS_PACKAGE, + ])), + runForProductionEventStore: true, + enableAsynchronousProcessing: $channels, + licenceKey: LicenceTesting::VALID_LICENCE, + testConfiguration: $testConfiguration, + ); + } +} diff --git a/plan.md b/plan.md deleted file mode 100644 index 4a6532552..000000000 --- a/plan.md +++ /dev/null @@ -1,272 +0,0 @@ -# Plan: Add `rebuild` Command for ProjectionV2 - -## Context - -ProjectionV2 has `backfill` but lacks a `rebuild` command that resets tracking and re-processes from scratch. For partitioned projections, rebuild must work per-partition: reset that partition's tracking, call the rebuild handler with partition context (aggregate ID, aggregate type, stream name), then re-process all events — all within the same transaction. For global projections, it resets the single global tracking and re-processes. Batching follows `ProjectionBackfill` attribute config. - -## Changes - -### 1. Create `ProjectionRebuild` method attribute -**New file:** `packages/Ecotone/src/Projecting/Attribute/ProjectionRebuild.php` - -```php -#[Attribute(Attribute::TARGET_METHOD)] -class ProjectionRebuild {} -``` - -Follow the pattern of `ProjectionFlush`, `ProjectionDelete` in the same directory. - -### 2. Create parameter attributes for rebuild context -**New files in** `packages/Ecotone/src/Projecting/Attribute/`: - -Each extends `Header` (like `ProjectionName` and `AggregateIdentifier`), returning a specific header name: - -- **`PartitionAggregateId.php`** — `extends Header`, returns `ProjectingHeaders::PARTITION_AGGREGATE_ID` -- **`PartitionAggregateType.php`** — `extends Header`, returns `ProjectingHeaders::PARTITION_AGGREGATE_TYPE` -- **`PartitionStreamName.php`** — `extends Header`, returns `ProjectingHeaders::PARTITION_STREAM_NAME` - -User's rebuild handler uses them as parameter annotations: -```php -#[ProjectionRebuild] -public function rebuild( - #[PartitionAggregateId] string $aggregateId, - #[PartitionAggregateType] string $aggregateType, - #[PartitionStreamName] string $streamName, -): void { - $this->connection->executeStatement("DELETE FROM tickets WHERE ticket_id = ?", [$aggregateId]); -} -``` - -No `HeaderBuilder` converters needed — resolution happens automatically via `Header` attribute. - -### 3. Add partition header constants to `ProjectingHeaders` -**File:** `packages/Ecotone/src/Projecting/ProjectingHeaders.php` - -```php -public const PARTITION_AGGREGATE_ID = 'partition.aggregateId'; -public const PARTITION_AGGREGATE_TYPE = 'partition.aggregateType'; -public const PARTITION_STREAM_NAME = 'partition.streamName'; -``` - -### 4. Add `rebuild()` to `ProjectorExecutor` interface -**File:** `packages/Ecotone/src/Projecting/ProjectorExecutor.php` - -```php -public function rebuild(?string $aggregateId = null, ?string $aggregateType = null, ?string $streamName = null): void; -``` - -### 5. Implement `rebuild()` in all `ProjectorExecutor` implementations - -**`EcotoneProjectorExecutor.php`** — Add `?string $rebuildChannel = null` as the **last** constructor parameter. Implement `rebuild()` dispatching to `$rebuildChannel` with headers: -- `ProjectingHeaders::PROJECTION_NAME` -- `ProjectingHeaders::PARTITION_AGGREGATE_ID` → `$aggregateId` -- `ProjectingHeaders::PARTITION_AGGREGATE_TYPE` → `$aggregateType` -- `ProjectingHeaders::PARTITION_STREAM_NAME` → `$streamName` - -**`InMemoryProjector.php`** — Add `rebuild(): void { $this->projectedEvents = []; }` - -**`EventStoreChannelAdapterProjection.php`** — Add no-op `rebuild(): void {}` - -### 6. Wire `rebuildChannel` through the builder -**File:** `packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php` - -- Add `?string $rebuildChannel = null` property -- Add `setRebuildChannel(?string $rebuildChannel): void` method -- Update `compile()` to pass `$this->rebuildChannel` as last arg to `EcotoneProjectorExecutor` - -### 7. Scan `#[ProjectionRebuild]` in `ProjectingAttributeModule` -**File:** `packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php` - -- Import `Ecotone\Projecting\Attribute\ProjectionRebuild` -- Add to `$lifecycleAnnotations` merge: `findCombined(ProjectionV2::class, ProjectionRebuild::class)` -- Add branch: `instanceof ProjectionRebuild` → `$projectionBuilder->setRebuildChannel($inputChannel)` -- No special parameter converters needed — the `#[PartitionAggregateId]` etc. attributes on user's method parameters handle resolution automatically - -### 8. Add `executeRebuild()` and `prepareRebuild()` to `ProjectingManager` -**File:** `packages/Ecotone/src/Projecting/ProjectingManager.php` - -**`executeRebuild()`** — per-partition, all in one transaction: -```php -public function executeRebuild(?string $partitionKeyValue = null, ?string $aggregateId = null, ?string $aggregateType = null, ?string $streamName = null): void -{ - $transaction = $this->getProjectionStateStorage()->beginTransaction(); - try { - $storage = $this->getProjectionStateStorage(); - - // Lock existing partition or init new one - $projectionState = $storage->loadPartition($this->projectionName, $partitionKeyValue, lock: true); - if ($projectionState === null) { - $storage->initPartition($this->projectionName, $partitionKeyValue); - } - - // Reset partition to initial state (lastPosition=null, userState=null) - $projectionState = new ProjectionPartitionState( - $this->projectionName, - $partitionKeyValue, - null, - null, - ProjectionInitializationStatus::UNINITIALIZED, - ); - $storage->savePartition($projectionState); - - // Call rebuild handler with partition context - $this->projectorExecutor->rebuild($aggregateId, $aggregateType, $streamName); - - // Re-process all events from scratch - $streamSource = $this->streamSourceRegistry->getFor($this->projectionName); - do { - $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); - $userState = $projectionState->userState; - $processedEvents = 0; - foreach ($streamPage->events as $event) { - $userState = $this->projectorExecutor->project($event, $userState); - $processedEvents++; - } - if ($processedEvents > 0) { - $this->projectorExecutor->flush($userState); - } - $projectionState = $projectionState - ->withLastPosition($streamPage->lastPosition) - ->withUserState($userState) - ->withStatus(ProjectionInitializationStatus::INITIALIZED); - } while ($processedEvents > 0); - - $storage->savePartition($projectionState); - $transaction->commit(); - } catch (Throwable $e) { - $transaction->rollBack(); - throw $e; - } -} -``` - -**`prepareRebuild()`** — reuses backfill batching but targets rebuild handler: -```php -public function prepareRebuild(): void -{ - $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); - foreach ($streamFilters as $streamFilter) { - $this->prepareBatchesForFilter($streamFilter, RebuildExecutorHandler::REBUILD_EXECUTOR_CHANNEL); - } -} -``` - -Refactor `prepareBackfillForFilter` → `prepareBatchesForFilter(StreamFilter, string $targetChannel)` shared between `prepareBackfill()` and `prepareRebuild()`. Similarly refactor `sendBackfillMessage` → `sendBatchMessage(array $headers, string $targetChannel)`. - -### 9. Create `RebuildExecutorHandler` -**New file:** `packages/Ecotone/src/Projecting/RebuildExecutorHandler.php` - -Follows `BackfillExecutorHandler` pattern. For each partition, extracts aggregate ID from the composite partition key (`{streamName}:{aggregateType}:{aggregateId}`) and calls `executeRebuild()`: - -```php -class RebuildExecutorHandler -{ - public const REBUILD_EXECUTOR_CHANNEL = 'ecotone.projection.rebuild.executor'; - - public function executeRebuildBatch( - string $projectionName, - ?int $limit = null, - int $offset = 0, - string $streamName = '', - ?string $aggregateType = null, - string $eventStoreReferenceName = '', - ): void { - $projectingManager = $this->projectionRegistry->get($projectionName); - $streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName); - - foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) { - // Extract aggregateId from composite key "{streamName}:{aggregateType}:{aggregateId}" - $prefix = $streamFilter->streamName . ':' . $streamFilter->aggregateType . ':'; - $aggregateId = substr($partition, strlen($prefix)); - - $projectingManager->executeRebuild($partition, $aggregateId, $streamFilter->aggregateType, $streamFilter->streamName); - if ($this->terminationListener->shouldTerminate()) { - break; - } - } - } -} -``` - -For **global projections** (SinglePartitionProvider yields `null`): `executeRebuild(null, null, null, null)` — rebuild handler receives all nulls, user's method declares no parameters or optional ones. - -### 10. Register `RebuildExecutorHandler` in `ProjectingModule` -**File:** `packages/Ecotone/src/Projecting/Config/ProjectingModule.php` - -Register `RebuildExecutorHandler` service definition and message handler following the exact pattern of `BackfillExecutorHandler` registration (lines 155-181), but with: -- `RebuildExecutorHandler::class` service -- `executeRebuildBatch` method -- `RebuildExecutorHandler::REBUILD_EXECUTOR_CHANNEL` input channel -- Same `HeaderBuilder` mappings using `backfill.*` header names (reused from batch dispatching) - -### 11. Add `ecotone:projection:rebuild` console command -**File:** `packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php` - -```php -#[ConsoleCommand('ecotone:projection:rebuild')] -public function rebuildProjection(string $name): void -``` - -### 12. Update `FlowTestSupport` -**File:** `packages/Ecotone/src/Lite/Test/FlowTestSupport.php` - -Add `rebuildProjection()` method: -```php -public function rebuildProjection(string $projectionName): self -{ - $this->getGateway(ProjectionRegistry::class)->get($projectionName)->prepareRebuild(); - return $this; -} -``` - -### 13. Tests -**New file:** `packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php` - -Follow `BackfillProjectionTest` patterns with `AbstractTicketProjection` base class. The rebuild handler receives partition context. Test cases: - -1. **Partitioned async rebuild (batch=2, 5 partitions)**: Events already processed → rebuild sends 3 batch messages → each run resets partition, calls `#[ProjectionRebuild]`, re-processes events → verify data is rebuilt correctly -2. **Partitioned sync rebuild**: All 5 partitions rebuilt immediately -3. **Global sync rebuild**: Resets global tracking, re-processes all events -4. **Global async rebuild**: Single message, re-processes after running channel -5. **Rebuild calls `#[ProjectionRebuild]` handler with partition context**: Verify `aggregateId`, `aggregateType`, `streamName` are passed to the rebuild method -6. **Rebuild without `#[ProjectionRebuild]` handler works**: Tracking resets, data overwritten by re-processing - -Test projection fixture for partitioned rebuild: -```php -#[ProjectionRebuild] -public function rebuild( - #[PartitionAggregateId] string $aggregateId, - #[PartitionAggregateType] string $aggregateType, - #[PartitionStreamName] string $streamName, -): void { - $this->connection->executeStatement("DELETE FROM {$this->tableName()} WHERE ticket_id = ?", [$aggregateId]); -} -``` - -## Key Files - -| File | Action | -|------|--------| -| `packages/Ecotone/src/Projecting/Attribute/ProjectionRebuild.php` | Create | -| `packages/Ecotone/src/Projecting/Attribute/PartitionAggregateId.php` | Create | -| `packages/Ecotone/src/Projecting/Attribute/PartitionAggregateType.php` | Create | -| `packages/Ecotone/src/Projecting/Attribute/PartitionStreamName.php` | Create | -| `packages/Ecotone/src/Projecting/RebuildExecutorHandler.php` | Create | -| `packages/Ecotone/src/Projecting/ProjectorExecutor.php` | Add `rebuild()` | -| `packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php` | Implement `rebuild()` | -| `packages/Ecotone/src/Projecting/InMemory/InMemoryProjector.php` | Implement `rebuild()` | -| `packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreChannelAdapterProjection.php` | Implement `rebuild()` | -| `packages/Ecotone/src/Projecting/ProjectingHeaders.php` | Add rebuild constants | -| `packages/Ecotone/src/Projecting/ProjectingManager.php` | Add `executeRebuild()`, `prepareRebuild()`, refactor batch helpers | -| `packages/Ecotone/src/Projecting/Config/EcotoneProjectionExecutorBuilder.php` | Add `rebuildChannel` | -| `packages/Ecotone/src/Projecting/Config/ProjectingAttributeModule.php` | Scan `#[ProjectionRebuild]` | -| `packages/Ecotone/src/Projecting/Config/ProjectingModule.php` | Register `RebuildExecutorHandler` | -| `packages/Ecotone/src/Projecting/Config/ProjectingConsoleCommands.php` | Add rebuild command | -| `packages/Ecotone/src/Lite/Test/FlowTestSupport.php` | Add `rebuildProjection()` | -| `packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php` | Create | - -## Verification - -1. Run existing backfill tests (no regressions): `vendor/bin/phpunit packages/PdoEventSourcing/tests/Projecting/BackfillProjectionTest.php` -2. Run new rebuild tests: `vendor/bin/phpunit packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php` -3. Run full projecting test suite: `vendor/bin/phpunit packages/PdoEventSourcing/tests/Projecting/` From cb94bfca6c24e6959e487ce7bd46289d9cb97877 Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Tue, 3 Mar 2026 20:51:24 +0100 Subject: [PATCH 3/8] partition aggregate id --- .../Attribute/PartitionAggregateId.php | 25 +++++++++++++++++++ .../Attribute/PartitionAggregateType.php | 25 +++++++++++++++++++ .../Projecting/EcotoneProjectorExecutor.php | 5 ++-- .../src/Projecting/ProjectingHeaders.php | 3 --- .../Projecting/RebuildProjectionTest.php | 5 ++-- 5 files changed, 54 insertions(+), 9 deletions(-) create mode 100644 packages/Ecotone/src/Projecting/Attribute/PartitionAggregateId.php create mode 100644 packages/Ecotone/src/Projecting/Attribute/PartitionAggregateType.php diff --git a/packages/Ecotone/src/Projecting/Attribute/PartitionAggregateId.php b/packages/Ecotone/src/Projecting/Attribute/PartitionAggregateId.php new file mode 100644 index 000000000..1f1a462f9 --- /dev/null +++ b/packages/Ecotone/src/Projecting/Attribute/PartitionAggregateId.php @@ -0,0 +1,25 @@ +connection->executeStatement("DELETE FROM {$this->tableName()} WHERE ticket_id = ?", [$aggregateId]); From df5e21f24c21f8f825db8d40657e4d75f0c7486f Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Wed, 4 Mar 2026 16:24:32 +0000 Subject: [PATCH 4/8] fix: add rebuild methods to ProjectionExecutorBuilder interface and EventStoreStreamingChannelAdapterBuilder --- .../Projecting/Config/ProjectionExecutorBuilder.php | 2 ++ .../EventStoreStreamingChannelAdapterBuilder.php | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php b/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php index c309d20c0..08e7792af 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectionExecutorBuilder.php @@ -19,4 +19,6 @@ public function automaticInitialization(): bool; public function eventLoadingBatchSize(): int; public function backfillPartitionBatchSize(): int; public function backfillAsyncChannelName(): ?string; + public function rebuildPartitionBatchSize(): int; + public function rebuildAsyncChannelName(): ?string; } diff --git a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php index 4dabc0a72..392fae963 100644 --- a/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php +++ b/packages/Ecotone/src/Projecting/EventStoreAdapter/EventStoreStreamingChannelAdapterBuilder.php @@ -65,6 +65,16 @@ public function backfillAsyncChannelName(): ?string return null; } + public function rebuildPartitionBatchSize(): int + { + return 100; + } + + public function rebuildAsyncChannelName(): ?string + { + return null; + } + public function compile(MessagingContainerBuilder $builder): Definition|Reference { return new Definition( From c2465310d84ccdb7694edac9f082af2a3d912a3f Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 5 Mar 2026 08:42:50 +0100 Subject: [PATCH 5/8] Partition key --- .../src/Projecting/AggregatePartitionKey.php | 33 +++++++++++++++++++ .../Config/PartitionHeaderConverter.php | 11 ++++--- .../Projecting/EcotoneProjectorExecutor.php | 8 ++--- 3 files changed, 43 insertions(+), 9 deletions(-) create mode 100644 packages/Ecotone/src/Projecting/AggregatePartitionKey.php diff --git a/packages/Ecotone/src/Projecting/AggregatePartitionKey.php b/packages/Ecotone/src/Projecting/AggregatePartitionKey.php new file mode 100644 index 000000000..8393ee8d5 --- /dev/null +++ b/packages/Ecotone/src/Projecting/AggregatePartitionKey.php @@ -0,0 +1,33 @@ + $parts[0], + 'aggregateType' => $parts[1], + 'aggregateId' => $parts[2], + ]; + } +} diff --git a/packages/Ecotone/src/Projecting/Config/PartitionHeaderConverter.php b/packages/Ecotone/src/Projecting/Config/PartitionHeaderConverter.php index 566c089bb..7cc5bd8f8 100644 --- a/packages/Ecotone/src/Projecting/Config/PartitionHeaderConverter.php +++ b/packages/Ecotone/src/Projecting/Config/PartitionHeaderConverter.php @@ -10,15 +10,16 @@ use Ecotone\Messaging\Handler\ParameterConverter; use Ecotone\Messaging\Message; use Ecotone\Messaging\MessageHeaders; +use Ecotone\Projecting\AggregatePartitionKey; class PartitionHeaderConverter implements ParameterConverter { public function getArgumentFrom(Message $message): string { - $streamName = $message->getHeaders()->get(MessageHeaders::EVENT_STREAM_NAME); - $aggregateType = $message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_TYPE); - $aggregateId = $message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_ID); - - return "{$streamName}:{$aggregateType}:{$aggregateId}"; + return AggregatePartitionKey::compose( + $message->getHeaders()->get(MessageHeaders::EVENT_STREAM_NAME), + $message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_TYPE), + $message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_ID), + ); } } diff --git a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php index 69fe7cfc8..215b17035 100644 --- a/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php +++ b/packages/Ecotone/src/Projecting/EcotoneProjectorExecutor.php @@ -100,10 +100,10 @@ public function reset(?string $partitionKey = null): void if ($partitionKey !== null) { $headers[ProjectingHeaders::REBUILD_PARTITION_KEY] = $partitionKey; - $parts = explode(':', $partitionKey, 3); - if (count($parts) === 3) { - $headers[MessageHeaders::EVENT_AGGREGATE_TYPE] = $parts[1]; - $headers[MessageHeaders::EVENT_AGGREGATE_ID] = $parts[2]; + $decomposed = AggregatePartitionKey::decompose($partitionKey); + if ($decomposed !== null) { + $headers[MessageHeaders::EVENT_AGGREGATE_TYPE] = $decomposed['aggregateType']; + $headers[MessageHeaders::EVENT_AGGREGATE_ID] = $decomposed['aggregateId']; } } From b30a6a2f2d124daf33dc6b6893bb9ce43c827c4b Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 5 Mar 2026 20:52:09 +0100 Subject: [PATCH 6/8] robust rebuild --- .../Projecting/Config/ProjectingModule.php | 2 +- .../src/Projecting/ProjectingManager.php | 62 ++++----- .../Projecting/RebuildProjectionTest.php | 122 ++++++++++++++++-- 3 files changed, 146 insertions(+), 40 deletions(-) diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 16dbad1e7..9e5ae4a75 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -125,7 +125,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO ->chainInterceptedProcessor( MethodInvokerBuilder::create( $projectingManagerReference, - InterfaceToCallReference::create(ProjectingManager::class, 'executeSingleBatch'), + InterfaceToCallReference::create(ProjectingManager::class, 'executePartitionBatch'), [ HeaderBuilder::createOptional('partitionKeyValue', ProjectingHeaders::PROJECTION_PARTITION_KEY), HeaderBuilder::create('canInitialize', ProjectingHeaders::PROJECTION_CAN_INITIALIZE), diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index 0ad26a793..5b614b438 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -68,7 +68,7 @@ public function execute(?string $partitionKeyValue = null, bool $manualInitializ } while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true); } - public function executeSingleBatch(?string $partitionKeyValue = null, bool $canInitialize = false, bool $shouldReset = false): int + public function executePartitionBatch(?string $partitionKeyValue = null, bool $canInitialize = false, bool $shouldReset = false): int { $transaction = $this->getProjectionStateStorage()->beginTransaction(); try { @@ -90,30 +90,34 @@ public function executeSingleBatch(?string $partitionKeyValue = null, bool $canI } $streamSource = $this->streamSourceRegistry->getFor($this->projectionName); - $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); - + $totalProcessedEvents = 0; $userState = $projectionState->userState; - $processedEvents = 0; - foreach ($streamPage->events as $event) { - $userState = $this->projectorExecutor->project($event, $userState); - $processedEvents++; - } - if ($processedEvents > 0) { - $this->projectorExecutor->flush($userState); - } - - $projectionState = $projectionState - ->withLastPosition($streamPage->lastPosition) - ->withUserState($userState); - if ($processedEvents === 0 && $canInitialize) { - // If we are forcing execution and there are no new events, we still want to enable the projection if it was uninitialized + do { + $streamPage = $streamSource->load($this->projectionName, $projectionState->lastPosition, $this->eventLoadingBatchSize, $partitionKeyValue); + + $batchProcessedEvents = 0; + foreach ($streamPage->events as $event) { + $userState = $this->projectorExecutor->project($event, $userState); + $batchProcessedEvents++; + } + if ($batchProcessedEvents > 0) { + $this->projectorExecutor->flush($userState); + } + + $totalProcessedEvents += $batchProcessedEvents; + $projectionState = $projectionState + ->withLastPosition($streamPage->lastPosition) + ->withUserState($userState); + } while ($shouldReset && $batchProcessedEvents >= $this->eventLoadingBatchSize); + + if ($totalProcessedEvents === 0 && $canInitialize) { $projectionState = $projectionState->withStatus(ProjectionInitializationStatus::INITIALIZED); } $this->getProjectionStateStorage()->savePartition($projectionState); $transaction->commit(); - return $processedEvents; + return $totalProcessedEvents; } catch (Throwable $e) { $transaction->rollBack(); throw $e; @@ -221,19 +225,15 @@ public function backfill(): void public function executeWithReset(?string $partitionKeyValue = null): void { - $first = true; - do { - $processedEvents = $this->messagingEntrypoint->sendWithHeaders( - [], - [ - ProjectingHeaders::PROJECTION_PARTITION_KEY => $partitionKeyValue, - ProjectingHeaders::PROJECTION_CAN_INITIALIZE => true, - 'projection.shouldReset' => $first, - ], - self::batchChannelFor($this->projectionName) - ); - $first = false; - } while ($processedEvents > 0 && $this->terminationListener->shouldTerminate() !== true); + $this->messagingEntrypoint->sendWithHeaders( + [], + [ + ProjectingHeaders::PROJECTION_PARTITION_KEY => $partitionKeyValue, + ProjectingHeaders::PROJECTION_CAN_INITIALIZE => true, + 'projection.shouldReset' => true, + ], + self::batchChannelFor($this->projectionName) + ); } public function prepareRebuild(): void diff --git a/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php index bf371c4ed..f4e61b534 100644 --- a/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php @@ -20,11 +20,17 @@ use Ecotone\Modelling\Attribute\QueryHandler; use Ecotone\Projecting\Attribute\PartitionAggregateId; use Ecotone\Projecting\Attribute\Partitioned; +use Ecotone\Projecting\Attribute\ProjectionExecution; use Ecotone\Projecting\Attribute\ProjectionRebuild; use Ecotone\Projecting\Attribute\ProjectionV2; use Ecotone\Test\LicenceTesting; use InvalidArgumentException; +use RuntimeException; +use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\ChangeAssignedPerson; +use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\CloseTicket; use Test\Ecotone\EventSourcing\Fixture\Ticket\Command\RegisterTicket; +use Test\Ecotone\EventSourcing\Fixture\Ticket\Event\AssignedPersonWasChanged; +use Test\Ecotone\EventSourcing\Fixture\Ticket\Event\TicketWasClosed; use Test\Ecotone\EventSourcing\Fixture\Ticket\Event\TicketWasRegistered; use Test\Ecotone\EventSourcing\Fixture\Ticket\Ticket; use Test\Ecotone\EventSourcing\Fixture\Ticket\TicketEventConverter; @@ -105,13 +111,9 @@ public function delete(): void #[ProjectionReset] public function reset( - #[PartitionAggregateId] ?string $aggregateId = null, + #[PartitionAggregateId] string $aggregateId, ): void { - if ($aggregateId !== null) { - $this->connection->executeStatement("DELETE FROM {$this->tableName()} WHERE ticket_id = ?", [$aggregateId]); - } else { - $this->connection->executeStatement("DELETE FROM {$this->tableName()}"); - } + $this->connection->executeStatement("DELETE FROM {$this->tableName()} WHERE ticket_id = ?", [$aggregateId]); } public function getTickets(): array @@ -120,6 +122,80 @@ public function getTickets(): array } } +#[ProjectionV2('rebuild_rollback')] +#[Partitioned] +#[ProjectionRebuild] +#[ProjectionExecution(eventLoadingBatchSize: 3)] +#[FromStream(stream: Ticket::class, aggregateType: Ticket::class)] +class RebuildRollbackProjection +{ + public bool $shouldFailOnProjection = false; + public int $projectedEventsCount = 0; + + public function __construct(private Connection $connection) + { + } + + #[EventHandler] + public function onTicketRegistered(TicketWasRegistered $event): void + { + $this->handleEvent($event->getTicketId()); + $this->connection->executeStatement( + 'INSERT INTO rebuild_rollback_tickets VALUES (?,?) ON CONFLICT(ticket_id) DO NOTHING', + [$event->getTicketId(), $event->getTicketType()] + ); + } + + #[EventHandler] + public function onAssignedPersonChanged(AssignedPersonWasChanged $event): void + { + $this->handleEvent($event->getTicketId()); + } + + #[EventHandler] + public function onTicketClosed(TicketWasClosed $event): void + { + $this->handleEvent($event->getTicketId()); + } + + private function handleEvent(string $ticketId): void + { + if ($this->shouldFailOnProjection) { + $this->projectedEventsCount++; + if ($this->projectedEventsCount >= 4) { + throw new RuntimeException('Projection failed on 4th event during rebuild'); + } + } + } + + #[ProjectionInitialization] + public function initialization(): void + { + $this->connection->executeStatement( + 'CREATE TABLE IF NOT EXISTS rebuild_rollback_tickets (ticket_id VARCHAR(36) PRIMARY KEY, ticket_type VARCHAR(25))' + ); + } + + #[ProjectionDelete] + public function delete(): void + { + $this->connection->executeStatement('DROP TABLE IF EXISTS rebuild_rollback_tickets'); + } + + #[ProjectionReset] + public function reset( + #[PartitionAggregateId] string $aggregateId, + ): void { + $this->connection->executeStatement('DELETE FROM rebuild_rollback_tickets WHERE ticket_id = ?', [$aggregateId]); + } + + #[QueryHandler('getRebuildRollbackTickets')] + public function query(): array + { + return $this->connection->executeQuery('SELECT * FROM rebuild_rollback_tickets ORDER BY ticket_id ASC')->fetchAllAssociative(); + } +} + /** * licence Enterprise * @internal @@ -243,7 +319,7 @@ protected function tableName(): string public function test_global_projection_sync_rebuild(): void { $connection = $this->getConnection(); - $projection = new #[ ProjectionV2('rebuild_global_sync'), ProjectionRebuild(), FromStream(Ticket::class) ] class ($connection) extends AbstractRebuildGlobalProjection { + $projection = new #[ ProjectionV2('rebuild_global_sync'), ProjectionRebuild, FromStream(Ticket::class) ] class ($connection) extends AbstractRebuildGlobalProjection { #[QueryHandler('getRebuildGlobalSyncTickets')] public function query(): array { @@ -268,7 +344,7 @@ protected function tableName(): string public function test_rebuild_resets_existing_data(): void { $connection = $this->getConnection(); - $projection = new #[ ProjectionV2('rebuild_resets_data'), Partitioned, ProjectionRebuild(), FromStream(stream: Ticket::class, aggregateType: Ticket::class) ] class ($connection) extends AbstractRebuildPartitionedProjection { + $projection = new #[ ProjectionV2('rebuild_resets_data'), Partitioned, ProjectionRebuild, FromStream(stream: Ticket::class, aggregateType: Ticket::class) ] class ($connection) extends AbstractRebuildPartitionedProjection { #[QueryHandler('getRebuildResetsDataTickets')] public function query(): array { @@ -296,6 +372,36 @@ protected function tableName(): string self::assertSame('2', $tickets[1]['ticket_id']); } + public function test_rebuild_rolls_back_on_exception_during_reprojection(): void + { + $connection = $this->getConnection(); + $projection = new RebuildRollbackProjection($connection); + + $ecotone = $this->bootstrapEcotone([$projection::class], [$projection], true); + + $ecotone->sendCommand(new RegisterTicket('1', 'User1', 'alert')); + $ecotone->sendCommand(new ChangeAssignedPerson('1', 'User2')); + $ecotone->sendCommand(new ChangeAssignedPerson('1', 'User3')); + $ecotone->sendCommand(new ChangeAssignedPerson('1', 'User4')); + $ecotone->sendCommand(new CloseTicket('1')); + + self::assertCount(1, $ecotone->sendQueryWithRouting('getRebuildRollbackTickets')); + self::assertTrue($ecotone->sendQueryWithRouting('ticket.isClosed', metadata: ['aggregate.id' => '1'])); + + $projection->shouldFailOnProjection = true; + + $thrownException = false; + try { + $ecotone->runConsoleCommand('ecotone:projection:rebuild', ['name' => 'rebuild_rollback']); + } catch (RuntimeException $e) { + $thrownException = true; + } + + self::assertTrue($thrownException, 'Expected RuntimeException to be thrown during rebuild'); + self::assertCount(1, $ecotone->sendQueryWithRouting('getRebuildRollbackTickets')); + self::assertTrue($ecotone->sendQueryWithRouting('ticket.isClosed', metadata: ['aggregate.id' => '1'])); + } + private function createPartitions(FlowTestSupport $ecotone, int $count): void { for ($i = 1; $i <= $count; $i++) { From 486f7b59d9823da4b9981131db83b43a46c537bd Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Thu, 5 Mar 2026 20:58:13 +0100 Subject: [PATCH 7/8] refactor --- .../Projecting/BackfillExecutorHandler.php | 55 --------- .../Projecting/Config/ProjectingModule.php | 57 +++------- ....php => PartitionBatchExecutorHandler.php} | 13 ++- .../src/Projecting/ProjectingManager.php | 107 +++++------------- 4 files changed, 51 insertions(+), 181 deletions(-) delete mode 100644 packages/Ecotone/src/Projecting/BackfillExecutorHandler.php rename packages/Ecotone/src/Projecting/{RebuildExecutorHandler.php => PartitionBatchExecutorHandler.php} (70%) diff --git a/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php b/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php deleted file mode 100644 index 50186219c..000000000 --- a/packages/Ecotone/src/Projecting/BackfillExecutorHandler.php +++ /dev/null @@ -1,55 +0,0 @@ -projectionRegistry->get($projectionName); - $streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName); - - foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) { - $projectingManager->execute($partition, true); - if ($this->terminationListener->shouldTerminate()) { - break; - } - } - } -} diff --git a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php index 9e5ae4a75..ccb007010 100644 --- a/packages/Ecotone/src/Projecting/Config/ProjectingModule.php +++ b/packages/Ecotone/src/Projecting/Config/ProjectingModule.php @@ -29,8 +29,7 @@ use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvokerBuilder; use Ecotone\Messaging\Handler\ServiceActivator\MessageProcessorActivatorBuilder; use Ecotone\Projecting\Attribute\ProjectionFlush; -use Ecotone\Projecting\BackfillExecutorHandler; -use Ecotone\Projecting\RebuildExecutorHandler; +use Ecotone\Projecting\PartitionBatchExecutorHandler; use Ecotone\Projecting\InMemory\InMemoryProjectionRegistry; use Ecotone\Projecting\PartitionProviderRegistry; use Ecotone\Projecting\ProjectingHeaders; @@ -155,10 +154,10 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO new Definition(InMemoryProjectionRegistry::class, [$projectionRegistryMap]) ); - // Register BackfillExecutorHandler and its message handler + // Register PartitionBatchExecutorHandler and its message handler $messagingConfiguration->registerServiceDefinition( - BackfillExecutorHandler::class, - new Definition(BackfillExecutorHandler::class, [ + PartitionBatchExecutorHandler::class, + new Definition(PartitionBatchExecutorHandler::class, [ new Reference(ProjectionRegistry::class), new Reference(TerminationListener::class), ]) @@ -168,49 +167,21 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO MessageProcessorActivatorBuilder::create() ->chainInterceptedProcessor( MethodInvokerBuilder::create( - BackfillExecutorHandler::class, - InterfaceToCallReference::create(BackfillExecutorHandler::class, 'executeBackfillBatch'), + PartitionBatchExecutorHandler::class, + InterfaceToCallReference::create(PartitionBatchExecutorHandler::class, 'executeBatch'), [ PayloadBuilder::create('projectionName'), - HeaderBuilder::createOptional('limit', 'backfill.limit'), - HeaderBuilder::createOptional('offset', 'backfill.offset'), - HeaderBuilder::createOptional('streamName', 'backfill.streamName'), - HeaderBuilder::createOptional('aggregateType', 'backfill.aggregateType'), - HeaderBuilder::createOptional('eventStoreReferenceName', 'backfill.eventStoreReferenceName'), + HeaderBuilder::createOptional('limit', 'partitionBatch.limit'), + HeaderBuilder::createOptional('offset', 'partitionBatch.offset'), + HeaderBuilder::createOptional('streamName', 'partitionBatch.streamName'), + HeaderBuilder::createOptional('aggregateType', 'partitionBatch.aggregateType'), + HeaderBuilder::createOptional('eventStoreReferenceName', 'partitionBatch.eventStoreReferenceName'), + HeaderBuilder::createOptional('shouldReset', 'partitionBatch.shouldReset'), ], ) ) - ->withEndpointId('backfill_executor_handler') - ->withInputChannelName(BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL) - ); - - // Register RebuildExecutorHandler and its message handler - $messagingConfiguration->registerServiceDefinition( - RebuildExecutorHandler::class, - new Definition(RebuildExecutorHandler::class, [ - new Reference(ProjectionRegistry::class), - new Reference(TerminationListener::class), - ]) - ); - - $messagingConfiguration->registerMessageHandler( - MessageProcessorActivatorBuilder::create() - ->chainInterceptedProcessor( - MethodInvokerBuilder::create( - RebuildExecutorHandler::class, - InterfaceToCallReference::create(RebuildExecutorHandler::class, 'executeRebuildBatch'), - [ - PayloadBuilder::create('projectionName'), - HeaderBuilder::createOptional('limit', 'rebuild.limit'), - HeaderBuilder::createOptional('offset', 'rebuild.offset'), - HeaderBuilder::createOptional('streamName', 'rebuild.streamName'), - HeaderBuilder::createOptional('aggregateType', 'rebuild.aggregateType'), - HeaderBuilder::createOptional('eventStoreReferenceName', 'rebuild.eventStoreReferenceName'), - ], - ) - ) - ->withEndpointId('rebuild_executor_handler') - ->withInputChannelName(RebuildExecutorHandler::REBUILD_EXECUTOR_CHANNEL) + ->withEndpointId('partition_batch_executor_handler') + ->withInputChannelName(PartitionBatchExecutorHandler::PARTITION_BATCH_EXECUTOR_CHANNEL) ); // Register console commands diff --git a/packages/Ecotone/src/Projecting/RebuildExecutorHandler.php b/packages/Ecotone/src/Projecting/PartitionBatchExecutorHandler.php similarity index 70% rename from packages/Ecotone/src/Projecting/RebuildExecutorHandler.php rename to packages/Ecotone/src/Projecting/PartitionBatchExecutorHandler.php index 87ba99feb..49da5c550 100644 --- a/packages/Ecotone/src/Projecting/RebuildExecutorHandler.php +++ b/packages/Ecotone/src/Projecting/PartitionBatchExecutorHandler.php @@ -9,9 +9,9 @@ use Ecotone\Messaging\Endpoint\Interceptor\TerminationListener; -class RebuildExecutorHandler +class PartitionBatchExecutorHandler { - public const REBUILD_EXECUTOR_CHANNEL = 'ecotone.projection.rebuild.executor'; + public const PARTITION_BATCH_EXECUTOR_CHANNEL = 'ecotone.projection.partition_batch.executor'; public function __construct( private ProjectionRegistry $projectionRegistry, @@ -19,19 +19,24 @@ public function __construct( ) { } - public function executeRebuildBatch( + public function executeBatch( string $projectionName, ?int $limit = null, int $offset = 0, string $streamName = '', ?string $aggregateType = null, string $eventStoreReferenceName = '', + bool $shouldReset = false, ): void { $projectingManager = $this->projectionRegistry->get($projectionName); $streamFilter = new StreamFilter($streamName, $aggregateType, $eventStoreReferenceName); foreach ($projectingManager->getPartitionProvider()->partitions($streamFilter, $limit, $offset) as $partition) { - $projectingManager->executeWithReset($partition); + if ($shouldReset) { + $projectingManager->executeWithReset($partition); + } else { + $projectingManager->execute($partition, true); + } if ($this->terminationListener->shouldTerminate()) { break; } diff --git a/packages/Ecotone/src/Projecting/ProjectingManager.php b/packages/Ecotone/src/Projecting/ProjectingManager.php index 5b614b438..d2e87397f 100644 --- a/packages/Ecotone/src/Projecting/ProjectingManager.php +++ b/packages/Ecotone/src/Projecting/ProjectingManager.php @@ -158,61 +158,9 @@ public function delete(): void $this->projectorExecutor->delete(); } - /** - * Prepares backfill by calculating batches and sending messages to BackfillExecutorHandler. - * Each batch message contains a limit and offset for processing a subset of partitions. - * This enables the backfill to be executed synchronously or asynchronously depending on configuration. - */ public function prepareBackfill(): void { - $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); - - foreach ($streamFilters as $streamFilter) { - $this->prepareBackfillForFilter($streamFilter); - } - } - - private function prepareBackfillForFilter(StreamFilter $streamFilter): void - { - $totalPartitions = $this->getPartitionProvider()->count($streamFilter); - - if ($totalPartitions === 0) { - return; - } - - $numberOfBatches = (int) ceil($totalPartitions / $this->backfillPartitionBatchSize); - - for ($batch = 0; $batch < $numberOfBatches; $batch++) { - $offset = $batch * $this->backfillPartitionBatchSize; - - $headers = [ - 'backfill.limit' => $this->backfillPartitionBatchSize, - 'backfill.offset' => $offset, - 'backfill.streamName' => $streamFilter->streamName, - 'backfill.aggregateType' => $streamFilter->aggregateType, - 'backfill.eventStoreReferenceName' => $streamFilter->eventStoreReferenceName, - ]; - - $this->sendBackfillMessage($headers); - } - } - - private function sendBackfillMessage(array $headers): void - { - if ($this->backfillAsyncChannelName !== null) { - $this->messagingEntrypoint->sendWithHeaders( - $this->projectionName, - $headers, - $this->backfillAsyncChannelName, - BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL - ); - } else { - $this->messagingEntrypoint->sendWithHeaders( - $this->projectionName, - $headers, - BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL - ); - } + $this->preparePartitionBatches($this->backfillPartitionBatchSize, $this->backfillAsyncChannelName, false); } /** @@ -238,52 +186,53 @@ public function executeWithReset(?string $partitionKeyValue = null): void public function prepareRebuild(): void { - $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); - - foreach ($streamFilters as $streamFilter) { - $this->prepareRebuildForFilter($streamFilter); - } + $this->preparePartitionBatches($this->rebuildPartitionBatchSize, $this->rebuildAsyncChannelName, true); } - private function prepareRebuildForFilter(StreamFilter $streamFilter): void + private function preparePartitionBatches(int $partitionBatchSize, ?string $asyncChannelName, bool $shouldReset): void { - $totalPartitions = $this->getPartitionProvider()->count($streamFilter); + $streamFilters = $this->streamFilterRegistry->provide($this->projectionName); - if ($totalPartitions === 0) { - return; - } + foreach ($streamFilters as $streamFilter) { + $totalPartitions = $this->getPartitionProvider()->count($streamFilter); + + if ($totalPartitions === 0) { + continue; + } - $numberOfBatches = (int) ceil($totalPartitions / $this->rebuildPartitionBatchSize); + $numberOfBatches = (int) ceil($totalPartitions / $partitionBatchSize); - for ($batch = 0; $batch < $numberOfBatches; $batch++) { - $offset = $batch * $this->rebuildPartitionBatchSize; + for ($batch = 0; $batch < $numberOfBatches; $batch++) { + $offset = $batch * $partitionBatchSize; - $headers = [ - 'rebuild.limit' => $this->rebuildPartitionBatchSize, - 'rebuild.offset' => $offset, - 'rebuild.streamName' => $streamFilter->streamName, - 'rebuild.aggregateType' => $streamFilter->aggregateType, - 'rebuild.eventStoreReferenceName' => $streamFilter->eventStoreReferenceName, - ]; + $headers = [ + 'partitionBatch.limit' => $partitionBatchSize, + 'partitionBatch.offset' => $offset, + 'partitionBatch.streamName' => $streamFilter->streamName, + 'partitionBatch.aggregateType' => $streamFilter->aggregateType, + 'partitionBatch.eventStoreReferenceName' => $streamFilter->eventStoreReferenceName, + 'partitionBatch.shouldReset' => $shouldReset, + ]; - $this->sendRebuildMessage($headers); + $this->sendPartitionBatchMessage($headers, $asyncChannelName); + } } } - private function sendRebuildMessage(array $headers): void + private function sendPartitionBatchMessage(array $headers, ?string $asyncChannelName): void { - if ($this->rebuildAsyncChannelName !== null) { + if ($asyncChannelName !== null) { $this->messagingEntrypoint->sendWithHeaders( $this->projectionName, $headers, - $this->rebuildAsyncChannelName, - RebuildExecutorHandler::REBUILD_EXECUTOR_CHANNEL + $asyncChannelName, + PartitionBatchExecutorHandler::PARTITION_BATCH_EXECUTOR_CHANNEL ); } else { $this->messagingEntrypoint->sendWithHeaders( $this->projectionName, $headers, - RebuildExecutorHandler::REBUILD_EXECUTOR_CHANNEL + PartitionBatchExecutorHandler::PARTITION_BATCH_EXECUTOR_CHANNEL ); } } From cb860ea012049483475341ead32c4a281b2a0f5b Mon Sep 17 00:00:00 2001 From: Dariusz Gafka Date: Fri, 6 Mar 2026 14:31:55 +0100 Subject: [PATCH 8/8] mysql --- .../tests/Projecting/RebuildProjectionTest.php | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php b/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php index f4e61b534..3379a915e 100644 --- a/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php +++ b/packages/PdoEventSourcing/tests/Projecting/RebuildProjectionTest.php @@ -5,6 +5,7 @@ namespace Test\Ecotone\EventSourcing\Projecting; use Doctrine\DBAL\Connection; +use Doctrine\DBAL\Platforms\MySQLPlatform; use Ecotone\EventSourcing\Attribute\FromStream; use Ecotone\EventSourcing\Attribute\ProjectionDelete; use Ecotone\EventSourcing\Attribute\ProjectionInitialization; @@ -140,10 +141,11 @@ public function __construct(private Connection $connection) public function onTicketRegistered(TicketWasRegistered $event): void { $this->handleEvent($event->getTicketId()); - $this->connection->executeStatement( - 'INSERT INTO rebuild_rollback_tickets VALUES (?,?) ON CONFLICT(ticket_id) DO NOTHING', - [$event->getTicketId(), $event->getTicketType()] - ); + if ($this->connection->getDatabasePlatform() instanceof MySQLPlatform) { + $this->connection->executeStatement('INSERT IGNORE INTO rebuild_rollback_tickets VALUES (?,?)', [$event->getTicketId(), $event->getTicketType()]); + } else { + $this->connection->executeStatement('INSERT INTO rebuild_rollback_tickets VALUES (?,?) ON CONFLICT(ticket_id) DO NOTHING', [$event->getTicketId(), $event->getTicketType()]); + } } #[EventHandler]