Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .claude/commands/create-pr.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ Keep the title under 70 characters, concise and descriptive.
**"Pull Request Contribution Terms"** section:
- Ask the user using AskUserQuestion: "Do you agree to the contribution terms outlined in CONTRIBUTING.md?" with options:
- "Yes, I agree" — Mark the checkbox with `[X]`
- "No" — Leave the checkbox empty `[ ]`
- "No" — Leave the checkbox empty `[]`
- If agreed: `- [X] I have read and agree to the contribution terms outlined in [CONTRIBUTING](https://github.com/ecotoneframework/ecotone-dev/blob/main/CONTRIBUTING.md)`
- If not agreed: `- [ ] I have read and agree to the contribution terms outlined in [CONTRIBUTING](https://github.com/ecotoneframework/ecotone-dev/blob/main/CONTRIBUTING.md)`
- If not agreed: `- [] I have read and agree to the contribution terms outlined in [CONTRIBUTING](https://github.com/ecotoneframework/ecotone-dev/blob/main/CONTRIBUTING.md)`

### Step 4: Review with User

Expand Down
33 changes: 33 additions & 0 deletions packages/Ecotone/src/Projecting/AggregatePartitionKey.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

/*
* licence Enterprise
*/
declare(strict_types=1);

namespace Ecotone\Projecting;

class AggregatePartitionKey
{
public static function compose(string $streamName, string $aggregateType, string $aggregateId): string
{
return "{$streamName}:{$aggregateType}:{$aggregateId}";
}

/**
* @return array{streamName: string, aggregateType: string, aggregateId: string}|null
*/
public static function decompose(string $partitionKey): ?array
{
$parts = explode(':', $partitionKey, 3);
if (count($parts) !== 3) {
return null;
}

return [
'streamName' => $parts[0],
'aggregateType' => $parts[1],
'aggregateId' => $parts[2],
];
}
}
25 changes: 25 additions & 0 deletions packages/Ecotone/src/Projecting/Attribute/PartitionAggregateId.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

/*
* licence Apache-2.0
*/
declare(strict_types=1);

namespace Ecotone\Projecting\Attribute;

use Attribute;
use Ecotone\Messaging\Attribute\Parameter\Header;
use Ecotone\Messaging\MessageHeaders;

#[Attribute(Attribute::TARGET_PARAMETER)]
class PartitionAggregateId extends Header
{
public function __construct()
{
}

public function getHeaderName(): string
{
return MessageHeaders::EVENT_AGGREGATE_ID;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

/*
* licence Apache-2.0
*/
declare(strict_types=1);

namespace Ecotone\Projecting\Attribute;

use Attribute;
use Ecotone\Messaging\Attribute\Parameter\Header;
use Ecotone\Messaging\MessageHeaders;

#[Attribute(Attribute::TARGET_PARAMETER)]
class PartitionAggregateType extends Header
{
public function __construct()
{
}

public function getHeaderName(): string
{
return MessageHeaders::EVENT_AGGREGATE_TYPE;
}
}
24 changes: 24 additions & 0 deletions packages/Ecotone/src/Projecting/Attribute/ProjectionRebuild.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

/*
* licence Enterprise
*/
declare(strict_types=1);

namespace Ecotone\Projecting\Attribute;

use Attribute;
use InvalidArgumentException;

#[Attribute(Attribute::TARGET_CLASS)]
class ProjectionRebuild
{
public function __construct(
public readonly int $partitionBatchSize = 100,
public readonly ?string $asyncChannelName = null,
) {
if ($this->partitionBatchSize < 1) {
throw new InvalidArgumentException('Rebuild partition batch size must be at least 1');
}
}
}
55 changes: 0 additions & 55 deletions packages/Ecotone/src/Projecting/BackfillExecutorHandler.php

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class EcotoneProjectionExecutorBuilder implements ProjectionExecutorBuilder
{
private const DEFAULT_EVENT_LOADING_BATCH_SIZE = 1_000;
private const DEFAULT_BACKFILL_PARTITION_BATCH_SIZE = 100;
private const DEFAULT_REBUILD_PARTITION_BATCH_SIZE = 100;

/**
* @param AnnotatedDefinition[] $projectionEventHandlers
Expand All @@ -46,6 +47,9 @@ public function __construct(
private ?int $backfillPartitionBatchSize = null,
private ?string $backfillAsyncChannelName = null,
private bool $partitioned = false,
private ?string $resetChannel = null,
private ?int $rebuildPartitionBatchSize = null,
private ?string $rebuildAsyncChannelName = null,
) {
if ($this->partitionHeader && ! $this->automaticInitialization) {
throw new ConfigurationException("Cannot set partition header for projection {$this->projectionName} with automatic initialization disabled");
Expand Down Expand Up @@ -92,6 +96,11 @@ public function setFlushChannel(string $inputChannel): void
$this->flushChannel = $inputChannel;
}

public function setResetChannel(string $inputChannel): void
{
$this->resetChannel = $inputChannel;
}

public function setAsyncChannel(string $asynchronousChannelName): void
{
$this->asyncChannelName = $asynchronousChannelName;
Expand All @@ -117,6 +126,16 @@ public function backfillAsyncChannelName(): ?string
return $this->backfillAsyncChannelName;
}

public function rebuildPartitionBatchSize(): int
{
return $this->rebuildPartitionBatchSize ?? self::DEFAULT_REBUILD_PARTITION_BATCH_SIZE;
}

public function rebuildAsyncChannelName(): ?string
{
return $this->rebuildAsyncChannelName;
}

public function compile(MessagingContainerBuilder $builder): Definition|Reference
{
$routerProcessor = $this->buildExecutionRouter($builder);
Expand All @@ -129,6 +148,7 @@ public function compile(MessagingContainerBuilder $builder): Definition|Referenc
$this->deleteChannel,
$this->flushChannel,
$this->isLive,
$this->resetChannel,
]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
use Ecotone\Messaging\Handler\ParameterConverter;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageHeaders;
use Ecotone\Projecting\AggregatePartitionKey;

class PartitionHeaderConverter implements ParameterConverter
{
public function getArgumentFrom(Message $message): string
{
$streamName = $message->getHeaders()->get(MessageHeaders::EVENT_STREAM_NAME);
$aggregateType = $message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_TYPE);
$aggregateId = $message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_ID);

return "{$streamName}:{$aggregateType}:{$aggregateId}";
return AggregatePartitionKey::compose(
$message->getHeaders()->get(MessageHeaders::EVENT_STREAM_NAME),
$message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_TYPE),
$message->getHeaders()->get(MessageHeaders::EVENT_AGGREGATE_ID),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Ecotone\AnnotationFinder\AnnotationFinder;
use Ecotone\EventSourcing\Attribute\ProjectionDelete;
use Ecotone\EventSourcing\Attribute\ProjectionInitialization;
use Ecotone\EventSourcing\Attribute\ProjectionReset;
use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Messaging\Attribute\ModuleAnnotation;
use Ecotone\Messaging\Config\Annotation\AnnotatedDefinitionReference;
Expand All @@ -36,6 +37,7 @@
use Ecotone\Projecting\Attribute\Polling;
use Ecotone\Projecting\Attribute\ProjectionBackfill;
use Ecotone\Projecting\Attribute\ProjectionDeployment;
use Ecotone\Projecting\Attribute\ProjectionRebuild;
use Ecotone\Projecting\Attribute\ProjectionExecution;
use Ecotone\Projecting\Attribute\ProjectionFlush;
use Ecotone\Projecting\Attribute\ProjectionV2;
Expand Down Expand Up @@ -80,6 +82,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
$projectionAttribute = $annotationRegistrationService->getAttributeForClass($projectionClassName, ProjectionV2::class);
$batchSizeAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionExecution::class);
$backfillAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionBackfill::class);
$rebuildAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionRebuild::class);
$pollingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Polling::class);
$streamingAttribute = $annotationRegistrationService->findAttributeForClass($projectionClassName, Streaming::class);
$projectionDeployment = $annotationRegistrationService->findAttributeForClass($projectionClassName, ProjectionDeployment::class);
Expand All @@ -100,6 +103,8 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
backfillPartitionBatchSize: $backfillAttribute?->backfillPartitionBatchSize,
backfillAsyncChannelName: $backfillAttribute?->asyncChannelName,
partitioned: $partitionAttribute !== null,
rebuildPartitionBatchSize: $rebuildAttribute?->partitionBatchSize,
rebuildAsyncChannelName: $rebuildAttribute?->asyncChannelName,
);

$asynchronousChannelName = self::getProjectionAsynchronousChannel($annotationRegistrationService, $projectionClassName);
Expand Down Expand Up @@ -140,6 +145,7 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
$annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionInitialization::class),
$annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionDelete::class),
$annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionFlush::class),
$annotationRegistrationService->findCombined(ProjectionV2::class, ProjectionReset::class),
);
foreach ($lifecycleAnnotations as $lifecycleAnnotation) {
/** @var ProjectionV2 $projectionAttribute */
Expand All @@ -153,6 +159,8 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
$projectionBuilder->setDeleteChannel($inputChannel);
} elseif ($lifecycleAnnotation->getAnnotationForMethod() instanceof ProjectionFlush) {
$projectionBuilder->setFlushChannel($inputChannel);
} elseif ($lifecycleAnnotation->getAnnotationForMethod() instanceof ProjectionReset) {
$projectionBuilder->setResetChannel($inputChannel);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ public function backfillProjection(string $name): void
$this->registry->get($name)->prepareBackfill();
}

#[ConsoleCommand('ecotone:projection:rebuild')]
public function rebuildProjection(string $name): void
{
if (! $this->registry->has($name)) {
throw new InvalidArgumentException("There is no projection with name {$name}");
}
$this->registry->get($name)->prepareRebuild();
}

#[ConsoleCommand('ecotone:projection:delete')]
public function deleteProjection(string $name): void
{
Expand Down
32 changes: 18 additions & 14 deletions packages/Ecotone/src/Projecting/Config/ProjectingModule.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvokerBuilder;
use Ecotone\Messaging\Handler\ServiceActivator\MessageProcessorActivatorBuilder;
use Ecotone\Projecting\Attribute\ProjectionFlush;
use Ecotone\Projecting\BackfillExecutorHandler;
use Ecotone\Projecting\PartitionBatchExecutorHandler;
use Ecotone\Projecting\InMemory\InMemoryProjectionRegistry;
use Ecotone\Projecting\PartitionProviderRegistry;
use Ecotone\Projecting\ProjectingHeaders;
Expand Down Expand Up @@ -93,6 +93,8 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
$projectionBuilder->automaticInitialization(),
$projectionBuilder->backfillPartitionBatchSize(),
$projectionBuilder->backfillAsyncChannelName(),
$projectionBuilder->rebuildPartitionBatchSize(),
$projectionBuilder->rebuildAsyncChannelName(),
])
);
$projectionRegistryMap[$projectionName] = new Reference($projectingManagerReference);
Expand Down Expand Up @@ -122,10 +124,11 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
->chainInterceptedProcessor(
MethodInvokerBuilder::create(
$projectingManagerReference,
InterfaceToCallReference::create(ProjectingManager::class, 'executeSingleBatch'),
InterfaceToCallReference::create(ProjectingManager::class, 'executePartitionBatch'),
[
HeaderBuilder::createOptional('partitionKeyValue', ProjectingHeaders::PROJECTION_PARTITION_KEY),
HeaderBuilder::create('canInitialize', ProjectingHeaders::PROJECTION_CAN_INITIALIZE),
HeaderBuilder::createOptional('shouldReset', 'projection.shouldReset'),
],
)
)
Expand All @@ -151,10 +154,10 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
new Definition(InMemoryProjectionRegistry::class, [$projectionRegistryMap])
);

// Register BackfillExecutorHandler and its message handler
// Register PartitionBatchExecutorHandler and its message handler
$messagingConfiguration->registerServiceDefinition(
BackfillExecutorHandler::class,
new Definition(BackfillExecutorHandler::class, [
PartitionBatchExecutorHandler::class,
new Definition(PartitionBatchExecutorHandler::class, [
new Reference(ProjectionRegistry::class),
new Reference(TerminationListener::class),
])
Expand All @@ -164,20 +167,21 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
MessageProcessorActivatorBuilder::create()
->chainInterceptedProcessor(
MethodInvokerBuilder::create(
BackfillExecutorHandler::class,
InterfaceToCallReference::create(BackfillExecutorHandler::class, 'executeBackfillBatch'),
PartitionBatchExecutorHandler::class,
InterfaceToCallReference::create(PartitionBatchExecutorHandler::class, 'executeBatch'),
[
PayloadBuilder::create('projectionName'),
HeaderBuilder::createOptional('limit', 'backfill.limit'),
HeaderBuilder::createOptional('offset', 'backfill.offset'),
HeaderBuilder::createOptional('streamName', 'backfill.streamName'),
HeaderBuilder::createOptional('aggregateType', 'backfill.aggregateType'),
HeaderBuilder::createOptional('eventStoreReferenceName', 'backfill.eventStoreReferenceName'),
HeaderBuilder::createOptional('limit', 'partitionBatch.limit'),
HeaderBuilder::createOptional('offset', 'partitionBatch.offset'),
HeaderBuilder::createOptional('streamName', 'partitionBatch.streamName'),
HeaderBuilder::createOptional('aggregateType', 'partitionBatch.aggregateType'),
HeaderBuilder::createOptional('eventStoreReferenceName', 'partitionBatch.eventStoreReferenceName'),
HeaderBuilder::createOptional('shouldReset', 'partitionBatch.shouldReset'),
],
)
)
->withEndpointId('backfill_executor_handler')
->withInputChannelName(BackfillExecutorHandler::BACKFILL_EXECUTOR_CHANNEL)
->withEndpointId('partition_batch_executor_handler')
->withInputChannelName(PartitionBatchExecutorHandler::PARTITION_BATCH_EXECUTOR_CHANNEL)
);

// Register console commands
Expand Down
Loading