Skip to content

Commit 91de91b

Browse files
authored
Merge pull request #76 from utopia-php/feat/redis-resilience-retries
feat(redis): survive transient Redis outages with bounded reconnects
2 parents 0fbc7d7 + 1bee005 commit 91de91b

4 files changed

Lines changed: 380 additions & 14 deletions

File tree

src/Queue/Broker/Redis.php

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,73 @@
1111
class Redis implements Publisher, Consumer
1212
{
1313
private const int POP_TIMEOUT = 2;
14+
private const int RECONNECT_BACKOFF_MS = 100;
15+
private const int RECONNECT_MAX_BACKOFF_MS = 5_000;
1416

1517
private bool $closed = false;
18+
/**
19+
* @var (callable(Queue, \Throwable, int, int): void)|null
20+
*/
21+
private $reconnectCallback = null;
22+
/**
23+
* @var (callable(Queue, int): void)|null
24+
*/
25+
private $reconnectSuccessCallback = null;
1626

1727
public function __construct(private readonly Connection $connection)
1828
{
1929
}
2030

31+
public function setReconnectCallback(?callable $callback): self
32+
{
33+
$this->reconnectCallback = $callback;
34+
35+
return $this;
36+
}
37+
38+
public function setReconnectSuccessCallback(?callable $callback): self
39+
{
40+
$this->reconnectSuccessCallback = $callback;
41+
42+
return $this;
43+
}
44+
2145
public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
2246
{
47+
$reconnectBackoffMs = self::RECONNECT_BACKOFF_MS;
48+
$reconnectAttempt = 0;
49+
2350
while (!$this->closed) {
2451
/**
2552
* Waiting for next Job.
2653
*/
2754
try {
2855
$nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT);
29-
} catch (\RedisException $e) {
56+
if ($reconnectAttempt > 0) {
57+
$this->triggerReconnectSuccessCallback($queue, $reconnectAttempt);
58+
}
59+
60+
$reconnectBackoffMs = self::RECONNECT_BACKOFF_MS;
61+
$reconnectAttempt = 0;
62+
} catch (\RedisException|\RedisClusterException $e) {
3063
if ($this->closed) {
3164
break;
3265
}
3366

34-
throw $e;
67+
$reconnectAttempt++;
68+
69+
try {
70+
$this->connection->close();
71+
} catch (\Throwable) {
72+
}
73+
74+
$sleepMs = \mt_rand(0, $reconnectBackoffMs);
75+
$this->triggerReconnectCallback($queue, $e, $reconnectAttempt, $sleepMs);
76+
77+
\usleep($sleepMs * 1000);
78+
$reconnectBackoffMs = \min(self::RECONNECT_MAX_BACKOFF_MS, $reconnectBackoffMs * 2);
79+
80+
continue;
3581
}
3682

3783
if (!$nextMessage) {
@@ -104,6 +150,30 @@ public function close(): void
104150
$this->closed = true;
105151
}
106152

153+
private function triggerReconnectCallback(Queue $queue, \Throwable $error, int $attempt, int $sleepMs): void
154+
{
155+
if (!\is_callable($this->reconnectCallback)) {
156+
return;
157+
}
158+
159+
try {
160+
($this->reconnectCallback)($queue, $error, $attempt, $sleepMs);
161+
} catch (\Throwable) {
162+
}
163+
}
164+
165+
private function triggerReconnectSuccessCallback(Queue $queue, int $attempts): void
166+
{
167+
if (!\is_callable($this->reconnectSuccessCallback)) {
168+
return;
169+
}
170+
171+
try {
172+
($this->reconnectSuccessCallback)($queue, $attempts);
173+
} catch (\Throwable) {
174+
}
175+
}
176+
107177
public function enqueue(Queue $queue, array $payload, bool $priority = false): bool
108178
{
109179
$payload = [

src/Queue/Connection/Redis.php

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
class Redis implements Connection
88
{
9+
protected const int CONNECT_MAX_ATTEMPTS = 5;
10+
protected const int CONNECT_BACKOFF_MS = 100;
11+
protected const int CONNECT_MAX_BACKOFF_MS = 3_000;
12+
913
protected string $host;
1014
protected int $port;
1115
protected ?string $user;
@@ -178,8 +182,12 @@ public function ping(): bool
178182

179183
public function close(): void
180184
{
181-
$this->redis?->close();
182-
$this->redis = null;
185+
try {
186+
$this->redis?->close();
187+
} catch (\Throwable) {
188+
} finally {
189+
$this->redis = null;
190+
}
183191
}
184192

185193
protected function getRedis(): \Redis
@@ -188,15 +196,58 @@ protected function getRedis(): \Redis
188196
return $this->redis;
189197
}
190198

191-
$this->redis = new \Redis();
192-
193199
$connectTimeout = $this->connectTimeout < 0 ? 0 : $this->connectTimeout;
194-
$this->redis->connect($this->host, $this->port, $connectTimeout);
195200

196-
if ($this->readTimeout >= 0) {
197-
$this->redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout);
201+
for ($attempt = 1; $attempt <= self::CONNECT_MAX_ATTEMPTS; $attempt++) {
202+
$redis = new \Redis();
203+
$connected = false;
204+
205+
try {
206+
$redis->connect($this->host, $this->port, $connectTimeout);
207+
$connected = true;
208+
209+
if ($this->readTimeout >= 0) {
210+
$redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout);
211+
}
212+
213+
$this->redis = $redis;
214+
return $this->redis;
215+
} catch (\RedisException $e) {
216+
if ($connected) {
217+
try {
218+
$redis->close();
219+
} catch (\Throwable) {
220+
}
221+
}
222+
223+
if ($attempt === self::CONNECT_MAX_ATTEMPTS) {
224+
throw new \RedisException(
225+
\sprintf(
226+
'Failed to connect to Redis at %s:%d after %d attempts: %s',
227+
$this->host,
228+
$this->port,
229+
self::CONNECT_MAX_ATTEMPTS,
230+
$e->getMessage(),
231+
),
232+
(int)$e->getCode(),
233+
$e,
234+
);
235+
}
236+
237+
// Exponential backoff with full jitter to avoid thundering herd on recovery.
238+
$backoffMs = \min(
239+
self::CONNECT_MAX_BACKOFF_MS,
240+
self::CONNECT_BACKOFF_MS * (2 ** ($attempt - 1)),
241+
);
242+
\usleep(\mt_rand(0, $backoffMs) * 1000);
243+
}
198244
}
199245

200-
return $this->redis;
246+
throw new \RedisException(\sprintf(
247+
'Unreachable: Redis connect loop for %s:%d exited after %d attempts without success or exception.',
248+
$this->host,
249+
$this->port,
250+
self::CONNECT_MAX_ATTEMPTS,
251+
));
201252
}
202253
}

src/Queue/Connection/RedisCluster.php

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
class RedisCluster implements Connection
88
{
9+
protected const int CONNECT_MAX_ATTEMPTS = 5;
10+
protected const int CONNECT_BACKOFF_MS = 100;
11+
protected const int CONNECT_MAX_BACKOFF_MS = 3_000;
12+
913
protected array $seeds;
1014
protected float $connectTimeout;
1115
protected float $readTimeout;
@@ -175,8 +179,12 @@ public function ping(): bool
175179

176180
public function close(): void
177181
{
178-
$this->redis?->close();
179-
$this->redis = null;
182+
try {
183+
$this->redis?->close();
184+
} catch (\Throwable) {
185+
} finally {
186+
$this->redis = null;
187+
}
180188
}
181189

182190
protected function getRedis(): \RedisCluster
@@ -187,7 +195,38 @@ protected function getRedis(): \RedisCluster
187195

188196
$connectTimeout = $this->connectTimeout < 0 ? 0 : $this->connectTimeout;
189197
$readTimeout = $this->readTimeout < 0 ? 0 : $this->readTimeout;
190-
$this->redis = new \RedisCluster(null, $this->seeds, $connectTimeout, $readTimeout);
191-
return $this->redis;
198+
199+
for ($attempt = 1; $attempt <= self::CONNECT_MAX_ATTEMPTS; $attempt++) {
200+
try {
201+
$this->redis = new \RedisCluster(null, $this->seeds, $connectTimeout, $readTimeout);
202+
return $this->redis;
203+
} catch (\RedisClusterException $e) {
204+
if ($attempt === self::CONNECT_MAX_ATTEMPTS) {
205+
throw new \RedisClusterException(
206+
\sprintf(
207+
'Failed to connect to Redis cluster nodes [%s] after %d attempts: %s',
208+
\implode(', ', $this->seeds),
209+
self::CONNECT_MAX_ATTEMPTS,
210+
$e->getMessage(),
211+
),
212+
(int)$e->getCode(),
213+
$e,
214+
);
215+
}
216+
217+
// Exponential backoff with full jitter to avoid thundering herd on recovery.
218+
$backoffMs = \min(
219+
self::CONNECT_MAX_BACKOFF_MS,
220+
self::CONNECT_BACKOFF_MS * (2 ** ($attempt - 1)),
221+
);
222+
\usleep(\mt_rand(0, $backoffMs) * 1000);
223+
}
224+
}
225+
226+
throw new \RedisClusterException(\sprintf(
227+
'Unreachable: Redis cluster connect loop for nodes [%s] exited after %d attempts without success or exception.',
228+
\implode(', ', $this->seeds),
229+
self::CONNECT_MAX_ATTEMPTS,
230+
));
192231
}
193232
}

0 commit comments

Comments
 (0)