Skip to content

Commit db7bdea

Browse files
committed
adopt s2-lite, upgrade s2 package with support
1 parent 96908e2 commit db7bdea

File tree

13 files changed

+385
-78
lines changed

13 files changed

+385
-78
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,6 +1344,8 @@ const EnvironmentSchema = z
13441344

13451345
REALTIME_STREAMS_S2_BASIN: z.string().optional(),
13461346
REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(),
1347+
REALTIME_STREAMS_S2_ENDPOINT: z.string().optional(),
1348+
REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS: z.enum(["true", "false"]).default("false"),
13471349
REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS: z.coerce
13481350
.number()
13491351
.int()

apps/webapp/app/services/realtime/s2realtimeStreams.server.ts

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ export type S2RealtimeStreamsOptions = {
1010
accessToken: string; // "Bearer" token issued in S2 console
1111
streamPrefix?: string; // defaults to ""
1212

13+
// Custom endpoint for s2-lite (self-hosted)
14+
endpoint?: string; // e.g., "http://localhost:4566/v1"
15+
16+
// Skip access token issuance (s2-lite doesn't support /access-tokens)
17+
skipAccessTokens?: boolean;
18+
1319
// Read behavior
1420
s2WaitSeconds?: number;
1521

@@ -37,8 +43,11 @@ type S2AppendAck = {
3743
export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
3844
private readonly basin: string;
3945
private readonly baseUrl: string;
46+
private readonly accountUrl: string;
47+
private readonly endpoint?: string;
4048
private readonly token: string;
4149
private readonly streamPrefix: string;
50+
private readonly skipAccessTokens: boolean;
4251

4352
private readonly s2WaitSeconds: number;
4453

@@ -56,9 +65,12 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
5665

5766
constructor(opts: S2RealtimeStreamsOptions) {
5867
this.basin = opts.basin;
59-
this.baseUrl = `https://${this.basin}.b.aws.s2.dev/v1`;
68+
this.baseUrl = opts.endpoint ?? `https://${this.basin}.b.aws.s2.dev/v1`;
69+
this.accountUrl = opts.endpoint ?? `https://aws.s2.dev/v1`;
70+
this.endpoint = opts.endpoint;
6071
this.token = opts.accessToken;
6172
this.streamPrefix = opts.streamPrefix ?? "";
73+
this.skipAccessTokens = opts.skipAccessTokens ?? false;
6274

6375
this.s2WaitSeconds = opts.s2WaitSeconds ?? 60;
6476

@@ -80,17 +92,20 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
8092
runId: string,
8193
streamId: string
8294
): Promise<{ responseHeaders?: Record<string, string> }> {
83-
const id = randomUUID();
84-
85-
const accessToken = await this.getS2AccessToken(id);
95+
const accessToken = this.skipAccessTokens
96+
? this.token
97+
: await this.getS2AccessToken(randomUUID());
8698

8799
return {
88100
responseHeaders: {
89101
"X-S2-Access-Token": accessToken,
90-
"X-S2-Stream-Name": `/runs/${runId}/${streamId}`,
102+
"X-S2-Stream-Name": this.skipAccessTokens
103+
? this.toStreamName(runId, streamId)
104+
: `/runs/${runId}/${streamId}`,
91105
"X-S2-Basin": this.basin,
92106
"X-S2-Flush-Interval-Ms": this.flushIntervalMs.toString(),
93107
"X-S2-Max-Retries": this.maxRetries.toString(),
108+
...(this.endpoint ? { "X-S2-Endpoint": this.endpoint } : {}),
94109
},
95110
};
96111
}
@@ -142,6 +157,7 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
142157
Authorization: `Bearer ${this.token}`,
143158
Accept: "text/event-stream",
144159
"S2-Format": "raw",
160+
"S2-Basin": this.basin,
145161
},
146162
}
147163
);
@@ -236,7 +252,8 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
236252
headers: {
237253
Authorization: `Bearer ${this.token}`,
238254
"Content-Type": "application/json",
239-
"S2-Format": "raw", // UTF-8 JSON encoding (no base64 overhead) when your data is text. :contentReference[oaicite:8]{index=8}
255+
"S2-Format": "raw",
256+
"S2-Basin": this.basin,
240257
},
241258
body: JSON.stringify(body),
242259
});
@@ -265,7 +282,7 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
265282

266283
private async s2IssueAccessToken(id: string): Promise<string> {
267284
// POST /v1/access-tokens
268-
const res = await fetch(`https://aws.s2.dev/v1/access-tokens`, {
285+
const res = await fetch(`${this.accountUrl}/access-tokens`, {
269286
method: "POST",
270287
headers: {
271288
Authorization: `Bearer ${this.token}`,
@@ -316,6 +333,7 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
316333
Authorization: `Bearer ${this.token}`,
317334
Accept: "text/event-stream",
318335
"S2-Format": "raw",
336+
"S2-Basin": this.basin,
319337
},
320338
signal: opts.signal,
321339
});

apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ export function getRealtimeStreamInstance(
4040
return new S2RealtimeStreams({
4141
basin: env.REALTIME_STREAMS_S2_BASIN,
4242
accessToken: env.REALTIME_STREAMS_S2_ACCESS_TOKEN,
43+
endpoint: env.REALTIME_STREAMS_S2_ENDPOINT,
44+
skipAccessTokens: env.REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS === "true",
4345
streamPrefix: [
4446
"org",
4547
environment.organization.id,

apps/webapp/app/v3/services/deployment.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ export class DeploymentService extends BaseService {
368368
);
369369

370370
return fromPromise(
371-
stream.append(events.map((event) => AppendRecord.make(JSON.stringify(event)))),
371+
stream.append(events.map((event) => AppendRecord.string({ body: JSON.stringify(event) }))),
372372
(error) => ({
373373
type: "failed_to_append_to_event_log" as const,
374374
cause: error,

apps/webapp/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
"@remix-run/serve": "2.1.0",
108108
"@remix-run/server-runtime": "2.1.0",
109109
"@remix-run/v1-meta": "^0.1.3",
110-
"@s2-dev/streamstore": "^0.17.2",
110+
"@s2-dev/streamstore": "^0.22.5",
111111
"@sentry/remix": "9.46.0",
112112
"@slack/web-api": "7.9.1",
113113
"@socket.io/redis-adapter": "^8.3.0",

docker/docker-compose.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,30 @@ services:
143143
networks:
144144
- app_network
145145

146+
s2:
147+
image: ghcr.io/s2-streamstore/s2
148+
command: lite
149+
ports:
150+
- "4566:80"
151+
networks:
152+
- app_network
153+
healthcheck:
154+
test: ["CMD-SHELL", "wget -qO- http://localhost:80/v1/basins?limit=1 || exit 1"]
155+
interval: 2s
156+
timeout: 3s
157+
retries: 5
158+
start_period: 3s
159+
160+
s2-init:
161+
image: curlimages/curl:latest
162+
depends_on:
163+
s2:
164+
condition: service_healthy
165+
networks:
166+
- app_network
167+
restart: "no"
168+
entrypoint: ["sh", "-c", "curl -sf -X PUT http://s2:80/v1/basins/trigger-local -H 'Content-Type: application/json' -d '{\"config\":{\"create_stream_on_append\":true,\"create_stream_on_read\":true}}' && echo ' Basin trigger-local ready' || echo ' Basin trigger-local already exists'"]
169+
146170
toxiproxy:
147171
container_name: toxiproxy
148172
image: ghcr.io/shopify/toxiproxy:latest

packages/core/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,16 @@
177177
"@opentelemetry/core": "2.0.1",
178178
"@opentelemetry/exporter-logs-otlp-http": "0.203.0",
179179
"@opentelemetry/exporter-metrics-otlp-http": "0.203.0",
180-
"@opentelemetry/host-metrics": "^0.37.0",
181180
"@opentelemetry/exporter-trace-otlp-http": "0.203.0",
181+
"@opentelemetry/host-metrics": "^0.37.0",
182182
"@opentelemetry/instrumentation": "0.203.0",
183183
"@opentelemetry/resources": "2.0.1",
184184
"@opentelemetry/sdk-logs": "0.203.0",
185185
"@opentelemetry/sdk-metrics": "2.0.1",
186186
"@opentelemetry/sdk-trace-base": "2.0.1",
187187
"@opentelemetry/sdk-trace-node": "2.0.1",
188188
"@opentelemetry/semantic-conventions": "1.36.0",
189-
"@s2-dev/streamstore": "0.17.3",
189+
"@s2-dev/streamstore": "0.22.5",
190190
"dequal": "^2.0.3",
191191
"eventsource": "^3.0.5",
192192
"eventsource-parser": "^3.0.0",

packages/core/src/v3/realtimeStreams/streamInstance.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ export class StreamInstance<T> implements StreamsWriter {
5252
basin: parsedResponse.basin,
5353
stream: parsedResponse.streamName ?? this.options.key,
5454
accessToken: parsedResponse.accessToken,
55+
endpoint: parsedResponse.endpoint,
5556
source: this.options.source,
5657
signal: this.options.signal,
5758
debug: this.options.debug,
@@ -103,6 +104,7 @@ type ParsedStreamResponse =
103104
version: "v2";
104105
accessToken: string;
105106
basin: string;
107+
endpoint?: string;
106108
flushIntervalMs?: number;
107109
maxRetries?: number;
108110
streamName?: string;
@@ -123,6 +125,7 @@ function parseCreateStreamResponse(
123125
return { version: "v1" };
124126
}
125127

128+
const endpoint = headers?.["x-s2-endpoint"];
126129
const flushIntervalMs = headers?.["x-s2-flush-interval-ms"];
127130
const maxRetries = headers?.["x-s2-max-retries"];
128131
const streamName = headers?.["x-s2-stream-name"];
@@ -131,6 +134,7 @@ function parseCreateStreamResponse(
131134
version: "v2",
132135
accessToken,
133136
basin,
137+
endpoint,
134138
flushIntervalMs: flushIntervalMs ? parseInt(flushIntervalMs) : undefined,
135139
maxRetries: maxRetries ? parseInt(maxRetries) : undefined,
136140
streamName,

packages/core/src/v3/realtimeStreams/streamsWriterV2.ts

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ export type StreamsWriterV2Options<T = any> = {
66
basin: string;
77
stream: string;
88
accessToken: string;
9+
endpoint?: string; // Custom S2 endpoint (for s2-lite)
910
source: ReadableStream<T>;
1011
signal?: AbortSignal;
1112
flushIntervalMs?: number; // Used as lingerDuration for BatchTransform (default 200ms)
1213
maxRetries?: number; // Not used with appendSession, kept for compatibility
1314
debug?: boolean; // Enable debug logging (default false)
14-
maxQueuedBytes?: number; // Max queued bytes for appendSession (default 10MB)
15+
maxInflightBytes?: number; // Max queued bytes for appendSession (default 10MB)
1516
};
1617

1718
/**
@@ -50,18 +51,28 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
5051
private streamPromise: Promise<void>;
5152
private readonly flushIntervalMs: number;
5253
private readonly debug: boolean;
53-
private readonly maxQueuedBytes: number;
54+
private readonly maxInflightBytes: number;
5455
private aborted = false;
5556
private sessionWritable: WritableStream<any> | null = null;
5657

5758
constructor(private options: StreamsWriterV2Options<T>) {
5859
this.debug = options.debug ?? false;
59-
this.s2Client = new S2({ accessToken: options.accessToken });
60+
this.s2Client = new S2({
61+
accessToken: options.accessToken,
62+
...(options.endpoint
63+
? {
64+
endpoints: {
65+
account: options.endpoint,
66+
basin: options.endpoint,
67+
},
68+
}
69+
: {}),
70+
});
6071
this.flushIntervalMs = options.flushIntervalMs ?? 200;
61-
this.maxQueuedBytes = options.maxQueuedBytes ?? 1024 * 1024 * 10; // 10MB default
72+
this.maxInflightBytes = options.maxInflightBytes ?? 1024 * 1024 * 10; // 10MB default
6273

6374
this.log(
64-
`[S2MetadataStream] Initializing: basin=${options.basin}, stream=${options.stream}, flushIntervalMs=${this.flushIntervalMs}, maxQueuedBytes=${this.maxQueuedBytes}`
75+
`[S2MetadataStream] Initializing: basin=${options.basin}, stream=${options.stream}, flushIntervalMs=${this.flushIntervalMs}, maxInflightBytes=${this.maxInflightBytes}`
6576
);
6677

6778
// Check if already aborted
@@ -124,7 +135,7 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
124135
const stream = basin.stream(this.options.stream);
125136

126137
const session = await stream.appendSession({
127-
maxQueuedBytes: this.maxQueuedBytes,
138+
maxInflightBytes: this.maxInflightBytes,
128139
});
129140

130141
this.sessionWritable = session.writable;
@@ -141,7 +152,7 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
141152
return;
142153
}
143154
// Convert each chunk to JSON string and wrap in AppendRecord
144-
controller.enqueue(AppendRecord.make(JSON.stringify({ data: chunk, id: nanoid(7) })));
155+
controller.enqueue(AppendRecord.string({ body: JSON.stringify({ data: chunk, id: nanoid(7) }) }));
145156
},
146157
})
147158
)
@@ -158,9 +169,9 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
158169
const lastAcked = session.lastAckedPosition();
159170

160171
if (lastAcked?.end) {
161-
const recordsWritten = lastAcked.end.seq_num;
172+
const recordsWritten = lastAcked.end.seqNum;
162173
this.log(
163-
`[S2MetadataStream] Written ${recordsWritten} records, ending at seq_num=${lastAcked.end.seq_num}`
174+
`[S2MetadataStream] Written ${recordsWritten} records, ending at seqNum=${lastAcked.end.seqNum}`
164175
);
165176
}
166177
} catch (error) {

0 commit comments

Comments
 (0)