From a87ce119ecac447de4912f385aefa18b144e2516 Mon Sep 17 00:00:00 2001 From: Jan Wehner Date: Thu, 19 Feb 2026 14:22:16 +0100 Subject: [PATCH 1/4] feat: add option to set dynamic connection and queue --- src/Activity.php | 8 ++--- src/ActivityStub.php | 4 ++- src/ChildWorkflowStub.php | 6 +++- src/Traits/Continues.php | 6 +++- src/Workflow.php | 14 +++++---- src/WorkflowStub.php | 16 +++++++--- ...00_add_queue_column_to_workflows_table.php | 30 +++++++++++++++++++ ...e_connection_column_to_workflows_table.php | 30 +++++++++++++++++++ 8 files changed, 97 insertions(+), 17 deletions(-) create mode 100644 src/migrations/2026_02_19_000000_add_queue_column_to_workflows_table.php create mode 100644 src/migrations/2026_02_19_000001_add_queue_connection_column_to_workflows_table.php diff --git a/src/Activity.php b/src/Activity.php index 55908e27..5bab41d9 100644 --- a/src/Activity.php +++ b/src/Activity.php @@ -54,12 +54,12 @@ public function __construct( ) { $this->arguments = $arguments; - if (property_exists($this, 'connection')) { - $this->onConnection($this->connection); + if (property_exists($this, 'connection') || $this->storedWorkflow->queue_connection) { + $this->onConnection($this->storedWorkflow->queue_connection ?? $this->connection); } - if (property_exists($this, 'queue')) { - $this->onQueue($this->queue); + if (property_exists($this, 'queue') || $this->storedWorkflow->queue) { + $this->onConnection($this->storedWorkflow->queue ?? $this->queue); } $this->afterCommit = true; diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 3eb8cbdf..decac94c 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -66,7 +66,9 @@ public static function make($activity, ...$arguments): PromiseInterface return resolve($result); } - $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); + $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments) + ->onConnection($context->storedWorkflow->queue_connection) + ->onQueue($context->storedWorkflow->queue); ++$context->index; WorkflowStub::setContext($context); diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 36e70bb9..e1714969 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -56,7 +56,11 @@ public static function make($workflow, ...$arguments): PromiseInterface ->wherePivot('parent_index', $context->index) ->first(); - $childWorkflow = $storedChildWorkflow ? $storedChildWorkflow->toWorkflow() : WorkflowStub::make($workflow); + $childWorkflow = $storedChildWorkflow + ? $storedChildWorkflow->toWorkflow() + : WorkflowStub::make( + $workflow, $context->storedWorkflow->queue_connection, $context->storedWorkflow->queue + ); if ($childWorkflow->running() && ! $childWorkflow->created()) { try { diff --git a/src/Traits/Continues.php b/src/Traits/Continues.php index 2294b63a..2e677474 100644 --- a/src/Traits/Continues.php +++ b/src/Traits/Continues.php @@ -22,7 +22,11 @@ public static function continueAsNew(...$arguments): PromiseInterface ->withPivot('parent_index') ->first(); - $newWorkflow = self::make($context->storedWorkflow->class); + $newWorkflow = self::make( + $context->storedWorkflow->class, + $context->storedWorkflow->queue_connection, + $context->storedWorkflow->queue + ); if ($parentWorkflow) { $parentWorkflow->children() diff --git a/src/Workflow.php b/src/Workflow.php index 0942dd99..efac722b 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -76,12 +76,12 @@ public function __construct( $this->arguments = $arguments; - if (property_exists($this, 'connection')) { - $this->onConnection($this->connection); + if (property_exists($this, 'connection') || $this->storedWorkflow->queue_connection) { + $this->onConnection($this->storedWorkflow->queue_connection ?? $this->connection); } - if (property_exists($this, 'queue')) { - $this->onQueue($this->queue); + if (property_exists($this, 'queue') || $this->storedWorkflow->queue) { + $this->onConnection($this->storedWorkflow->queue ?? $this->queue); } $this->afterCommit = true; @@ -287,8 +287,10 @@ public function handle(): void if ($parentWorkflow) { $properties = WorkflowStub::getDefaultProperties($parentWorkflow->class); - $connection = $properties['connection'] ?? config('queue.default'); - $queue = $properties['queue'] ?? config('queue.connections.' . $connection . '.queue', 'default'); + $connection = $parentWorkflow->queue_connection + ?: ($properties['connection'] ?? config('queue.default')); + $queue = $parentWorkflow->queue + ?: ($properties['queue'] ?? config('queue.connections.' . $connection . '.queue', 'default')); ChildWorkflow::dispatch( $parentWorkflow->pivot->parent_index, diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index 84e33ac2..36db8a20 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -123,12 +123,18 @@ public function __call($method, $arguments) public static function connection() { - return Arr::get(self::getDefaultProperties(self::$context->storedWorkflow->class), 'connection'); + return self::$context->storedWorkflow->queue_connection ?? Arr::get( + self::getDefaultProperties(self::$context->storedWorkflow->class), + 'connection' + ); } public static function queue() { - return Arr::get(self::getDefaultProperties(self::$context->storedWorkflow->class), 'queue'); + return self::$context->storedWorkflow->queue ?? Arr::get( + self::getDefaultProperties(self::$context->storedWorkflow->class), + 'queue' + ); } public static function getDefaultProperties(string $class): array @@ -140,10 +146,12 @@ public static function getDefaultProperties(string $class): array return self::$defaultPropertiesCache[$class]; } - public static function make($class): static + public static function make($class, ?string $connection = null, ?string $queue = null): static { $storedWorkflow = config('workflows.stored_workflow_model', StoredWorkflow::class)::create([ 'class' => $class, + 'queue_connection' => $connection, + 'queue' => $queue, ]); return new self($storedWorkflow); @@ -396,6 +404,6 @@ private function dispatch(): void $this->storedWorkflow->class::$dispatch( $this->storedWorkflow, ...Serializer::unserialize($this->storedWorkflow->arguments) - ); + )->onConnection(self::connection())->onQueue(self::queue()); } } diff --git a/src/migrations/2026_02_19_000000_add_queue_column_to_workflows_table.php b/src/migrations/2026_02_19_000000_add_queue_column_to_workflows_table.php new file mode 100644 index 00000000..4b7512fe --- /dev/null +++ b/src/migrations/2026_02_19_000000_add_queue_column_to_workflows_table.php @@ -0,0 +1,30 @@ +string('queue') + ->nullable(); + }); + } + + /** + * Reverse the migrations. + */ + public function down(): void + { + Schema::table('workflows', static function (Blueprint $table) { + $table->dropColumn('queue'); + }); + } +}; diff --git a/src/migrations/2026_02_19_000001_add_queue_connection_column_to_workflows_table.php b/src/migrations/2026_02_19_000001_add_queue_connection_column_to_workflows_table.php new file mode 100644 index 00000000..a661a312 --- /dev/null +++ b/src/migrations/2026_02_19_000001_add_queue_connection_column_to_workflows_table.php @@ -0,0 +1,30 @@ +string('queue_connection') + ->nullable(); + }); + } + + /** + * Reverse the migrations. + */ + public function down(): void + { + Schema::table('workflows', static function (Blueprint $table) { + $table->dropColumn('queue_connection'); + }); + } +}; From b864fb7a3b52b214d6a43cfc01f8e89a5206acb8 Mon Sep 17 00:00:00 2001 From: Jan Wehner Date: Thu, 19 Feb 2026 15:49:05 +0100 Subject: [PATCH 2/4] test: add basic test for dynamic job connection/queue --- tests/Feature/DynamicConnectionQueueTest.php | 91 ++++++++++++++++++++ tests/TestCase.php | 6 +- 2 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 tests/Feature/DynamicConnectionQueueTest.php diff --git a/tests/Feature/DynamicConnectionQueueTest.php b/tests/Feature/DynamicConnectionQueueTest.php new file mode 100644 index 00000000..3b3b3c6c --- /dev/null +++ b/tests/Feature/DynamicConnectionQueueTest.php @@ -0,0 +1,91 @@ + 'redis', + 'connection' => 'workflows', + 'queue' => 'workflows', + 'retry_after' => 90, + 'block_for' => null, + 'after_commit' => false, + ]); + } + + public function testWorkWithDynamicConnectionQueue(): void + { + $workflow = WorkflowStub::make(TestWorkflow::class, 'workflows', 'workflows'); + + $workflow->start(shouldAssert: false); + + $workflow->cancel(); + + while (! $workflow->isCanceled()); + + while ($workflow->running()); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('workflow_activity_other', $workflow->output()); + $this->assertSame([TestActivity::class, TestOtherActivity::class, Signal::class], $workflow->logs() + ->pluck('class') + ->sort() + ->values() + ->toArray()); + + $this->assertSame('workflows', WorkflowStub::connection()); + $this->assertSame('workflows', WorkflowStub::queue()); + } + + public function testChildWorkflowWithDynamicConnectionQueue(): void + { + $workflow = WorkflowStub::make(TestParentWorkflow::class, 'workflows', 'workflows'); + + $workflow->start(); + + while ($workflow->running()); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('workflow_activity_other', $workflow->output()); + $this->assertSame([TestActivity::class, TestChildWorkflow::class], $workflow->logs() + ->pluck('class') + ->sort() + ->values() + ->toArray()); + + $this->assertSame('workflows', WorkflowStub::connection()); + $this->assertSame('workflows', WorkflowStub::queue()); + $this->assertDatabaseHas('workflows', [ + 'class' => TestChildWorkflow::class, + 'queue_connection' => 'workflows', + 'queue' => 'workflows', + ]); + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index 9538f9b9..d2b53bca 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -13,6 +13,8 @@ abstract class TestCase extends BaseTestCase { public const NUMBER_OF_WORKERS = 2; + public static string $queueConnection = 'redis'; + private static $workers = []; public static function setUpBeforeClass(): void @@ -44,7 +46,9 @@ public static function setUpBeforeClass(): void } for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { - self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); + self::$workers[$i] = new Process([ + 'php', __DIR__ . '/../vendor/bin/testbench', 'queue:work', self::$queueConnection + ]); self::$workers[$i]->disableOutput(); self::$workers[$i]->start(); } From a284780754c15808d2aa0ac5b17e92ae2e4d6136 Mon Sep 17 00:00:00 2001 From: Jan Wehner Date: Thu, 19 Feb 2026 15:43:54 +0100 Subject: [PATCH 3/4] run ecs --- src/ChildWorkflowStub.php | 4 +++- tests/Feature/DynamicConnectionQueueTest.php | 5 +---- tests/TestCase.php | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index e1714969..f598fd4d 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -59,7 +59,9 @@ public static function make($workflow, ...$arguments): PromiseInterface $childWorkflow = $storedChildWorkflow ? $storedChildWorkflow->toWorkflow() : WorkflowStub::make( - $workflow, $context->storedWorkflow->queue_connection, $context->storedWorkflow->queue + $workflow, + $context->storedWorkflow->queue_connection, + $context->storedWorkflow->queue ); if ($childWorkflow->running() && ! $childWorkflow->created()) { diff --git a/tests/Feature/DynamicConnectionQueueTest.php b/tests/Feature/DynamicConnectionQueueTest.php index 3b3b3c6c..a04b3048 100644 --- a/tests/Feature/DynamicConnectionQueueTest.php +++ b/tests/Feature/DynamicConnectionQueueTest.php @@ -5,16 +5,12 @@ namespace Tests\Feature; use Illuminate\Support\Facades\Config; -use Symfony\Component\Process\Process; use Tests\Fixtures\TestActivity; use Tests\Fixtures\TestChildWorkflow; use Tests\Fixtures\TestOtherActivity; use Tests\Fixtures\TestParentWorkflow; -use Tests\Fixtures\TestSagaWorkflow; -use Tests\Fixtures\TestUndoActivity; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; -use Workflow\Exception; use Workflow\Signal; use Workflow\States\WorkflowCompletedStatus; use Workflow\WorkflowStub; @@ -26,6 +22,7 @@ public static function setUpBeforeClass(): void parent::$queueConnection = 'workflows'; parent::setUpBeforeClass(); } + protected function setUp(): void { parent::setUp(); diff --git a/tests/TestCase.php b/tests/TestCase.php index d2b53bca..df923e9b 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -47,7 +47,7 @@ public static function setUpBeforeClass(): void for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { self::$workers[$i] = new Process([ - 'php', __DIR__ . '/../vendor/bin/testbench', 'queue:work', self::$queueConnection + 'php', __DIR__ . '/../vendor/bin/testbench', 'queue:work', self::$queueConnection, ]); self::$workers[$i]->disableOutput(); self::$workers[$i]->start(); From e8dee83f4a0c9a8ef74e35b0d7510bee56a8ea61 Mon Sep 17 00:00:00 2001 From: Jan Wehner Date: Thu, 19 Feb 2026 15:47:03 +0100 Subject: [PATCH 4/4] fix call to wrong method --- src/Activity.php | 2 +- src/Workflow.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Activity.php b/src/Activity.php index 5bab41d9..4d99a84a 100644 --- a/src/Activity.php +++ b/src/Activity.php @@ -59,7 +59,7 @@ public function __construct( } if (property_exists($this, 'queue') || $this->storedWorkflow->queue) { - $this->onConnection($this->storedWorkflow->queue ?? $this->queue); + $this->onQueue($this->storedWorkflow->queue ?? $this->queue); } $this->afterCommit = true; diff --git a/src/Workflow.php b/src/Workflow.php index efac722b..d478f38e 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -81,7 +81,7 @@ public function __construct( } if (property_exists($this, 'queue') || $this->storedWorkflow->queue) { - $this->onConnection($this->storedWorkflow->queue ?? $this->queue); + $this->onQueue($this->storedWorkflow->queue ?? $this->queue); } $this->afterCommit = true;