Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions packages/durabletask-js/src/testing/in-memory-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export class InMemoryOrchestrationBackend {
private readonly activityQueue: ActivityWorkItem[] = [];
private readonly stateWaiters: Map<string, StateWaiter[]> = new Map();
private readonly pendingTimers: Set<ReturnType<typeof setTimeout>> = new Set();
private readonly instanceTimers: Map<string, Set<ReturnType<typeof setTimeout>>> = new Map();
private nextCompletionToken: number = 1;
private readonly maxHistorySize: number;

Expand Down Expand Up @@ -217,6 +218,7 @@ export class InMemoryOrchestrationBackend {

this.instances.delete(instanceId);
this.stateWaiters.delete(instanceId);
this.cancelInstanceTimers(instanceId);
return true;
}

Expand Down Expand Up @@ -394,6 +396,7 @@ export class InMemoryOrchestrationBackend {
clearTimeout(timer);
}
this.pendingTimers.clear();
this.instanceTimers.clear();
}

/**
Expand Down Expand Up @@ -543,6 +546,7 @@ export class InMemoryOrchestrationBackend {

const timerHandle = setTimeout(() => {
this.pendingTimers.delete(timerHandle);
this.removeInstanceTimer(instance.instanceId, timerHandle);
const currentInstance = this.instances.get(instance.instanceId);
if (currentInstance && !this.isTerminalStatus(currentInstance.status)) {
const timerFiredEvent = pbh.newTimerFiredEvent(timerId, fireAt);
Expand All @@ -552,6 +556,7 @@ export class InMemoryOrchestrationBackend {
}
}, delay);
this.pendingTimers.add(timerHandle);
this.addInstanceTimer(instance.instanceId, timerHandle);
}

private processCreateSubOrchestrationAction(instance: OrchestrationInstance, action: pb.OrchestratorAction): void {
Expand Down Expand Up @@ -638,6 +643,36 @@ export class InMemoryOrchestrationBackend {
}
}

private addInstanceTimer(instanceId: string, timerHandle: ReturnType<typeof setTimeout>): void {
let timers = this.instanceTimers.get(instanceId);
if (!timers) {
timers = new Set();
this.instanceTimers.set(instanceId, timers);
}
timers.add(timerHandle);
}

private removeInstanceTimer(instanceId: string, timerHandle: ReturnType<typeof setTimeout>): void {
const timers = this.instanceTimers.get(instanceId);
if (timers) {
timers.delete(timerHandle);
if (timers.size === 0) {
this.instanceTimers.delete(instanceId);
}
}
}

private cancelInstanceTimers(instanceId: string): void {
const timers = this.instanceTimers.get(instanceId);
if (timers) {
for (const timer of timers) {
clearTimeout(timer);
this.pendingTimers.delete(timer);
}
this.instanceTimers.delete(instanceId);
}
}

private notifyWaiters(instanceId: string): void {
const instance = this.instances.get(instanceId);
const waiters = this.stateWaiters.get(instanceId);
Expand Down
67 changes: 67 additions & 0 deletions packages/durabletask-js/test/in-memory-backend.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,73 @@ describe("In-Memory Backend", () => {
expect(state).toBeUndefined();
});

it("should cancel pending timers when purging a terminated orchestration", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
// Create a timer far in the future — it will still be pending when we terminate
yield ctx.createTimer(3600);
return "done";
};

worker.addOrchestrator(orchestrator);
await worker.start();

const id = await client.scheduleNewOrchestration(orchestrator);
// Wait for the orchestration to start so the timer action is processed by the backend
await client.waitForOrchestrationStart(id, false, 5);

// Terminate while the long timer is still pending
await client.terminateOrchestration(id, "terminated");
const state = await client.waitForOrchestrationCompletion(id, true, 10);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.TERMINATED);

// Timer should still be pending before purge
const pendingTimersBefore = (backend as any).pendingTimers.size;
expect(pendingTimersBefore).toBeGreaterThan(0);

// Purge the terminated orchestration
const result = await client.purgeOrchestration(id);
expect(result.deletedInstanceCount).toEqual(1);

// After purge, pending timers for this instance should be cancelled
expect((backend as any).pendingTimers.size).toBe(0);
expect((backend as any).instanceTimers.size).toBe(0);
});

it("should cancel pending timers for only the purged orchestration", async () => {
const timerOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
yield ctx.createTimer(3600);
return "done";
};

const waitOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
yield ctx.createTimer(7200);
return "done";
};

worker.addOrchestrator(timerOrchestrator);
worker.addOrchestrator(waitOrchestrator);
await worker.start();

// Start two orchestrations that both create long timers
const id1 = await client.scheduleNewOrchestration(timerOrchestrator);
const id2 = await client.scheduleNewOrchestration(waitOrchestrator);

await client.waitForOrchestrationStart(id1, false, 5);
await client.waitForOrchestrationStart(id2, false, 5);

// Terminate and purge only the first orchestration
await client.terminateOrchestration(id1, "terminated");
await client.waitForOrchestrationCompletion(id1, false, 10);

const result = await client.purgeOrchestration(id1);
expect(result.deletedInstanceCount).toEqual(1);

// The second orchestration's timer should still be pending
expect((backend as any).pendingTimers.size).toBe(1);
expect((backend as any).instanceTimers.has(id2)).toBe(true);
expect((backend as any).instanceTimers.has(id1)).toBe(false);
});

it("should allow reusing instance IDs after reset", async () => {
const orchestrator: TOrchestrator = async (_: OrchestrationContext, input: number) => {
return input * 2;
Expand Down
Loading