|
| 1 | +import { logger, runs, streams, task, wait } from "@trigger.dev/sdk/v3"; |
| 2 | + |
| 3 | +/** |
| 4 | + * Multi-agent coordination via input streams. |
| 5 | + * |
| 6 | + * Demonstrates the pattern from the Agent Relay blog post: instead of a human |
| 7 | + * copy-pasting context between agents, the agents talk to each other directly. |
| 8 | + * |
| 9 | + * Three "agents" (tasks) collaborate to write and review a document: |
| 10 | + * - Planner: breaks the work into steps and sends them to the worker |
| 11 | + * - Worker: executes each step, sends results to the reviewer |
| 12 | + * - Reviewer: checks each result, sends feedback back to the worker |
| 13 | + * |
| 14 | + * The coordinator task wires them together — no human in the loop. |
| 15 | + */ |
| 16 | + |
| 17 | +// -- Input streams for inter-agent communication -- |
| 18 | + |
| 19 | +/** Coordinator → Agent: tells an agent who its peers are */ |
| 20 | +const connect = streams.input<{ workerRunId: string }>({ |
| 21 | + id: "connect", |
| 22 | +}); |
| 23 | + |
| 24 | +/** Planner → Worker: steps to execute */ |
| 25 | +const planSteps = streams.input<{ step: number; instruction: string }>({ |
| 26 | + id: "plan-steps", |
| 27 | +}); |
| 28 | + |
| 29 | +/** Planner → Worker: signal that all steps have been sent */ |
| 30 | +const planComplete = streams.input<{ totalSteps: number }>({ |
| 31 | + id: "plan-complete", |
| 32 | +}); |
| 33 | + |
| 34 | +/** Worker → Reviewer: completed work for review */ |
| 35 | +const workResult = streams.input<{ step: number; output: string }>({ |
| 36 | + id: "work-result", |
| 37 | +}); |
| 38 | + |
| 39 | +/** Reviewer → Worker: feedback on completed work */ |
| 40 | +const reviewFeedback = streams.input<{ |
| 41 | + step: number; |
| 42 | + approved: boolean; |
| 43 | + comment: string; |
| 44 | +}>({ |
| 45 | + id: "review-feedback", |
| 46 | +}); |
| 47 | + |
| 48 | +// -- Mock "AI" functions (replace with real LLM calls) -- |
| 49 | + |
| 50 | +function mockPlan(topic: string) { |
| 51 | + return [ |
| 52 | + { step: 1, instruction: `Research the topic: ${topic}` }, |
| 53 | + { step: 2, instruction: `Write an outline for: ${topic}` }, |
| 54 | + { step: 3, instruction: `Draft the introduction for: ${topic}` }, |
| 55 | + ]; |
| 56 | +} |
| 57 | + |
| 58 | +function mockWork(instruction: string): string { |
| 59 | + return `[Completed] ${instruction} — Lorem ipsum dolor sit amet.`; |
| 60 | +} |
| 61 | + |
| 62 | +function mockReview(output: string): { approved: boolean; comment: string } { |
| 63 | + const approved = !output.includes("outline"); |
| 64 | + return { |
| 65 | + approved, |
| 66 | + comment: approved ? "Looks good." : "Needs more detail in the structure.", |
| 67 | + }; |
| 68 | +} |
| 69 | + |
| 70 | +// -- Agent tasks -- |
| 71 | + |
| 72 | +/** |
| 73 | + * Planner agent: receives a topic, breaks it into steps, sends them to the worker. |
| 74 | + */ |
| 75 | +export const plannerAgent = task({ |
| 76 | + id: "agent-planner", |
| 77 | + run: async (payload: { topic: string; workerRunId: string }) => { |
| 78 | + const steps = mockPlan(payload.topic); |
| 79 | + logger.info("Planner: created plan", { stepCount: steps.length }); |
| 80 | + |
| 81 | + for (const step of steps) { |
| 82 | + await planSteps.send(payload.workerRunId, step); |
| 83 | + logger.info("Planner: sent step", { step: step.step }); |
| 84 | + await wait.for({ seconds: 1 }); |
| 85 | + } |
| 86 | + |
| 87 | + await planComplete.send(payload.workerRunId, { totalSteps: steps.length }); |
| 88 | + logger.info("Planner: all steps sent"); |
| 89 | + |
| 90 | + return { steps: steps.length }; |
| 91 | + }, |
| 92 | +}); |
| 93 | + |
| 94 | +/** |
| 95 | + * Worker agent: receives steps from the planner, executes them, sends results |
| 96 | + * to the reviewer, and incorporates feedback. |
| 97 | + */ |
| 98 | +export const workerAgent = task({ |
| 99 | + id: "agent-worker", |
| 100 | + run: async (payload: { reviewerRunId: string }) => { |
| 101 | + const completedSteps: Array<{ |
| 102 | + step: number; |
| 103 | + output: string; |
| 104 | + feedback: string; |
| 105 | + }> = []; |
| 106 | + |
| 107 | + let totalSteps: number | null = null; |
| 108 | + let stepsReceived = 0; |
| 109 | + |
| 110 | + // Listen for plan completion signal |
| 111 | + planComplete.on((data) => { |
| 112 | + totalSteps = data.totalSteps; |
| 113 | + logger.info("Worker: plan complete signal received", { totalSteps }); |
| 114 | + }); |
| 115 | + |
| 116 | + // Listen for review feedback |
| 117 | + reviewFeedback.on((data) => { |
| 118 | + logger.info("Worker: received feedback", { step: data.step, approved: data.approved }); |
| 119 | + const entry = completedSteps.find((s) => s.step === data.step); |
| 120 | + if (entry) { |
| 121 | + entry.feedback = data.comment; |
| 122 | + } |
| 123 | + }); |
| 124 | + |
| 125 | + // Process steps as they arrive |
| 126 | + planSteps.on(async (data) => { |
| 127 | + stepsReceived++; |
| 128 | + logger.info("Worker: received step", { step: data.step, instruction: data.instruction }); |
| 129 | + |
| 130 | + const output = mockWork(data.instruction); |
| 131 | + completedSteps.push({ step: data.step, output, feedback: "" }); |
| 132 | + |
| 133 | + // Send result to reviewer |
| 134 | + await workResult.send(payload.reviewerRunId, { step: data.step, output }); |
| 135 | + logger.info("Worker: sent result to reviewer", { step: data.step }); |
| 136 | + }); |
| 137 | + |
| 138 | + // Wait until all steps are received and processed |
| 139 | + while (totalSteps === null || stepsReceived < totalSteps) { |
| 140 | + await wait.for({ seconds: 1 }); |
| 141 | + } |
| 142 | + |
| 143 | + // Give reviewer time to send feedback |
| 144 | + await wait.for({ seconds: 5 }); |
| 145 | + |
| 146 | + logger.info("Worker: all done", { completedSteps }); |
| 147 | + return { completedSteps }; |
| 148 | + }, |
| 149 | +}); |
| 150 | + |
| 151 | +/** |
| 152 | + * Reviewer agent: receives work results, reviews them, sends feedback to the |
| 153 | + * worker, and reports final results to the coordinator. |
| 154 | + * |
| 155 | + * The reviewer doesn't know the worker's run ID at spawn time — it receives |
| 156 | + * it via the `connect` input stream once the coordinator has spawned both agents. |
| 157 | + */ |
| 158 | +export const reviewerAgent = task({ |
| 159 | + id: "agent-reviewer", |
| 160 | + run: async (payload: { expectedSteps: number }) => { |
| 161 | + // Wait for the coordinator to tell us who the worker is |
| 162 | + const { workerRunId } = await connect.once({ timeoutMs: 30_000 }).unwrap(); |
| 163 | + logger.info("Reviewer: connected to worker", { workerRunId }); |
| 164 | + |
| 165 | + const reviews: Array<{ step: number; approved: boolean; comment: string }> = []; |
| 166 | + |
| 167 | + // Review each piece of work as it arrives |
| 168 | + workResult.on(async (data) => { |
| 169 | + logger.info("Reviewer: checking step", { step: data.step }); |
| 170 | + |
| 171 | + const review = mockReview(data.output); |
| 172 | + reviews.push({ step: data.step, ...review }); |
| 173 | + |
| 174 | + // Send feedback back to worker |
| 175 | + await reviewFeedback.send(workerRunId, { |
| 176 | + step: data.step, |
| 177 | + ...review, |
| 178 | + }); |
| 179 | + logger.info("Reviewer: sent feedback", { step: data.step, approved: review.approved }); |
| 180 | + }); |
| 181 | + |
| 182 | + // Wait until all steps are reviewed |
| 183 | + while (reviews.length < payload.expectedSteps) { |
| 184 | + await wait.for({ seconds: 1 }); |
| 185 | + } |
| 186 | + |
| 187 | + const approved = reviews.filter((r) => r.approved).length; |
| 188 | + const rejected = reviews.filter((r) => !r.approved).length; |
| 189 | + const summary = `Reviewed ${reviews.length} steps. ${approved} approved, ${rejected} need revision.`; |
| 190 | + |
| 191 | + logger.info("Reviewer: done", { summary }); |
| 192 | + return { reviews, summary }; |
| 193 | + }, |
| 194 | +}); |
| 195 | + |
| 196 | +/** |
| 197 | + * Coordinator: wires the agents together and collects results. |
| 198 | + * |
| 199 | + * This is the orchestrator — it spawns the agents, connects them via input |
| 200 | + * streams, and waits for everything to complete. No human in the loop. |
| 201 | + */ |
| 202 | +export const agentRelayCoordinator = task({ |
| 203 | + id: "agent-relay-coordinator", |
| 204 | + run: async (payload: { topic?: string }) => { |
| 205 | + const topic = payload.topic ?? "The future of multi-agent systems"; |
| 206 | + logger.info("Coordinator: starting multi-agent workflow", { topic }); |
| 207 | + |
| 208 | + // Spawn worker and reviewer (order doesn't matter — they wait for connections) |
| 209 | + const reviewerHandle = await reviewerAgent.trigger({ expectedSteps: 3 }); |
| 210 | + const workerHandle = await workerAgent.trigger({ |
| 211 | + reviewerRunId: reviewerHandle.id, |
| 212 | + }); |
| 213 | + |
| 214 | + logger.info("Coordinator: agents spawned", { |
| 215 | + workerId: workerHandle.id, |
| 216 | + reviewerId: reviewerHandle.id, |
| 217 | + }); |
| 218 | + |
| 219 | + // Tell the reviewer who the worker is so it can send feedback |
| 220 | + await connect.send(reviewerHandle.id, { workerRunId: workerHandle.id }); |
| 221 | + |
| 222 | + // Spawn the planner — it sends steps directly to the worker |
| 223 | + const plannerHandle = await plannerAgent.trigger({ |
| 224 | + topic, |
| 225 | + workerRunId: workerHandle.id, |
| 226 | + }); |
| 227 | + |
| 228 | + logger.info("Coordinator: planner spawned, waiting for completion", { |
| 229 | + plannerId: plannerHandle.id, |
| 230 | + }); |
| 231 | + |
| 232 | + // Wait for all agents to complete |
| 233 | + const [plannerRun, workerRun, reviewerRun] = await Promise.all([ |
| 234 | + runs.poll(plannerHandle, { pollIntervalMs: 2000 }), |
| 235 | + runs.poll(workerHandle, { pollIntervalMs: 2000 }), |
| 236 | + runs.poll(reviewerHandle, { pollIntervalMs: 2000 }), |
| 237 | + ]); |
| 238 | + |
| 239 | + logger.info("Coordinator: all agents complete", { |
| 240 | + planner: plannerRun.output, |
| 241 | + worker: workerRun.output, |
| 242 | + reviewer: reviewerRun.output, |
| 243 | + }); |
| 244 | + |
| 245 | + return { |
| 246 | + topic, |
| 247 | + planner: plannerRun.output, |
| 248 | + worker: workerRun.output, |
| 249 | + reviewer: reviewerRun.output, |
| 250 | + }; |
| 251 | + }, |
| 252 | +}); |
0 commit comments