Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/Activity.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public function __construct(
) {
$this->arguments = $arguments;

if (property_exists($this, 'connection')) {
$this->onConnection($this->connection);
if (property_exists($this, 'connection') || $this->storedWorkflow->queue_connection) {
$this->onConnection($this->storedWorkflow->queue_connection ?? $this->connection);
}

if (property_exists($this, 'queue')) {
$this->onQueue($this->queue);
if (property_exists($this, 'queue') || $this->storedWorkflow->queue) {
$this->onQueue($this->storedWorkflow->queue ?? $this->queue);
}

$this->afterCommit = true;
Expand Down
4 changes: 3 additions & 1 deletion src/ActivityStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public static function make($activity, ...$arguments): PromiseInterface
return resolve($result);
}

$activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments);
$activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments)
->onConnection($context->storedWorkflow->queue_connection)
->onQueue($context->storedWorkflow->queue);

++$context->index;
WorkflowStub::setContext($context);
Expand Down
8 changes: 7 additions & 1 deletion src/ChildWorkflowStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ public static function make($workflow, ...$arguments): PromiseInterface
->wherePivot('parent_index', $context->index)
->first();

$childWorkflow = $storedChildWorkflow ? $storedChildWorkflow->toWorkflow() : WorkflowStub::make($workflow);
$childWorkflow = $storedChildWorkflow
? $storedChildWorkflow->toWorkflow()
: WorkflowStub::make(
$workflow,
$context->storedWorkflow->queue_connection,
$context->storedWorkflow->queue
);

if ($childWorkflow->running() && ! $childWorkflow->created()) {
try {
Expand Down
6 changes: 5 additions & 1 deletion src/Traits/Continues.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ public static function continueAsNew(...$arguments): PromiseInterface
->withPivot('parent_index')
->first();

$newWorkflow = self::make($context->storedWorkflow->class);
$newWorkflow = self::make(
$context->storedWorkflow->class,
$context->storedWorkflow->queue_connection,
$context->storedWorkflow->queue
);

if ($parentWorkflow) {
$parentWorkflow->children()
Expand Down
14 changes: 8 additions & 6 deletions src/Workflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ public function __construct(

$this->arguments = $arguments;

if (property_exists($this, 'connection')) {
$this->onConnection($this->connection);
if (property_exists($this, 'connection') || $this->storedWorkflow->queue_connection) {
$this->onConnection($this->storedWorkflow->queue_connection ?? $this->connection);
}

if (property_exists($this, 'queue')) {
$this->onQueue($this->queue);
if (property_exists($this, 'queue') || $this->storedWorkflow->queue) {
$this->onQueue($this->storedWorkflow->queue ?? $this->queue);
}

$this->afterCommit = true;
Expand Down Expand Up @@ -287,8 +287,10 @@ public function handle(): void

if ($parentWorkflow) {
$properties = WorkflowStub::getDefaultProperties($parentWorkflow->class);
$connection = $properties['connection'] ?? config('queue.default');
$queue = $properties['queue'] ?? config('queue.connections.' . $connection . '.queue', 'default');
$connection = $parentWorkflow->queue_connection
?: ($properties['connection'] ?? config('queue.default'));
$queue = $parentWorkflow->queue
?: ($properties['queue'] ?? config('queue.connections.' . $connection . '.queue', 'default'));

ChildWorkflow::dispatch(
$parentWorkflow->pivot->parent_index,
Expand Down
16 changes: 12 additions & 4 deletions src/WorkflowStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,18 @@ public function __call($method, $arguments)

public static function connection()
{
return Arr::get(self::getDefaultProperties(self::$context->storedWorkflow->class), 'connection');
return self::$context->storedWorkflow->queue_connection ?? Arr::get(
self::getDefaultProperties(self::$context->storedWorkflow->class),
'connection'
);
}

public static function queue()
{
return Arr::get(self::getDefaultProperties(self::$context->storedWorkflow->class), 'queue');
return self::$context->storedWorkflow->queue ?? Arr::get(
self::getDefaultProperties(self::$context->storedWorkflow->class),
'queue'
);
}

public static function getDefaultProperties(string $class): array
Expand All @@ -140,10 +146,12 @@ public static function getDefaultProperties(string $class): array
return self::$defaultPropertiesCache[$class];
}

public static function make($class): static
public static function make($class, ?string $connection = null, ?string $queue = null): static
{
$storedWorkflow = config('workflows.stored_workflow_model', StoredWorkflow::class)::create([
'class' => $class,
'queue_connection' => $connection,
'queue' => $queue,
]);

return new self($storedWorkflow);
Expand Down Expand Up @@ -396,6 +404,6 @@ private function dispatch(): void
$this->storedWorkflow->class::$dispatch(
$this->storedWorkflow,
...Serializer::unserialize($this->storedWorkflow->arguments)
);
)->onConnection(self::connection())->onQueue(self::queue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class() extends Migration {
/**
* Run the migrations.
*/
public function up(): void
{
Schema::table('workflows', static function (Blueprint $blueprint): void {
$blueprint->string('queue')
->nullable();
});
}

/**
* Reverse the migrations.
*/
public function down(): void
{
Schema::table('workflows', static function (Blueprint $table) {
$table->dropColumn('queue');
});
}
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class() extends Migration {
/**
* Run the migrations.
*/
public function up(): void
{
Schema::table('workflows', static function (Blueprint $blueprint): void {
$blueprint->string('queue_connection')
->nullable();
});
}

/**
* Reverse the migrations.
*/
public function down(): void
{
Schema::table('workflows', static function (Blueprint $table) {
$table->dropColumn('queue_connection');
});
}
};
88 changes: 88 additions & 0 deletions tests/Feature/DynamicConnectionQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

declare(strict_types=1);

namespace Tests\Feature;

use Illuminate\Support\Facades\Config;
use Tests\Fixtures\TestActivity;
use Tests\Fixtures\TestChildWorkflow;
use Tests\Fixtures\TestOtherActivity;
use Tests\Fixtures\TestParentWorkflow;
use Tests\Fixtures\TestWorkflow;
use Tests\TestCase;
use Workflow\Signal;
use Workflow\States\WorkflowCompletedStatus;
use Workflow\WorkflowStub;

final class DynamicConnectionQueueTest extends TestCase
{
public static function setUpBeforeClass(): void
{
parent::$queueConnection = 'workflows';
parent::setUpBeforeClass();
}

protected function setUp(): void
{
parent::setUp();
Config::set('database.redis.workflows', Config::get('database.redis.default'));
Config::set('queue.connections.workflows', [
'driver' => 'redis',
'connection' => 'workflows',
'queue' => 'workflows',
'retry_after' => 90,
'block_for' => null,
'after_commit' => false,
]);
}

public function testWorkWithDynamicConnectionQueue(): void
{
$workflow = WorkflowStub::make(TestWorkflow::class, 'workflows', 'workflows');

$workflow->start(shouldAssert: false);

$workflow->cancel();

while (! $workflow->isCanceled());

while ($workflow->running());

$this->assertSame(WorkflowCompletedStatus::class, $workflow->status());
$this->assertSame('workflow_activity_other', $workflow->output());
$this->assertSame([TestActivity::class, TestOtherActivity::class, Signal::class], $workflow->logs()
->pluck('class')
->sort()
->values()
->toArray());

$this->assertSame('workflows', WorkflowStub::connection());
$this->assertSame('workflows', WorkflowStub::queue());
}

public function testChildWorkflowWithDynamicConnectionQueue(): void
{
$workflow = WorkflowStub::make(TestParentWorkflow::class, 'workflows', 'workflows');

$workflow->start();

while ($workflow->running());

$this->assertSame(WorkflowCompletedStatus::class, $workflow->status());
$this->assertSame('workflow_activity_other', $workflow->output());
$this->assertSame([TestActivity::class, TestChildWorkflow::class], $workflow->logs()
->pluck('class')
->sort()
->values()
->toArray());

$this->assertSame('workflows', WorkflowStub::connection());
$this->assertSame('workflows', WorkflowStub::queue());
$this->assertDatabaseHas('workflows', [
'class' => TestChildWorkflow::class,
'queue_connection' => 'workflows',
'queue' => 'workflows',
]);
}
}
6 changes: 5 additions & 1 deletion tests/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ abstract class TestCase extends BaseTestCase
{
public const NUMBER_OF_WORKERS = 2;

public static string $queueConnection = 'redis';

private static $workers = [];

public static function setUpBeforeClass(): void
Expand Down Expand Up @@ -44,7 +46,9 @@ public static function setUpBeforeClass(): void
}

for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) {
self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']);
self::$workers[$i] = new Process([
'php', __DIR__ . '/../vendor/bin/testbench', 'queue:work', self::$queueConnection,
]);
self::$workers[$i]->disableOutput();
self::$workers[$i]->start();
}
Expand Down
Loading