Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/durabletask-js/src/worker/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const CATEGORY_ENTITIES = "Microsoft.DurableTask.Worker.Entities";
/** @internal */ export const EVENT_ACTIVITY_EXECUTION_ERROR = 734;
/** @internal */ export const EVENT_ACTIVITY_RESPONSE_ERROR = 735;
/** @internal */ export const EVENT_STREAM_ERROR_INFO = 736;
/** @internal */ export const EVENT_RETRY_HANDLER_EXCEPTION = 737;

// ── Entity-specific Event IDs (800+ range) ──────────────────────────────────

Expand Down Expand Up @@ -194,6 +195,24 @@ export function retryingTask(logger: Logger, instanceId: string, name: string, a
}, `${instanceId}: Evaluating custom retry handler for failed '${name}' task. Attempt = ${attempt}.`);
}

/**
* Logs that a retry handler or handleFailure predicate threw an exception.
* The task will not be retried and will fail with its original error.
*/
export function retryHandlerException(logger: Logger, instanceId: string, name: string, error: unknown): void {
const msg = toErrorMessage(error);
emitLog(logger, "warn", {
eventId: EVENT_RETRY_HANDLER_EXCEPTION,
category: CATEGORY_ORCHESTRATIONS,
properties: {
instanceId,
name,
error: msg,
...(error instanceof Error && error.stack ? { stack: error.stack } : {}),
},
}, `${instanceId}: Retry evaluation for '${name}' threw an exception and will not be retried: ${msg}`);
}

// ═══════════════════════════════════════════════════════════════════════════════
// JS-specific Worker Lifecycle Logs (Event IDs 700+)
// ═══════════════════════════════════════════════════════════════════════════════
Expand Down
22 changes: 20 additions & 2 deletions packages/durabletask-js/src/worker/orchestration-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,16 @@ export class OrchestrationExecutor {
): Promise<boolean> {
if (task instanceof RetryableTask) {
task.recordFailure(errorMessage, failureDetails);
const nextDelayMs = task.computeNextDelayInMilliseconds(ctx._currentUtcDatetime);
let nextDelayMs: number | undefined;
try {
nextDelayMs = task.computeNextDelayInMilliseconds(ctx._currentUtcDatetime);
} catch (e: unknown) {
// The retry policy's handleFailure predicate threw an exception.
// Treat this as "don't retry" so the task fails with its original error
// rather than crashing the entire orchestration.
WorkerLogs.retryHandlerException(this._logger, ctx._instanceId, task.taskName, e);
return false;
}

if (nextDelayMs !== undefined) {
WorkerLogs.retryingTask(this._logger, ctx._instanceId, task.taskName, task.attemptCount);
Expand All @@ -776,7 +785,16 @@ export class OrchestrationExecutor {
}
} else if (task instanceof RetryHandlerTask) {
task.recordFailure(errorMessage, failureDetails);
const retryResult = await task.shouldRetry(ctx._currentUtcDatetime);
let retryResult: boolean | number;
try {
retryResult = await task.shouldRetry(ctx._currentUtcDatetime);
} catch (e: unknown) {
// The user-provided retry handler threw an exception.
// Treat this as "don't retry" so the task fails with its original error
// rather than crashing the entire orchestration.
WorkerLogs.retryHandlerException(this._logger, ctx._instanceId, task.taskName, e);
return false;
}

// Only retry when the handler explicitly returns true or a finite number.
// Using a positive check (=== true || finite number) instead of !== false
Expand Down
109 changes: 109 additions & 0 deletions packages/durabletask-js/test/orchestration_executor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,115 @@ describe("Orchestration Executor", () => {
const completeAction = getAndValidateSingleCompleteOrchestrationAction(result);
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
});

it("should fail the task (not crash orchestration) when retry handler throws an exception", async () => {
const { result: startResult, replay } = await startOrchestration(
async function* (ctx: OrchestrationContext): any {
const retryHandler = async (_retryCtx: any) => {
throw new Error("Handler exploded!");
};
return yield ctx.callActivity("flakyActivity", undefined, { retry: retryHandler });
},
);

expect(startResult.actions[0].hasScheduletask()).toBe(true);

// Activity fails → handler throws → should NOT retry, task should fail with original error
const result = await replay(
[newTaskScheduledEvent(1, "flakyActivity")],
[newTaskFailedEvent(1, new Error("Original activity error"))],
);
const completeAction = getAndValidateSingleCompleteOrchestrationAction(result);
// The orchestration fails with the original task error, not the handler exception
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("Original activity error");
});

it("should allow orchestrator to catch TaskFailedError when retry handler throws", async () => {
const { result: startResult, replay } = await startOrchestration(
async function* (ctx: OrchestrationContext): any {
const retryHandler = async (_retryCtx: any) => {
throw new Error("Handler exploded!");
};
try {
yield ctx.callActivity("flakyActivity", undefined, { retry: retryHandler });
} catch (e: any) {
return "caught: " + e.message;
}
},
);

expect(startResult.actions[0].hasScheduletask()).toBe(true);

// Activity fails → handler throws → task fails → orchestrator catches the error
const result = await replay(
[newTaskScheduledEvent(1, "flakyActivity")],
[newTaskFailedEvent(1, new Error("Original activity error"))],
);
const completeAction = getAndValidateSingleCompleteOrchestrationAction(result);
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(completeAction?.getResult()?.getValue()).toContain("Original activity error");
});

it("should fail the task (not crash orchestration) when handleFailure predicate throws", async () => {
const { RetryPolicy } = await import("../src/task/retry/retry-policy");

const { result: startResult, replay } = await startOrchestration(
async function* (ctx: OrchestrationContext): any {
const retryPolicy = new RetryPolicy({
maxNumberOfAttempts: 3,
firstRetryIntervalInMilliseconds: 1000,
handleFailure: (_failure: any) => {
throw new Error("Predicate exploded!");
},
});
return yield ctx.callActivity("flakyActivity", undefined, { retry: retryPolicy });
},
);

expect(startResult.actions[0].hasScheduletask()).toBe(true);

// Activity fails → handleFailure throws → should NOT retry, task should fail with original error
const result = await replay(
[newTaskScheduledEvent(1, "flakyActivity")],
[newTaskFailedEvent(1, new Error("Original activity error"))],
);
const completeAction = getAndValidateSingleCompleteOrchestrationAction(result);
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("Original activity error");
});

it("should allow orchestrator to catch TaskFailedError when handleFailure predicate throws", async () => {
const { RetryPolicy } = await import("../src/task/retry/retry-policy");

const { result: startResult, replay } = await startOrchestration(
async function* (ctx: OrchestrationContext): any {
const retryPolicy = new RetryPolicy({
maxNumberOfAttempts: 3,
firstRetryIntervalInMilliseconds: 1000,
handleFailure: (_failure: any) => {
throw new Error("Predicate exploded!");
},
});
try {
yield ctx.callActivity("flakyActivity", undefined, { retry: retryPolicy });
} catch (e: any) {
return "caught: " + e.message;
}
},
);

expect(startResult.actions[0].hasScheduletask()).toBe(true);

// Activity fails → predicate throws → task fails → orchestrator catches the error
const result = await replay(
[newTaskScheduledEvent(1, "flakyActivity")],
[newTaskFailedEvent(1, new Error("Original activity error"))],
);
const completeAction = getAndValidateSingleCompleteOrchestrationAction(result);
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(completeAction?.getResult()?.getValue()).toContain("Original activity error");
});
});

it("should complete immediately when whenAll is called with an empty task array", async () => {
Expand Down
Loading