Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,23 @@

namespace Yiisoft\Queue\Redis;

use BackedEnum;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

final class Adapter implements AdapterInterface
{
public function __construct(
private QueueProviderInterface $provider,
private QueueProviderInterface $provider,
private MessageSerializerInterface $serializer,
private LoopInterface $loop,
private int $timeout = 3
) {
private LoopInterface $loop,
private int $timeout = 3
)
{
}

public function runExisting(callable $handlerCallback): void
Expand All @@ -45,23 +47,21 @@ public function status(int|string $id): JobStatus
}

if ($this->provider->existInReserved($id)) {
return JobStatus::reserved();
return JobStatus::RESERVED;
}

if ($this->provider->existInWaiting($id)) {
return JobStatus::waiting();
return JobStatus::WAITING;
}

return JobStatus::done();
return JobStatus::DONE;
}

public function push(MessageInterface $message): MessageInterface
{
$payload = $this->serializer->serialize($message);
$id = $this->provider->pushMessage($payload, $message->getMetadata());
$envelope = IdEnvelope::fromMessage($message);
$envelope->setId($id);
return $envelope;
return new IdEnvelope($message, $id);
}

public function subscribe(callable $handlerCallback): void
Expand All @@ -79,10 +79,11 @@ public function subscribe(callable $handlerCallback): void
}
}

public function withChannel(string $channel): AdapterInterface
public function withChannel(BackedEnum|string $channel): AdapterInterface
{
$adapter = clone $this;
$adapter->provider = $this->provider->withChannelName($channel);
$channelName = is_string($channel) ? $channel : (string) $channel->value;
$adapter->provider = $this->provider->withChannelName($channelName);
return $adapter;
}

Expand All @@ -94,9 +95,11 @@ private function reserve(): ?IdEnvelope
}

$message = $this->serializer->unserialize($reserve->payload);
$envelope = IdEnvelope::fromMessage($message);
$envelope->setId($reserve->id);
return new IdEnvelope($message, $reserve->id);
}

return $envelope;
public function getChannel(): string
{
return $this->provider->getChannelName();
}
}
15 changes: 10 additions & 5 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Redis\Message;
Expand All @@ -10,10 +9,11 @@ final class Message implements MessageInterface
{
public function __construct(
private string $handlerName,
private mixed $data,
private array $metadata,
private int $delay = 0 //delay in seconds
) {
private mixed $data,
private array $metadata,
private int $delay = 0 //delay in seconds
)
{
if ($this->delay > 0) {
$this->metadata['delay'] = $delay;
}
Expand All @@ -40,4 +40,9 @@ public function getMetadata(): array
{
return $this->metadata;
}

public static function fromData(string $handlerName, mixed $data, array $metadata = []): self
{
return new self($handlerName, $data, $metadata);
}
}
5 changes: 5 additions & 0 deletions src/QueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class QueueProvider implements QueueProviderInterface
{
private const DEFAULT_CHANNEL_NAME = 'yii-queue';

Check failure on line 12 in src/QueueProvider.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

MissingClassConstType

src/QueueProvider.php:12:19: MissingClassConstType: Class constant "Yiisoft\Queue\Redis\QueueProvider::DEFAULT_CHANNEL_NAME" should have a declared type. (see https://psalm.dev/359)

/**
* @throws RedisException
Expand Down Expand Up @@ -152,4 +152,9 @@
throw new NotConnectedRedisException('Redis is not connected.');
}
}

public function getChannelName(): string
{
return $this->channelName;
}
}
2 changes: 2 additions & 0 deletions src/QueueProviderInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ public function existInWaiting(int $id): bool;
public function existInReserved(int $id): bool;

public function withChannelName(string $channelName): self;

public function getChannelName(): string;
}
8 changes: 4 additions & 4 deletions tests/Integration/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\JsonMessageSerializer;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Message\MessageInterface;
Expand Down Expand Up @@ -53,20 +53,20 @@ public function testStatus(): void
);

$status = $adapter->status($message->getId());
$this->assertEquals(JobStatus::waiting(), $status);
$this->assertEquals(JobStatus::WAITING, $status);

$queue->run();

$status = $adapter->status($message->getId());
$this->assertEquals(JobStatus::done(), $status);
$this->assertEquals(JobStatus::DONE, $status);

$mockReserved = $this->createMock(QueueProviderInterface::class);
$mockReserved->method('existInReserved')->willReturn(true);
$adapter = new Adapter($mockReserved, new JsonMessageSerializer(), $this->getLoop());
$queue = $this->getDefaultQueue($adapter);

$status = $adapter->status('1');
$this->assertEquals(JobStatus::reserved(), $status);
$this->assertEquals(JobStatus::RESERVED, $status);
}

public function testListen(): void
Expand Down
15 changes: 14 additions & 1 deletion tests/Unit/Message/MessageTest.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<?php

declare(strict_types=1);

namespace Unit\Message;
Expand Down Expand Up @@ -40,4 +39,18 @@ public function testWithDelay(): void
$this->assertNotSame($message, $delayedMessage);
$this->assertEquals(5, $delayedMessage->getMetadata()['delay']);
}

public function testFromData(): void
{
$handlerName = 'test-handler';
$data = ['key' => 'value'];
$metadata = ['custom' => 'metadata'];

$message = Message::fromData($handlerName, $data, $metadata);

$this->assertInstanceOf(Message::class, $message);
$this->assertEquals($handlerName, $message->getHandlerName());
$this->assertEquals($data, $message->getData());
$this->assertEquals($metadata, $message->getMetadata());
}
}
17 changes: 17 additions & 0 deletions tests/Unit/QueueProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,21 @@ public function testRedisException(): void
$this->expectException(\RuntimeException::class);
$provider->getId();
}

/**
* @throws \PHPUnit\Framework\MockObject\Exception
*/
public function testGetChannelName(): void
{
// Тестируем значение по умолчанию
$redis = $this->createMock(\Redis::class);
$redis->method('isConnected')->willReturn(true);
$provider = new QueueProvider($redis);
$this->assertEquals('yii-queue', $provider->getChannelName());

// Тестируем пользовательское имя канала
$customChannelName = 'custom-channel';
$provider = new QueueProvider($redis, $customChannelName);
$this->assertEquals($customChannelName, $provider->getChannelName());
}
}
15 changes: 15 additions & 0 deletions tests/Unit/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,19 @@ public function testAdapterNullMessage()
});
$this->assertTrue($notUseHandler);
}

public function testGetChannel(): void
{
$expectedChannelName = 'test-channel';
$queueProvider = $this->createMock(QueueProviderInterface::class);
$queueProvider->method('getChannelName')->willReturn($expectedChannelName);

$adapter = new Adapter(
$queueProvider,
$this->createMock(MessageSerializerInterface::class),
$this->createMock(LoopInterface::class)
);

$this->assertEquals($expectedChannelName, $adapter->getChannel());
}
}
Loading