Skip to content

feat: add projection rebuild command for ProjectionV2#646

Merged
dgafka merged 9 commits intomainfrom
feat/projection-rebuilding
Mar 6, 2026
Merged

feat: add projection rebuild command for ProjectionV2#646
dgafka merged 9 commits intomainfrom
feat/projection-rebuilding

Conversation

@dgafka
Copy link
Member

@dgafka dgafka commented Mar 3, 2026

Why is this change proposed?

ProjectionV2 supports backfill for catching up on historical events, but lacks a way to fully rebuild a projection from scratch — resetting its tracking state and re-processing all events. This is essential for recovering from corrupted projection state, applying schema changes, or fixing bugs in projection logic without registering another version of the projection. It's especially useful for partitioned projections that can be rebuilt in the background without long running blocking transaction (as it rebuilds one aggregate at a time).

Description of Changes

  • Add #[ProjectionRebuild] class attribute to configure rebuild behavior (partition batch size, async channel)
  • Add #[ProjectionReset] lifecycle method support — called per-partition during rebuild with partition context headers (aggregate ID, type, stream name)
  • Add ecotone:projection:rebuild console command that triggers rebuild via ProjectingManager::prepareRebuild()
  • Add RebuildExecutorHandler that iterates partitions and executes reset + re-process per partition within a transaction
  • Extend ProjectorExecutor interface with reset() method implemented across all executors

Usage

#[ProjectionV2('ticket_list')]
#[Partitioned]
#[ProjectionRebuild(partitionBatchSize: 100, asyncChannelName: 'rebuild')]
#[FromStream(stream: Ticket::class, aggregateType: Ticket::class)]
class TicketListProjection
{
    #[ProjectionReset]
    public function reset(
        #[PartitionAggregateId] ?string $aggregateId = null,
    ): void {
        if ($aggregateId !== null) {
            $this->connection->executeStatement("DELETE FROM tickets WHERE ticket_id = ?", [$aggregateId]);
        } else {
            $this->connection->executeStatement("DELETE FROM tickets");
        }
    }

    #[EventHandler]
    public function addTicket(TicketWasRegistered $event): void { /* ... */ }
}

Trigger via console: ecotone:projection:rebuild --name=ticket_list

Use case scenarios

  1. Bug fix in projection logic — After fixing a bug in an event handler, rebuild the projection to correct all historical data without downtime
  2. Schema migration — When adding a new column to the projection table, rebuild re-processes all events to populate the new column
  3. Partitioned cleanup — For partitioned projections, rebuild resets and re-processes each partition independently, calling the reset handler with partition context so you can surgically clean up per-aggregate data
sequenceDiagram
    participant Console as ecotone:projection:rebuild
    participant Manager as ProjectingManager
    participant Executor as RebuildExecutorHandler
    participant Projection as ProjectorExecutor
    
    Console->>Manager: prepareRebuild()
    Manager->>Manager: Batch partitions
    loop Each batch (sync or async)
        Manager->>Executor: executeRebuildBatch()
        loop Each partition
            Executor->>Projection: reset(partitionKey)
            Note over Projection: Calls #[ProjectionReset] with<br/>partition context headers
            Executor->>Projection: Re-process all events
        end
    end
Loading

Pull Request Contribution Terms

  • I have read and agree to the contribution terms outlined in CONTRIBUTING.

@dgafka dgafka merged commit 106f12d into main Mar 6, 2026
8 checks passed
@dgafka dgafka deleted the feat/projection-rebuilding branch March 6, 2026 16:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant