From 1635a17ab1ae25a582bb62edafd36798b97c03fb Mon Sep 17 00:00:00 2001 From: "G.R.Dalenoort" Date: Fri, 30 Nov 2018 20:24:59 +0100 Subject: [PATCH 1/6] Make consumer interruptable --- src/Consumer.php | 4 ++-- src/Driver/FlatFile/Driver.php | 11 +++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/Consumer.php b/src/Consumer.php index 2fefc087..386dbf74 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -39,8 +39,6 @@ public function __construct(Router $router, EventDispatcherInterface $dispatcher */ public function consume(Queue $queue, array $options = []) { - declare(ticks=1); - $this->bind(); while ($this->tick($queue, $options)) { @@ -59,6 +57,8 @@ public function consume(Queue $queue, array $options = []) */ public function tick(Queue $queue, array $options = []) { + pcntl_signal_dispatch(); + $this->configure($options); if ($this->shutdown) { diff --git a/src/Driver/FlatFile/Driver.php b/src/Driver/FlatFile/Driver.php index afd91e0c..4af87c14 100644 --- a/src/Driver/FlatFile/Driver.php +++ b/src/Driver/FlatFile/Driver.php @@ -109,8 +109,15 @@ public function popMessage($queueName, $duration = 5) $files = $this->getJobFiles($queueName); } - usleep(1000); - } + $nano = time_nanosleep(0, 1000000); + if ($nano === false) { + // fallback when time_nanosleep fails + usleep(1000); + } elseif (is_array($nano)) { + // Driver Interrupted by a signal + return array(null, null); + } + } return [null, null]; } From 4a96a05adfaa09f9d5df3a8b24788e91be40aa00 Mon Sep 17 00:00:00 2001 From: "G.R.Dalenoort" Date: Fri, 30 Nov 2018 21:27:17 +0100 Subject: [PATCH 2/6] while tick pcntl_signal_dispatch --- src/Consumer.php | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Consumer.php b/src/Consumer.php index 386dbf74..1e6ed9a4 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -42,7 +42,7 @@ public function consume(Queue $queue, array $options = []) $this->bind(); while ($this->tick($queue, $options)) { - // NO op + pcntl_signal_dispatch(); } } @@ -57,8 +57,6 @@ public function consume(Queue $queue, array $options = []) */ public function tick(Queue $queue, array $options = []) { - pcntl_signal_dispatch(); - $this->configure($options); if ($this->shutdown) { From 292552b88ccbd217c72c82a7ad7fcbabab2de022 Mon Sep 17 00:00:00 2001 From: "G.R.Dalenoort" Date: Fri, 30 Nov 2018 21:46:43 +0100 Subject: [PATCH 3/6] \Throwable is root interface since PHP7, catch \Exception first --- src/Consumer.php | 12 +++++++++--- src/Driver/FlatFile/Driver.php | 18 +++++++++--------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/Consumer.php b/src/Consumer.php index 1e6ed9a4..651eb52c 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -36,13 +36,14 @@ public function __construct(Router $router, EventDispatcherInterface $dispatcher * * @param Queue $queue * @param array $options + * @throws \Throwable */ public function consume(Queue $queue, array $options = []) { $this->bind(); while ($this->tick($queue, $options)) { - pcntl_signal_dispatch(); + pcntl_signal_dispatch(); } } @@ -54,6 +55,7 @@ public function consume(Queue $queue, array $options = []) * @param array $options * * @return bool + * @throws \Throwable */ public function tick(Queue $queue, array $options = []) { @@ -132,15 +134,17 @@ public function invoke(Envelope $envelope, Queue $queue) $queue->acknowledge($envelope); $this->dispatcher->dispatch(BernardEvents::ACKNOWLEDGE, new EnvelopeEvent($envelope, $queue)); - } catch (\Throwable $error) { - $this->rejectDispatch($error, $envelope, $queue); + } catch (\Exception $exception) { $this->rejectDispatch($exception, $envelope, $queue); + } catch (\Throwable $error) { + $this->rejectDispatch($error, $envelope, $queue); } } /** * @param array $options + * @return array */ protected function configure(array $options) { @@ -151,6 +155,8 @@ protected function configure(array $options) $this->options = array_filter($options) + $this->options; $this->options['max-runtime'] += microtime(true); $this->configured = true; + + return $this->options; } /** diff --git a/src/Driver/FlatFile/Driver.php b/src/Driver/FlatFile/Driver.php index 4af87c14..d4b7a83f 100644 --- a/src/Driver/FlatFile/Driver.php +++ b/src/Driver/FlatFile/Driver.php @@ -109,15 +109,15 @@ public function popMessage($queueName, $duration = 5) $files = $this->getJobFiles($queueName); } - $nano = time_nanosleep(0, 1000000); - if ($nano === false) { - // fallback when time_nanosleep fails - usleep(1000); - } elseif (is_array($nano)) { - // Driver Interrupted by a signal - return array(null, null); - } - } + $nano = time_nanosleep(0, 1000000); + if ($nano === false) { + // fallback when time_nanosleep fails + usleep(1000); + } elseif (is_array($nano)) { + // Interrupted by a pcntl_signal + return array(null, null); + } + } return [null, null]; } From ffe3027dd94d0349c8f11b1ef4e38f8d5d77fb67 Mon Sep 17 00:00:00 2001 From: "G.R.Dalenoort" Date: Fri, 30 Nov 2018 22:01:54 +0100 Subject: [PATCH 4/6] throwable php7 | php5 --- src/Consumer.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Consumer.php b/src/Consumer.php index 651eb52c..522613ce 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -135,10 +135,12 @@ public function invoke(Envelope $envelope, Queue $queue) $this->dispatcher->dispatch(BernardEvents::ACKNOWLEDGE, new EnvelopeEvent($envelope, $queue)); - } catch (\Exception $exception) { - $this->rejectDispatch($exception, $envelope, $queue); } catch (\Throwable $error) { + // php 7 $this->rejectDispatch($error, $envelope, $queue); + } catch (\Exception $exception) { + // php 5 + $this->rejectDispatch($exception, $envelope, $queue); } } From 00cc4eebdb406c9bb541886016f1a850c3e4adb0 Mon Sep 17 00:00:00 2001 From: "G.R.Dalenoort" Date: Thu, 18 Feb 2021 08:43:59 +0100 Subject: [PATCH 5/6] composer require ext-pcntl for consumer --- composer.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 8b6f64b1..96428e2f 100644 --- a/composer.json +++ b/composer.json @@ -9,7 +9,8 @@ "php": ">=7.4", "beberlei/assert": "^2.1 || ^3.0", "bernard/normalt": "^1.0", - "symfony/event-dispatcher": "^3.0 || ^4.0" + "symfony/event-dispatcher": "^3.0 || ^4.0", + "ext-pcntl": "*" }, "require-dev" : { "doctrine/dbal": "^2.5", From 9af85458ea2a1587d9b39e8d5a55ca16f1b6ffe2 Mon Sep 17 00:00:00 2001 From: "G.R.Dalenoort" Date: Thu, 18 Feb 2021 09:37:06 +0100 Subject: [PATCH 6/6] Update flatfile unit tests - deprecated assertDirectoryNotExists => assertDirectoryDoesNotExist - add missing property private $BaseDir --- tests/Driver/FlatFile/DriverTest.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/Driver/FlatFile/DriverTest.php b/tests/Driver/FlatFile/DriverTest.php index da15d41e..16810ebe 100644 --- a/tests/Driver/FlatFile/DriverTest.php +++ b/tests/Driver/FlatFile/DriverTest.php @@ -14,6 +14,8 @@ class DriverTest extends \PHPUnit\Framework\TestCase */ private $driver; + private $baseDir; + protected function setUp(): void { $this->baseDir = sys_get_temp_dir().\DIRECTORY_SEPARATOR.'bernard-flat'; @@ -49,7 +51,7 @@ public function testRemove() $this->driver->removeQueue('send-newsletter'); - $this->assertDirectoryNotExists($this->baseDir.\DIRECTORY_SEPARATOR.'send-newsletter'); + $this->assertDirectoryDoesNotExist($this->baseDir.\DIRECTORY_SEPARATOR.'send-newsletter'); } public function testRemoveQueueWithPoppedMessage() @@ -60,7 +62,7 @@ public function testRemoveQueueWithPoppedMessage() $this->driver->removeQueue('send-newsletter'); - $this->assertDirectoryNotExists($this->baseDir.\DIRECTORY_SEPARATOR.'send-newsletter'); + $this->assertDirectoryDoesNotExist($this->baseDir.\DIRECTORY_SEPARATOR.'send-newsletter'); } public function testPushMessage()