From 92393c1f8897c93adec6a714e2a55969b366696b Mon Sep 17 00:00:00 2001 From: Gareth Jones Date: Sun, 1 Mar 2026 16:38:51 -0800 Subject: [PATCH 1/2] feat(pipeline): add multi-step agent pipeline execution Add pipeline feature for chaining agent calls where output flows between steps automatically via {{steps..output}} interpolation. - YAML schema with sequential and parallel step execution - Parse, validate, interpolate, and run modules with full test coverage - CLI: `poe-code pipeline run ` and `poe-code pipeline validate` - SDK: `pipeline.run()` for programmatic use - 46 unit tests covering parsing, validation, interpolation, and execution --- src/cli/commands/pipeline.ts | 107 ++++++++++ src/cli/program.ts | 2 + src/index.ts | 8 + src/sdk/pipeline/index.ts | 29 +++ src/sdk/pipeline/interpolate.test.ts | 97 +++++++++ src/sdk/pipeline/interpolate.ts | 28 +++ src/sdk/pipeline/parse.test.ts | 214 +++++++++++++++++++ src/sdk/pipeline/parse.ts | 183 ++++++++++++++++ src/sdk/pipeline/run.test.ts | 306 +++++++++++++++++++++++++++ src/sdk/pipeline/run.ts | 181 ++++++++++++++++ src/sdk/pipeline/types.ts | 54 +++++ src/sdk/pipeline/validate.test.ts | 201 ++++++++++++++++++ src/sdk/pipeline/validate.ts | 87 ++++++++ 13 files changed, 1497 insertions(+) create mode 100644 src/cli/commands/pipeline.ts create mode 100644 src/sdk/pipeline/index.ts create mode 100644 src/sdk/pipeline/interpolate.test.ts create mode 100644 src/sdk/pipeline/interpolate.ts create mode 100644 src/sdk/pipeline/parse.test.ts create mode 100644 src/sdk/pipeline/parse.ts create mode 100644 src/sdk/pipeline/run.test.ts create mode 100644 src/sdk/pipeline/run.ts create mode 100644 src/sdk/pipeline/types.ts create mode 100644 src/sdk/pipeline/validate.test.ts create mode 100644 src/sdk/pipeline/validate.ts diff --git a/src/cli/commands/pipeline.ts b/src/cli/commands/pipeline.ts new file mode 100644 index 00000000..07d6a59b --- /dev/null +++ b/src/cli/commands/pipeline.ts @@ -0,0 +1,107 @@ +import path from "node:path"; +import { readFile } from "node:fs/promises"; +import type { Command } from "commander"; +import type { CliContainer } from "../container.js"; +import { + resolveCommandFlags +} from "./shared.js"; +import { parsePipeline } from "../../sdk/pipeline/parse.js"; +import { validatePipeline } from "../../sdk/pipeline/validate.js"; +import { runPipeline } from "../../sdk/pipeline/run.js"; +import { isParallelGroup } from "../../sdk/pipeline/types.js"; +import type { PipelineDefinition } from "../../sdk/pipeline/types.js"; + +export function registerPipelineCommand( + program: Command, + _container: CliContainer +): void { + const pipelineCmd = program + .command("pipeline") + .description("Run multi-step agent pipelines."); + + pipelineCmd + .command("run") + .description("Run a pipeline from a YAML file.") + .argument("", "Path to pipeline YAML file") + .option("-C, --cwd ", "Override working directory for all steps") + .action(async function (this: Command, file: string) { + const flags = resolveCommandFlags(program); + const commandOptions = this.opts<{ cwd?: string }>(); + + const filePath = path.resolve(file); + const yamlContent = await readFile(filePath, "utf8"); + const pipeline = parsePipeline(yamlContent); + validatePipeline(pipeline); + + const cwd = commandOptions.cwd + ? path.resolve(commandOptions.cwd) + : process.cwd(); + + if (flags.dryRun) { + renderDryRun(pipeline); + return; + } + + const result = await runPipeline(pipeline, { cwd }); + + if (result.summary.success) { + const duration = formatDuration(result.summary.totalDuration); + console.log( + `\nPipeline completed: ${result.summary.completedSteps} steps (${duration})` + ); + } else { + const duration = formatDuration(result.summary.totalDuration); + console.error( + `\nPipeline aborted (${result.summary.completedSteps}/${result.summary.totalSteps} steps completed, ${duration})` + ); + process.exitCode = 1; + } + }); + + pipelineCmd + .command("validate") + .description("Validate a pipeline YAML file without running it.") + .argument("", "Path to pipeline YAML file") + .action(async function (_this: Command, file: string) { + const filePath = path.resolve(file); + const yamlContent = await readFile(filePath, "utf8"); + const pipeline = parsePipeline(yamlContent); + validatePipeline(pipeline); + console.log(`Pipeline "${pipeline.name}" is valid.`); + }); +} + +function renderDryRun(pipeline: PipelineDefinition): void { + console.log(`Pipeline: ${pipeline.name}`); + if (pipeline.description) { + console.log(` ${pipeline.description}`); + } + console.log(""); + + let stepIndex = 1; + for (const entry of pipeline.steps) { + if (isParallelGroup(entry)) { + const names = entry.parallel + .map((s) => { + const agent = s.agent ?? pipeline.defaults?.agent ?? "?"; + const mode = s.mode ?? pipeline.defaults?.mode ?? "yolo"; + return `${s.name} (${agent} · ${mode})`; + }) + .join(" + "); + console.log(` ${stepIndex}. [parallel] ${names}`); + stepIndex += entry.parallel.length; + } else { + const agent = entry.agent ?? pipeline.defaults?.agent ?? "?"; + const mode = entry.mode ?? pipeline.defaults?.mode ?? "yolo"; + console.log(` ${stepIndex}. ${entry.name} (${agent} · ${mode})`); + stepIndex += 1; + } + } +} + +function formatDuration(ms: number): string { + if (ms < 1000) { + return `${ms}ms`; + } + return `${(ms / 1000).toFixed(1)}s`; +} diff --git a/src/cli/program.ts b/src/cli/program.ts index 7cd494b8..1511ff2d 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -23,6 +23,7 @@ import { registerVersionOption } from "./commands/version.js"; import { registerRalphCommand } from "./commands/ralph.js"; import { registerUsageCommand } from "./commands/usage.js"; import { registerModelsCommand } from "./commands/models.js"; +import { registerPipelineCommand } from "./commands/pipeline.js"; import packageJson from "../../package.json" with { type: "json" }; import { throwCommandNotFound } from "./command-not-found.js"; import { @@ -320,6 +321,7 @@ function bootstrapProgram(container: CliContainer): Command { registerRalphCommand(program, container); registerUsageCommand(program, container); registerModelsCommand(program, container); + registerPipelineCommand(program, container); program.allowExcessArguments().action(function (this: Command) { const args = this.args; diff --git a/src/index.ts b/src/index.ts index 223695f5..8eaceddd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -14,6 +14,14 @@ export type { GenerateResult, MediaGenerateResult } from "./sdk/types.js"; +export { pipeline } from "./sdk/pipeline/index.js"; +export type { + PipelineDefinition, + PipelineStep, + PipelineResult, + PipelineStepResult, + PipelineSummary +} from "./sdk/pipeline/types.js"; async function main(): Promise { const [{ createProgram }, { createCliMain }] = await Promise.all([ diff --git a/src/sdk/pipeline/index.ts b/src/sdk/pipeline/index.ts new file mode 100644 index 00000000..9529277e --- /dev/null +++ b/src/sdk/pipeline/index.ts @@ -0,0 +1,29 @@ +export { parsePipeline } from "./parse.js"; +export { validatePipeline } from "./validate.js"; +export { interpolate } from "./interpolate.js"; +export { runPipeline, type RunPipelineOptions } from "./run.js"; +export type { + PipelineDefinition, + PipelineDefaults, + PipelineStep, + PipelineStepEntry, + PipelineParallelGroup, + PipelineStepResult, + PipelineResult, + PipelineSummary +} from "./types.js"; +export { isParallelGroup } from "./types.js"; + +import { runPipeline } from "./run.js"; +import type { PipelineDefinition, PipelineResult } from "./types.js"; + +export const pipeline = { + async run( + definition: PipelineDefinition, + options?: { cwd?: string } + ): Promise { + return runPipeline(definition, { + cwd: options?.cwd ?? process.cwd() + }); + } +}; diff --git a/src/sdk/pipeline/interpolate.test.ts b/src/sdk/pipeline/interpolate.test.ts new file mode 100644 index 00000000..8fc22755 --- /dev/null +++ b/src/sdk/pipeline/interpolate.test.ts @@ -0,0 +1,97 @@ +import { describe, it, expect } from "vitest"; +import { interpolate } from "./interpolate.js"; +import type { PipelineStepResult } from "./types.js"; + +describe("interpolate", () => { + it("replaces step output references", () => { + const steps: Record = { + research: { output: "Found 3 bugs", exitCode: 0, duration: 5000 } + }; + const result = interpolate( + "Fix these: {{steps.research.output}}", + steps, + { name: "test", cwd: "/project" } + ); + expect(result).toBe("Fix these: Found 3 bugs"); + }); + + it("replaces step exitCode references", () => { + const steps: Record = { + build: { output: "", exitCode: 1, duration: 3000 } + }; + const result = interpolate( + "Build exited with {{steps.build.exitCode}}", + steps, + { name: "test", cwd: "/project" } + ); + expect(result).toBe("Build exited with 1"); + }); + + it("replaces pipeline.name", () => { + const result = interpolate( + "Running {{pipeline.name}}", + {}, + { name: "my-pipeline", cwd: "/project" } + ); + expect(result).toBe("Running my-pipeline"); + }); + + it("replaces pipeline.cwd", () => { + const result = interpolate( + "Working in {{pipeline.cwd}}", + {}, + { name: "test", cwd: "/my/project" } + ); + expect(result).toBe("Working in /my/project"); + }); + + it("replaces multiple references in one prompt", () => { + const steps: Record = { + a: { output: "output-a", exitCode: 0, duration: 1000 }, + b: { output: "output-b", exitCode: 0, duration: 2000 } + }; + const result = interpolate( + "A: {{steps.a.output}}, B: {{steps.b.output}}, Pipeline: {{pipeline.name}}", + steps, + { name: "multi", cwd: "/project" } + ); + expect(result).toBe("A: output-a, B: output-b, Pipeline: multi"); + }); + + it("returns prompt unchanged when no references", () => { + const result = interpolate( + "Just a prompt with no refs", + {}, + { name: "test", cwd: "/project" } + ); + expect(result).toBe("Just a prompt with no refs"); + }); + + it("handles multiline output in references", () => { + const steps: Record = { + research: { + output: "Line 1\nLine 2\nLine 3", + exitCode: 0, + duration: 5000 + } + }; + const result = interpolate( + "Results:\n{{steps.research.output}}", + steps, + { name: "test", cwd: "/project" } + ); + expect(result).toBe("Results:\nLine 1\nLine 2\nLine 3"); + }); + + it("handles duplicate references to same step", () => { + const steps: Record = { + step1: { output: "hello", exitCode: 0, duration: 1000 } + }; + const result = interpolate( + "{{steps.step1.output}} and again {{steps.step1.output}}", + steps, + { name: "test", cwd: "/project" } + ); + expect(result).toBe("hello and again hello"); + }); +}); diff --git a/src/sdk/pipeline/interpolate.ts b/src/sdk/pipeline/interpolate.ts new file mode 100644 index 00000000..7a3e381b --- /dev/null +++ b/src/sdk/pipeline/interpolate.ts @@ -0,0 +1,28 @@ +import type { PipelineStepResult } from "./types.js"; + +export interface InterpolationContext { + name: string; + cwd: string; +} + +export function interpolate( + prompt: string, + completedSteps: Record, + pipeline: InterpolationContext +): string { + return prompt.replace( + /\{\{(steps\.([^.}]+)\.(output|exitCode)|pipeline\.(name|cwd))\}\}/g, + (_match, _full, stepName, stepField, pipelineField) => { + if (pipelineField !== undefined) { + return String(pipeline[pipelineField as keyof InterpolationContext]); + } + + const stepResult = completedSteps[stepName as string]; + if (!stepResult) { + return _match; + } + + return String(stepResult[stepField as keyof PipelineStepResult]); + } + ); +} diff --git a/src/sdk/pipeline/parse.test.ts b/src/sdk/pipeline/parse.test.ts new file mode 100644 index 00000000..7ebcca77 --- /dev/null +++ b/src/sdk/pipeline/parse.test.ts @@ -0,0 +1,214 @@ +import { describe, it, expect } from "vitest"; +import { parsePipeline } from "./parse.js"; + +describe("parsePipeline", () => { + it("parses a minimal pipeline", () => { + const yaml = ` +name: quick-review +steps: + - name: review + agent: claude-code + prompt: Review this code +`; + const result = parsePipeline(yaml); + expect(result).toEqual({ + name: "quick-review", + steps: [ + { name: "review", agent: "claude-code", prompt: "Review this code" } + ] + }); + }); + + it("parses a pipeline with defaults", () => { + const yaml = ` +name: test-pipeline +description: A test pipeline +defaults: + agent: claude-code + mode: read + model: sonnet +steps: + - name: step1 + prompt: Do something +`; + const result = parsePipeline(yaml); + expect(result).toEqual({ + name: "test-pipeline", + description: "A test pipeline", + defaults: { agent: "claude-code", mode: "read", model: "sonnet" }, + steps: [{ name: "step1", prompt: "Do something" }] + }); + }); + + it("parses parallel groups", () => { + const yaml = ` +name: parallel-test +steps: + - name: first + agent: claude-code + prompt: First step + - parallel: + - name: a + agent: codex + prompt: Step A + - name: b + agent: claude-code + prompt: Step B + - name: last + agent: claude-code + prompt: Last step +`; + const result = parsePipeline(yaml); + expect(result.steps).toHaveLength(3); + expect(result.steps[1]).toEqual({ + parallel: [ + { name: "a", agent: "codex", prompt: "Step A" }, + { name: "b", agent: "claude-code", prompt: "Step B" } + ] + }); + }); + + it("parses steps with all optional fields", () => { + const yaml = ` +name: full +steps: + - name: step1 + agent: codex + prompt: Do it + mode: edit + model: o3-pro + args: + - --flag + - value + cwd: /tmp/project +`; + const result = parsePipeline(yaml); + expect(result.steps[0]).toEqual({ + name: "step1", + agent: "codex", + prompt: "Do it", + mode: "edit", + model: "o3-pro", + args: ["--flag", "value"], + cwd: "/tmp/project" + }); + }); + + it("throws on missing name", () => { + const yaml = ` +steps: + - name: step1 + agent: claude-code + prompt: Do it +`; + expect(() => parsePipeline(yaml)).toThrow("name"); + }); + + it("throws on missing steps", () => { + const yaml = ` +name: no-steps +`; + expect(() => parsePipeline(yaml)).toThrow("steps"); + }); + + it("throws on empty steps", () => { + const yaml = ` +name: empty +steps: [] +`; + expect(() => parsePipeline(yaml)).toThrow("steps"); + }); + + it("throws on step missing name", () => { + const yaml = ` +name: test +steps: + - agent: claude-code + prompt: Do it +`; + expect(() => parsePipeline(yaml)).toThrow("name"); + }); + + it("throws on step missing prompt", () => { + const yaml = ` +name: test +steps: + - name: step1 + agent: claude-code +`; + expect(() => parsePipeline(yaml)).toThrow("prompt"); + }); + + it("throws on duplicate step names", () => { + const yaml = ` +name: test +steps: + - name: step1 + agent: claude-code + prompt: First + - name: step1 + agent: claude-code + prompt: Second +`; + expect(() => parsePipeline(yaml)).toThrow('Duplicate step name "step1"'); + }); + + it("throws on duplicate names across parallel groups", () => { + const yaml = ` +name: test +steps: + - name: step1 + agent: claude-code + prompt: First + - parallel: + - name: step1 + agent: codex + prompt: Second + - name: step2 + agent: claude-code + prompt: Third +`; + expect(() => parsePipeline(yaml)).toThrow('Duplicate step name "step1"'); + }); + + it("throws on parallel group with fewer than 2 steps", () => { + const yaml = ` +name: test +steps: + - parallel: + - name: only + agent: claude-code + prompt: Alone +`; + expect(() => parsePipeline(yaml)).toThrow("at least 2 steps"); + }); + + it("throws on invalid YAML", () => { + expect(() => parsePipeline(":::invalid")).toThrow(); + }); + + it("throws on invalid mode value", () => { + const yaml = ` +name: test +steps: + - name: step1 + agent: claude-code + prompt: Do it + mode: invalid +`; + expect(() => parsePipeline(yaml)).toThrow("mode"); + }); + + it("throws on invalid defaults mode", () => { + const yaml = ` +name: test +defaults: + mode: invalid +steps: + - name: step1 + agent: claude-code + prompt: Do it +`; + expect(() => parsePipeline(yaml)).toThrow("mode"); + }); +}); diff --git a/src/sdk/pipeline/parse.ts b/src/sdk/pipeline/parse.ts new file mode 100644 index 00000000..571c4829 --- /dev/null +++ b/src/sdk/pipeline/parse.ts @@ -0,0 +1,183 @@ +import { parse as parseYaml } from "yaml"; +import type { + PipelineDefinition, + PipelineDefaults, + PipelineStep, + PipelineStepEntry +} from "./types.js"; + +const VALID_MODES = new Set(["yolo", "edit", "read"]); + +export function parsePipeline(yamlContent: string): PipelineDefinition { + const raw = parseYaml(yamlContent) as Record; + + if (!raw || typeof raw !== "object") { + throw new Error("Pipeline YAML must be a mapping"); + } + + const name = requireString(raw, "name", "Pipeline"); + const description = optionalString(raw, "description"); + const defaults = parseDefaults(raw.defaults); + const steps = parseSteps(raw.steps); + + return { + name, + ...(description !== undefined ? { description } : {}), + ...(defaults !== undefined ? { defaults } : {}), + steps + }; +} + +function parseDefaults(raw: unknown): PipelineDefaults | undefined { + if (raw === undefined || raw === null) { + return undefined; + } + + if (typeof raw !== "object" || Array.isArray(raw)) { + throw new Error("Pipeline defaults must be a mapping"); + } + + const obj = raw as Record; + const agent = optionalString(obj, "agent"); + const mode = optionalString(obj, "mode"); + const model = optionalString(obj, "model"); + + if (mode !== undefined && !VALID_MODES.has(mode)) { + throw new Error( + `Invalid defaults mode "${mode}". Must be one of: ${[...VALID_MODES].join(", ")}` + ); + } + + const result: PipelineDefaults = {}; + if (agent !== undefined) result.agent = agent; + if (mode !== undefined) result.mode = mode as PipelineDefaults["mode"]; + if (model !== undefined) result.model = model; + + return Object.keys(result).length > 0 ? result : undefined; +} + +function parseSteps(raw: unknown): PipelineStepEntry[] { + if (!Array.isArray(raw)) { + throw new Error("Pipeline steps must be an array"); + } + + if (raw.length === 0) { + throw new Error("Pipeline steps must contain at least one step"); + } + + const seenNames = new Set(); + const entries: PipelineStepEntry[] = []; + + for (const item of raw) { + if (typeof item !== "object" || item === null) { + throw new Error("Each step must be a mapping"); + } + + const obj = item as Record; + + if ("parallel" in obj) { + const group = parseParallelGroup(obj.parallel, seenNames); + entries.push(group); + } else { + const step = parseStep(obj, seenNames); + entries.push(step); + } + } + + return entries; +} + +function parseParallelGroup( + raw: unknown, + seenNames: Set +): { parallel: PipelineStep[] } { + if (!Array.isArray(raw)) { + throw new Error("Parallel group must be an array"); + } + + if (raw.length < 2) { + throw new Error("Parallel group must contain at least 2 steps"); + } + + const steps: PipelineStep[] = []; + for (const item of raw) { + if (typeof item !== "object" || item === null) { + throw new Error("Each parallel step must be a mapping"); + } + steps.push(parseStep(item as Record, seenNames)); + } + + return { parallel: steps }; +} + +function parseStep( + obj: Record, + seenNames: Set +): PipelineStep { + const name = requireString(obj, "name", "Step"); + const prompt = requireString(obj, "prompt", `Step "${name}"`); + + if (seenNames.has(name)) { + throw new Error(`Duplicate step name "${name}"`); + } + seenNames.add(name); + + const agent = optionalString(obj, "agent"); + const mode = optionalString(obj, "mode"); + const model = optionalString(obj, "model"); + const cwd = optionalString(obj, "cwd"); + const args = parseArgs(obj.args); + + if (mode !== undefined && !VALID_MODES.has(mode)) { + throw new Error( + `Invalid mode "${mode}" on step "${name}". Must be one of: ${[...VALID_MODES].join(", ")}` + ); + } + + const step: PipelineStep = { name, prompt }; + if (agent !== undefined) step.agent = agent; + if (mode !== undefined) step.mode = mode as PipelineStep["mode"]; + if (model !== undefined) step.model = model; + if (args !== undefined) step.args = args; + if (cwd !== undefined) step.cwd = cwd; + + return step; +} + +function parseArgs(raw: unknown): string[] | undefined { + if (raw === undefined || raw === null) { + return undefined; + } + + if (!Array.isArray(raw)) { + throw new Error("Step args must be an array"); + } + + return raw.map((item) => String(item)); +} + +function requireString( + obj: Record, + field: string, + context: string +): string { + const value = obj[field]; + if (typeof value !== "string" || value.trim().length === 0) { + throw new Error(`${context} requires a non-empty "${field}" field`); + } + return value; +} + +function optionalString( + obj: Record, + field: string +): string | undefined { + const value = obj[field]; + if (value === undefined || value === null) { + return undefined; + } + if (typeof value !== "string") { + throw new Error(`"${field}" must be a string`); + } + return value; +} diff --git a/src/sdk/pipeline/run.test.ts b/src/sdk/pipeline/run.test.ts new file mode 100644 index 00000000..cd1b55e1 --- /dev/null +++ b/src/sdk/pipeline/run.test.ts @@ -0,0 +1,306 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import type { PipelineDefinition } from "./types.js"; + +vi.mock("../spawn.js", () => ({ + spawn: vi.fn() +})); + +vi.mock("@poe-code/agent-spawn", () => ({ + renderAcpStream: vi.fn(async (events: AsyncIterable) => { + for await (const ignoredEvent of events) { + // drain + } + }) +})); + +import { runPipeline } from "./run.js"; +import { spawn } from "../spawn.js"; +import { renderAcpStream } from "@poe-code/agent-spawn"; + +function emptyAsyncIterable(): AsyncIterable { + return (async function* () {})(); +} + +function mockSpawnSuccess(output: string) { + return { + events: (async function* () { + yield { event: "agent_message", text: output }; + })(), + result: Promise.resolve({ + stdout: output, + stderr: "", + exitCode: 0 + }) + }; +} + +function mockSpawnFailure(exitCode: number, stderr = "") { + return { + events: emptyAsyncIterable(), + result: Promise.resolve({ + stdout: "", + stderr, + exitCode + }) + }; +} + +beforeEach(() => { + vi.mocked(spawn).mockReset(); + vi.mocked(renderAcpStream).mockReset(); + vi.mocked(renderAcpStream).mockImplementation(async (events) => { + for await (const ignoredEvent of events) { + // drain + } + }); +}); + +describe("runPipeline", () => { + it("runs a single sequential step", async () => { + vi.mocked(spawn).mockReturnValueOnce(mockSpawnSuccess("review output")); + + const pipeline: PipelineDefinition = { + name: "test", + steps: [ + { name: "review", agent: "claude-code", prompt: "Review this code" } + ] + }; + + const result = await runPipeline(pipeline, { cwd: "/project" }); + + expect(result.steps.review.output).toBe("review output"); + expect(result.steps.review.exitCode).toBe(0); + expect(result.summary.totalSteps).toBe(1); + expect(result.summary.completedSteps).toBe(1); + expect(result.summary.success).toBe(true); + expect(spawn).toHaveBeenCalledWith("claude-code", { + prompt: "Review this code", + cwd: "/project", + mode: "yolo" + }); + }); + + it("runs sequential steps with interpolation", async () => { + vi.mocked(spawn) + .mockReturnValueOnce(mockSpawnSuccess("Found 3 bugs")) + .mockReturnValueOnce(mockSpawnSuccess("Fixed 3 bugs")); + + const pipeline: PipelineDefinition = { + name: "fix-flow", + steps: [ + { name: "research", agent: "claude-code", prompt: "Find bugs" }, + { + name: "fix", + agent: "codex", + prompt: "Fix: {{steps.research.output}}" + } + ] + }; + + const result = await runPipeline(pipeline, { cwd: "/project" }); + + expect(result.steps.research.output).toBe("Found 3 bugs"); + expect(result.steps.fix.output).toBe("Fixed 3 bugs"); + expect(result.summary.completedSteps).toBe(2); + expect(spawn).toHaveBeenCalledTimes(2); + expect(spawn).toHaveBeenNthCalledWith(2, "codex", { + prompt: "Fix: Found 3 bugs", + cwd: "/project", + mode: "yolo" + }); + }); + + it("aborts on step failure", async () => { + vi.mocked(spawn) + .mockReturnValueOnce(mockSpawnSuccess("ok")) + .mockReturnValueOnce(mockSpawnFailure(1, "rate limit exceeded")); + + const pipeline: PipelineDefinition = { + name: "test", + steps: [ + { name: "step1", agent: "claude-code", prompt: "First" }, + { name: "step2", agent: "codex", prompt: "Second" }, + { name: "step3", agent: "claude-code", prompt: "Third" } + ] + }; + + const result = await runPipeline(pipeline, { cwd: "/project" }); + + expect(result.summary.success).toBe(false); + expect(result.summary.completedSteps).toBe(1); + expect(result.summary.totalSteps).toBe(3); + expect(result.steps.step1.exitCode).toBe(0); + expect(result.steps.step2.exitCode).toBe(1); + expect(result.steps.step3).toBeUndefined(); + expect(spawn).toHaveBeenCalledTimes(2); + }); + + it("runs parallel steps concurrently", async () => { + vi.mocked(spawn) + .mockReturnValueOnce(mockSpawnSuccess("research output")) + .mockReturnValueOnce(mockSpawnSuccess("fix-a output")) + .mockReturnValueOnce(mockSpawnSuccess("fix-b output")); + + const pipeline: PipelineDefinition = { + name: "parallel-test", + steps: [ + { name: "research", agent: "claude-code", prompt: "Research" }, + { + parallel: [ + { + name: "fix-a", + agent: "codex", + prompt: "Fix A: {{steps.research.output}}" + }, + { + name: "fix-b", + agent: "claude-code", + prompt: "Fix B: {{steps.research.output}}" + } + ] + } + ] + }; + + const result = await runPipeline(pipeline, { cwd: "/project" }); + + expect(result.steps.research.output).toBe("research output"); + expect(result.steps["fix-a"].output).toBe("fix-a output"); + expect(result.steps["fix-b"].output).toBe("fix-b output"); + expect(result.summary.completedSteps).toBe(3); + expect(result.summary.success).toBe(true); + }); + + it("uses pipeline defaults for agent and mode", () => { + vi.mocked(spawn).mockReturnValueOnce(mockSpawnSuccess("output")); + + const pipeline: PipelineDefinition = { + name: "test", + defaults: { agent: "claude-code", mode: "read", model: "sonnet" }, + steps: [{ name: "step1", prompt: "Do it" }] + }; + + runPipeline(pipeline, { cwd: "/project" }); + + expect(spawn).toHaveBeenCalledWith("claude-code", { + prompt: "Do it", + cwd: "/project", + mode: "read", + model: "sonnet" + }); + }); + + it("step overrides pipeline defaults", () => { + vi.mocked(spawn).mockReturnValueOnce(mockSpawnSuccess("output")); + + const pipeline: PipelineDefinition = { + name: "test", + defaults: { agent: "claude-code", mode: "read" }, + steps: [ + { + name: "step1", + agent: "codex", + mode: "edit", + model: "o3-pro", + prompt: "Do it" + } + ] + }; + + runPipeline(pipeline, { cwd: "/project" }); + + expect(spawn).toHaveBeenCalledWith("codex", { + prompt: "Do it", + cwd: "/project", + mode: "edit", + model: "o3-pro" + }); + }); + + it("forwards step args", () => { + vi.mocked(spawn).mockReturnValueOnce(mockSpawnSuccess("output")); + + const pipeline: PipelineDefinition = { + name: "test", + steps: [ + { + name: "step1", + agent: "claude-code", + prompt: "Do it", + args: ["--flag", "value"] + } + ] + }; + + runPipeline(pipeline, { cwd: "/project" }); + + expect(spawn).toHaveBeenCalledWith("claude-code", { + prompt: "Do it", + cwd: "/project", + mode: "yolo", + args: ["--flag", "value"] + }); + }); + + it("uses step cwd when specified", () => { + vi.mocked(spawn).mockReturnValueOnce(mockSpawnSuccess("output")); + + const pipeline: PipelineDefinition = { + name: "test", + steps: [ + { + name: "step1", + agent: "claude-code", + prompt: "Do it", + cwd: "/other/dir" + } + ] + }; + + runPipeline(pipeline, { cwd: "/project" }); + + expect(spawn).toHaveBeenCalledWith("claude-code", { + prompt: "Do it", + cwd: "/other/dir", + mode: "yolo" + }); + }); + + it("aborts parallel group on any failure", async () => { + vi.mocked(spawn) + .mockReturnValueOnce(mockSpawnSuccess("ok")) + .mockReturnValueOnce(mockSpawnFailure(1, "error")) + .mockReturnValueOnce(mockSpawnSuccess("ok too")); + + const pipeline: PipelineDefinition = { + name: "test", + steps: [ + { + parallel: [ + { name: "a", agent: "codex", prompt: "A" }, + { name: "b", agent: "claude-code", prompt: "B" } + ] + }, + { name: "after", agent: "claude-code", prompt: "After" } + ] + }; + + const result = await runPipeline(pipeline, { cwd: "/project" }); + + expect(result.summary.success).toBe(false); + expect(result.steps.after).toBeUndefined(); + }); + + it("renders events from sequential steps", async () => { + vi.mocked(spawn).mockReturnValueOnce(mockSpawnSuccess("output")); + + const pipeline: PipelineDefinition = { + name: "test", + steps: [{ name: "step1", agent: "claude-code", prompt: "Do it" }] + }; + + await runPipeline(pipeline, { cwd: "/project" }); + + expect(renderAcpStream).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/sdk/pipeline/run.ts b/src/sdk/pipeline/run.ts new file mode 100644 index 00000000..97b1d29b --- /dev/null +++ b/src/sdk/pipeline/run.ts @@ -0,0 +1,181 @@ +import type { + AcpEvent, + AgentMessageEvent +} from "@poe-code/agent-spawn"; +import { renderAcpStream } from "@poe-code/agent-spawn"; +import { spawn } from "../spawn.js"; +import type { SpawnOptions } from "../types.js"; +import { interpolate } from "./interpolate.js"; +import { validatePipeline } from "./validate.js"; +import { + isParallelGroup, + type PipelineDefinition, + type PipelineResult, + type PipelineStep, + type PipelineStepResult +} from "./types.js"; + +export interface RunPipelineOptions { + cwd: string; +} + +export async function runPipeline( + pipeline: PipelineDefinition, + options: RunPipelineOptions +): Promise { + validatePipeline(pipeline); + + const stepResults: Record = {}; + const totalSteps = countSteps(pipeline); + let completedSteps = 0; + let success = true; + const startTime = Date.now(); + + for (const entry of pipeline.steps) { + if (!success) break; + + if (isParallelGroup(entry)) { + const results = await runParallelGroup( + entry.parallel, + pipeline, + stepResults, + options + ); + + for (const [name, result] of Object.entries(results)) { + stepResults[name] = result; + if (result.exitCode === 0) { + completedSteps += 1; + } else { + success = false; + } + } + } else { + const result = await runStep(entry, pipeline, stepResults, options); + stepResults[entry.name] = result; + + if (result.exitCode === 0) { + completedSteps += 1; + } else { + success = false; + } + } + } + + return { + steps: stepResults, + summary: { + totalSteps, + completedSteps, + totalDuration: Date.now() - startTime, + success + } + }; +} + +async function runStep( + step: PipelineStep, + pipeline: PipelineDefinition, + completedSteps: Record, + options: RunPipelineOptions +): Promise { + const agent = step.agent ?? pipeline.defaults?.agent; + if (!agent) { + throw new Error(`Step "${step.name}" has no agent`); + } + + const prompt = interpolate(step.prompt, completedSteps, { + name: pipeline.name, + cwd: options.cwd + }); + + const mode = step.mode ?? pipeline.defaults?.mode ?? "yolo"; + const model = step.model ?? pipeline.defaults?.model; + const cwd = step.cwd ?? options.cwd; + + const spawnOptions: SpawnOptions = { + prompt, + cwd, + mode, + ...(model ? { model } : {}), + ...(step.args ? { args: step.args } : {}) + }; + + const startTime = Date.now(); + const { events, result } = spawn(agent, spawnOptions); + const { teed, getOutput, done } = teeAcpStream(events); + + await renderAcpStream(teed); + const final = await result; + await done; + + return { + output: getOutput(), + exitCode: final.exitCode, + duration: Date.now() - startTime + }; +} + +async function runParallelGroup( + steps: PipelineStep[], + pipeline: PipelineDefinition, + completedSteps: Record, + options: RunPipelineOptions +): Promise> { + const promises = steps.map((step) => + runStep(step, pipeline, completedSteps, options).then( + (result) => ({ name: step.name, result }) + ) + ); + + const settled = await Promise.all(promises); + const results: Record = {}; + + for (const { name, result } of settled) { + results[name] = result; + } + + return results; +} + +function countSteps(pipeline: PipelineDefinition): number { + let count = 0; + for (const entry of pipeline.steps) { + if (isParallelGroup(entry)) { + count += entry.parallel.length; + } else { + count += 1; + } + } + return count; +} + +function teeAcpStream(events: AsyncIterable): { + teed: AsyncIterable; + getOutput: () => string; + done: Promise; +} { + const chunks: string[] = []; + let resolveDone: (() => void) | undefined; + const done = new Promise((resolve) => { + resolveDone = resolve; + }); + const teed = (async function* () { + try { + for await (const event of events) { + if (event.event === "agent_message") { + chunks.push((event as AgentMessageEvent).text); + } + yield event; + } + } finally { + resolveDone?.(); + } + })(); + + return { + teed, + getOutput: () => chunks.join(""), + done + }; +} diff --git a/src/sdk/pipeline/types.ts b/src/sdk/pipeline/types.ts new file mode 100644 index 00000000..87e56ec7 --- /dev/null +++ b/src/sdk/pipeline/types.ts @@ -0,0 +1,54 @@ +import type { SpawnMode } from "@poe-code/agent-spawn"; + +export interface PipelineDefinition { + name: string; + description?: string; + defaults?: PipelineDefaults; + steps: PipelineStepEntry[]; +} + +export interface PipelineDefaults { + agent?: string; + mode?: SpawnMode; + model?: string; +} + +export type PipelineStepEntry = PipelineStep | PipelineParallelGroup; + +export interface PipelineStep { + name: string; + agent?: string; + prompt: string; + mode?: SpawnMode; + model?: string; + args?: string[]; + cwd?: string; +} + +export interface PipelineParallelGroup { + parallel: PipelineStep[]; +} + +export interface PipelineStepResult { + output: string; + exitCode: number; + duration: number; +} + +export interface PipelineResult { + steps: Record; + summary: PipelineSummary; +} + +export interface PipelineSummary { + totalSteps: number; + completedSteps: number; + totalDuration: number; + success: boolean; +} + +export function isParallelGroup( + entry: PipelineStepEntry +): entry is PipelineParallelGroup { + return "parallel" in entry; +} diff --git a/src/sdk/pipeline/validate.test.ts b/src/sdk/pipeline/validate.test.ts new file mode 100644 index 00000000..687a3481 --- /dev/null +++ b/src/sdk/pipeline/validate.test.ts @@ -0,0 +1,201 @@ +import { describe, it, expect } from "vitest"; +import { validatePipeline } from "./validate.js"; +import type { PipelineDefinition } from "./types.js"; + +function minimal(overrides: Partial = {}): PipelineDefinition { + return { + name: "test", + steps: [ + { name: "step1", agent: "claude-code", prompt: "Do it" } + ], + ...overrides + }; +} + +describe("validatePipeline", () => { + it("passes for a valid minimal pipeline", () => { + expect(() => validatePipeline(minimal())).not.toThrow(); + }); + + it("resolves agent from defaults when step has no agent", () => { + const pipeline = minimal({ + defaults: { agent: "claude-code" }, + steps: [{ name: "step1", prompt: "Do it" }] + }); + expect(() => validatePipeline(pipeline)).not.toThrow(); + }); + + it("throws when step has no agent and no default", () => { + const pipeline = minimal({ + steps: [{ name: "step1", prompt: "Do it" }] + }); + expect(() => validatePipeline(pipeline)).toThrow( + 'Step "step1" has no agent' + ); + }); + + it("validates sequential step references to earlier steps", () => { + const pipeline = minimal({ + steps: [ + { name: "step1", agent: "claude-code", prompt: "First" }, + { + name: "step2", + agent: "claude-code", + prompt: "Based on: {{steps.step1.output}}" + } + ] + }); + expect(() => validatePipeline(pipeline)).not.toThrow(); + }); + + it("rejects forward reference in sequential steps", () => { + const pipeline = minimal({ + steps: [ + { + name: "step1", + agent: "claude-code", + prompt: "Based on: {{steps.step2.output}}" + }, + { name: "step2", agent: "claude-code", prompt: "Second" } + ] + }); + expect(() => validatePipeline(pipeline)).toThrow( + 'Step "step1" references "step2" which has not completed before it' + ); + }); + + it("allows parallel steps to reference earlier sequential steps", () => { + const pipeline = minimal({ + steps: [ + { name: "research", agent: "claude-code", prompt: "Analyze" }, + { + parallel: [ + { + name: "fix-a", + agent: "codex", + prompt: "Fix A: {{steps.research.output}}" + }, + { + name: "fix-b", + agent: "claude-code", + prompt: "Fix B: {{steps.research.output}}" + } + ] + } + ] + }); + expect(() => validatePipeline(pipeline)).not.toThrow(); + }); + + it("rejects parallel sibling references", () => { + const pipeline = minimal({ + steps: [ + { + parallel: [ + { name: "fix-a", agent: "codex", prompt: "Fix A" }, + { + name: "fix-b", + agent: "claude-code", + prompt: "Based on: {{steps.fix-a.output}}" + } + ] + } + ] + }); + expect(() => validatePipeline(pipeline)).toThrow( + 'Step "fix-b" references "fix-a" which has not completed before it' + ); + }); + + it("allows step after parallel group to reference parallel steps", () => { + const pipeline = minimal({ + steps: [ + { + parallel: [ + { name: "a", agent: "codex", prompt: "A" }, + { name: "b", agent: "claude-code", prompt: "B" } + ] + }, + { + name: "final", + agent: "claude-code", + prompt: "A: {{steps.a.output}}, B: {{steps.b.output}}" + } + ] + }); + expect(() => validatePipeline(pipeline)).not.toThrow(); + }); + + it("warns on unknown step references", () => { + const pipeline = minimal({ + steps: [ + { + name: "step1", + agent: "claude-code", + prompt: "Ref: {{steps.nonexistent.output}}" + } + ] + }); + expect(() => validatePipeline(pipeline)).toThrow( + 'Step "step1" references unknown step "nonexistent"' + ); + }); + + it("validates exitCode references", () => { + const pipeline = minimal({ + steps: [ + { name: "step1", agent: "claude-code", prompt: "First" }, + { + name: "step2", + agent: "claude-code", + prompt: "Code: {{steps.step1.exitCode}}" + } + ] + }); + expect(() => validatePipeline(pipeline)).not.toThrow(); + }); + + it("validates pipeline variable references", () => { + const pipeline = minimal({ + steps: [ + { + name: "step1", + agent: "claude-code", + prompt: "Pipeline: {{pipeline.name}} in {{pipeline.cwd}}" + } + ] + }); + expect(() => validatePipeline(pipeline)).not.toThrow(); + }); + + it("validates agent in parallel group steps with defaults", () => { + const pipeline = minimal({ + defaults: { agent: "claude-code" }, + steps: [ + { + parallel: [ + { name: "a", prompt: "A" }, + { name: "b", prompt: "B" } + ] + } + ] + }); + expect(() => validatePipeline(pipeline)).not.toThrow(); + }); + + it("throws when parallel step has no agent and no default", () => { + const pipeline = minimal({ + steps: [ + { + parallel: [ + { name: "a", agent: "codex", prompt: "A" }, + { name: "b", prompt: "B" } + ] + } + ] + }); + expect(() => validatePipeline(pipeline)).toThrow( + 'Step "b" has no agent' + ); + }); +}); diff --git a/src/sdk/pipeline/validate.ts b/src/sdk/pipeline/validate.ts new file mode 100644 index 00000000..3bb3a8e9 --- /dev/null +++ b/src/sdk/pipeline/validate.ts @@ -0,0 +1,87 @@ +import { + isParallelGroup, + type PipelineDefinition, + type PipelineStep, + type PipelineStepEntry +} from "./types.js"; + +const REFERENCE_PATTERN = /\{\{steps\.([^.}]+)\.(output|exitCode)\}\}/g; + +export function validatePipeline(pipeline: PipelineDefinition): void { + const defaultAgent = pipeline.defaults?.agent; + const completedSteps = new Set(); + const allStepNames = collectAllStepNames(pipeline.steps); + + for (const entry of pipeline.steps) { + if (isParallelGroup(entry)) { + for (const step of entry.parallel) { + validateStepAgent(step, defaultAgent); + validateStepReferences(step, completedSteps, allStepNames); + } + for (const step of entry.parallel) { + completedSteps.add(step.name); + } + } else { + validateStepAgent(entry, defaultAgent); + validateStepReferences(entry, completedSteps, allStepNames); + completedSteps.add(entry.name); + } + } +} + +function validateStepAgent(step: PipelineStep, defaultAgent?: string): void { + if (!step.agent && !defaultAgent) { + throw new Error( + `Step "${step.name}" has no agent and no default agent is configured` + ); + } +} + +function validateStepReferences( + step: PipelineStep, + completedSteps: Set, + allStepNames: Set +): void { + const references = extractReferences(step.prompt); + + for (const ref of references) { + if (!allStepNames.has(ref)) { + throw new Error( + `Step "${step.name}" references unknown step "${ref}"` + ); + } + + if (!completedSteps.has(ref)) { + throw new Error( + `Step "${step.name}" references "${ref}" which has not completed before it` + ); + } + } +} + +function extractReferences(prompt: string): Set { + const refs = new Set(); + let match: RegExpExecArray | null; + REFERENCE_PATTERN.lastIndex = 0; + while ((match = REFERENCE_PATTERN.exec(prompt)) !== null) { + const name = match[1]; + if (name !== undefined) { + refs.add(name); + } + } + return refs; +} + +function collectAllStepNames(entries: PipelineStepEntry[]): Set { + const names = new Set(); + for (const entry of entries) { + if (isParallelGroup(entry)) { + for (const step of entry.parallel) { + names.add(step.name); + } + } else { + names.add(entry.name); + } + } + return names; +} From cbdc1d8cc985efd732e4aa6a9702f2041b3413a5 Mon Sep 17 00:00:00 2001 From: Gareth Jones Date: Sun, 1 Mar 2026 16:52:33 -0800 Subject: [PATCH 2/2] fix(pipeline): address code review findings - Fix validate subcommand parameter binding (was broken) - Use container.fs.readFile instead of node:fs/promises import - Add CLI command tests (validate, run, dry-run, failure exit code) - Add pipeline commands to help text - Use design system for CLI output instead of console.log - Remove proxy function, make cwd optional in RunPipelineOptions - Remove double validation (CLI + runPipeline) - Add interpolate edge case test for unresolved references - Export runPipeline/parsePipeline directly from SDK --- src/cli/commands/pipeline-command.test.ts | 212 ++++++++++++++++++++++ src/cli/commands/pipeline.ts | 54 ++++-- src/cli/program.ts | 10 + src/index.ts | 8 +- src/sdk/pipeline/index.ts | 14 -- src/sdk/pipeline/interpolate.test.ts | 9 + src/sdk/pipeline/run.ts | 10 +- 7 files changed, 277 insertions(+), 40 deletions(-) create mode 100644 src/cli/commands/pipeline-command.test.ts diff --git a/src/cli/commands/pipeline-command.test.ts b/src/cli/commands/pipeline-command.test.ts new file mode 100644 index 00000000..b9b903a7 --- /dev/null +++ b/src/cli/commands/pipeline-command.test.ts @@ -0,0 +1,212 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { Volume, createFsFromVolume } from "memfs"; +import path from "node:path"; +import type { FileSystem } from "../utils/file-system.js"; + +const renderAcpStreamMock = vi.hoisted( + () => + vi.fn(async (events: AsyncIterable) => { + for await (const ignoredEvent of events) { + // noop + } + }) +); + +vi.mock("@poe-code/agent-spawn", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + renderAcpStream: renderAcpStreamMock + }; +}); + +vi.mock("../../sdk/spawn.js", () => ({ + spawn: vi.fn() +})); + +import { createProgram } from "../program.js"; +import { spawn as sdkSpawn } from "../../sdk/spawn.js"; + +const cwd = "/repo"; +const homeDir = "/home/test"; + +function createMemFs(files?: Record): FileSystem { + const vol = new Volume(); + vol.mkdirSync(homeDir, { recursive: true }); + vol.mkdirSync(cwd, { recursive: true }); + if (files) { + for (const [filePath, content] of Object.entries(files)) { + const dir = path.dirname(filePath); + vol.mkdirSync(dir, { recursive: true }); + vol.writeFileSync(filePath, content, "utf8"); + } + } + return createFsFromVolume(vol).promises as unknown as FileSystem; +} + +function mockSpawnSuccess(output: string) { + return { + events: (async function* () { + yield { event: "agent_message", text: output }; + })(), + result: Promise.resolve({ + stdout: output, + stderr: "", + exitCode: 0 + }) + }; +} + +const validPipeline = ` +name: test-pipeline +steps: + - name: review + agent: claude-code + prompt: Review this code +`; + +const invalidPipeline = ` +steps: + - name: review + agent: claude-code + prompt: Review +`; + +const pipelinePath = "/repo/pipeline.yaml"; + +describe("pipeline command", () => { + const originalEnv = { ...process.env }; + + beforeEach(() => { + vi.clearAllMocks(); + process.env = { ...originalEnv, FORCE_COLOR: "1" }; + }); + + afterEach(() => { + process.env = { ...originalEnv }; + }); + + describe("pipeline validate", () => { + it("reports valid pipeline", async () => { + const fs = createMemFs({ [pipelinePath]: validPipeline }); + const program = createProgram({ + fs, + prompts: vi.fn().mockResolvedValue({}), + env: { cwd, homeDir }, + logger: () => {}, + suppressCommanderOutput: true + }); + + await program.parseAsync([ + "node", + "cli", + "pipeline", + "validate", + pipelinePath + ]); + }); + + it("throws on invalid pipeline", async () => { + const fs = createMemFs({ [pipelinePath]: invalidPipeline }); + const program = createProgram({ + fs, + prompts: vi.fn().mockResolvedValue({}), + env: { cwd, homeDir }, + logger: () => {}, + suppressCommanderOutput: true + }); + + await expect( + program.parseAsync([ + "node", + "cli", + "pipeline", + "validate", + pipelinePath + ]) + ).rejects.toThrow("name"); + }); + }); + + describe("pipeline run", () => { + it("runs a pipeline and spawns agents", async () => { + vi.mocked(sdkSpawn).mockReturnValue(mockSpawnSuccess("review output")); + + const fs = createMemFs({ [pipelinePath]: validPipeline }); + const program = createProgram({ + fs, + prompts: vi.fn().mockResolvedValue({}), + env: { cwd, homeDir }, + logger: () => {}, + suppressCommanderOutput: true + }); + + await program.parseAsync([ + "node", + "cli", + "pipeline", + "run", + pipelinePath + ]); + + expect(sdkSpawn).toHaveBeenCalledWith("claude-code", expect.objectContaining({ + prompt: "Review this code", + mode: "yolo" + })); + }); + + it("shows dry run without spawning", async () => { + const fs = createMemFs({ [pipelinePath]: validPipeline }); + const program = createProgram({ + fs, + prompts: vi.fn().mockResolvedValue({}), + env: { cwd, homeDir }, + logger: () => {}, + suppressCommanderOutput: true + }); + + await program.parseAsync([ + "node", + "cli", + "--dry-run", + "pipeline", + "run", + pipelinePath + ]); + + expect(sdkSpawn).not.toHaveBeenCalled(); + }); + + it("sets exit code on failure", async () => { + vi.mocked(sdkSpawn).mockReturnValue({ + events: (async function* () {})(), + result: Promise.resolve({ + stdout: "", + stderr: "error", + exitCode: 1 + }) + }); + + const fs = createMemFs({ [pipelinePath]: validPipeline }); + const program = createProgram({ + fs, + prompts: vi.fn().mockResolvedValue({}), + env: { cwd, homeDir }, + logger: () => {}, + suppressCommanderOutput: true + }); + + const originalExitCode = process.exitCode; + await program.parseAsync([ + "node", + "cli", + "pipeline", + "run", + pipelinePath + ]); + + expect(process.exitCode).toBe(1); + process.exitCode = originalExitCode; + }); + }); +}); diff --git a/src/cli/commands/pipeline.ts b/src/cli/commands/pipeline.ts index 07d6a59b..a825d9c9 100644 --- a/src/cli/commands/pipeline.ts +++ b/src/cli/commands/pipeline.ts @@ -1,9 +1,10 @@ import path from "node:path"; -import { readFile } from "node:fs/promises"; import type { Command } from "commander"; import type { CliContainer } from "../container.js"; +import { text } from "@poe-code/design-system"; import { - resolveCommandFlags + resolveCommandFlags, + createExecutionResources } from "./shared.js"; import { parsePipeline } from "../../sdk/pipeline/parse.js"; import { validatePipeline } from "../../sdk/pipeline/validate.js"; @@ -13,7 +14,7 @@ import type { PipelineDefinition } from "../../sdk/pipeline/types.js"; export function registerPipelineCommand( program: Command, - _container: CliContainer + container: CliContainer ): void { const pipelineCmd = program .command("pipeline") @@ -29,9 +30,8 @@ export function registerPipelineCommand( const commandOptions = this.opts<{ cwd?: string }>(); const filePath = path.resolve(file); - const yamlContent = await readFile(filePath, "utf8"); + const yamlContent = await container.fs.readFile(filePath, "utf8"); const pipeline = parsePipeline(yamlContent); - validatePipeline(pipeline); const cwd = commandOptions.cwd ? path.resolve(commandOptions.cwd) @@ -42,41 +42,59 @@ export function registerPipelineCommand( return; } + const resources = createExecutionResources( + container, + flags, + "pipeline" + ); + resources.logger.intro(`pipeline ${pipeline.name}`); + const result = await runPipeline(pipeline, { cwd }); if (result.summary.success) { const duration = formatDuration(result.summary.totalDuration); - console.log( - `\nPipeline completed: ${result.summary.completedSteps} steps (${duration})` + resources.logger.info( + `Pipeline completed: ${result.summary.completedSteps} steps (${duration})` ); } else { const duration = formatDuration(result.summary.totalDuration); - console.error( - `\nPipeline aborted (${result.summary.completedSteps}/${result.summary.totalSteps} steps completed, ${duration})` + resources.logger.info( + `Pipeline aborted (${result.summary.completedSteps}/${result.summary.totalSteps} steps completed, ${duration})` ); process.exitCode = 1; } + + resources.context.finalize(); }); pipelineCmd .command("validate") .description("Validate a pipeline YAML file without running it.") .argument("", "Path to pipeline YAML file") - .action(async function (_this: Command, file: string) { + .action(async function (this: Command, file: string) { const filePath = path.resolve(file); - const yamlContent = await readFile(filePath, "utf8"); + const yamlContent = await container.fs.readFile(filePath, "utf8"); const pipeline = parsePipeline(yamlContent); validatePipeline(pipeline); - console.log(`Pipeline "${pipeline.name}" is valid.`); + + const resources = createExecutionResources( + container, + resolveCommandFlags(program), + "pipeline" + ); + resources.logger.info(`Pipeline "${pipeline.name}" is valid.`); + resources.context.finalize(); }); } function renderDryRun(pipeline: PipelineDefinition): void { - console.log(`Pipeline: ${pipeline.name}`); + const lines: string[] = [ + text.heading(`Pipeline: ${pipeline.name}`) + ]; if (pipeline.description) { - console.log(` ${pipeline.description}`); + lines.push(` ${pipeline.description}`); } - console.log(""); + lines.push(""); let stepIndex = 1; for (const entry of pipeline.steps) { @@ -88,15 +106,17 @@ function renderDryRun(pipeline: PipelineDefinition): void { return `${s.name} (${agent} · ${mode})`; }) .join(" + "); - console.log(` ${stepIndex}. [parallel] ${names}`); + lines.push(` ${stepIndex}. ${text.muted("[parallel]")} ${names}`); stepIndex += entry.parallel.length; } else { const agent = entry.agent ?? pipeline.defaults?.agent ?? "?"; const mode = entry.mode ?? pipeline.defaults?.mode ?? "yolo"; - console.log(` ${stepIndex}. ${entry.name} (${agent} · ${mode})`); + lines.push(` ${stepIndex}. ${text.command(entry.name)} ${text.muted(`(${agent} · ${mode})`)}`); stepIndex += 1; } } + + process.stdout.write(lines.join("\n") + "\n"); } function formatDuration(ms: number): string { diff --git a/src/cli/program.ts b/src/cli/program.ts index 1511ff2d..61c9a031 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -128,6 +128,16 @@ function formatHelpText(input: { name: "usage list", args: "", description: "Display usage history" + }, + { + name: "pipeline run", + args: "", + description: "Run a multi-step agent pipeline" + }, + { + name: "pipeline validate", + args: "", + description: "Validate a pipeline YAML file" } ]; const nameWidth = Math.max(0, ...commandRows.map((row) => row.name.length)); diff --git a/src/index.ts b/src/index.ts index 8eaceddd..db8b4ecb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -14,14 +14,16 @@ export type { GenerateResult, MediaGenerateResult } from "./sdk/types.js"; -export { pipeline } from "./sdk/pipeline/index.js"; +export { runPipeline } from "./sdk/pipeline/index.js"; +export { parsePipeline } from "./sdk/pipeline/index.js"; export type { PipelineDefinition, PipelineStep, PipelineResult, PipelineStepResult, - PipelineSummary -} from "./sdk/pipeline/types.js"; + PipelineSummary, + RunPipelineOptions +} from "./sdk/pipeline/index.js"; async function main(): Promise { const [{ createProgram }, { createCliMain }] = await Promise.all([ diff --git a/src/sdk/pipeline/index.ts b/src/sdk/pipeline/index.ts index 9529277e..80d28138 100644 --- a/src/sdk/pipeline/index.ts +++ b/src/sdk/pipeline/index.ts @@ -13,17 +13,3 @@ export type { PipelineSummary } from "./types.js"; export { isParallelGroup } from "./types.js"; - -import { runPipeline } from "./run.js"; -import type { PipelineDefinition, PipelineResult } from "./types.js"; - -export const pipeline = { - async run( - definition: PipelineDefinition, - options?: { cwd?: string } - ): Promise { - return runPipeline(definition, { - cwd: options?.cwd ?? process.cwd() - }); - } -}; diff --git a/src/sdk/pipeline/interpolate.test.ts b/src/sdk/pipeline/interpolate.test.ts index 8fc22755..18300ff9 100644 --- a/src/sdk/pipeline/interpolate.test.ts +++ b/src/sdk/pipeline/interpolate.test.ts @@ -94,4 +94,13 @@ describe("interpolate", () => { ); expect(result).toBe("hello and again hello"); }); + + it("leaves unresolved step references unchanged", () => { + const result = interpolate( + "Ref: {{steps.missing.output}}", + {}, + { name: "test", cwd: "/project" } + ); + expect(result).toBe("Ref: {{steps.missing.output}}"); + }); }); diff --git a/src/sdk/pipeline/run.ts b/src/sdk/pipeline/run.ts index 97b1d29b..9c0627ee 100644 --- a/src/sdk/pipeline/run.ts +++ b/src/sdk/pipeline/run.ts @@ -6,7 +6,6 @@ import { renderAcpStream } from "@poe-code/agent-spawn"; import { spawn } from "../spawn.js"; import type { SpawnOptions } from "../types.js"; import { interpolate } from "./interpolate.js"; -import { validatePipeline } from "./validate.js"; import { isParallelGroup, type PipelineDefinition, @@ -16,14 +15,13 @@ import { } from "./types.js"; export interface RunPipelineOptions { - cwd: string; + cwd?: string; } export async function runPipeline( pipeline: PipelineDefinition, - options: RunPipelineOptions + options: RunPipelineOptions = {} ): Promise { - validatePipeline(pipeline); const stepResults: Record = {}; const totalSteps = countSteps(pipeline); @@ -84,14 +82,14 @@ async function runStep( throw new Error(`Step "${step.name}" has no agent`); } + const cwd = step.cwd ?? options.cwd ?? process.cwd(); const prompt = interpolate(step.prompt, completedSteps, { name: pipeline.name, - cwd: options.cwd + cwd }); const mode = step.mode ?? pipeline.defaults?.mode ?? "yolo"; const model = step.model ?? pipeline.defaults?.model; - const cwd = step.cwd ?? options.cwd; const spawnOptions: SpawnOptions = { prompt,