diff --git a/src/Activity.php b/src/Activity.php index 55908e27..4d99a84a 100644 --- a/src/Activity.php +++ b/src/Activity.php @@ -54,12 +54,12 @@ public function __construct( ) { $this->arguments = $arguments; - if (property_exists($this, 'connection')) { - $this->onConnection($this->connection); + if (property_exists($this, 'connection') || $this->storedWorkflow->queue_connection) { + $this->onConnection($this->storedWorkflow->queue_connection ?? $this->connection); } - if (property_exists($this, 'queue')) { - $this->onQueue($this->queue); + if (property_exists($this, 'queue') || $this->storedWorkflow->queue) { + $this->onQueue($this->storedWorkflow->queue ?? $this->queue); } $this->afterCommit = true; diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 3eb8cbdf..decac94c 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -66,7 +66,9 @@ public static function make($activity, ...$arguments): PromiseInterface return resolve($result); } - $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); + $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments) + ->onConnection($context->storedWorkflow->queue_connection) + ->onQueue($context->storedWorkflow->queue); ++$context->index; WorkflowStub::setContext($context); diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 36e70bb9..f598fd4d 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -56,7 +56,13 @@ public static function make($workflow, ...$arguments): PromiseInterface ->wherePivot('parent_index', $context->index) ->first(); - $childWorkflow = $storedChildWorkflow ? $storedChildWorkflow->toWorkflow() : WorkflowStub::make($workflow); + $childWorkflow = $storedChildWorkflow + ? $storedChildWorkflow->toWorkflow() + : WorkflowStub::make( + $workflow, + $context->storedWorkflow->queue_connection, + $context->storedWorkflow->queue + ); if ($childWorkflow->running() && ! $childWorkflow->created()) { try { diff --git a/src/Traits/Continues.php b/src/Traits/Continues.php index 2294b63a..2e677474 100644 --- a/src/Traits/Continues.php +++ b/src/Traits/Continues.php @@ -22,7 +22,11 @@ public static function continueAsNew(...$arguments): PromiseInterface ->withPivot('parent_index') ->first(); - $newWorkflow = self::make($context->storedWorkflow->class); + $newWorkflow = self::make( + $context->storedWorkflow->class, + $context->storedWorkflow->queue_connection, + $context->storedWorkflow->queue + ); if ($parentWorkflow) { $parentWorkflow->children() diff --git a/src/Workflow.php b/src/Workflow.php index 0942dd99..d478f38e 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -76,12 +76,12 @@ public function __construct( $this->arguments = $arguments; - if (property_exists($this, 'connection')) { - $this->onConnection($this->connection); + if (property_exists($this, 'connection') || $this->storedWorkflow->queue_connection) { + $this->onConnection($this->storedWorkflow->queue_connection ?? $this->connection); } - if (property_exists($this, 'queue')) { - $this->onQueue($this->queue); + if (property_exists($this, 'queue') || $this->storedWorkflow->queue) { + $this->onQueue($this->storedWorkflow->queue ?? $this->queue); } $this->afterCommit = true; @@ -287,8 +287,10 @@ public function handle(): void if ($parentWorkflow) { $properties = WorkflowStub::getDefaultProperties($parentWorkflow->class); - $connection = $properties['connection'] ?? config('queue.default'); - $queue = $properties['queue'] ?? config('queue.connections.' . $connection . '.queue', 'default'); + $connection = $parentWorkflow->queue_connection + ?: ($properties['connection'] ?? config('queue.default')); + $queue = $parentWorkflow->queue + ?: ($properties['queue'] ?? config('queue.connections.' . $connection . '.queue', 'default')); ChildWorkflow::dispatch( $parentWorkflow->pivot->parent_index, diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index 84e33ac2..36db8a20 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -123,12 +123,18 @@ public function __call($method, $arguments) public static function connection() { - return Arr::get(self::getDefaultProperties(self::$context->storedWorkflow->class), 'connection'); + return self::$context->storedWorkflow->queue_connection ?? Arr::get( + self::getDefaultProperties(self::$context->storedWorkflow->class), + 'connection' + ); } public static function queue() { - return Arr::get(self::getDefaultProperties(self::$context->storedWorkflow->class), 'queue'); + return self::$context->storedWorkflow->queue ?? Arr::get( + self::getDefaultProperties(self::$context->storedWorkflow->class), + 'queue' + ); } public static function getDefaultProperties(string $class): array @@ -140,10 +146,12 @@ public static function getDefaultProperties(string $class): array return self::$defaultPropertiesCache[$class]; } - public static function make($class): static + public static function make($class, ?string $connection = null, ?string $queue = null): static { $storedWorkflow = config('workflows.stored_workflow_model', StoredWorkflow::class)::create([ 'class' => $class, + 'queue_connection' => $connection, + 'queue' => $queue, ]); return new self($storedWorkflow); @@ -396,6 +404,6 @@ private function dispatch(): void $this->storedWorkflow->class::$dispatch( $this->storedWorkflow, ...Serializer::unserialize($this->storedWorkflow->arguments) - ); + )->onConnection(self::connection())->onQueue(self::queue()); } } diff --git a/src/migrations/2026_02_19_000000_add_queue_column_to_workflows_table.php b/src/migrations/2026_02_19_000000_add_queue_column_to_workflows_table.php new file mode 100644 index 00000000..4b7512fe --- /dev/null +++ b/src/migrations/2026_02_19_000000_add_queue_column_to_workflows_table.php @@ -0,0 +1,30 @@ +string('queue') + ->nullable(); + }); + } + + /** + * Reverse the migrations. + */ + public function down(): void + { + Schema::table('workflows', static function (Blueprint $table) { + $table->dropColumn('queue'); + }); + } +}; diff --git a/src/migrations/2026_02_19_000001_add_queue_connection_column_to_workflows_table.php b/src/migrations/2026_02_19_000001_add_queue_connection_column_to_workflows_table.php new file mode 100644 index 00000000..a661a312 --- /dev/null +++ b/src/migrations/2026_02_19_000001_add_queue_connection_column_to_workflows_table.php @@ -0,0 +1,30 @@ +string('queue_connection') + ->nullable(); + }); + } + + /** + * Reverse the migrations. + */ + public function down(): void + { + Schema::table('workflows', static function (Blueprint $table) { + $table->dropColumn('queue_connection'); + }); + } +}; diff --git a/tests/Feature/DynamicConnectionQueueTest.php b/tests/Feature/DynamicConnectionQueueTest.php new file mode 100644 index 00000000..a04b3048 --- /dev/null +++ b/tests/Feature/DynamicConnectionQueueTest.php @@ -0,0 +1,88 @@ + 'redis', + 'connection' => 'workflows', + 'queue' => 'workflows', + 'retry_after' => 90, + 'block_for' => null, + 'after_commit' => false, + ]); + } + + public function testWorkWithDynamicConnectionQueue(): void + { + $workflow = WorkflowStub::make(TestWorkflow::class, 'workflows', 'workflows'); + + $workflow->start(shouldAssert: false); + + $workflow->cancel(); + + while (! $workflow->isCanceled()); + + while ($workflow->running()); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('workflow_activity_other', $workflow->output()); + $this->assertSame([TestActivity::class, TestOtherActivity::class, Signal::class], $workflow->logs() + ->pluck('class') + ->sort() + ->values() + ->toArray()); + + $this->assertSame('workflows', WorkflowStub::connection()); + $this->assertSame('workflows', WorkflowStub::queue()); + } + + public function testChildWorkflowWithDynamicConnectionQueue(): void + { + $workflow = WorkflowStub::make(TestParentWorkflow::class, 'workflows', 'workflows'); + + $workflow->start(); + + while ($workflow->running()); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('workflow_activity_other', $workflow->output()); + $this->assertSame([TestActivity::class, TestChildWorkflow::class], $workflow->logs() + ->pluck('class') + ->sort() + ->values() + ->toArray()); + + $this->assertSame('workflows', WorkflowStub::connection()); + $this->assertSame('workflows', WorkflowStub::queue()); + $this->assertDatabaseHas('workflows', [ + 'class' => TestChildWorkflow::class, + 'queue_connection' => 'workflows', + 'queue' => 'workflows', + ]); + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index 9538f9b9..df923e9b 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -13,6 +13,8 @@ abstract class TestCase extends BaseTestCase { public const NUMBER_OF_WORKERS = 2; + public static string $queueConnection = 'redis'; + private static $workers = []; public static function setUpBeforeClass(): void @@ -44,7 +46,9 @@ public static function setUpBeforeClass(): void } for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { - self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); + self::$workers[$i] = new Process([ + 'php', __DIR__ . '/../vendor/bin/testbench', 'queue:work', self::$queueConnection, + ]); self::$workers[$i]->disableOutput(); self::$workers[$i]->start(); }