From f0472fa6db638c522e80f81e1ae3eb9a66ada0d9 Mon Sep 17 00:00:00 2001 From: Copilot <223556219+Copilot@users.noreply.github.com> Date: Thu, 19 Mar 2026 08:32:55 +0000 Subject: [PATCH] fix: cancel pending timers when purging orchestration instances The InMemoryOrchestrationBackend.purge() method deleted orchestration instances from the store but did not cancel their pending setTimeout handles. This caused timer handles to leak, keeping the Node.js event loop alive unnecessarily and wasting resources until the timers eventually fired and found no instance to act on. The fix adds per-instance timer tracking via an instanceTimers map. When purge() is called, all pending timers for that instance are cancelled and removed from both the instance-level and global timer tracking sets. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../src/testing/in-memory-backend.ts | 35 ++++++++++ .../test/in-memory-backend.spec.ts | 67 +++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/packages/durabletask-js/src/testing/in-memory-backend.ts b/packages/durabletask-js/src/testing/in-memory-backend.ts index 2c96acb..64f6824 100644 --- a/packages/durabletask-js/src/testing/in-memory-backend.ts +++ b/packages/durabletask-js/src/testing/in-memory-backend.ts @@ -62,6 +62,7 @@ export class InMemoryOrchestrationBackend { private readonly activityQueue: ActivityWorkItem[] = []; private readonly stateWaiters: Map = new Map(); private readonly pendingTimers: Set> = new Set(); + private readonly instanceTimers: Map>> = new Map(); private nextCompletionToken: number = 1; private readonly maxHistorySize: number; @@ -217,6 +218,7 @@ export class InMemoryOrchestrationBackend { this.instances.delete(instanceId); this.stateWaiters.delete(instanceId); + this.cancelInstanceTimers(instanceId); return true; } @@ -394,6 +396,7 @@ export class InMemoryOrchestrationBackend { clearTimeout(timer); } this.pendingTimers.clear(); + this.instanceTimers.clear(); } /** @@ -543,6 +546,7 @@ export class InMemoryOrchestrationBackend { const timerHandle = setTimeout(() => { this.pendingTimers.delete(timerHandle); + this.removeInstanceTimer(instance.instanceId, timerHandle); const currentInstance = this.instances.get(instance.instanceId); if (currentInstance && !this.isTerminalStatus(currentInstance.status)) { const timerFiredEvent = pbh.newTimerFiredEvent(timerId, fireAt); @@ -552,6 +556,7 @@ export class InMemoryOrchestrationBackend { } }, delay); this.pendingTimers.add(timerHandle); + this.addInstanceTimer(instance.instanceId, timerHandle); } private processCreateSubOrchestrationAction(instance: OrchestrationInstance, action: pb.OrchestratorAction): void { @@ -638,6 +643,36 @@ export class InMemoryOrchestrationBackend { } } + private addInstanceTimer(instanceId: string, timerHandle: ReturnType): void { + let timers = this.instanceTimers.get(instanceId); + if (!timers) { + timers = new Set(); + this.instanceTimers.set(instanceId, timers); + } + timers.add(timerHandle); + } + + private removeInstanceTimer(instanceId: string, timerHandle: ReturnType): void { + const timers = this.instanceTimers.get(instanceId); + if (timers) { + timers.delete(timerHandle); + if (timers.size === 0) { + this.instanceTimers.delete(instanceId); + } + } + } + + private cancelInstanceTimers(instanceId: string): void { + const timers = this.instanceTimers.get(instanceId); + if (timers) { + for (const timer of timers) { + clearTimeout(timer); + this.pendingTimers.delete(timer); + } + this.instanceTimers.delete(instanceId); + } + } + private notifyWaiters(instanceId: string): void { const instance = this.instances.get(instanceId); const waiters = this.stateWaiters.get(instanceId); diff --git a/packages/durabletask-js/test/in-memory-backend.spec.ts b/packages/durabletask-js/test/in-memory-backend.spec.ts index 66fe6a9..a4d6d3b 100644 --- a/packages/durabletask-js/test/in-memory-backend.spec.ts +++ b/packages/durabletask-js/test/in-memory-backend.spec.ts @@ -356,6 +356,73 @@ describe("In-Memory Backend", () => { expect(state).toBeUndefined(); }); + it("should cancel pending timers when purging a terminated orchestration", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + // Create a timer far in the future — it will still be pending when we terminate + yield ctx.createTimer(3600); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + // Wait for the orchestration to start so the timer action is processed by the backend + await client.waitForOrchestrationStart(id, false, 5); + + // Terminate while the long timer is still pending + await client.terminateOrchestration(id, "terminated"); + const state = await client.waitForOrchestrationCompletion(id, true, 10); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.TERMINATED); + + // Timer should still be pending before purge + const pendingTimersBefore = (backend as any).pendingTimers.size; + expect(pendingTimersBefore).toBeGreaterThan(0); + + // Purge the terminated orchestration + const result = await client.purgeOrchestration(id); + expect(result.deletedInstanceCount).toEqual(1); + + // After purge, pending timers for this instance should be cancelled + expect((backend as any).pendingTimers.size).toBe(0); + expect((backend as any).instanceTimers.size).toBe(0); + }); + + it("should cancel pending timers for only the purged orchestration", async () => { + const timerOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.createTimer(3600); + return "done"; + }; + + const waitOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.createTimer(7200); + return "done"; + }; + + worker.addOrchestrator(timerOrchestrator); + worker.addOrchestrator(waitOrchestrator); + await worker.start(); + + // Start two orchestrations that both create long timers + const id1 = await client.scheduleNewOrchestration(timerOrchestrator); + const id2 = await client.scheduleNewOrchestration(waitOrchestrator); + + await client.waitForOrchestrationStart(id1, false, 5); + await client.waitForOrchestrationStart(id2, false, 5); + + // Terminate and purge only the first orchestration + await client.terminateOrchestration(id1, "terminated"); + await client.waitForOrchestrationCompletion(id1, false, 10); + + const result = await client.purgeOrchestration(id1); + expect(result.deletedInstanceCount).toEqual(1); + + // The second orchestration's timer should still be pending + expect((backend as any).pendingTimers.size).toBe(1); + expect((backend as any).instanceTimers.has(id2)).toBe(true); + expect((backend as any).instanceTimers.has(id1)).toBe(false); + }); + it("should allow reusing instance IDs after reset", async () => { const orchestrator: TOrchestrator = async (_: OrchestrationContext, input: number) => { return input * 2;