From 3829686d54d1838c6299795ef672905b5f0d500a Mon Sep 17 00:00:00 2001 From: aegorov Date: Sun, 7 Apr 2024 01:57:32 +0300 Subject: [PATCH 1/3] Add delay for queue message handle --- src/Message/Message.php | 43 +++++++++++++++++++++++++ src/QueueProvider.php | 9 +++++- tests/Integration/QueueProviderTest.php | 18 ++++++++++- tests/Message/MessageTest.php | 42 ++++++++++++++++++++++++ 4 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 src/Message/Message.php create mode 100644 tests/Message/MessageTest.php diff --git a/src/Message/Message.php b/src/Message/Message.php new file mode 100644 index 0000000..612d620 --- /dev/null +++ b/src/Message/Message.php @@ -0,0 +1,43 @@ +delay > 0) { + $this->metadata['delay'] = $delay; + } + } + + public function withDelay(int $delay): self + { + $message = clone $this; + $message->metadata['delay'] = $delay; + return $message; + } + + public function getHandlerName(): string + { + return $this->handlerName; + } + + public function getData(): mixed + { + return $this->data; + } + + public function getMetadata(): array + { + return $this->metadata; + } +} diff --git a/src/QueueProvider.php b/src/QueueProvider.php index d065f4c..5ed38ce 100644 --- a/src/QueueProvider.php +++ b/src/QueueProvider.php @@ -28,7 +28,13 @@ public function pushMessage(string $message, array $metadata = []): int $this->checkConnection(); $id = $this->getId(); $this->redis->hset("$this->channelName.messages", (string) $id, $message); - $this->redis->lpush("$this->channelName.waiting", $id); + + $delay = isset($metadata['delay']) && is_int($metadata['delay']) ? $metadata['delay'] : 0; + if ($delay > 0) { + $this->redis->zadd("$this->channelName.delayed", time() + $delay, $id); + } else { + $this->redis->lpush("$this->channelName.waiting", $id); + } return $id; } @@ -93,6 +99,7 @@ public function delete(string $id): void { $this->checkConnection(); $this->redis->zrem("$this->channelName.reserved", $id); + $this->redis->zrem("$this->channelName.delayed", $id); $this->redis->hdel("$this->channelName.messages", $id); $this->redis->hdel("$this->channelName.attempts", $id); } diff --git a/tests/Integration/QueueProviderTest.php b/tests/Integration/QueueProviderTest.php index 0d0a0cd..09d2281 100644 --- a/tests/Integration/QueueProviderTest.php +++ b/tests/Integration/QueueProviderTest.php @@ -5,6 +5,7 @@ namespace Yiisoft\Queue\Redis\Tests\Integration; use PHPUnit\Framework\TestCase; +use Yiisoft\Queue\Redis\Message\Message; use Yiisoft\Queue\Redis\QueueProvider; use Yiisoft\Queue\Redis\QueueProviderInterface; @@ -19,7 +20,7 @@ public function testGetId(QueueProvider $provider) $this->assertGreaterThan(0, $id); } - public function test__construct() + public function test__construct(): QueueProvider { $redis = new \Redis(); $connected = $redis->connect('redis'); @@ -28,4 +29,19 @@ public function test__construct() $this->assertInstanceOf(QueueProviderInterface::class, $provider); return $provider; } + + /** + * @depends test__construct + */ + public function testDelay(QueueProvider $provider): void + { + $message = new Message('test', ['key' => 'value'], [], 2); + $id = $provider->pushMessage(json_encode($message->getData(), JSON_THROW_ON_ERROR), $message->getMetadata()); + $this->assertGreaterThan(0, $id); + $reserv = $provider->reserve($id); + $this->assertNull($reserv); + sleep(3); + $reserv = $provider->reserve($id); + $this->assertNotNull($reserv); + } } diff --git a/tests/Message/MessageTest.php b/tests/Message/MessageTest.php new file mode 100644 index 0000000..32376a5 --- /dev/null +++ b/tests/Message/MessageTest.php @@ -0,0 +1,42 @@ +assertEquals('handler', $message->getHandlerName()); + } + + public function testGetData(): void + { + $message = new Message('handler', 'data', []); + $this->assertEquals('data', $message->getData()); + } + + public function testGetMetadata(): void + { + $metadata = ['key' => 'value']; + $message = new Message('handler', 'data', $metadata); + $this->assertEquals($metadata, $message->getMetadata()); + + $message = new Message('handler', 'data', $metadata, 2); + $metadata['delay'] = 2; + $this->assertEquals($metadata, $message->getMetadata()); + } + + public function testWithDelay(): void + { + $message = new Message('handler', 'data', []); + $delayedMessage = $message->withDelay(5); + + $this->assertNotSame($message, $delayedMessage); + $this->assertEquals(5, $delayedMessage->getMetadata()['delay']); + } +} From 70f2c8f3195dce61bf98b002e62b02b39951a0e1 Mon Sep 17 00:00:00 2001 From: aegorov Date: Sun, 7 Apr 2024 02:21:16 +0300 Subject: [PATCH 2/3] move MessageTest.php to Unit --- tests/{ => Unit}/Message/MessageTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename tests/{ => Unit}/Message/MessageTest.php (96%) diff --git a/tests/Message/MessageTest.php b/tests/Unit/Message/MessageTest.php similarity index 96% rename from tests/Message/MessageTest.php rename to tests/Unit/Message/MessageTest.php index 32376a5..08958e4 100644 --- a/tests/Message/MessageTest.php +++ b/tests/Unit/Message/MessageTest.php @@ -1,7 +1,7 @@ Date: Thu, 17 Jul 2025 18:00:11 +0300 Subject: [PATCH 3/3] Updating dependencies. --- src/Adapter.php | 35 ++++++++++++++++-------------- src/Message/Message.php | 5 +++++ src/QueueProvider.php | 5 +++++ src/QueueProviderInterface.php | 2 ++ tests/Integration/QueueTest.php | 8 +++---- tests/Unit/Message/MessageTest.php | 14 ++++++++++++ tests/Unit/QueueProviderTest.php | 17 +++++++++++++++ tests/Unit/QueueTest.php | 15 +++++++++++++ 8 files changed, 81 insertions(+), 20 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index c503559..9978023 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -4,9 +4,10 @@ namespace Yiisoft\Queue\Redis; +use BackedEnum; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\Cli\LoopInterface; -use Yiisoft\Queue\Enum\JobStatus; +use Yiisoft\Queue\JobStatus; use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Queue\Message\MessageInterface; use Yiisoft\Queue\Message\MessageSerializerInterface; @@ -14,11 +15,12 @@ final class Adapter implements AdapterInterface { public function __construct( - private QueueProviderInterface $provider, + private QueueProviderInterface $provider, private MessageSerializerInterface $serializer, - private LoopInterface $loop, - private int $timeout = 3 - ) { + private LoopInterface $loop, + private int $timeout = 3 + ) + { } public function runExisting(callable $handlerCallback): void @@ -45,23 +47,21 @@ public function status(int|string $id): JobStatus } if ($this->provider->existInReserved($id)) { - return JobStatus::reserved(); + return JobStatus::RESERVED; } if ($this->provider->existInWaiting($id)) { - return JobStatus::waiting(); + return JobStatus::WAITING; } - return JobStatus::done(); + return JobStatus::DONE; } public function push(MessageInterface $message): MessageInterface { $payload = $this->serializer->serialize($message); $id = $this->provider->pushMessage($payload, $message->getMetadata()); - $envelope = IdEnvelope::fromMessage($message); - $envelope->setId($id); - return $envelope; + return new IdEnvelope($message, $id); } public function subscribe(callable $handlerCallback): void @@ -79,10 +79,11 @@ public function subscribe(callable $handlerCallback): void } } - public function withChannel(string $channel): AdapterInterface + public function withChannel(BackedEnum|string $channel): AdapterInterface { $adapter = clone $this; - $adapter->provider = $this->provider->withChannelName($channel); + $channelName = is_string($channel) ? $channel : (string) $channel->value; + $adapter->provider = $this->provider->withChannelName($channelName); return $adapter; } @@ -94,9 +95,11 @@ private function reserve(): ?IdEnvelope } $message = $this->serializer->unserialize($reserve->payload); - $envelope = IdEnvelope::fromMessage($message); - $envelope->setId($reserve->id); + return new IdEnvelope($message, $reserve->id); + } - return $envelope; + public function getChannel(): string + { + return $this->provider->getChannelName(); } } diff --git a/src/Message/Message.php b/src/Message/Message.php index 612d620..a760ef2 100644 --- a/src/Message/Message.php +++ b/src/Message/Message.php @@ -40,4 +40,9 @@ public function getMetadata(): array { return $this->metadata; } + + public static function fromData(string $handlerName, mixed $data, array $metadata = []): self + { + return new self($handlerName, $data, $metadata); + } } diff --git a/src/QueueProvider.php b/src/QueueProvider.php index 5ed38ce..a5d6c9b 100644 --- a/src/QueueProvider.php +++ b/src/QueueProvider.php @@ -152,4 +152,9 @@ private function checkConnection(): void throw new NotConnectedRedisException('Redis is not connected.'); } } + + public function getChannelName(): string + { + return $this->channelName; + } } diff --git a/src/QueueProviderInterface.php b/src/QueueProviderInterface.php index 1bce6fc..f59b523 100644 --- a/src/QueueProviderInterface.php +++ b/src/QueueProviderInterface.php @@ -20,4 +20,6 @@ public function existInWaiting(int $id): bool; public function existInReserved(int $id): bool; public function withChannelName(string $channelName): self; + + public function getChannelName(): string; } diff --git a/tests/Integration/QueueTest.php b/tests/Integration/QueueTest.php index 2882e40..196a1b3 100644 --- a/tests/Integration/QueueTest.php +++ b/tests/Integration/QueueTest.php @@ -6,7 +6,7 @@ use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\Cli\LoopInterface; -use Yiisoft\Queue\Enum\JobStatus; +use Yiisoft\Queue\JobStatus; use Yiisoft\Queue\Message\JsonMessageSerializer; use Yiisoft\Queue\Message\Message; use Yiisoft\Queue\Message\MessageInterface; @@ -53,12 +53,12 @@ public function testStatus(): void ); $status = $adapter->status($message->getId()); - $this->assertEquals(JobStatus::waiting(), $status); + $this->assertEquals(JobStatus::WAITING, $status); $queue->run(); $status = $adapter->status($message->getId()); - $this->assertEquals(JobStatus::done(), $status); + $this->assertEquals(JobStatus::DONE, $status); $mockReserved = $this->createMock(QueueProviderInterface::class); $mockReserved->method('existInReserved')->willReturn(true); @@ -66,7 +66,7 @@ public function testStatus(): void $queue = $this->getDefaultQueue($adapter); $status = $adapter->status('1'); - $this->assertEquals(JobStatus::reserved(), $status); + $this->assertEquals(JobStatus::RESERVED, $status); } public function testListen(): void diff --git a/tests/Unit/Message/MessageTest.php b/tests/Unit/Message/MessageTest.php index 08958e4..beca2b2 100644 --- a/tests/Unit/Message/MessageTest.php +++ b/tests/Unit/Message/MessageTest.php @@ -39,4 +39,18 @@ public function testWithDelay(): void $this->assertNotSame($message, $delayedMessage); $this->assertEquals(5, $delayedMessage->getMetadata()['delay']); } + + public function testFromData(): void + { + $handlerName = 'test-handler'; + $data = ['key' => 'value']; + $metadata = ['custom' => 'metadata']; + + $message = Message::fromData($handlerName, $data, $metadata); + + $this->assertInstanceOf(Message::class, $message); + $this->assertEquals($handlerName, $message->getHandlerName()); + $this->assertEquals($data, $message->getData()); + $this->assertEquals($metadata, $message->getMetadata()); + } } diff --git a/tests/Unit/QueueProviderTest.php b/tests/Unit/QueueProviderTest.php index 8458639..a3bcdf9 100644 --- a/tests/Unit/QueueProviderTest.php +++ b/tests/Unit/QueueProviderTest.php @@ -61,4 +61,21 @@ public function testRedisException(): void $this->expectException(\RuntimeException::class); $provider->getId(); } + + /** + * @throws \PHPUnit\Framework\MockObject\Exception + */ + public function testGetChannelName(): void + { + // Тестируем значение по умолчанию + $redis = $this->createMock(\Redis::class); + $redis->method('isConnected')->willReturn(true); + $provider = new QueueProvider($redis); + $this->assertEquals('yii-queue', $provider->getChannelName()); + + // Тестируем пользовательское имя канала + $customChannelName = 'custom-channel'; + $provider = new QueueProvider($redis, $customChannelName); + $this->assertEquals($customChannelName, $provider->getChannelName()); + } } diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index 18494e3..abbee71 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -52,4 +52,19 @@ public function testAdapterNullMessage() }); $this->assertTrue($notUseHandler); } + + public function testGetChannel(): void + { + $expectedChannelName = 'test-channel'; + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->method('getChannelName')->willReturn($expectedChannelName); + + $adapter = new Adapter( + $queueProvider, + $this->createMock(MessageSerializerInterface::class), + $this->createMock(LoopInterface::class) + ); + + $this->assertEquals($expectedChannelName, $adapter->getChannel()); + } }