Skip to content

Commit 0dfc2b5

Browse files
committed
fix(redis): tighten stale TCP connection detection and add fast lease deadline
1 parent 2ae8145 commit 0dfc2b5

File tree

3 files changed

+47
-37
lines changed

3 files changed

+47
-37
lines changed

apps/sim/lib/core/config/redis.test.ts

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,8 @@ describe('redis config', () => {
2929
getRedisClient()
3030

3131
mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
32-
await vi.advanceTimersByTimeAsync(30_000)
33-
await vi.advanceTimersByTimeAsync(30_000)
34-
await vi.advanceTimersByTimeAsync(30_000)
32+
await vi.advanceTimersByTimeAsync(15_000)
33+
await vi.advanceTimersByTimeAsync(15_000)
3534

3635
expect(listener).toHaveBeenCalledTimes(1)
3736
})
@@ -44,9 +43,9 @@ describe('redis config', () => {
4443
getRedisClient()
4544
mockRedisInstance.ping.mockResolvedValue('PONG')
4645

47-
await vi.advanceTimersByTimeAsync(30_000)
48-
await vi.advanceTimersByTimeAsync(30_000)
49-
await vi.advanceTimersByTimeAsync(30_000)
46+
await vi.advanceTimersByTimeAsync(15_000)
47+
await vi.advanceTimersByTimeAsync(15_000)
48+
await vi.advanceTimersByTimeAsync(15_000)
5049

5150
expect(listener).not.toHaveBeenCalled()
5251
})
@@ -58,34 +57,29 @@ describe('redis config', () => {
5857

5958
getRedisClient()
6059

61-
// 2 failures then a success — should reset counter
60+
// 1 failure then a success — should reset counter
6261
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
63-
await vi.advanceTimersByTimeAsync(30_000)
64-
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
65-
await vi.advanceTimersByTimeAsync(30_000)
62+
await vi.advanceTimersByTimeAsync(15_000)
6663
mockRedisInstance.ping.mockResolvedValueOnce('PONG')
67-
await vi.advanceTimersByTimeAsync(30_000)
64+
await vi.advanceTimersByTimeAsync(15_000)
6865

69-
// 2 more failures — should NOT trigger reconnect (counter was reset)
70-
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
71-
await vi.advanceTimersByTimeAsync(30_000)
66+
// 1 more failure — should NOT trigger reconnect (counter was reset)
7267
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
73-
await vi.advanceTimersByTimeAsync(30_000)
68+
await vi.advanceTimersByTimeAsync(15_000)
7469

7570
expect(listener).not.toHaveBeenCalled()
7671
})
7772

78-
it('should call disconnect(true) after 3 consecutive PING failures', async () => {
73+
it('should call disconnect(true) after 2 consecutive PING failures', async () => {
7974
const { getRedisClient } = await import('./redis')
8075
getRedisClient()
8176

8277
mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
83-
await vi.advanceTimersByTimeAsync(30_000)
84-
await vi.advanceTimersByTimeAsync(30_000)
78+
await vi.advanceTimersByTimeAsync(15_000)
8579

8680
expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()
8781

88-
await vi.advanceTimersByTimeAsync(30_000)
82+
await vi.advanceTimersByTimeAsync(15_000)
8983
expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true)
9084
})
9185

@@ -100,9 +94,8 @@ describe('redis config', () => {
10094

10195
getRedisClient()
10296
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
103-
await vi.advanceTimersByTimeAsync(30_000)
104-
await vi.advanceTimersByTimeAsync(30_000)
105-
await vi.advanceTimersByTimeAsync(30_000)
97+
await vi.advanceTimersByTimeAsync(15_000)
98+
await vi.advanceTimersByTimeAsync(15_000)
10699

107100
expect(badListener).toHaveBeenCalledTimes(1)
108101
expect(goodListener).toHaveBeenCalledTimes(1)
@@ -119,7 +112,7 @@ describe('redis config', () => {
119112

120113
// After closing, PING failures should not trigger disconnect
121114
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
122-
await vi.advanceTimersByTimeAsync(30_000 * 5)
115+
await vi.advanceTimersByTimeAsync(15_000 * 5)
123116
expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()
124117
})
125118
})

apps/sim/lib/core/config/redis.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ let pingFailures = 0
1111
let pingInterval: NodeJS.Timeout | null = null
1212
let pingInFlight = false
1313

14-
const PING_INTERVAL_MS = 30_000
15-
const MAX_PING_FAILURES = 3
14+
const PING_INTERVAL_MS = 15_000
15+
const MAX_PING_FAILURES = 2
1616

1717
/** Callbacks invoked when the PING health check forces a reconnect. */
1818
const reconnectListeners: Array<() => void> = []
@@ -42,7 +42,7 @@ function startPingHealthCheck(redis: Redis): void {
4242
})
4343

4444
if (pingFailures >= MAX_PING_FAILURES) {
45-
logger.error('Redis PING failed 3 consecutive times — forcing reconnect', {
45+
logger.error('Redis PING failed consecutive times — forcing reconnect', {
4646
consecutiveFailures: pingFailures,
4747
})
4848
pingFailures = 0
@@ -89,7 +89,13 @@ export function getRedisClient(): Redis | null {
8989

9090
retryStrategy: (times) => {
9191
if (times > 10) {
92-
logger.error(`Redis reconnection attempt ${times}`, { nextRetryMs: 30000 })
92+
// Log at the transition point and every 20 attempts (~10 min) to avoid unbounded log volume
93+
if (times === 11 || times % 20 === 0) {
94+
logger.error('Redis reconnection stalled — retrying every 30s', {
95+
attempt: times,
96+
nextRetryMs: 30000,
97+
})
98+
}
9399
return 30000
94100
}
95101
const base = Math.min(1000 * 2 ** (times - 1), 10000)

apps/sim/lib/execution/isolated-vm.ts

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const DISTRIBUTED_MAX_INFLIGHT_PER_OWNER =
7171
MAX_ACTIVE_PER_OWNER + MAX_QUEUED_PER_OWNER
7272
const DISTRIBUTED_LEASE_MIN_TTL_MS = Number.parseInt(env.IVM_DISTRIBUTED_LEASE_MIN_TTL_MS) || 120000
7373
const DISTRIBUTED_KEY_PREFIX = 'ivm:fair:v1:owner'
74+
const LEASE_REDIS_DEADLINE_MS = 200
7475
const QUEUE_RETRY_DELAY_MS = 1000
7576
const DISTRIBUTED_LEASE_GRACE_MS = 30000
7677

@@ -292,17 +293,27 @@ async function tryAcquireDistributedLease(
292293
return 1
293294
`
294295

295-
try {
296-
const result = await redis.eval(
297-
script,
298-
1,
299-
key,
300-
now.toString(),
301-
DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(),
302-
expiresAt.toString(),
303-
leaseId,
304-
leaseTtlMs.toString()
296+
const deadline = new Promise<never>((_, reject) =>
297+
setTimeout(
298+
() => reject(new Error(`Redis lease timed out after ${LEASE_REDIS_DEADLINE_MS}ms`)),
299+
LEASE_REDIS_DEADLINE_MS
305300
)
301+
)
302+
303+
try {
304+
const result = await Promise.race([
305+
redis.eval(
306+
script,
307+
1,
308+
key,
309+
now.toString(),
310+
DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(),
311+
expiresAt.toString(),
312+
leaseId,
313+
leaseTtlMs.toString()
314+
),
315+
deadline,
316+
])
306317
return Number(result) === 1 ? 'acquired' : 'limit_exceeded'
307318
} catch (error) {
308319
logger.error('Failed to acquire distributed owner lease', { ownerKey, error })

0 commit comments

Comments
 (0)