Skip to content

Commit 16a6fab

Browse files
committed
Better span experience, show input streams on dashboard, cleanup the data format
1 parent 57f2488 commit 16a6fab

File tree

7 files changed

+112
-47
lines changed

7 files changed

+112
-47
lines changed

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,41 @@ export class SpanPresenter extends BasePresenter {
629629
},
630630
};
631631
}
632+
case "input-stream": {
633+
if (!span.entity.id) {
634+
logger.error(`SpanPresenter: No input stream id`, {
635+
spanId,
636+
inputStreamId: span.entity.id,
637+
});
638+
return { ...data, entity: null };
639+
}
640+
641+
const [runId, streamId] = span.entity.id.split(":");
642+
643+
if (!runId || !streamId) {
644+
logger.error(`SpanPresenter: Invalid input stream id`, {
645+
spanId,
646+
inputStreamId: span.entity.id,
647+
});
648+
return { ...data, entity: null };
649+
}
650+
651+
// Translate user-facing stream ID to internal S2 stream name
652+
const s2StreamKey = `$trigger.input:${streamId}`;
653+
654+
return {
655+
...data,
656+
entity: {
657+
type: "realtime-stream" as const,
658+
object: {
659+
runId,
660+
streamKey: s2StreamKey,
661+
displayName: streamId,
662+
metadata: undefined,
663+
},
664+
},
665+
};
666+
}
632667
default:
633668
return { ...data, entity: null };
634669
}

apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -110,24 +110,19 @@ const { action, loader } = createActionApiRoute(
110110

111111
if (records.length > 0) {
112112
const record = records[0]!;
113-
try {
114-
const parsed = JSON.parse(record.data) as { data: unknown };
115-
116-
// Data exists — complete the waitpoint immediately
117-
await engine.completeWaitpoint({
118-
id: result.waitpoint.id,
119-
output: {
120-
value: JSON.stringify(parsed.data),
121-
type: "application/json",
122-
isError: false,
123-
},
124-
});
125-
126-
// Clean up the Redis cache since we completed it ourselves
127-
await deleteInputStreamWaitpoint(run.friendlyId, body.streamId);
128-
} catch {
129-
// Skip malformed records
130-
}
113+
114+
// Record data is the raw user payload — no wrapper to unwrap
115+
await engine.completeWaitpoint({
116+
id: result.waitpoint.id,
117+
output: {
118+
value: record.data,
119+
type: "application/json",
120+
isError: false,
121+
},
122+
});
123+
124+
// Clean up the Redis cache since we completed it ourselves
125+
await deleteInputStreamWaitpoint(run.friendlyId, body.streamId);
131126
}
132127
}
133128
} catch {

apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,9 @@ const { action } = createActionApiRoute(
6767
run.realtimeStreamsVersion
6868
);
6969

70-
// Build the input stream record
70+
// Build the input stream record (raw user data, no wrapper)
7171
const recordId = `inp_${crypto.randomUUID().replace(/-/g, "").slice(0, 12)}`;
72-
const record = JSON.stringify({
73-
data: body.data.data,
74-
ts: Date.now(),
75-
id: recordId,
76-
});
72+
const record = JSON.stringify(body.data.data);
7773

7874
// Append the record to the per-stream S2 stream (auto-creates on first write)
7975
await realtimeStream.appendPart(

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,6 +1348,7 @@ function SpanEntity({ span }: { span: Span }) {
13481348
runId={span.entity.object.runId}
13491349
streamKey={span.entity.object.streamKey}
13501350
metadata={span.entity.object.metadata}
1351+
displayName={span.entity.object.displayName}
13511352
/>
13521353
);
13531354
}

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,12 @@ export function RealtimeStreamViewer({
9898
runId,
9999
streamKey,
100100
metadata,
101+
displayName,
101102
}: {
102103
runId: string;
103104
streamKey: string;
104105
metadata: Record<string, unknown> | undefined;
106+
displayName?: string;
105107
}) {
106108
const organization = useOrganization();
107109
const project = useProject();
@@ -244,8 +246,8 @@ export function RealtimeStreamViewer({
244246
variant="small/bright"
245247
className="mb-0 flex min-w-0 items-center gap-1 truncate whitespace-nowrap"
246248
>
247-
<span>Stream:</span>
248-
<span className="truncate font-mono text-text-dimmed">{streamKey}</span>
249+
<span>{displayName ? "Input stream:" : "Stream:"}</span>
250+
<span className="truncate font-mono text-text-dimmed">{displayName ?? streamKey}</span>
249251
</Paragraph>
250252
</div>
251253
<div className="flex w-full flex-wrap items-center justify-between gap-3 @[300px]:w-auto @[300px]:flex-nowrap">
@@ -487,6 +489,9 @@ function useRealtimeStream(resourcePath: string, startIndex?: number) {
487489
const [isConnected, setIsConnected] = useState(false);
488490

489491
useEffect(() => {
492+
setChunks([]);
493+
setError(null);
494+
490495
const abortController = new AbortController();
491496
let reader: ReadableStreamDefaultReader<SSEStreamPart<unknown>> | null = null;
492497

packages/core/src/v3/inputStreams/manager.ts

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,6 @@ type OnceWaiter = {
1010
timeoutHandle?: ReturnType<typeof setTimeout>;
1111
};
1212

13-
/**
14-
* InputStreamRecord is the shape of records on a per-stream S2 stream.
15-
*/
16-
interface InputStreamRecord {
17-
data: unknown;
18-
ts: number;
19-
id: string;
20-
}
2113

2214
type TailState = {
2315
abortController: AbortController;
@@ -195,7 +187,7 @@ export class StandardInputStreamManager implements InputStreamManager {
195187

196188
async #runTail(runId: string, streamId: string, signal: AbortSignal): Promise<void> {
197189
try {
198-
const stream = await this.apiClient.fetchStream<InputStreamRecord>(
190+
const stream = await this.apiClient.fetchStream<unknown>(
199191
runId,
200192
`input/${streamId}`,
201193
{
@@ -225,21 +217,19 @@ export class StandardInputStreamManager implements InputStreamManager {
225217
for await (const record of stream) {
226218
if (signal.aborted) break;
227219

228-
// S2 SSE returns record bodies as JSON strings; parse into InputStreamRecord
229-
let parsed: InputStreamRecord;
220+
// S2 SSE returns record bodies as JSON strings; parse if needed
221+
let data: unknown;
230222
if (typeof record === "string") {
231223
try {
232-
parsed = JSON.parse(record) as InputStreamRecord;
224+
data = JSON.parse(record);
233225
} catch {
234-
continue;
226+
data = record;
235227
}
236-
} else if (record.data !== undefined) {
237-
parsed = record;
238228
} else {
239-
continue;
229+
data = record;
240230
}
241231

242-
this.#dispatch(streamId, parsed.data);
232+
this.#dispatch(streamId, data);
243233
}
244234
} catch (error) {
245235
// AbortError is expected when disconnecting

packages/trigger-sdk/src/v3/streams.ts

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,29 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
703703
);
704704
},
705705
once(options) {
706-
return inputStreams.once(opts.id, options) as Promise<TData>;
706+
const ctx = taskContext.ctx;
707+
const runId = ctx?.run.id;
708+
709+
return tracer.startActiveSpan(
710+
`inputStream.once()`,
711+
async () => {
712+
return inputStreams.once(opts.id, options) as Promise<TData>;
713+
},
714+
{
715+
attributes: {
716+
[SemanticInternalAttributes.STYLE_ICON]: "streams",
717+
[SemanticInternalAttributes.ENTITY_TYPE]: "input-stream",
718+
...(runId
719+
? { [SemanticInternalAttributes.ENTITY_ID]: `${runId}:${opts.id}` }
720+
: {}),
721+
streamId: opts.id,
722+
...accessoryAttributes({
723+
items: [{ text: opts.id, variant: "normal" }],
724+
style: "codepath",
725+
}),
726+
},
727+
}
728+
);
707729
},
708730
peek() {
709731
return inputStreams.peek(opts.id) as TData | undefined;
@@ -732,6 +754,9 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
732754
lastSeqNum: inputStreams.lastSeqNum(opts.id),
733755
});
734756

757+
// Set the entity ID now that we have the waitpoint ID
758+
span.setAttribute(SemanticInternalAttributes.ENTITY_ID, response.waitpointId);
759+
735760
// 2. Block the run on the waitpoint
736761
const waitResponse = await apiClient.waitForWaitpointToken({
737762
runFriendlyId: ctx.run.id,
@@ -775,7 +800,7 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
775800
...accessoryAttributes({
776801
items: [
777802
{
778-
text: `input:${opts.id}`,
803+
text: opts.id,
779804
variant: "normal",
780805
},
781806
],
@@ -792,8 +817,26 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
792817
});
793818
},
794819
async send(runId, data, options) {
795-
const apiClient = apiClientManager.clientOrThrow();
796-
await apiClient.sendInputStream(runId, opts.id, data, options?.requestOptions);
820+
return tracer.startActiveSpan(
821+
`inputStream.send()`,
822+
async () => {
823+
const apiClient = apiClientManager.clientOrThrow();
824+
await apiClient.sendInputStream(runId, opts.id, data, options?.requestOptions);
825+
},
826+
{
827+
attributes: {
828+
[SemanticInternalAttributes.STYLE_ICON]: "streams",
829+
[SemanticInternalAttributes.ENTITY_TYPE]: "input-stream",
830+
[SemanticInternalAttributes.ENTITY_ID]: `${runId}:${opts.id}`,
831+
streamId: opts.id,
832+
runId,
833+
...accessoryAttributes({
834+
items: [{ text: opts.id, variant: "normal" }],
835+
style: "codepath",
836+
}),
837+
},
838+
}
839+
);
797840
},
798841
};
799842
}

0 commit comments

Comments
 (0)