diff --git a/src/Exception.php b/src/Exception.php index 765ef77..3cfda11 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -62,7 +62,12 @@ public function handle() public function middleware() { return [ - new WithoutOverlappingMiddleware($this->storedWorkflow->id, WithoutOverlappingMiddleware::ACTIVITY), + new WithoutOverlappingMiddleware( + $this->storedWorkflow->id, + WithoutOverlappingMiddleware::ACTIVITY, + 0, + 15 + ), ]; } } diff --git a/src/Middleware/WithoutOverlappingMiddleware.php b/src/Middleware/WithoutOverlappingMiddleware.php index 1bbcb29..4e3460c 100644 --- a/src/Middleware/WithoutOverlappingMiddleware.php +++ b/src/Middleware/WithoutOverlappingMiddleware.php @@ -112,7 +112,8 @@ public function lock($job) $locked = $this->compareAndSet( $this->getActivitySemaphoreKey(), $activitySemaphores, - array_merge($activitySemaphores, [$job->key]) + array_merge($activitySemaphores, [$job->key]), + $this->expiresAfter ); if ($locked) { if ($this->expiresAfter) { @@ -177,7 +178,13 @@ private function unlockActivity($job): bool $remaining = array_values( array_diff($this->cache->get($this->getActivitySemaphoreKey(), []), [$job->key]) ); - $this->cache->put($this->getActivitySemaphoreKey(), $remaining); + if ($remaining === []) { + $this->cache->forget($this->getActivitySemaphoreKey()); + } elseif ($this->expiresAfter) { + $this->cache->put($this->getActivitySemaphoreKey(), $remaining, $this->expiresAfter); + } else { + $this->cache->put($this->getActivitySemaphoreKey(), $remaining); + } $this->cache->forget($job->key); foreach ($remaining as $semaphore) { @@ -210,7 +217,9 @@ private function compareAndSet($key, $expectedValue, $newValue, $expiresAfter = $currentValue = is_int($expectedValue) ? (int) $currentValue : $currentValue; if ($currentValue === $expectedValue) { - if ($expiresAfter) { + if ($newValue === 0 || $newValue === []) { + $this->cache->forget($key); + } elseif ($expiresAfter) { $this->cache->put($key, $newValue, $expiresAfter); } else { $this->cache->put($key, $newValue); diff --git a/src/Signal.php b/src/Signal.php index f64efbf..531719b 100644 --- a/src/Signal.php +++ b/src/Signal.php @@ -38,7 +38,12 @@ public function __construct( public function middleware() { return [ - new WithoutOverlappingMiddleware($this->storedWorkflow->id, WithoutOverlappingMiddleware::WORKFLOW), + new WithoutOverlappingMiddleware( + $this->storedWorkflow->id, + WithoutOverlappingMiddleware::WORKFLOW, + 0, + 15 + ), ]; } diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index dac9eef..fa0d5d5 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -22,11 +22,11 @@ public function testMiddleware(): void )); $middleware = collect($exception->middleware()) - ->map(static fn ($middleware) => is_object($middleware) ? get_class($middleware) : $middleware) ->values(); $this->assertCount(1, $middleware); - $this->assertSame([WithoutOverlappingMiddleware::class], $middleware->all()); + $this->assertSame(WithoutOverlappingMiddleware::class, get_class($middleware[0])); + $this->assertSame(15, $middleware[0]->expiresAfter); } public function testExceptionWorkflowRunning(): void diff --git a/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php b/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php index 168a5e3..11d19c8 100644 --- a/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php +++ b/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php @@ -69,7 +69,7 @@ public function testAllowsOnlyOneWorkflowInstance(): void }); }); - $this->assertSame(0, Cache::get($middleware1->getWorkflowSemaphoreKey())); + $this->assertNull(Cache::get($middleware1->getWorkflowSemaphoreKey())); $this->assertNull(Cache::get($middleware1->getActivitySemaphoreKey())); } @@ -111,7 +111,7 @@ public function testAllowsMultipleActivityInstances(): void }); $this->assertNull(Cache::get($middleware1->getWorkflowSemaphoreKey())); - $this->assertSame(0, count(Cache::get($middleware1->getActivitySemaphoreKey()))); + $this->assertSame(0, count(Cache::get($middleware1->getActivitySemaphoreKey(), []))); } public function testUnknownTypeDoesNotCallNext(): void @@ -290,6 +290,41 @@ public function testUnlockActivityReturnsFalseWhenOtherActivityKeyExists(): void $this->assertSame([], Cache::get($middleware->getActivitySemaphoreKey(), [])); } + public function testUnlockActivityAppliesTtlWhenExpiresAfterIsConfigured(): void + { + $job = new \stdClass(); + $job->key = 'test-activity-key'; + + $lock = $this->mock(Lock::class, static function (MockInterface $mock) { + $mock->shouldReceive('get') + ->once() + ->andReturn(true); + $mock->shouldReceive('release') + ->once(); + }); + + $cache = $this->mock(Repository::class, static function (MockInterface $mock) use ($lock, $job) { + $mock->shouldReceive('lock') + ->once() + ->andReturn($lock); + $mock->shouldReceive('get') + ->with('laravel-workflow-overlap:1:activity', []) + ->andReturn([$job->key]); + $mock->shouldReceive('forget') + ->with('laravel-workflow-overlap:1:activity') + ->once(); + $mock->shouldReceive('forget') + ->with($job->key) + ->once(); + }); + + $middleware = new WithoutOverlappingMiddleware(1, WithoutOverlappingMiddleware::ACTIVITY, 0, 60); + + $result = $middleware->unlock($job); + + $this->assertTrue($result); + } + public function testUnlockActivityRetriesOnLockFailure(): void { $job = new \stdClass(); @@ -315,8 +350,8 @@ public function testUnlockActivityRetriesOnLockFailure(): void $mock->shouldReceive('get') ->with('laravel-workflow-overlap:1:activity', []) ->andReturn([$job->key]); - $mock->shouldReceive('put') - ->with('laravel-workflow-overlap:1:activity', []) + $mock->shouldReceive('forget') + ->with('laravel-workflow-overlap:1:activity') ->once(); $mock->shouldReceive('forget') ->with($job->key) @@ -426,4 +461,53 @@ public function testCompareAndSetReturnsFalseOnValueMismatch(): void $this->fail('Should not call next when CAS value mismatch'); }); } + + public function testActivityLockAppliesTtlToSemaphoreKeyWhenConfigured(): void + { + $job = new \stdClass(); + + $lock = $this->mock(Lock::class, static function (MockInterface $mock) { + $mock->shouldReceive('get') + ->once() + ->andReturn(true); + $mock->shouldReceive('release') + ->once(); + }); + + $cache = $this->mock(Repository::class, static function (MockInterface $mock) use ($lock) { + $mock->shouldReceive('get') + ->with('laravel-workflow-overlap:1:workflow', 0) + ->once() + ->andReturn(0); + $mock->shouldReceive('get') + ->with('laravel-workflow-overlap:1:activity', []) + ->twice() + ->andReturn([], []); + $mock->shouldReceive('lock') + ->with('laravel-workflow-overlap:1', 5) + ->once() + ->andReturn($lock); + $mock->shouldReceive('put') + ->withArgs(static fn ($key, $value, $ttl): bool => $key === 'laravel-workflow-overlap:1:activity' + && is_array($value) + && count($value) === 1 + && $ttl === 60) + ->once(); + $mock->shouldReceive('put') + ->withArgs( + static fn ($key, $value, $ttl): bool => str_starts_with( + $key, + 'laravel-workflow-overlap:1:activity:' + ) + && $value === 1 + && $ttl === 60 + ) + ->once(); + }); + + $middleware = new WithoutOverlappingMiddleware(1, WithoutOverlappingMiddleware::ACTIVITY, 0, 60); + + $this->assertTrue($middleware->lock($job)); + $this->assertTrue(isset($job->key)); + } } diff --git a/tests/Unit/SignalTest.php b/tests/Unit/SignalTest.php index 6470acc..5fb7b2f 100644 --- a/tests/Unit/SignalTest.php +++ b/tests/Unit/SignalTest.php @@ -20,11 +20,11 @@ public function testMiddleware(): void $signal = new Signal(new StoredWorkflow()); $middleware = collect($signal->middleware()) - ->map(static fn ($middleware) => is_object($middleware) ? get_class($middleware) : $middleware) ->values(); $this->assertCount(1, $middleware); - $this->assertSame([WithoutOverlappingMiddleware::class], $middleware->all()); + $this->assertSame(WithoutOverlappingMiddleware::class, get_class($middleware[0])); + $this->assertSame(15, $middleware[0]->expiresAfter); } public function testSignalWorkflowRunning(): void