Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/step-bind-preserves-metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/core": patch
---

Preserve the `this` binding of bound step proxies across workflow serialization, so passing `useStep(...).bind(thisArg)` as a step argument no longer loses the receiver.
5 changes: 5 additions & 0 deletions .changeset/swc-arguments-not-closure.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/swc-plugin": patch
---

Fix `arguments` being incorrectly captured as a closure variable in nested `function`-form step bodies, which previously produced invalid output.
5 changes: 5 additions & 0 deletions .changeset/swc-lexical-this-capture.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/swc-plugin": patch
---

Support `this` references inside nested arrow `"use step"` functions. Requires the enclosing class to have custom serialization.
41 changes: 40 additions & 1 deletion packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2112,6 +2112,11 @@ describe('e2e', () => {
// 3. counter.multiply(3) -> 5 * 3 = 15
// 4. counter.describe('test counter') -> { label: 'test counter', value: 5 }
// 5. Create Counter(100), call counter2.add(50) -> 100 + 50 = 150
// 6. counter.makeAdder(7).add(2) -> 5 + 2 + 7 = 14 (lexical `this`,
// direct invocation — `bind(this)` carries `thisVal` to the queue)
// 7. invokeAdderFromStep(counter.makeAdder(7).add, 3) -> 5 + 3 + 7 = 15
// (lexical `this` round-tripped through step-arg serialization —
// the reducer captures `__boundThis`, the reviver re-binds)
const run = await start(await e2e('instanceMethodStepWorkflow'), [5]);
const returnValue = await run.returnValue;

Expand All @@ -2121,6 +2126,8 @@ describe('e2e', () => {
multiplied: 15, // 5 * 3
description: { label: 'test counter', value: 5 },
added2: 150, // 100 + 50
adderResult: 14, // 5 + 2 + 7 (lexical `this` capture)
adderViaStep: 15, // 5 + 3 + 7 (lexical `this` survives serialization)
});

// Verify the run completed successfully
Expand All @@ -2134,9 +2141,15 @@ describe('e2e', () => {
multiplied: 15,
description: { label: 'test counter', value: 5 },
added2: 150,
adderResult: 14,
adderViaStep: 15,
});

// Verify the steps were executed (should have 4 steps: add, multiply, describe, add)
// Verify the steps were executed:
// - 4 Counter instance method steps (add, multiply, describe, add)
// - 2 lexical-`this` arrow steps from `makeAdder` (direct + via-step)
// - 1 invokeAdderFromStep wrapper (which itself triggers another
// makeAdder arrow step inside it)
const { json: steps } = await cliInspectJson(
`steps --runId ${run.runId}`
);
Expand All @@ -2151,6 +2164,32 @@ describe('e2e', () => {
expect(counterSteps.every((s: any) => s.status === 'completed')).toBe(
true
);

// The lexical-`this` arrow step inside `Counter#makeAdder` is
// hoisted by the SWC plugin under an `_anonymousStep` name. It
// ran once as its own step (`adder.add(2)` invoked directly from
// the workflow). The second call (inside `invokeAdderFromStep`)
// executes inline because steps invoked from another step body
// run inline rather than queueing a new step — so we only see one
// `_anonymousStep` event in the log, even though the body executed
// twice. Asserting `=== 1` here pins down both:
// 1. the direct invocation actually creates a step (i.e. the
// `bind(this)` proxy still goes through `useStep`), and
// 2. the round-tripped proxy correctly runs inline rather than
// somehow re-queuing a duplicate step.
const adderArrowSteps = steps.filter((s: any) =>
s.stepName.includes('_anonymousStep')
);
expect(adderArrowSteps.length).toBe(1);
expect(adderArrowSteps[0].status).toBe('completed');

// The `invokeAdderFromStep` wrapper itself runs as its own step;
// its body invokes the round-tripped `add` proxy inline.
const invokeAdderSteps = steps.filter((s: any) =>
s.stepName.includes('invokeAdderFromStep')
);
expect(invokeAdderSteps.length).toBe(1);
expect(invokeAdderSteps[0].status).toBe('completed');
}
);

Expand Down
142 changes: 142 additions & 0 deletions packages/core/src/serialization.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2299,6 +2299,148 @@ describe('step function serialization', () => {
expect(result).toBe('Result: 21');
});

it('should dehydrate and hydrate step function with closureVars + boundThis combined', async () => {
// The end-to-end shape exercised by the SWC plugin's lexical-`this`
// capture: a nested arrow step closes over both lexical `this` AND a
// surrounding closure variable. After serialization, the step-bundle
// reviver must run the registered body inside the closure-vars
// AsyncLocalStorage frame *and* invoke it with `apply(boundThis,
// args)`.
const stepName = 'step//workflows/test.ts//addToInstance';

const { __private_getClosureVars } = await import(
'./step/get-closure-vars.js'
);
const { contextStorage } = await import('./step/context-storage.js');

const stepFn = async function (this: { value: number }, amount: number) {
const { delta } = __private_getClosureVars() as { delta: number };
return this.value + amount + delta;
};
registerStepFunction(stepName, stepFn);

Object.defineProperty(stepFn, 'stepId', {
value: stepName,
writable: false,
enumerable: false,
configurable: false,
});
Object.defineProperty(stepFn, '__closureVarsFn', {
value: () => ({ delta: 7 }),
writable: false,
enumerable: false,
configurable: false,
});
// Simulate the `__boundThis` marker that the step proxy's overridden
// `.bind` (in step.ts) would attach. Plain object instead of a real
// class instance so the test focuses on the reducer/reviver plumbing.
const instance = { value: 5 };
Object.defineProperty(stepFn, '__boundThis', {
value: instance,
writable: false,
enumerable: false,
configurable: false,
});

// Round-trip through the workflow→step serialization pipeline.
const dehydrated = await dehydrateStepArguments(
[stepFn, 3],
mockRunId,
noEncryptionKey,
globalThis
);
const hydrated = (await hydrateStepArguments(
dehydrated,
'test-run-123',
noEncryptionKey,
[]
)) as [(amount: number) => Promise<number>, number];

expect(typeof hydrated[0]).toBe('function');
expect(hydrated[1]).toBe(3);

// Invoke the rehydrated step function inside a step-context frame
// (otherwise the closure-vars wrapper throws). The wrapper must
// bind `this` to `instance` *and* expose `delta = 7` via
// `__private_getClosureVars()`.
const result = await contextStorage.run(
{
stepMetadata: {
stepName,
stepId: 'test-step',
stepStartedAt: new Date(),
attempt: 1,
},
workflowMetadata: {
workflowName: 'workflow//workflows/test.ts//testWorkflow',
workflowRunId: 'test-run',
workflowStartedAt: new Date(),
url: 'http://localhost:3000',
features: { encryption: false },
},
ops: [],
},
() => hydrated[0](3)
);

// value(5) + amount(3) + delta(7) = 15
expect(result).toBe(15);
});

it('should preserve `boundArgs` (partial application) across serialization', async () => {
// The step proxy's overridden `.bind` also stashes prefilled args
// (`useStep(...).bind(thisArg, x)`) so partial application survives
// the round trip. The SWC plugin only ever emits `.bind(this)` today,
// so this codifies the safety net for future hand-written callers.
const stepName = 'step//workflows/test.ts//partialApply';

const stepFn = async function (
this: { tag: string },
prefilled: number,
runtimeArg: number
) {
return `${this.tag}:${prefilled}+${runtimeArg}`;
};
registerStepFunction(stepName, stepFn);

Object.defineProperty(stepFn, 'stepId', {
value: stepName,
writable: false,
enumerable: false,
configurable: false,
});
Object.defineProperty(stepFn, '__boundThis', {
value: { tag: 'bound' },
writable: false,
enumerable: false,
configurable: false,
});
Object.defineProperty(stepFn, '__boundArgs', {
value: [10],
writable: false,
enumerable: false,
configurable: false,
});

const dehydrated = await dehydrateStepArguments(
[stepFn],
mockRunId,
noEncryptionKey,
globalThis
);
const hydrated = (await hydrateStepArguments(
dehydrated,
'test-run-123',
noEncryptionKey,
[]
)) as [(runtimeArg: number) => Promise<string>];

// The hydrated proxy should already have `prefilled = 10` baked in,
// so the caller only supplies `runtimeArg`.
const result = await hydrated[0](32);
expect(result).toBe('bound:10+32');
});

it('should serialize step function to object through reducer', () => {
const stepName = 'step//workflows/test.ts//anotherStep';
const stepFn = async () => 'result';
Expand Down
75 changes: 47 additions & 28 deletions packages/core/src/serialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1001,10 +1001,29 @@ function getStepRevivers(
...getCommonRevivers(global),

// StepFunction reviver for step context - returns raw step function
// with closure variable support via AsyncLocalStorage
// with closure variable support via AsyncLocalStorage.
//
// Handles four independent flags from the serialized payload:
// - `closureVars`: invoke the body inside an AsyncLocalStorage frame
// so the SWC-emitted `WORKFLOW_STEP_CONTEXT_STORAGE` IIFE in the
// hoisted body can pull the closure variables back out.
// - `boundThis`: a `this` value captured by
// `useStep(...).bind(this)` in the workflow bundle (lexical-`this`
// arrow steps). The wrapper invokes the body via
// `stepFn.apply(boundThis, args)` so the body sees the same
// `this` it would have had in the workflow bundle. Property
// presence — not truthiness — is significant because
// `bind(null)` and `bind(undefined)` are both legal and should
// round-trip faithfully.
// - `boundArgs`: prefilled args from
// `useStep(...).bind(thisArg, x, y)`. Prepended to the call args
// so partial application survives serialization.
StepFunction: (value) => {
const stepId = value.stepId;
const closureVars = value.closureVars;
const hasBoundThis = 'boundThis' in value;
const boundThis = hasBoundThis ? value.boundThis : undefined;
const boundArgs = Array.isArray(value.boundArgs) ? value.boundArgs : [];

const stepFn = getStepFunction(stepId);
if (!stepFn) {
Expand All @@ -1016,47 +1035,47 @@ function getStepRevivers(
);
}

// If closure variables were serialized, return a wrapper function
// that sets up AsyncLocalStorage context when invoked
if (closureVars) {
const wrappedStepFn = ((...args: any[]) => {
// Get the current context from AsyncLocalStorage
const currentContext = contextStorage.getStore();
// Fast path: nothing to wrap.
if (!closureVars && !hasBoundThis && boundArgs.length === 0) {
return stepFn;
}

const wrappedStepFn = function (this: unknown, ...args: any[]) {
const callThis = hasBoundThis ? boundThis : this;
const callArgs = boundArgs.length > 0 ? [...boundArgs, ...args] : args;
if (closureVars) {
const currentContext = contextStorage.getStore();
if (!currentContext) {
throw new WorkflowRuntimeError(
'Cannot call step function with closure variables outside step context'
);
}

// Create a new context with the closure variables merged in
const newContext = {
...currentContext,
closureVars,
};

// Run the step function with the new context that includes closure vars
return contextStorage.run(newContext, () => stepFn(...args));
}) as any;

// Copy properties from original step function
Object.defineProperty(wrappedStepFn, 'name', {
value: stepFn.name,
});
Object.defineProperty(wrappedStepFn, 'stepId', {
value: stepId,
writable: false,
enumerable: false,
configurable: false,
});
if (stepFn.maxRetries !== undefined) {
wrappedStepFn.maxRetries = stepFn.maxRetries;
return contextStorage.run(newContext, () =>
stepFn.apply(callThis, callArgs)
);
}
return stepFn.apply(callThis, callArgs);
} as any;

return wrappedStepFn;
// Copy properties from original step function
Object.defineProperty(wrappedStepFn, 'name', {
value: stepFn.name,
});
Object.defineProperty(wrappedStepFn, 'stepId', {
value: stepId,
writable: false,
enumerable: false,
configurable: false,
});
if (stepFn.maxRetries !== undefined) {
wrappedStepFn.maxRetries = stepFn.maxRetries;
}

return stepFn;
return wrappedStepFn;
},

WorkflowFunction: (value) =>
Expand Down
Loading
Loading