Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import {
encodeWorkflowIdConflictPolicy,
WorkflowIdConflictPolicy,
compilePriority,
LoadedDataConverter,
} from '@temporalio/common';
import { withSerializationContext } from '@temporalio/common/lib/converter/serialization-context';
import { encodeUserMetadata } from '@temporalio/common/lib/internal-non-workflow/codec-helpers';
import { encodeUnifiedSearchAttributes } from '@temporalio/common/lib/converter/payload-search-attributes';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
Expand Down Expand Up @@ -527,6 +529,14 @@ export class WorkflowClient extends BaseClient {
return this.connection.workflowService;
}

protected dataConverterWithWorkflowContext(workflowId: string): LoadedDataConverter {
return withSerializationContext(this.dataConverter, {
type: 'workflow',
namespace: this.options.namespace,
workflowId,
});
}

protected async _start<T extends Workflow>(
workflowTypeOrFunc: string | T,
options: WorkflowStartOptions<T>,
Expand Down Expand Up @@ -793,6 +803,7 @@ export class WorkflowClient extends BaseClient {
runId?: string,
opts?: WorkflowResultOptions
): Promise<WorkflowResultType<T>> {
const dataConverter = this.dataConverterWithWorkflowContext(workflowId);
const followRuns = opts?.followRuns ?? true;
const execution: temporal.api.common.v1.IWorkflowExecution = { workflowId, runId };
const req: GetWorkflowExecutionHistoryRequest = {
Expand Down Expand Up @@ -832,7 +843,7 @@ export class WorkflowClient extends BaseClient {
// Note that we can only return one value from our workflow function in JS.
// Ignore any other payloads in result
const [result] = await decodeArrayFromPayloads(
this.dataConverter,
dataConverter,
ev.workflowExecutionCompletedEventAttributes.result?.payloads
);
return result as any;
Expand All @@ -845,16 +856,13 @@ export class WorkflowClient extends BaseClient {
const { failure, retryState } = ev.workflowExecutionFailedEventAttributes;
throw new WorkflowFailedError(
'Workflow execution failed',
await decodeOptionalFailureToOptionalError(this.dataConverter, failure),
await decodeOptionalFailureToOptionalError(dataConverter, failure),
decodeRetryState(retryState)
);
} else if (ev.workflowExecutionCanceledEventAttributes) {
const failure = new CancelledFailure(
'Workflow canceled',
await decodeArrayFromPayloads(
this.dataConverter,
ev.workflowExecutionCanceledEventAttributes.details?.payloads
)
await decodeArrayFromPayloads(dataConverter, ev.workflowExecutionCanceledEventAttributes.details?.payloads)
);
failure.stack = '';
throw new WorkflowFailedError('Workflow execution cancelled', failure, RetryState.NON_RETRYABLE_FAILURE);
Expand Down Expand Up @@ -941,13 +949,14 @@ export class WorkflowClient extends BaseClient {
* Used as the final function of the query interceptor chain
*/
protected async _queryWorkflowHandler(input: WorkflowQueryInput): Promise<unknown> {
const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!);
const req: temporal.api.workflowservice.v1.IQueryWorkflowRequest = {
queryRejectCondition: input.queryRejectCondition,
namespace: this.options.namespace,
execution: input.workflowExecution,
query: {
queryType: input.queryType,
queryArgs: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) },
queryArgs: { payloads: await encodeToPayloads(dataConverter, ...input.args) },
header: { fields: input.headers },
},
};
Expand All @@ -973,13 +982,14 @@ export class WorkflowClient extends BaseClient {
throw new TypeError('Invalid response from server');
}
// We ignore anything but the first result
return await decodeFromPayloadsAtIndex(this.dataConverter, 0, response.queryResult?.payloads);
return await decodeFromPayloadsAtIndex(dataConverter, 0, response.queryResult?.payloads);
}

protected async _createUpdateWorkflowRequest(
lifecycleStage: temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage,
input: WorkflowStartUpdateInput
): Promise<temporal.api.workflowservice.v1.IUpdateWorkflowExecutionRequest> {
const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!);
const updateId = input.options?.updateId ?? uuid4();
return {
namespace: this.options.namespace,
Expand All @@ -996,7 +1006,7 @@ export class WorkflowClient extends BaseClient {
input: {
header: { fields: input.headers },
name: input.updateName,
args: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) },
args: { payloads: await encodeToPayloads(dataConverter, ...input.args) },
},
},
};
Expand Down Expand Up @@ -1140,6 +1150,7 @@ export class WorkflowClient extends BaseClient {
workflowRunId?: string,
outcome?: temporal.api.update.v1.IOutcome
): WorkflowUpdateHandle<Ret> {
const dataConverter = this.dataConverterWithWorkflowContext(workflowId);
return {
updateId,
workflowId,
Expand All @@ -1150,10 +1161,10 @@ export class WorkflowClient extends BaseClient {
if (completedOutcome.failure) {
throw new WorkflowUpdateFailedError(
'Workflow Update failed',
await decodeOptionalFailureToOptionalError(this.dataConverter, completedOutcome.failure)
await decodeOptionalFailureToOptionalError(dataConverter, completedOutcome.failure)
);
} else {
return await decodeFromPayloadsAtIndex<Ret>(this.dataConverter, 0, completedOutcome.success?.payloads);
return await decodeFromPayloadsAtIndex<Ret>(dataConverter, 0, completedOutcome.success?.payloads);
}
},
};
Expand Down Expand Up @@ -1194,6 +1205,7 @@ export class WorkflowClient extends BaseClient {
* Used as the final function of the signal interceptor chain
*/
protected async _signalWorkflowHandler(input: WorkflowSignalInput): Promise<void> {
const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!);
const req: temporal.api.workflowservice.v1.ISignalWorkflowExecutionRequest = {
identity: this.options.identity,
namespace: this.options.namespace,
Expand All @@ -1202,7 +1214,7 @@ export class WorkflowClient extends BaseClient {
// control is unused,
signalName: input.signalName,
header: { fields: input.headers },
input: { payloads: await encodeToPayloads(this.dataConverter, ...input.args) },
input: { payloads: await encodeToPayloads(dataConverter, ...input.args) },
};
try {
await this.workflowService.signalWorkflowExecution(req);
Expand All @@ -1219,6 +1231,7 @@ export class WorkflowClient extends BaseClient {
protected async _signalWithStartWorkflowHandler(input: WorkflowSignalWithStartInput): Promise<string> {
const { identity } = this.options;
const { options, workflowType, signalName, signalArgs, headers } = input;
const dataConverter = this.dataConverterWithWorkflowContext(options.workflowId);
const req: temporal.api.workflowservice.v1.ISignalWithStartWorkflowExecutionRequest = {
namespace: this.options.namespace,
identity,
Expand All @@ -1227,9 +1240,9 @@ export class WorkflowClient extends BaseClient {
workflowIdReusePolicy: encodeWorkflowIdReusePolicy(options.workflowIdReusePolicy),
workflowIdConflictPolicy: encodeWorkflowIdConflictPolicy(options.workflowIdConflictPolicy),
workflowType: { name: workflowType },
input: { payloads: await encodeToPayloads(this.dataConverter, ...options.args) },
input: { payloads: await encodeToPayloads(dataConverter, ...options.args) },
signalName,
signalInput: { payloads: await encodeToPayloads(this.dataConverter, ...signalArgs) },
signalInput: { payloads: await encodeToPayloads(dataConverter, ...signalArgs) },
taskQueue: {
kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL,
name: options.taskQueue,
Expand All @@ -1239,7 +1252,7 @@ export class WorkflowClient extends BaseClient {
workflowTaskTimeout: options.workflowTaskTimeout,
workflowStartDelay: options.startDelay,
retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined,
memo: options.memo ? { fields: await encodeMapToPayloads(this.dataConverter, options.memo) } : undefined,
memo: options.memo ? { fields: await encodeMapToPayloads(dataConverter, options.memo) } : undefined,
searchAttributes:
options.searchAttributes || options.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated
? {
Expand All @@ -1248,7 +1261,7 @@ export class WorkflowClient extends BaseClient {
: undefined,
cronSchedule: options.cronSchedule,
header: { fields: headers },
userMetadata: await encodeUserMetadata(this.dataConverter, options.staticSummary, options.staticDetails),
userMetadata: await encodeUserMetadata(dataConverter, options.staticSummary, options.staticDetails),
priority: options.priority ? compilePriority(options.priority) : undefined,
versioningOverride: options.versioningOverride ?? undefined,
};
Expand Down Expand Up @@ -1299,6 +1312,7 @@ export class WorkflowClient extends BaseClient {
protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise<StartWorkflowExecutionRequest> {
const { options: opts, workflowType, headers } = input;
const { identity, namespace } = this.options;
const dataConverter = this.dataConverterWithWorkflowContext(opts.workflowId);
const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol];
const supportsEagerStart = (this.connection as InternalConnectionLike)?.[InternalConnectionLikeSymbol]
?.supportsEagerStart;
Expand All @@ -1318,7 +1332,7 @@ export class WorkflowClient extends BaseClient {
workflowIdReusePolicy: encodeWorkflowIdReusePolicy(opts.workflowIdReusePolicy),
workflowIdConflictPolicy: encodeWorkflowIdConflictPolicy(opts.workflowIdConflictPolicy),
workflowType: { name: workflowType },
input: { payloads: await encodeToPayloads(this.dataConverter, ...opts.args) },
input: { payloads: await encodeToPayloads(dataConverter, ...opts.args) },
taskQueue: {
kind: temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL,
name: opts.taskQueue,
Expand All @@ -1328,7 +1342,7 @@ export class WorkflowClient extends BaseClient {
workflowTaskTimeout: opts.workflowTaskTimeout,
workflowStartDelay: opts.startDelay,
retryPolicy: opts.retry ? compileRetryPolicy(opts.retry) : undefined,
memo: opts.memo ? { fields: await encodeMapToPayloads(this.dataConverter, opts.memo) } : undefined,
memo: opts.memo ? { fields: await encodeMapToPayloads(dataConverter, opts.memo) } : undefined,
searchAttributes:
opts.searchAttributes || opts.typedSearchAttributes // eslint-disable-line @typescript-eslint/no-deprecated
? {
Expand All @@ -1337,7 +1351,7 @@ export class WorkflowClient extends BaseClient {
: undefined,
cronSchedule: opts.cronSchedule,
header: { fields: headers },
userMetadata: await encodeUserMetadata(this.dataConverter, opts.staticSummary, opts.staticDetails),
userMetadata: await encodeUserMetadata(dataConverter, opts.staticSummary, opts.staticDetails),
priority: opts.priority ? compilePriority(opts.priority) : undefined,
versioningOverride: opts.versioningOverride ?? undefined,
requestEagerExecution: opts.requestEagerStart,
Expand All @@ -1353,12 +1367,13 @@ export class WorkflowClient extends BaseClient {
protected async _terminateWorkflowHandler(
input: WorkflowTerminateInput
): Promise<TerminateWorkflowExecutionResponse> {
const dataConverter = this.dataConverterWithWorkflowContext(input.workflowExecution.workflowId!);
const req: temporal.api.workflowservice.v1.ITerminateWorkflowExecutionRequest = {
namespace: this.options.namespace,
identity: this.options.identity,
...input,
details: {
payloads: input.details ? await encodeToPayloads(this.dataConverter, ...input.details) : undefined,
payloads: input.details ? await encodeToPayloads(dataConverter, ...input.details) : undefined,
},
firstExecutionRunId: input.firstExecutionRunId,
};
Expand Down
12 changes: 12 additions & 0 deletions packages/common/src/converter/failure-converter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { isError } from '../type-helpers';
import { msOptionalToTs } from '../time';
import { encode } from '../encoding';
import { arrayFromPayloads, fromPayloadsAtIndex, PayloadConverter, toPayloads } from './payload-converter';
import type { SerializationContext } from './serialization-context';

// Can't import proto enums into the workflow sandbox, use this helper type and enum converter instead.
const NexusHandlerErrorRetryBehavior = {
Expand Down Expand Up @@ -109,6 +110,17 @@ export interface FailureConverter {
* The returned error must be an instance of `TemporalFailure`.
*/
failureToError(err: ProtoFailure, payloadConverter: PayloadConverter): Error;

/**
* Optionally return a converter bound to the current serialization context.
*
* The SDK may call this for individual failure conversion operations across
* different workflows and activities. Implementations should treat this as
* side-effect free and may return `this` when no rebinding is needed.
*
* @experimental Serialization context is an experimental feature and may change.
*/
withContext?(context: SerializationContext): FailureConverter;
}

/**
Expand Down
12 changes: 12 additions & 0 deletions packages/common/src/converter/payload-converter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { decode, encode } from '../encoding';
import { PayloadConverterError, ValueError } from '../errors';
import { Payload } from '../interfaces';
import type { SerializationContext } from './serialization-context';
import { encodingKeys, encodingTypes, METADATA_ENCODING_KEY } from './types';

/**
Expand All @@ -25,6 +26,17 @@ export interface PayloadConverter {
* Converts a {@link Payload} back to a value.
*/
fromPayload<T>(payload: Payload): T;

/**
* Optionally return a converter bound to the current serialization context.
*
* The SDK may call this for individual conversion operations across different
* workflows and activities. Implementations should treat this as side-effect
* free and may return `this` when no rebinding is needed.
*
* @experimental Serialization context is an experimental feature and may change.
*/
withContext?(context: SerializationContext): PayloadConverter;
}

/**
Expand Down
107 changes: 107 additions & 0 deletions packages/common/src/converter/serialization-context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import type { LoadedDataConverter } from './data-converter';
import type { FailureConverter } from './failure-converter';
import type { PayloadConverter } from './payload-converter';

/**
* Context for payloads owned by a workflow.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Context for payloads owned by a workflow.
* Context provided to data converters for payloads owned by a workflow.

*
* This is used when converting payloads sent to or received from a workflow.
* If a workflow interacts with a child workflow or an external workflow, this
* context refers to that target workflow.
*
* @experimental Serialization context is an experimental feature and may change.
*/
export interface WorkflowSerializationContext {
/** Always `'workflow'` for workflow-owned payloads. */
type: 'workflow';
/** Namespace of the workflow that owns the payload. */
namespace: string;
/** Workflow ID of the workflow that owns the payload. */
workflowId: string;
}

/**
* Context for payloads owned by an activity.
*
* This is used when converting activity arguments, results, heartbeat details,
* and activity-related failures.
*
* @experimental Serialization context is an experimental feature and may change.
*/
export interface ActivitySerializationContext {
/** Always `'activity'` for activity-owned payloads. */
type: 'activity';
/** Namespace of the activity that owns the payload. */
namespace: string;
/**
* Activity ID of the activity that owns the payload.
*
* This may be omitted when context is supplied manually and the caller does
* not know the activity ID.
*/
activityId?: string;
/** Workflow ID of the workflow that scheduled the activity, when known. */
workflowId?: string;
/** Whether the activity is a local activity started from a workflow. */
isLocal: boolean;
}

/**
* Context passed to payload and failure converters.
*
* The context describes the workflow or activity whose payload is being converted.
* For example:
* - `client.workflow.start()` uses the target workflow's context.
* - `executeChild()` uses the child workflow's context, not the parent's.
* - `scheduleActivity()` uses the scheduled activity's context.
*
* @experimental Serialization context is an experimental feature and may change.
*/
export type SerializationContext = WorkflowSerializationContext | ActivitySerializationContext;

/**
* Return a payload converter bound to `context` if the converter supports context binding.
*/
export function withPayloadConverterContext(
converter: PayloadConverter,
context: SerializationContext
): PayloadConverter {
return converter.withContext?.(context) ?? converter;
}

/**
* Return a failure converter bound to `context` if the converter supports context binding.
*/
export function withFailureConverterContext(
converter: FailureConverter,
context: SerializationContext
): FailureConverter {
return converter.withContext?.(context) ?? converter;
}

/**
* Return a loaded data converter with its payload and failure converters bound to `context`.
*
* Internal helper for non-workflow code paths. Workflow-isolate code should bind the individual
* payload or failure converter directly to avoid pulling unnecessary code into the workflow bundle.
*
* NOTE: this does *not* bind `context` to payload codecs
*/
// ts-prune-ignore-next
export function withSerializationContext(
converter: LoadedDataConverter,
context: SerializationContext
): LoadedDataConverter {
const payloadConverter = withPayloadConverterContext(converter.payloadConverter, context);
const failureConverter = withFailureConverterContext(converter.failureConverter, context);

if (payloadConverter === converter.payloadConverter && failureConverter === converter.failureConverter) {
return converter;
}

return {
payloadConverter,
failureConverter,
payloadCodecs: converter.payloadCodecs,
};
}
5 changes: 5 additions & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ export * from './converter/data-converter';
export * from './converter/failure-converter';
export * from './converter/payload-codec';
export * from './converter/payload-converter';
export type {
ActivitySerializationContext,
SerializationContext,
WorkflowSerializationContext,
} from './converter/serialization-context';
export * from './converter/types';
export * from './deprecated-time';
export * from './errors';
Expand Down
Loading
Loading