Skip to content

Commit 132fef0

Browse files
authored
fix(redis): tighten stale TCP connection detection and add fast lease deadline (#3311)
* fix(redis): tighten stale TCP connection detection and add fast lease deadline * revert(redis): restore original retryStrategy logging * fix(redis): clear deadline timer after Promise.race to prevent memory leak * fix(redis): downgrade lease fallback log to warn — unavailable is expected fallback
1 parent 2ae8145 commit 132fef0

File tree

3 files changed

+47
-37
lines changed

3 files changed

+47
-37
lines changed

apps/sim/lib/core/config/redis.test.ts

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,8 @@ describe('redis config', () => {
2929
getRedisClient()
3030

3131
mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
32-
await vi.advanceTimersByTimeAsync(30_000)
33-
await vi.advanceTimersByTimeAsync(30_000)
34-
await vi.advanceTimersByTimeAsync(30_000)
32+
await vi.advanceTimersByTimeAsync(15_000)
33+
await vi.advanceTimersByTimeAsync(15_000)
3534

3635
expect(listener).toHaveBeenCalledTimes(1)
3736
})
@@ -44,9 +43,9 @@ describe('redis config', () => {
4443
getRedisClient()
4544
mockRedisInstance.ping.mockResolvedValue('PONG')
4645

47-
await vi.advanceTimersByTimeAsync(30_000)
48-
await vi.advanceTimersByTimeAsync(30_000)
49-
await vi.advanceTimersByTimeAsync(30_000)
46+
await vi.advanceTimersByTimeAsync(15_000)
47+
await vi.advanceTimersByTimeAsync(15_000)
48+
await vi.advanceTimersByTimeAsync(15_000)
5049

5150
expect(listener).not.toHaveBeenCalled()
5251
})
@@ -58,34 +57,29 @@ describe('redis config', () => {
5857

5958
getRedisClient()
6059

61-
// 2 failures then a success — should reset counter
60+
// 1 failure then a success — should reset counter
6261
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
63-
await vi.advanceTimersByTimeAsync(30_000)
64-
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
65-
await vi.advanceTimersByTimeAsync(30_000)
62+
await vi.advanceTimersByTimeAsync(15_000)
6663
mockRedisInstance.ping.mockResolvedValueOnce('PONG')
67-
await vi.advanceTimersByTimeAsync(30_000)
64+
await vi.advanceTimersByTimeAsync(15_000)
6865

69-
// 2 more failures — should NOT trigger reconnect (counter was reset)
70-
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
71-
await vi.advanceTimersByTimeAsync(30_000)
66+
// 1 more failure — should NOT trigger reconnect (counter was reset)
7267
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
73-
await vi.advanceTimersByTimeAsync(30_000)
68+
await vi.advanceTimersByTimeAsync(15_000)
7469

7570
expect(listener).not.toHaveBeenCalled()
7671
})
7772

78-
it('should call disconnect(true) after 3 consecutive PING failures', async () => {
73+
it('should call disconnect(true) after 2 consecutive PING failures', async () => {
7974
const { getRedisClient } = await import('./redis')
8075
getRedisClient()
8176

8277
mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
83-
await vi.advanceTimersByTimeAsync(30_000)
84-
await vi.advanceTimersByTimeAsync(30_000)
78+
await vi.advanceTimersByTimeAsync(15_000)
8579

8680
expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()
8781

88-
await vi.advanceTimersByTimeAsync(30_000)
82+
await vi.advanceTimersByTimeAsync(15_000)
8983
expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true)
9084
})
9185

@@ -100,9 +94,8 @@ describe('redis config', () => {
10094

10195
getRedisClient()
10296
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
103-
await vi.advanceTimersByTimeAsync(30_000)
104-
await vi.advanceTimersByTimeAsync(30_000)
105-
await vi.advanceTimersByTimeAsync(30_000)
97+
await vi.advanceTimersByTimeAsync(15_000)
98+
await vi.advanceTimersByTimeAsync(15_000)
10699

107100
expect(badListener).toHaveBeenCalledTimes(1)
108101
expect(goodListener).toHaveBeenCalledTimes(1)
@@ -119,7 +112,7 @@ describe('redis config', () => {
119112

120113
// After closing, PING failures should not trigger disconnect
121114
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
122-
await vi.advanceTimersByTimeAsync(30_000 * 5)
115+
await vi.advanceTimersByTimeAsync(15_000 * 5)
123116
expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()
124117
})
125118
})

apps/sim/lib/core/config/redis.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ let pingFailures = 0
1111
let pingInterval: NodeJS.Timeout | null = null
1212
let pingInFlight = false
1313

14-
const PING_INTERVAL_MS = 30_000
15-
const MAX_PING_FAILURES = 3
14+
const PING_INTERVAL_MS = 15_000
15+
const MAX_PING_FAILURES = 2
1616

1717
/** Callbacks invoked when the PING health check forces a reconnect. */
1818
const reconnectListeners: Array<() => void> = []
@@ -42,7 +42,7 @@ function startPingHealthCheck(redis: Redis): void {
4242
})
4343

4444
if (pingFailures >= MAX_PING_FAILURES) {
45-
logger.error('Redis PING failed 3 consecutive times — forcing reconnect', {
45+
logger.error('Redis PING failed consecutive times — forcing reconnect', {
4646
consecutiveFailures: pingFailures,
4747
})
4848
pingFailures = 0

apps/sim/lib/execution/isolated-vm.ts

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const DISTRIBUTED_MAX_INFLIGHT_PER_OWNER =
7171
MAX_ACTIVE_PER_OWNER + MAX_QUEUED_PER_OWNER
7272
const DISTRIBUTED_LEASE_MIN_TTL_MS = Number.parseInt(env.IVM_DISTRIBUTED_LEASE_MIN_TTL_MS) || 120000
7373
const DISTRIBUTED_KEY_PREFIX = 'ivm:fair:v1:owner'
74+
const LEASE_REDIS_DEADLINE_MS = 200
7475
const QUEUE_RETRY_DELAY_MS = 1000
7576
const DISTRIBUTED_LEASE_GRACE_MS = 30000
7677

@@ -292,21 +293,37 @@ async function tryAcquireDistributedLease(
292293
return 1
293294
`
294295

295-
try {
296-
const result = await redis.eval(
297-
script,
298-
1,
299-
key,
300-
now.toString(),
301-
DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(),
302-
expiresAt.toString(),
303-
leaseId,
304-
leaseTtlMs.toString()
296+
let deadlineTimer: NodeJS.Timeout | undefined
297+
const deadline = new Promise<never>((_, reject) => {
298+
deadlineTimer = setTimeout(
299+
() => reject(new Error(`Redis lease timed out after ${LEASE_REDIS_DEADLINE_MS}ms`)),
300+
LEASE_REDIS_DEADLINE_MS
305301
)
302+
})
303+
304+
try {
305+
const result = await Promise.race([
306+
redis.eval(
307+
script,
308+
1,
309+
key,
310+
now.toString(),
311+
DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(),
312+
expiresAt.toString(),
313+
leaseId,
314+
leaseTtlMs.toString()
315+
),
316+
deadline,
317+
])
306318
return Number(result) === 1 ? 'acquired' : 'limit_exceeded'
307319
} catch (error) {
308-
logger.error('Failed to acquire distributed owner lease', { ownerKey, error })
320+
logger.warn('Failed to acquire distributed owner lease — falling back to local execution', {
321+
ownerKey,
322+
error,
323+
})
309324
return 'unavailable'
325+
} finally {
326+
clearTimeout(deadlineTimer)
310327
}
311328
}
312329

0 commit comments

Comments
 (0)