Skip to content

Commit 01a9aef

Browse files
committed
add files
0 parents  commit 01a9aef

File tree

6 files changed

+314
-0
lines changed

6 files changed

+314
-0
lines changed

PhpFpmConnectionFactory.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use Enqueue\Dsn\Dsn;
5+
use hollodotme\FastCGI\Client;
6+
use hollodotme\FastCGI\SocketConnections\NetworkSocket;
7+
use hollodotme\FastCGI\SocketConnections\UnixDomainSocket;
8+
use Interop\Queue\ConnectionFactory;
9+
use Interop\Queue\Context;
10+
11+
class PhpFpmConnectionFactory implements ConnectionFactory
12+
{
13+
/**
14+
* @var string
15+
*/
16+
private $dsn;
17+
18+
public function __construct(string $dsn)
19+
{
20+
$this->dsn = $dsn;
21+
}
22+
23+
public function createContext(): Context
24+
{
25+
$dsn = new Dsn($this->dsn);
26+
if ('unix' == $dsn->getSchemeProtocol()) {
27+
$socket = new UnixDomainSocket($dsn->getPath());
28+
} else if ('tcp' == $dsn->getSchemeProtocol()) {
29+
$socket = new NetworkSocket($dsn->getHost(), $dsn->getPort());
30+
} else {
31+
throw new \LogicException('Protocol is not supported');
32+
}
33+
34+
return new PhpFpmContext(new Client($socket));
35+
}
36+
}

PhpFpmConsumer.php

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use Interop\Queue\Consumer;
5+
use Interop\Queue\Message;
6+
use Interop\Queue\Queue;
7+
8+
class PhpFpmConsumer implements Consumer
9+
{
10+
/**
11+
* @var PhpFpmDestination
12+
*/
13+
private $destination;
14+
15+
public function __construct(PhpFpmDestination $destination)
16+
{
17+
$this->destination = $destination;
18+
}
19+
20+
public function getQueue(): Queue
21+
{
22+
return $this->destination;
23+
}
24+
25+
public function receive(int $timeout = 0): ?Message
26+
{
27+
return $this->receiveNoWait();
28+
}
29+
30+
public function receiveNoWait(): ?Message
31+
{
32+
if ($_POST['message']) {
33+
return PhpFpmMessage::jsonUnserialize($_POST['message']);
34+
}
35+
36+
return null;
37+
}
38+
39+
public function acknowledge(Message $message): void
40+
{
41+
}
42+
43+
public function reject(Message $message, bool $requeue = false): void
44+
{
45+
}
46+
}

PhpFpmContext.php

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use hollodotme\FastCGI\Client;
5+
use Interop\Queue\ConnectionFactory;
6+
use Interop\Queue\Consumer;
7+
use Interop\Queue\Context;
8+
use Interop\Queue\Destination;
9+
use Interop\Queue\Exception\PurgeQueueNotSupportedException;
10+
use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
11+
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
12+
use Interop\Queue\Message;
13+
use Interop\Queue\Producer;
14+
use Interop\Queue\Queue;
15+
use Interop\Queue\SubscriptionConsumer;
16+
use Interop\Queue\Topic;
17+
18+
class PhpFpmContext implements Context
19+
{
20+
/**
21+
* @var Client
22+
*/
23+
private $cgiClient;
24+
25+
public function __construct(Client $cgiClient)
26+
{
27+
$this->cgiClient = $cgiClient;
28+
}
29+
30+
public function createMessage(string $body = '', array $properties = [], array $headers = []): Message
31+
{
32+
return new PhpFpmMessage($body, $properties, $headers);
33+
}
34+
35+
public function createTopic(string $topicName): Topic
36+
{
37+
return new PhpFpmDestination($topicName);
38+
}
39+
40+
public function createQueue(string $queueName): Queue
41+
{
42+
return new PhpFpmDestination($queueName);
43+
}
44+
45+
public function createProducer(): Producer
46+
{
47+
return new PhpFpmProducer($this->cgiClient);
48+
}
49+
50+
public function createConsumer(Destination $destination): Consumer
51+
{
52+
return new PhpFpmConsumer($destination);
53+
}
54+
55+
public function close(): void
56+
{
57+
}
58+
59+
public function createTemporaryQueue(): Queue
60+
{
61+
throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt();
62+
}
63+
64+
public function createSubscriptionConsumer(): SubscriptionConsumer
65+
{
66+
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
67+
}
68+
69+
public function purgeQueue(Queue $queue): void
70+
{
71+
throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
72+
}
73+
}

PhpFpmDestination.php

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use Interop\Queue\ConnectionFactory;
5+
use Interop\Queue\Context;
6+
use Interop\Queue\Queue;
7+
use Interop\Queue\Topic;
8+
9+
class PhpFpmDestination implements Queue, Topic
10+
{
11+
/**
12+
* @var string
13+
*/
14+
private $name;
15+
16+
public function __construct(string $name)
17+
{
18+
$this->name = $name;
19+
}
20+
21+
public function getName(): string
22+
{
23+
return $this->name;
24+
}
25+
26+
public function getQueueName(): string
27+
{
28+
return $this->getName();
29+
}
30+
31+
public function getTopicName(): string
32+
{
33+
return $this->getName();
34+
}
35+
}

PhpFpmMessage.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use Interop\Queue\Message;
5+
use Interop\Queue\MessageTrait;
6+
7+
class PhpFpmMessage implements Message, \JsonSerializable
8+
{
9+
public function __construct(string $body = '', array $properties = [], array $headers = [])
10+
{
11+
$this->body = $body;
12+
$this->properties = $properties;
13+
$this->headers = $headers;
14+
15+
$this->redelivered = false;
16+
}
17+
18+
use MessageTrait;
19+
20+
public function jsonSerialize(): array
21+
{
22+
return [
23+
'body' => $this->getBody(),
24+
'properties' => $this->getProperties(),
25+
'headers' => $this->getHeaders(),
26+
];
27+
}
28+
29+
public static function jsonUnserialize(string $json): self
30+
{
31+
$data = json_decode($json, true);
32+
if (JSON_ERROR_NONE !== json_last_error()) {
33+
throw new \InvalidArgumentException(sprintf(
34+
'The malformed json given. Error %s and message %s',
35+
json_last_error(),
36+
json_last_error_msg()
37+
));
38+
}
39+
40+
return new self($data['body'], $data['properties'], $data['headers']);
41+
}
42+
}

PhpFpmProducer.php

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
namespace Makasim\PhpFpm;
3+
4+
use hollodotme\FastCGI\Client;
5+
use hollodotme\FastCGI\Requests\PostRequest;
6+
use Interop\Queue\Destination;
7+
use Interop\Queue\Exception\DeliveryDelayNotSupportedException;
8+
use Interop\Queue\Exception\Exception;
9+
use Interop\Queue\Exception\PriorityNotSupportedException;
10+
use Interop\Queue\Exception\TimeToLiveNotSupportedException;
11+
use Interop\Queue\Message;
12+
use Interop\Queue\Producer;
13+
14+
class PhpFpmProducer implements Producer
15+
{
16+
/**
17+
* @var Client
18+
*/
19+
private $cgiClient;
20+
21+
public function __construct(Client $cgiClient)
22+
{
23+
$this->cgiClient = $cgiClient;
24+
}
25+
26+
/**
27+
* @param PhpFpmDestination $destination
28+
* @param PhpFpmMessage $message
29+
*/
30+
public function send(Destination $destination, Message $message): void
31+
{
32+
$request = new PostRequest($destination->getName(), http_build_query(['message' => json_encode($message)]));
33+
34+
try {
35+
$this->cgiClient->sendAsyncRequest($request);
36+
} catch (\Exception $e) {
37+
throw new Exception($e->getMessage(), $e->getCode(), $e);
38+
}
39+
}
40+
41+
public function setDeliveryDelay(int $deliveryDelay = null): Producer
42+
{
43+
if (null !== $deliveryDelay) {
44+
throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt();
45+
}
46+
47+
return $this;
48+
}
49+
50+
public function getDeliveryDelay(): ?int
51+
{
52+
return null;
53+
}
54+
55+
public function setPriority(int $priority = null): Producer
56+
{
57+
if (null !== $priority) {
58+
throw PriorityNotSupportedException::providerDoestNotSupportIt();
59+
}
60+
61+
return $this;
62+
}
63+
64+
public function getPriority(): ?int
65+
{
66+
return null;
67+
}
68+
69+
public function setTimeToLive(int $timeToLive = null): Producer
70+
{
71+
if (null !== $timeToLive) {
72+
throw TimeToLiveNotSupportedException::providerDoestNotSupportIt();
73+
}
74+
75+
return $this;
76+
}
77+
78+
public function getTimeToLive(): ?int
79+
{
80+
return null;
81+
}
82+
}

0 commit comments

Comments
 (0)