diff --git a/examples/README.md b/examples/README.md index 33ec938d..3c7fc0dc 100644 --- a/examples/README.md +++ b/examples/README.md @@ -9,6 +9,7 @@ Standalone, runnable examples demonstrating the mppx HTTP 402 payment flow. | [charge](./charge/) | Payment-gated image generation API | | [session/multi-fetch](./session/multi-fetch/) | Multiple paid requests over a single payment channel | | [session/sse](./session/sse/) | Pay-per-token LLM streaming with SSE | +| [session/ws](./session/ws/) | Pay-per-token LLM streaming with WebSocket | | [stripe](./stripe/) | Stripe SPT charge with automatic client | ## Running Examples diff --git a/examples/session/ws/README.md b/examples/session/ws/README.md new file mode 100644 index 00000000..cdb5920d --- /dev/null +++ b/examples/session/ws/README.md @@ -0,0 +1,42 @@ +# Stream: WebSocket + +Pay-per-token LLM-style streaming over WebSocket using the experimental Tempo session websocket helper. + +The client performs an HTTP `402` probe, opens a payment channel, upgrades to WebSocket, and then streams tokens while automatically responding to `payment-need-voucher` control frames. + +## Setup + +```bash +npx gitpick wevm/mppx/examples/session/ws +pnpm i +``` + +For local demos from this repository, use the workspace version instead: + +```bash +pnpm install +pnpm dev:example +``` + +Then choose `session/ws`. + +## Usage + +Start the server: + +```bash +pnpm dev +``` + +In a separate terminal, run the client: + +```bash +pnpm client +pnpm client "What is the meaning of life?" +``` + +## Notes + +- The WebSocket flow currently uses HTTP for the initial `402` challenge probe. +- During the stream, vouchers are sent in-band over the socket. +- After the stream ends, the demo calls `close()` to settle the channel and print the final receipt. diff --git a/examples/session/ws/package.json b/examples/session/ws/package.json new file mode 100644 index 00000000..112566a1 --- /dev/null +++ b/examples/session/ws/package.json @@ -0,0 +1,22 @@ +{ + "name": "session-ws", + "private": true, + "type": "module", + "scripts": { + "check:types": "tsgo -b", + "dev": "vite", + "client": "tsx src/client.ts" + }, + "dependencies": { + "@remix-run/node-fetch-server": "^0.13.0", + "@types/node": "^25.5.0", + "@types/ws": "^8.18.1", + "@typescript/native-preview": "7.0.0-dev.20260323.1", + "mppx": "workspace:*", + "tsx": "^4.21.0", + "typescript": "~5.9.3", + "viem": "^2.47.6", + "vite": "latest", + "ws": "^8.20.0" + } +} diff --git a/examples/session/ws/src/client.ts b/examples/session/ws/src/client.ts new file mode 100644 index 00000000..21b8272b --- /dev/null +++ b/examples/session/ws/src/client.ts @@ -0,0 +1,173 @@ +// WebSocket Streaming Payment Client — Example + +// +// This example demonstrates the client side of a metered WebSocket session. +// The websocket transport is still bootstrapped by an HTTP 402 challenge: +// +// 1. Client probes `/ws/chat` over HTTP and receives a `402` challenge +// 2. Client opens an on-chain channel and creates the first credential +// 3. Client opens a WebSocket and sends that credential as the first frame +// 4. Server streams tokens and emits `payment-need-voucher` frames when the +// current cumulative voucher is exhausted +// 5. Client signs and sends voucher updates over the same socket +// 6. Client sends a final `close()` credential to settle on-chain + +// `tempo` from 'mppx/client' provides the session manager used for this demo. +import { tempo } from 'mppx/client' +import { createClient, type Hex, http } from 'viem' +import { generatePrivateKey, privateKeyToAccount } from 'viem/accounts' +import { tempoModerato } from 'viem/chains' +import { Actions } from 'viem/tempo' +import { WebSocket } from 'ws' + +// The server URL. The websocket URL is derived from this base. +const BASE_URL = process.env.BASE_URL ?? 'http://localhost:5173' + +// pathUSD on Tempo testnet. +const currency = '0x20c0000000000000000000000000000000000000' as const + +// The per-token price configured on the server. +const PRICE_PER_TOKEN = '0.000075' + +// Client Account Setup + +// +// Generate a demo payer account unless the caller provides a persistent key via +// `PRIVATE_KEY`. Reusing a real key is convenient when presenting multiple demo +// runs and wanting a stable wallet address. +const account = privateKeyToAccount((process.env.PRIVATE_KEY as Hex) ?? generatePrivateKey()) + +// The client needs a viem client with the payer account attached because Tempo +// session credentials include signed vouchers and, on first use, a signed open +// transaction for the payment channel. +const client = createClient({ + account, + chain: tempoModerato, + pollingInterval: 1_000, + transport: http(), +}) + +// Fund the payer account via the public testnet faucet so it has pathUSD for +// the channel deposit and enough gas to get through the open/close lifecycle. +console.log(`Client account: ${account.address}`) +console.log('Funding account via faucet...') +await Actions.faucet.fundSync(client, { account, timeout: 30_000 }) + +// Helper to query the payer's current pathUSD balance. +const getBalance = () => Actions.token.getBalance(client, { account, token: currency }) + +// Format raw 6-decimal token values for terminal output. +const fmt = (value: bigint) => `${Number(value) / 1e6} pathUSD` + +const balanceBefore = await getBalance() +console.log(`Balance: ${fmt(balanceBefore)}`) + +// Step 1: Create a Session Manager + +// +// `tempo.session()` returns a stateful session manager. For WebSocket flows it +// still handles the hard parts: HTTP challenge probing, channel open, voucher +// creation, cumulative accounting, and final close. +// +// We pass the `ws` package's constructor explicitly because Node 18 does not +// provide a reliable global `WebSocket` in the same way browsers do. +const session = tempo.session({ + account, + client, + maxDeposit: '1', + webSocket: WebSocket as any, +}) + +// Step 2: Build the WebSocket URL + +// +// The example derives the socket URL from `BASE_URL` so the same code works +// against localhost or a remote deployment. The prompt is sent as a query +// parameter because the websocket content request itself has no HTTP body. +const prompt = process.argv[2] ?? 'Tell me something interesting' +const url = new URL(BASE_URL) +url.protocol = url.protocol === 'https:' ? 'wss:' : 'ws:' +url.pathname = '/ws/chat' +url.searchParams.set('prompt', prompt) + +console.log(`\n--- Channel ---`) +console.log(`Max deposit: 1 pathUSD`) +console.log(`Price per token: ${PRICE_PER_TOKEN} pathUSD`) +console.log(`Endpoint: ${url}`) + +// Step 3: Open the Paid WebSocket Session + +// +// `session.ws()` performs the initial HTTP 402 probe, creates the first +// payment credential, opens the websocket, and sends the auth frame. +// +// The optional `onReceipt` callback gives us visibility into the voucher and +// spend progression while the stream is active. +let receiptCount = 0 +const socket = await session.ws(url, { + onReceipt(receipt) { + receiptCount++ + console.log( + `\n[receipt ${receiptCount}] spent=${fmt(BigInt(receipt.spent))} accepted=${fmt(BigInt(receipt.acceptedCumulative))}`, + ) + }, +}) + +// Step 4: Read Streamed Tokens + +// +// Application data arrives as ordinary websocket text messages. Payment control +// frames (`payment-need-voucher`, `payment-receipt`) are intercepted internally +// by `session.ws()`, so the demo loop here only needs to print the content. +console.log(`\n--- Streaming (prompt: "${prompt}") ---`) + +let tokenCount = 0 +await new Promise((resolve, reject) => { + socket.addEventListener('message', (event) => { + if (typeof event.data !== 'string') return + tokenCount++ + process.stdout.write(event.data) + }) + socket.addEventListener('close', () => resolve(), { once: true }) + socket.addEventListener('error', () => reject(new Error('websocket stream failed')), { + once: true, + }) +}) + +// Step 5: Close and Settle + +// +// Once the content stream ends, we send a final `close` credential so the +// server can settle the channel and return a final receipt with the accepted +// cumulative amount and, when available, the close transaction hash. +console.log(`\n\nTokens: ${tokenCount}`) +console.log(`Voucher cumulative: ${fmt(session.cumulative)}`) + +console.log(`\n--- Settlement ---`) +const closeReceipt = await session.close() +if (closeReceipt) { + console.log(` Channel: ${closeReceipt.channelId}`) + console.log(` Settled: ${fmt(BigInt(closeReceipt.acceptedCumulative))}`) + console.log(` Tokens: ${closeReceipt.units}`) + if (closeReceipt.txHash) console.log(` Settle tx: ${closeReceipt.txHash}`) +} + +// Give the settlement transaction a few seconds to finalize before checking +// the post-session balance so the summary is easier to interpret live. +await new Promise((resolve) => setTimeout(resolve, 5_000)) + +const balanceAfter = await getBalance() +const totalSpent = balanceBefore - balanceAfter + +// Step 6: Summary + +// +// `session.cumulative` is the total voucher amount the client authorized. +// The balance delta is usually larger because it also includes gas for the +// open/close transactions. +console.log(`\n--- Summary ---`) +console.log(` Tokens streamed: ${tokenCount}`) +console.log(` Voucher total: ${fmt(session.cumulative)}`) +console.log(` Balance before: ${fmt(balanceBefore)}`) +console.log(` Balance after: ${fmt(balanceAfter)}`) +console.log(` Total spent: ${fmt(totalSpent)} (voucher + gas)`) diff --git a/examples/session/ws/src/server.ts b/examples/session/ws/src/server.ts new file mode 100644 index 00000000..5334790c --- /dev/null +++ b/examples/session/ws/src/server.ts @@ -0,0 +1,240 @@ +// WebSocket Streaming Payment Server — Example + +// +// This example demonstrates the server side of a metered WebSocket session +// using mppx's Tempo `session` method plus the experimental websocket helper. +// +// The flow is intentionally pragmatic and transport-specific: +// +// 1. Client sends a normal HTTP GET to `/ws/chat` +// → server responds `402 Payment Required` with `WWW-Authenticate` +// +// 2. Client opens a WebSocket to `/ws/chat` +// → first socket frame contains the same signed `Payment` credential +// +// 3. Server verifies that credential by routing it back through the normal +// Tempo `session()` verification path +// +// 4. Server streams application data over the socket, charging per token +// and sending `payment-need-voucher` control frames whenever it needs +// more cumulative voucher coverage +// +// 5. Client responds with signed voucher updates over the same socket and +// eventually sends a final `close` credential to settle the channel +// +import type * as node_http from 'node:http' +import type * as node_net from 'node:net' + +// `Mppx` is the server-side payment handler. `tempo` provides the Tempo +// payment method plus the websocket helper used for this demo. +import { Mppx, Store, tempo } from 'mppx/server' +import { createClient, http } from 'viem' +import { generatePrivateKey, privateKeyToAccount } from 'viem/accounts' +import { tempoModerato } from 'viem/chains' +import { Actions } from 'viem/tempo' +import { WebSocketServer } from 'ws' + +// Server Account Setup + +// +// Generate a fresh account for the demo server. This account is the payee — +// the recipient that receives settled channel funds after the stream ends. +const account = privateKeyToAccount(generatePrivateKey()) + +// pathUSD — TIP-20 token on Tempo testnet (Moderato). +// All payment amounts in this example are denominated in pathUSD with 6 decimals. +const currency = '0x20c0000000000000000000000000000000000000' as const + +// Price charged per streamed token. Every application token requires one +// successful `stream.charge()` call before it is yielded to the client. +const pricePerToken = '0.000075' + +// `Mppx.create()` requires a secret key so challenge IDs can be verified +// statelessly. The example ships with a default demo key so `pnpm dev` works +// out of the box, but still allows override via `MPP_SECRET_KEY`. +const secretKey = process.env.MPP_SECRET_KEY ?? 'mppx-demo-websocket-secret' + +// Viem Client + +// +// The server-side viem client carries the payee account because the Tempo +// session method needs signing capabilities for settlement and close flows. +const client = createClient({ + account, + chain: tempoModerato, + pollingInterval: 1_000, + transport: http(), +}) + +// Shared Channel Store + +// +// The websocket stream and the credential verification path must see the same +// channel state. `Store.memory()` is enough for a local demo because both the +// route handler and websocket helper run in the same process. +const store = Store.memory() + +// Mppx Server Instance + +// +// `tempo.session()` still owns the actual payment semantics here: +// challenge issuance, channel open verification, voucher verification, and +// cooperative close. The websocket helper is only responsible for transport. +const mppx = Mppx.create({ + methods: [ + tempo.session({ + account, + currency, + getClient: () => client, + store, + testnet: true, + }), + ], + secretKey, +}) + +// This route is the canonical payment entrypoint. It is used twice: +// +// - directly for the initial HTTP 402 probe +// - indirectly by the websocket helper, which constructs synthetic POST +// requests carrying the websocket-supplied credentials +const route = mppx.session({ + amount: pricePerToken, + unitType: 'token', +}) + +// WebSocket Upgrade Handler + +// +// `tempo.Ws.serve()` does not perform the HTTP upgrade itself. We use `ws` +// only for the low-level upgrade mechanics, then hand the accepted socket to +// mppx's Tempo websocket helper for payment-aware streaming. +const wsServer = new WebSocketServer({ noServer: true }) +wsServer.on('connection', (socket, req) => { + const url = new URL(req.url ?? '/ws/chat', `ws://${req.headers.host ?? 'localhost:5173'}`) + const prompt = url.searchParams.get('prompt') ?? 'Tell me something interesting' + + // `tempo.Ws.serve()` bridges websocket control frames to the existing + // Tempo session lifecycle. When it receives an in-band authorization frame, + // it verifies it through `route`. Once paid, it runs this generator and + // emits application chunks interleaved with payment control frames. + void tempo.Ws.serve({ + socket, + store, + url, + route, + generate: async function* (stream) { + for await (const token of generateTokens(prompt)) { + await stream.charge() + yield token + } + }, + }) +}) + +// Fund the server account so it can submit settlement transactions during the +// demo. In production this would be a long-lived account with managed funds. +console.log(`Server recipient: ${account.address}`) +await Actions.faucet.fundSync(client, { account, timeout: 30_000 }) +console.log('Server account funded') + +// HTTP Request Handler + +// +// This handler only needs to cover the HTTP 402 probe endpoint. For actual +// content streaming, the client must use the websocket upgrade path. +export async function handler(request: Request): Promise { + const url = new URL(request.url) + + if (url.pathname === '/api/health') return Response.json({ status: 'ok' }) + + if (url.pathname === '/ws/chat') { + // The initial client request arrives here without credentials and receives + // a 402 challenge. Later, management-style HTTP requests could also be + // verified here if you choose to mix transports. + const result = await route(request) + if (result.status === 402) return result.challenge + + // Once a route has already been paid over HTTP, return a simple message so + // it is obvious that WebSocket is the intended content path for this demo. + return result.withReceipt( + new Response( + 'Use the websocket endpoint for streaming. HTTP is only used for the 402 probe.', + ), + ) + } + + return null +} + +// Vite/Node Upgrade Bridge + +// +// Vite gives us the raw Node HTTP server. We intercept upgrade requests for +// `/ws/chat`, let `ws` turn them into WebSocket connections, and then hand the +// socket to the `connection` handler above. +export function handleUpgrade( + req: node_http.IncomingMessage, + socket: node_net.Socket, + head: Buffer, +) { + if (req.url?.startsWith('/ws/chat') !== true) return + + wsServer.handleUpgrade(req, socket as any, head, (websocket) => { + wsServer.emit('connection', websocket, req) + }) +} + +// Mock Token Generator + +// +// Simulates an LLM-style token stream. Each yielded string is treated as one +// billable unit because the websocket generator calls `stream.charge()` once +// before yielding each token. +async function* generateTokens(prompt: string): AsyncGenerator { + const words = [ + 'The', + ' question', + ' you', + ' asked', + ' -- "', + prompt, + '" --', + ' is', + ' a', + ' good', + ' one.', + '\n\n', + 'WebSockets', + ' let', + ' us', + ' keep', + ' a', + ' single', + ' paid', + ' connection', + ' open', + ' while', + ' vouchers', + ' move', + ' in-band.', + '\n\n', + 'That', + ' makes', + ' the', + ' demo', + ' feel', + ' much', + ' more', + ' live', + ' than', + ' the', + ' HTTP-only', + ' loop.', + ] + + for (const word of words) { + yield word + await new Promise((resolve) => setTimeout(resolve, 20 + Math.random() * 60)) + } +} diff --git a/examples/session/ws/tsconfig.json b/examples/session/ws/tsconfig.json new file mode 100644 index 00000000..808b9ab4 --- /dev/null +++ b/examples/session/ws/tsconfig.json @@ -0,0 +1,13 @@ +{ + "compilerOptions": { + "target": "ESNext", + "module": "ESNext", + "moduleResolution": "bundler", + "strict": true, + "skipLibCheck": true, + "lib": ["ESNext", "DOM"], + "types": ["node"], + "noEmit": true + }, + "include": ["src/**/*"] +} diff --git a/examples/session/ws/vite.config.ts b/examples/session/ws/vite.config.ts new file mode 100644 index 00000000..69202f03 --- /dev/null +++ b/examples/session/ws/vite.config.ts @@ -0,0 +1,34 @@ +import { createRequest, sendResponse } from '@remix-run/node-fetch-server' +import { defineConfig } from 'vite' + +import { handleUpgrade, handler } from './src/server.ts' + +export default defineConfig({ + plugins: [ + { + name: 'api', + configureServer(server) { + server.httpServer?.on('upgrade', handleUpgrade) + + // oxlint-disable-next-line no-async-endpoint-handlers + server.middlewares.use(async (req, res, next) => { + const request = createRequest(req, res) + const response = await handler(request) + if (response) await sendResponse(res, response) + else next() + }) + + server.httpServer?.once('listening', () => { + const addr = server.httpServer!.address() + const host = + typeof addr === 'object' && addr ? `localhost:${addr.port}` : 'localhost:5173' + setTimeout(() => { + console.log(`\n WebSocket demo:`) + console.log(` pnpm --dir examples/session/ws client`) + console.log(` ws://${host}/ws/chat\n`) + }, 100) + }) + }, + }, + ], +}) diff --git a/package.json b/package.json index 3cc47763..abd8d1fa 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "@playwright/test": "^1.58.2", "@types/express": "^5.0.6", "@types/node": "^25.5.0", + "@types/ws": "^8.18.1", "@typescript/native-preview": "7.0.0-dev.20260323.1", "browserslist": "^4.28.1", "elysia": "^1.4.27", @@ -51,6 +52,7 @@ "typescript": "~5.9.3", "viem": "^2.47.6", "vp": "npm:vite-plus@~0.1.14", + "ws": "^8.20.0", "zile": "^0.0.19" }, "packageManager": "pnpm@10.28.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c283d3a4..ef2d7cbf 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -67,6 +67,9 @@ importers: '@types/node': specifier: ^25.5.0 version: 25.5.0 + '@types/ws': + specifier: ^8.18.1 + version: 8.18.1 '@typescript/native-preview': specifier: 7.0.0-dev.20260323.1 version: 7.0.0-dev.20260323.1 @@ -118,6 +121,9 @@ importers: vp: specifier: npm:vite-plus@~0.1.14 version: vite-plus@0.1.14(@types/node@25.5.0)(bufferutil@4.1.0)(esbuild@0.27.2)(tsx@4.21.0)(typescript@5.9.3)(utf-8-validate@5.0.10)(vite@8.0.3(@types/node@25.5.0)(esbuild@0.27.2)(tsx@4.21.0)(yaml@2.8.3))(yaml@2.8.3) + ws: + specifier: ^8.20.0 + version: 8.20.0(bufferutil@4.1.0)(utf-8-validate@5.0.10) zile: specifier: ^0.0.19 version: 0.0.19(@typescript/native-preview@7.0.0-dev.20260323.1)(typescript@5.9.3) @@ -252,6 +258,39 @@ importers: specifier: latest version: 8.0.3(@types/node@25.5.0)(esbuild@0.27.2)(tsx@4.21.0)(yaml@2.8.3) + examples/session/ws: + dependencies: + '@remix-run/node-fetch-server': + specifier: ^0.13.0 + version: 0.13.0 + '@types/node': + specifier: ^25.5.0 + version: 25.5.2 + '@types/ws': + specifier: ^8.18.1 + version: 8.18.1 + '@typescript/native-preview': + specifier: 7.0.0-dev.20260323.1 + version: 7.0.0-dev.20260323.1 + mppx: + specifier: workspace:* + version: link:../../.. + tsx: + specifier: ^4.21.0 + version: 4.21.0 + typescript: + specifier: ~5.9.3 + version: 5.9.3 + viem: + specifier: ^2.47.5 + version: 2.47.6(bufferutil@4.1.0)(typescript@5.9.3)(utf-8-validate@5.0.10)(zod@4.3.6) + vite: + specifier: latest + version: 8.0.3(@types/node@25.5.2)(esbuild@0.27.2)(tsx@4.21.0)(yaml@2.8.3) + ws: + specifier: ^8.20.0 + version: 8.20.0(bufferutil@4.1.0)(utf-8-validate@5.0.10) + examples/stripe: dependencies: '@remix-run/node-fetch-server': @@ -1408,6 +1447,9 @@ packages: '@types/node@25.5.0': resolution: {integrity: sha512-jp2P3tQMSxWugkCUKLRPVUpGaL5MVFwF8RDuSRztfwgN1wmqJeMSbKlnEtQqU8UrhTmzEmZdu2I6v2dpp7XIxw==} + '@types/node@25.5.2': + resolution: {integrity: sha512-tO4ZIRKNC+MDWV4qKVZe3Ql/woTnmHDr5JD8UI5hn2pwBrHEwOEMZK7WlNb5RKB6EoJ02gwmQS9OrjuFnZYdpg==} + '@types/qs@6.14.0': resolution: {integrity: sha512-eOunJqu0K1923aExK6y8p6fsihYEn/BYuQ4g0CxAAgFc4b/ZLN4CrsRZ55srTdqoiLzU2B2evC+apEIxprEzkQ==} @@ -1437,6 +1479,9 @@ packages: '@types/ssh2@1.15.5': resolution: {integrity: sha512-N1ASjp/nXH3ovBHddRJpli4ozpk6UdDYIX4RJWFa9L1YKnzdhTlVmiGHm4DZnj/jLbqZpes4aeR30EFGQtvhQQ==} + '@types/ws@8.18.1': + resolution: {integrity: sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==} + '@typescript/native-preview-darwin-arm64@7.0.0-dev.20260323.1': resolution: {integrity: sha512-C3tQdgMaYn57xzRUWek+zNKMiP0z9j7fqbCjr0wlyiLNIh5jMwDArjBDKHlqSu58FKvZg7baqbqB5Mcepb3s6w==} cpu: [arm64] @@ -3659,8 +3704,8 @@ packages: utf-8-validate: optional: true - ws@8.19.0: - resolution: {integrity: sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==} + ws@8.20.0: + resolution: {integrity: sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==} engines: {node: '>=10.0.0'} peerDependencies: bufferutil: ^4.0.1 @@ -4744,7 +4789,7 @@ snapshots: '@types/body-parser@1.19.6': dependencies: '@types/connect': 3.4.38 - '@types/node': 25.5.0 + '@types/node': 25.5.2 '@types/chai@5.2.3': dependencies: @@ -4753,7 +4798,7 @@ snapshots: '@types/connect@3.4.38': dependencies: - '@types/node': 25.5.0 + '@types/node': 25.5.2 '@types/debug@4.1.12': dependencies: @@ -4763,13 +4808,13 @@ snapshots: '@types/docker-modem@3.0.6': dependencies: - '@types/node': 25.5.0 + '@types/node': 25.5.2 '@types/ssh2': 1.15.5 '@types/dockerode@3.3.47': dependencies: '@types/docker-modem': 3.0.6 - '@types/node': 25.5.0 + '@types/node': 25.5.2 '@types/ssh2': 1.15.5 '@types/esrecurse@4.3.1': {} @@ -4778,7 +4823,7 @@ snapshots: '@types/express-serve-static-core@5.1.1': dependencies: - '@types/node': 25.5.0 + '@types/node': 25.5.2 '@types/qs': 6.14.0 '@types/range-parser': 1.2.7 '@types/send': 1.2.1 @@ -4805,6 +4850,10 @@ snapshots: dependencies: undici-types: 7.18.2 + '@types/node@25.5.2': + dependencies: + undici-types: 7.18.2 + '@types/qs@6.14.0': {} '@types/range-parser@1.2.7': {} @@ -4819,26 +4868,30 @@ snapshots: '@types/send@1.2.1': dependencies: - '@types/node': 25.5.0 + '@types/node': 25.5.2 '@types/serve-static@2.2.0': dependencies: '@types/http-errors': 2.0.5 - '@types/node': 25.5.0 + '@types/node': 25.5.2 '@types/ssh2-streams@0.1.13': dependencies: - '@types/node': 25.5.0 + '@types/node': 25.5.2 '@types/ssh2@0.5.52': dependencies: - '@types/node': 25.5.0 + '@types/node': 25.5.2 '@types/ssh2-streams': 0.1.13 '@types/ssh2@1.15.5': dependencies: '@types/node': 18.19.130 + '@types/ws@8.18.1': + dependencies: + '@types/node': 25.5.2 + '@typescript/native-preview-darwin-arm64@7.0.0-dev.20260323.1': optional: true @@ -4922,7 +4975,7 @@ snapshots: tinyexec: 1.0.4 tinyglobby: 0.2.15 vite: 8.0.3(@types/node@25.5.0)(esbuild@0.27.2)(tsx@4.21.0)(yaml@2.8.3) - ws: 8.19.0(bufferutil@4.1.0)(utf-8-validate@5.0.10) + ws: 8.20.0(bufferutil@4.1.0)(utf-8-validate@5.0.10) optionalDependencies: '@types/node': 25.5.0 transitivePeerDependencies: @@ -6442,7 +6495,7 @@ snapshots: '@protobufjs/path': 1.1.2 '@protobufjs/pool': 1.1.0 '@protobufjs/utf8': 1.1.0 - '@types/node': 25.5.0 + '@types/node': 25.5.2 long: 5.3.2 proxy-addr@2.0.7: @@ -6756,7 +6809,7 @@ snapshots: stripe@17.7.0: dependencies: - '@types/node': 25.5.0 + '@types/node': 25.5.2 qs: 6.14.2 strtok3@10.3.4: @@ -7035,6 +7088,20 @@ snapshots: tsx: 4.21.0 yaml: 2.8.3 + vite@8.0.3(@types/node@25.5.2)(esbuild@0.27.2)(tsx@4.21.0)(yaml@2.8.3): + dependencies: + lightningcss: 1.32.0 + picomatch: 4.0.4 + postcss: 8.5.8 + rolldown: 1.0.0-rc.12 + tinyglobby: 0.2.15 + optionalDependencies: + '@types/node': 25.5.2 + esbuild: 0.27.2 + fsevents: 2.3.3 + tsx: 4.21.0 + yaml: 2.8.3 + wagmi@3.5.0(@metamask/sdk@0.33.1(bufferutil@4.1.0)(utf-8-validate@5.0.10))(@tanstack/query-core@5.90.20)(@tanstack/react-query@5.90.21(react@19.2.4))(@types/react@19.2.14)(ox@0.14.7(typescript@5.9.3)(zod@4.3.6))(react@19.2.4)(typescript@5.9.3)(viem@2.47.6(bufferutil@4.1.0)(typescript@5.9.3)(utf-8-validate@5.0.10)(zod@4.3.6)): dependencies: '@tanstack/react-query': 5.90.21(react@19.2.4) @@ -7109,7 +7176,7 @@ snapshots: bufferutil: 4.1.0 utf-8-validate: 5.0.10 - ws@8.19.0(bufferutil@4.1.0)(utf-8-validate@5.0.10): + ws@8.20.0(bufferutil@4.1.0)(utf-8-validate@5.0.10): optionalDependencies: bufferutil: 4.1.0 utf-8-validate: 5.0.10 diff --git a/src/tempo/client/SessionManager.ts b/src/tempo/client/SessionManager.ts index 0445b434..75435c3f 100644 --- a/src/tempo/client/SessionManager.ts +++ b/src/tempo/client/SessionManager.ts @@ -1,16 +1,44 @@ import type { Hex } from 'ox' import type { Address } from 'viem' -import type * as Challenge from '../../Challenge.js' +import * as Challenge from '../../Challenge.js' import * as Fetch from '../../client/internal/Fetch.js' +import * as PaymentCredential from '../../Credential.js' import type * as Account from '../../viem/Account.js' import type * as Client from '../../viem/Client.js' import { deserializeSessionReceipt } from '../session/Receipt.js' import { parseEvent } from '../session/Sse.js' -import type { SessionReceipt } from '../session/Types.js' +import type { SessionCredentialPayload, SessionReceipt } from '../session/Types.js' +import * as Ws from '../session/Ws.js' import type { ChannelEntry } from './ChannelOps.js' import { session as sessionPlugin } from './Session.js' +type WebSocketConstructor = { + new (url: string | URL, protocols?: string | string[]): WebSocket +} + +type ReceiptWaiter = { + predicate: (receipt: SessionReceipt) => boolean + reject(error: Error): void + resolve(receipt: SessionReceipt): void +} + +type CloseReadyWaiter = { + reject(error: Error): void + resolve(receipt: SessionReceipt): void +} + +const WebSocketReadyState = { + CONNECTING: 0, + OPEN: 1, + CLOSING: 2, + CLOSED: 3, +} as const + +// Browser-style WebSocket clients may only initiate close with 1000 or 3000-4999. +// Keep protocol/policy close codes on the server side and use an app-defined code here. +const ClientWebSocketProtocolErrorCloseCode = 3008 + export type SessionManager = { readonly channelId: Hex.Hex | undefined readonly cumulative: bigint @@ -25,6 +53,14 @@ export type SessionManager = { signal?: AbortSignal | undefined }, ): Promise> + ws( + input: string | URL, + init?: { + onReceipt?: ((receipt: SessionReceipt) => void) | undefined + protocols?: string | string[] | undefined + signal?: AbortSignal | undefined + }, + ): Promise close(): Promise } @@ -56,11 +92,20 @@ export type PaymentResponse = Response & { */ export function sessionManager(parameters: sessionManager.Parameters): SessionManager { const fetchFn = parameters.fetch ?? globalThis.fetch + const WebSocketImpl = + parameters.webSocket ?? + (globalThis as typeof globalThis & { WebSocket?: WebSocketConstructor }).WebSocket let channel: ChannelEntry | null = null let lastChallenge: Challenge.Challenge | null = null let lastUrl: RequestInfo | URL | null = null let spent = 0n + let activeSocketChallenge: Challenge.Challenge | null = null + let activeSocketChannelId: Hex.Hex | null = null + let activeSocket: WebSocket | null = null + let closeReadyReceipt: SessionReceipt | null = null + let closeReadyWaiter: CloseReadyWaiter | null = null + let receiptWaiter: ReceiptWaiter | null = null const method = sessionPlugin({ account: parameters.account, @@ -90,6 +135,64 @@ export function sessionManager(parameters: sessionManager.Parameters): SessionMa spent = spent > next ? spent : next } + function waitForReceipt(predicate: (receipt: SessionReceipt) => boolean = () => true) { + if (receiptWaiter) throw new Error('receipt wait already in progress') + return new Promise((resolve, reject) => { + receiptWaiter = { predicate, resolve, reject } + }) + } + + function waitForCloseReady() { + if (closeReadyReceipt) return Promise.resolve(closeReadyReceipt) + if (closeReadyWaiter) throw new Error('close-ready wait already in progress') + return new Promise((resolve, reject) => { + closeReadyWaiter = { resolve, reject } + }) + } + + function settleReceipt(receipt: SessionReceipt) { + if (!receiptWaiter) return + if (!receiptWaiter.predicate(receipt)) return + const waiter = receiptWaiter + receiptWaiter = null + waiter.resolve(receipt) + } + + function settleCloseReady(receipt: SessionReceipt) { + closeReadyReceipt = receipt + if (!closeReadyWaiter) return + const waiter = closeReadyWaiter + closeReadyWaiter = null + waiter.resolve(receipt) + } + + function rejectReceipt(error: Error) { + if (!receiptWaiter) return + const waiter = receiptWaiter + receiptWaiter = null + waiter.reject(error) + } + + function rejectCloseReady(error: Error) { + if (!closeReadyWaiter) return + const waiter = closeReadyWaiter + closeReadyWaiter = null + waiter.reject(error) + } + + function getFallbackCloseAmount(challenge: Challenge.Challenge, channelId: Hex.Hex): string { + if ( + closeReadyReceipt && + closeReadyReceipt.challengeId === challenge.id && + closeReadyReceipt.channelId === channelId + ) { + return closeReadyReceipt.spent + } + + const cumulative = channel?.channelId === channelId ? channel.cumulativeAmount : 0n + return (cumulative > spent ? cumulative : spent).toString() + } + function toPaymentResponse(response: Response): PaymentResponse { const receiptHeader = response.headers.get('Payment-Receipt') const receipt = receiptHeader ? deserializeSessionReceipt(receiptHeader) : null @@ -108,6 +211,106 @@ export function sessionManager(parameters: sessionManager.Parameters): SessionMa return toPaymentResponse(response) } + function createManagedSocket(socket: WebSocket) { + type EventType = 'close' | 'error' | 'message' | 'open' + type Listener = { + once: boolean + value: ((event: any) => void) | { handleEvent(event: any): void } + } + const listeners = new Map>() + let emittedClose = false + let readyState = socket.readyState + + const add = ( + type: EventType, + listener: ((event: any) => void) | { handleEvent(event: any): void }, + options?: boolean | AddEventListenerOptions, + ) => { + let set = listeners.get(type) + if (!set) { + set = new Set() + listeners.set(type, set) + } + set.add({ + once: typeof options === 'object' ? options.once === true : false, + value: listener, + }) + } + + const remove = ( + type: EventType, + listener: ((event: any) => void) | { handleEvent(event: any): void }, + ) => { + const set = listeners.get(type) + if (!set) return + for (const entry of set) { + if (entry.value === listener) set.delete(entry) + } + } + + const emit = (type: EventType, event: any) => { + if (type === 'close') { + if (emittedClose) return + emittedClose = true + readyState = WebSocketReadyState.CLOSED + } + if (type === 'open') readyState = WebSocketReadyState.OPEN + + const property = `on${type}` as const + const handler = (managed as Record)[property] + if (typeof handler === 'function') handler(event) + + const set = listeners.get(type) + if (!set) return + for (const entry of Array.from(set)) { + if (typeof entry.value === 'function') entry.value(event) + else entry.value.handleEvent(event) + if (entry.once) set.delete(entry) + } + } + + const managed = { + addEventListener: add, + close(code?: number, reason?: string) { + socket.close(code, reason) + }, + get bufferedAmount() { + return socket.bufferedAmount + }, + get extensions() { + return socket.extensions + }, + on(type: EventType, listener: (...args: any[]) => void) { + add(type, listener) + }, + onclose: null as ((event: any) => void) | null, + onerror: null as ((event: any) => void) | null, + onmessage: null as ((event: any) => void) | null, + onopen: null as ((event: any) => void) | null, + off(type: EventType, listener: (...args: any[]) => void) { + remove(type, listener) + }, + get protocol() { + return socket.protocol + }, + get readyState() { + return readyState + }, + removeEventListener: remove, + send(data: string) { + socket.send(data) + }, + get url() { + return socket.url + }, + } + + return { + emit, + socket: managed as unknown as WebSocket, + } + } + const self: SessionManager = { get channelId() { return channel?.channelId @@ -240,15 +443,228 @@ export function sessionManager(parameters: sessionManager.Parameters): SessionMa return iterate() }, + async ws(input, init) { + if (!WebSocketImpl) { + throw new Error( + 'No WebSocket implementation available. Pass `webSocket` to sessionManager() in this runtime.', + ) + } + + const { onReceipt, protocols, signal } = init ?? {} + const wsUrl = new URL(input.toString()) + const httpUrl = new URL(wsUrl.toString()) + if (httpUrl.protocol === 'ws:') httpUrl.protocol = 'http:' + if (httpUrl.protocol === 'wss:') httpUrl.protocol = 'https:' + + lastUrl = httpUrl.toString() + const probe = await fetchFn(httpUrl, signal ? { signal } : undefined) + if (probe.status !== 402) { + throw new Error( + `Expected a 402 payment challenge from ${httpUrl}, received ${probe.status} instead.`, + ) + } + + const challenge = Challenge.fromResponseList(probe).find( + (item) => item.method === method.name && item.intent === method.intent, + ) + if (!challenge) { + throw new Error( + 'No payment challenge received from HTTP endpoint for this WebSocket URL. The server may not require payment or did not advertise a challenge.', + ) + } + lastChallenge = challenge + + const credential = await method.createCredential({ + challenge: challenge as never, + context: {}, + }) + + closeReadyReceipt = null + activeSocketChallenge = challenge + const openCredential = PaymentCredential.deserialize(credential) + activeSocketChannelId = openCredential.payload.channelId + const rawSocket = new WebSocketImpl(wsUrl, protocols) + activeSocket = rawSocket + const managedSocket = createManagedSocket(rawSocket) + + const failSocketFlow = (message: string) => { + rejectReceipt(new Error(message)) + rejectCloseReady(new Error(message)) + if ( + rawSocket.readyState === WebSocketReadyState.CONNECTING || + rawSocket.readyState === WebSocketReadyState.OPEN + ) { + rawSocket.close(ClientWebSocketProtocolErrorCloseCode, message) + } + } + + const isExpectedReceipt = (receipt: SessionReceipt) => + receipt.challengeId === challenge.id && receipt.channelId === activeSocketChannelId + + const socketOpened = new Promise((resolve, reject) => { + const onOpen = () => { + rawSocket.removeEventListener('error', onError) + managedSocket.emit('open', { type: 'open' }) + resolve() + } + const onError = () => { + rawSocket.removeEventListener('open', onOpen) + reject(new Error(`WebSocket connection to ${wsUrl} failed to open.`)) + } + rawSocket.addEventListener('open', onOpen, { once: true }) + rawSocket.addEventListener('error', onError, { once: true }) + }) + + rawSocket.addEventListener('close', (event) => { + if (activeSocket === rawSocket) activeSocket = null + if (activeSocketChallenge === challenge) activeSocketChallenge = null + if (activeSocketChannelId === openCredential.payload.channelId) activeSocketChannelId = null + rejectReceipt(new Error('WebSocket closed before the payment flow completed.')) + rejectCloseReady(new Error('WebSocket closed before the payment flow completed.')) + managedSocket.emit('close', { + code: (event as CloseEvent).code ?? 1000, + reason: (event as CloseEvent).reason ?? '', + type: 'close', + wasClean: true, + }) + }) + + rawSocket.addEventListener('error', () => { + managedSocket.emit('error', { type: 'error' }) + }) + + rawSocket.addEventListener('message', async (event) => { + const raw = typeof event.data === 'string' ? event.data : undefined + if (!raw) return + + const message = Ws.parseMessage(raw) + if (!message) { + managedSocket.emit('message', { data: raw, type: 'message' }) + return + } + + switch (message.mpp) { + case 'authorization': + break + case 'message': + managedSocket.emit('message', { data: message.data, type: 'message' }) + break + case 'payment-close-ready': + if (!isExpectedReceipt(message.data)) { + failSocketFlow('received mismatched payment-close-ready frame') + break + } + if (BigInt(message.data.spent) > (channel?.cumulativeAmount ?? 0n)) { + failSocketFlow('received payment-close-ready beyond local voucher state') + break + } + updateSpentFromReceipt(message.data) + onReceipt?.(message.data) + settleCloseReady(message.data) + managedSocket.emit('close', { code: 1000, reason: 'stream complete', type: 'close' }) + break + case 'payment-error': + rejectReceipt(new Error(message.message)) + rejectCloseReady(new Error(message.message)) + break + case 'payment-need-voucher': { + if (message.data.channelId !== activeSocketChannelId) { + failSocketFlow('received mismatched payment-need-voucher frame') + break + } + const required = BigInt(message.data.requiredCumulative) + const nextCumulative = + (channel?.cumulativeAmount ?? 0n) > required + ? (channel?.cumulativeAmount ?? 0n) + : required + if (channel?.channelId === activeSocketChannelId) + channel.cumulativeAmount = nextCumulative + + const voucher = await method.createCredential({ + challenge: challenge as never, + context: { + action: 'voucher', + channelId: activeSocketChannelId, + cumulativeAmountRaw: nextCumulative.toString(), + }, + }) + rawSocket.send(Ws.formatAuthorizationMessage(voucher)) + break + } + case 'payment-receipt': + if (!isExpectedReceipt(message.data)) { + failSocketFlow('received mismatched payment-receipt frame') + break + } + updateSpentFromReceipt(message.data) + onReceipt?.(message.data) + settleReceipt(message.data) + break + } + }) + + if (signal) { + signal.addEventListener( + 'abort', + () => { + rejectReceipt(new Error('WebSocket payment flow aborted.')) + rejectCloseReady(new Error('WebSocket payment flow aborted.')) + rawSocket.close() + }, + { once: true }, + ) + } + + await socketOpened + rawSocket.send(Ws.formatAuthorizationMessage(credential)) + await waitForReceipt() + return managedSocket.socket + }, + async close() { - if (!channel?.opened || !lastChallenge) return undefined + const closeChallenge = activeSocketChallenge ?? lastChallenge + const closeChannelId = activeSocketChannelId ?? channel?.channelId + if (!channel?.opened || !closeChallenge || !closeChannelId) return undefined + if (activeSocket?.readyState === WebSocketReadyState.OPEN) { + const ready = + closeReadyReceipt ?? + (await (async () => { + activeSocket.send(Ws.formatCloseRequestMessage()) + return waitForCloseReady() + })()) + const readySpent = BigInt(ready.spent) + if (readySpent > (channel.cumulativeAmount > spent ? channel.cumulativeAmount : spent)) { + throw new Error('close-ready spent exceeds local voucher state') + } + + const credential = await method.createCredential({ + challenge: closeChallenge as never, + context: { + action: 'close', + channelId: closeChannelId, + cumulativeAmountRaw: readySpent.toString(), + }, + }) + + const pendingReceipt = waitForReceipt( + (receipt) => + Boolean(receipt.txHash) && + receipt.challengeId === closeChallenge.id && + receipt.channelId === closeChannelId, + ) + activeSocket.send(Ws.formatAuthorizationMessage(credential)) + const receipt = await pendingReceipt + activeSocket.close() + closeReadyReceipt = null + return receipt + } const credential = await method.createCredential({ - challenge: lastChallenge as never, + challenge: closeChallenge as never, context: { action: 'close', - channelId: channel.channelId, - cumulativeAmountRaw: spent.toString(), + channelId: closeChannelId, + cumulativeAmountRaw: getFallbackCloseAmount(closeChallenge, closeChannelId), }, }) @@ -283,5 +699,7 @@ export declare namespace sessionManager { fetch?: typeof globalThis.fetch | undefined /** Maximum deposit in human-readable units (e.g. `'10'` for 10 tokens). Converted to raw units via `decimals`. */ maxDeposit?: string | undefined + /** Optional websocket constructor for runtimes without a global WebSocket. */ + webSocket?: WebSocketConstructor | undefined } } diff --git a/src/tempo/server/Methods.ts b/src/tempo/server/Methods.ts index 1b9096a3..deeb7e06 100644 --- a/src/tempo/server/Methods.ts +++ b/src/tempo/server/Methods.ts @@ -1,3 +1,4 @@ +import * as Ws_ from '../session/Ws.js' import { charge as charge_ } from './Charge.js' import { session as session_, settle as settle_ } from './Session.js' @@ -29,4 +30,6 @@ export namespace tempo { export const session = session_ /** One-shot settle: reads highest voucher from storage and submits on-chain. */ export const settle = settle_ + /** Experimental websocket helpers for Tempo sessions. */ + export const Ws = Ws_ } diff --git a/src/tempo/server/Session.test.ts b/src/tempo/server/Session.test.ts index 83642a80..f5704518 100644 --- a/src/tempo/server/Session.test.ts +++ b/src/tempo/server/Session.test.ts @@ -1,3 +1,5 @@ +import * as node_http from 'node:http' + import type { z } from 'mppx' import { Challenge, Credential } from 'mppx' import { Mppx as Mppx_server, tempo as tempo_server } from 'mppx/server' @@ -13,7 +15,9 @@ import { import { waitForTransactionReceipt } from 'viem/actions' import { Addresses } from 'viem/tempo' import { beforeAll, beforeEach, describe, expect, expectTypeOf, test } from 'vp/test' +import { WebSocketServer } from 'ws' import { nodeEnv } from '~test/config.js' +import * as Http from '~test/Http.js' const isLocalnet = nodeEnv === 'localnet' import { @@ -32,6 +36,7 @@ import { InsufficientBalanceError, InvalidSignatureError, } from '../../Errors.js' +import * as NodeRequest from '../../server/Request.js' import * as Store from '../../Store.js' import { sessionManager } from '../client/SessionManager.js' import { @@ -42,6 +47,7 @@ import type * as Methods from '../Methods.js' import * as ChannelStore from '../session/ChannelStore.js' import type { SessionReceipt } from '../session/Types.js' import { signVoucher } from '../session/Voucher.js' +import * as TempoWs from '../session/Ws.js' import { charge, session, settle } from './Session.js' const payer = accounts[2] @@ -3077,6 +3083,524 @@ describe.runIf(isLocalnet)('session', () => { } }) }) + + describe('WebSocket', () => { + test('open -> stream -> need-voucher -> resume -> close', async () => { + const backingStore = Store.memory() + const routeHandler = Mppx_server.create({ + methods: [ + tempo_server.session({ + store: backingStore, + getClient: () => client, + account: recipientAccount, + currency, + escrowContract, + chainId: chain.id, + }), + ], + realm: 'api.example.com', + secretKey: 'secret', + }).session({ amount: '1', decimals: 6, unitType: 'token' }) + + let voucherUpdates = 0 + const route = async (request: Request) => { + if (request.method === 'POST' && request.headers.has('Authorization')) { + try { + const credential = Credential.fromRequest(request) + if (credential.payload?.action === 'voucher') voucherUpdates++ + } catch {} + } + return routeHandler(request) + } + + const httpHandler = NodeRequest.toNodeListener(async (request) => { + const result = await route(request) + if (result.status === 402) return result.challenge + return result.withReceipt(new Response('ok')) + }) + + const nodeServer = node_http.createServer(httpHandler) + const wsServer = new WebSocketServer({ noServer: true }) + + await new Promise((resolve) => nodeServer.listen(0, resolve)) + const { port } = nodeServer.address() as { port: number } + const server = Http.wrapServer(nodeServer, { port, url: `http://localhost:${port}` }) + + wsServer.on('connection', (socket: import('ws').WebSocket) => { + void TempoWs.serve({ + socket, + store: backingStore, + url: `${server.url}/ws`, + route, + generate: async function* (stream: TempoWs.SessionController) { + await stream.charge() + yield 'chunk-1' + await stream.charge() + yield 'chunk-2' + await stream.charge() + yield 'chunk-3' + }, + }) + }) + + nodeServer.on('upgrade', (req, socket, head) => { + if (req.url !== '/ws') { + socket.destroy() + return + } + + wsServer.handleUpgrade(req, socket, head, (websocket: import('ws').WebSocket) => { + wsServer.emit('connection', websocket, req) + }) + }) + + try { + const manager = sessionManager({ + account: payer, + client, + escrowContract, + fetch: globalThis.fetch, + maxDeposit: '3', + }) + + const ws = await manager.ws(`ws://localhost:${port}/ws`) + const chunks: string[] = [] + + await new Promise((resolve, reject) => { + ws.addEventListener('message', (event) => { + if (typeof event.data !== 'string') return + chunks.push(event.data) + }) + ws.addEventListener('close', () => resolve(), { once: true }) + ws.addEventListener('error', () => reject(new Error('websocket stream failed')), { + once: true, + }) + }) + + expect(chunks).toEqual(['chunk-1', 'chunk-2', 'chunk-3']) + expect(voucherUpdates).toBeGreaterThan(0) + + const closeReceipt = await manager.close() + expect(closeReceipt?.status).toBe('success') + expect(closeReceipt?.spent).toBe('3000000') + + const channelId = manager.channelId + expect(channelId).toBeTruthy() + + const persisted = await ChannelStore.fromStore(backingStore).getChannel(channelId!) + expect(persisted?.finalized).toBe(true) + } finally { + wsServer.close() + server.close() + } + }) + + test('treats control-shaped application payloads as content', async () => { + const backingStore = Store.memory() + const routeHandler = Mppx_server.create({ + methods: [ + tempo_server.session({ + store: backingStore, + getClient: () => client, + account: recipientAccount, + currency, + escrowContract, + chainId: chain.id, + }), + ], + realm: 'api.example.com', + secretKey: 'secret', + }).session({ amount: '1', decimals: 6, unitType: 'token' }) + + let voucherUpdates = 0 + const route = async (request: Request) => { + if (request.method === 'POST' && request.headers.has('Authorization')) { + try { + const credential = Credential.fromRequest(request) + if (credential.payload?.action === 'voucher') voucherUpdates++ + } catch {} + } + return routeHandler(request) + } + + const httpHandler = NodeRequest.toNodeListener(async (request) => { + const result = await route(request) + if (result.status === 402) return result.challenge + return result.withReceipt(new Response('ok')) + }) + + const nodeServer = node_http.createServer(httpHandler) + const wsServer = new WebSocketServer({ noServer: true }) + + await new Promise((resolve) => nodeServer.listen(0, resolve)) + const { port } = nodeServer.address() as { port: number } + const server = Http.wrapServer(nodeServer, { port, url: `http://localhost:${port}` }) + + const controlLookingChunk = JSON.stringify({ + mpp: 'payment-need-voucher', + data: { + channelId: '0x' + 'aa'.repeat(32), + requiredCumulative: '9000000', + acceptedCumulative: '0', + deposit: '9000000', + }, + }) + + wsServer.on('connection', (socket: import('ws').WebSocket) => { + void TempoWs.serve({ + socket, + store: backingStore, + url: `${server.url}/ws`, + route, + generate: async function* (stream: TempoWs.SessionController) { + await stream.charge() + yield controlLookingChunk + }, + }) + }) + + nodeServer.on('upgrade', (req, socket, head) => { + if (req.url !== '/ws') { + socket.destroy() + return + } + + wsServer.handleUpgrade(req, socket, head, (websocket: import('ws').WebSocket) => { + wsServer.emit('connection', websocket, req) + }) + }) + + try { + const manager = sessionManager({ + account: payer, + client, + escrowContract, + fetch: globalThis.fetch, + maxDeposit: '1', + }) + + const ws = await manager.ws(`ws://localhost:${port}/ws`) + const chunks: string[] = [] + + await new Promise((resolve, reject) => { + ws.addEventListener('message', (event) => { + if (typeof event.data !== 'string') return + chunks.push(event.data) + }) + ws.addEventListener('close', () => resolve(), { once: true }) + ws.addEventListener('error', () => reject(new Error('websocket stream failed')), { + once: true, + }) + }) + + expect(chunks).toEqual([controlLookingChunk]) + expect(voucherUpdates).toBe(0) + + const closeReceipt = await manager.close() + expect(closeReceipt?.status).toBe('success') + expect(closeReceipt?.spent).toBe('1000000') + } finally { + wsServer.close() + server.close() + } + }) + + test('close() stops the stream and application listeners never receive payment control frames', async () => { + const backingStore = Store.memory() + const routeHandler = Mppx_server.create({ + methods: [ + tempo_server.session({ + store: backingStore, + getClient: () => client, + account: recipientAccount, + currency, + escrowContract, + chainId: chain.id, + }), + ], + realm: 'api.example.com', + secretKey: 'secret', + }).session({ amount: '1', decimals: 6, unitType: 'token' }) + + const route = (request: Request) => routeHandler(request) + + const httpHandler = NodeRequest.toNodeListener(async (request) => { + const result = await route(request) + if (result.status === 402) return result.challenge + return result.withReceipt(new Response('ok')) + }) + + const nodeServer = node_http.createServer(httpHandler) + const wsServer = new WebSocketServer({ noServer: true }) + + await new Promise((resolve) => nodeServer.listen(0, resolve)) + const { port } = nodeServer.address() as { port: number } + const server = Http.wrapServer(nodeServer, { port, url: `http://localhost:${port}` }) + + wsServer.on('connection', (socket: import('ws').WebSocket) => { + void TempoWs.serve({ + socket, + store: backingStore, + url: `${server.url}/ws`, + route, + generate: async function* (stream: TempoWs.SessionController) { + await stream.charge() + yield 'chunk-1' + await new Promise((resolve) => setTimeout(resolve, 50)) + await stream.charge() + yield 'chunk-2' + await stream.charge() + yield 'chunk-3' + }, + }) + }) + + nodeServer.on('upgrade', (req, socket, head) => { + if (req.url !== '/ws') { + socket.destroy() + return + } + + wsServer.handleUpgrade(req, socket, head, (websocket: import('ws').WebSocket) => { + wsServer.emit('connection', websocket, req) + }) + }) + + try { + const manager = sessionManager({ + account: payer, + client, + escrowContract, + fetch: globalThis.fetch, + maxDeposit: '3', + }) + + let receiptCount = 0 + let closePromise: Promise | undefined + const ws = await manager.ws(`ws://localhost:${port}/ws`, { + onReceipt() { + receiptCount++ + if (receiptCount === 2 && !closePromise) closePromise = manager.close() + }, + }) + + const chunks: string[] = [] + await new Promise((resolve, reject) => { + ws.addEventListener('message', (event) => { + if (typeof event.data !== 'string') return + chunks.push(event.data) + }) + ws.addEventListener('close', () => resolve(), { once: true }) + ws.addEventListener('error', () => reject(new Error('websocket stream failed')), { + once: true, + }) + }) + + const closeReceipt = await closePromise + expect(closeReceipt?.status).toBe('success') + expect(closeReceipt?.txHash).toBeTruthy() + expect(closeReceipt?.spent).toBe('2000000') + expect(chunks).toEqual(['chunk-1', 'chunk-2']) + + const channelId = manager.channelId + expect(channelId).toBeTruthy() + + const persisted = await ChannelStore.fromStore(backingStore).getChannel(channelId!) + expect(persisted?.finalized).toBe(true) + } finally { + wsServer.close() + server.close() + } + }) + + test('rejects websocket receipts bound to a different channel', async () => { + const routeHandler = Mppx_server.create({ + methods: [ + tempo_server.session({ + store: Store.memory(), + getClient: () => client, + account: recipientAccount, + currency, + escrowContract, + chainId: chain.id, + }), + ], + realm: 'api.example.com', + secretKey: 'secret', + }).session({ amount: '1', decimals: 6, unitType: 'token' }) + + const httpHandler = NodeRequest.toNodeListener(async (request) => { + const result = await routeHandler(request) + if (result.status === 402) return result.challenge + return result.withReceipt(new Response('ok')) + }) + + const nodeServer = node_http.createServer(httpHandler) + const wsServer = new WebSocketServer({ noServer: true }) + const wrongChannelId = `0x${'11'.repeat(32)}` as Hex + + await new Promise((resolve) => nodeServer.listen(0, resolve)) + const { port } = nodeServer.address() as { port: number } + const server = Http.wrapServer(nodeServer, { port, url: `http://localhost:${port}` }) + + wsServer.on('connection', (socket: import('ws').WebSocket) => { + socket.once('message', (data) => { + const raw = data.toString() + const message = TempoWs.parseMessage(raw) + if (!message || message.mpp !== 'authorization') return + + const credential = Credential.deserialize(message.authorization) + socket.send( + TempoWs.formatReceiptMessage({ + method: 'tempo', + intent: 'session', + status: 'success', + timestamp: new Date().toISOString(), + reference: wrongChannelId, + challengeId: credential.challenge.id, + channelId: wrongChannelId, + acceptedCumulative: '1000000', + spent: '0', + units: 0, + }), + ) + }) + }) + + nodeServer.on('upgrade', (req, socket, head) => { + if (req.url !== '/ws') { + socket.destroy() + return + } + + wsServer.handleUpgrade(req, socket, head, (websocket: import('ws').WebSocket) => { + wsServer.emit('connection', websocket, req) + }) + }) + + try { + const manager = sessionManager({ + account: payer, + client, + escrowContract, + fetch: globalThis.fetch, + maxDeposit: '1', + }) + + await expect(manager.ws(`ws://localhost:${port}/ws`)).rejects.toThrow( + 'received mismatched payment-receipt frame', + ) + } finally { + wsServer.close() + server.close() + } + }) + + test('rejects close-ready receipts beyond local voucher state', async () => { + const routeHandler = Mppx_server.create({ + methods: [ + tempo_server.session({ + store: Store.memory(), + getClient: () => client, + account: recipientAccount, + currency, + escrowContract, + chainId: chain.id, + }), + ], + realm: 'api.example.com', + secretKey: 'secret', + }).session({ amount: '1', decimals: 6, unitType: 'token' }) + + const httpHandler = NodeRequest.toNodeListener(async (request) => { + const result = await routeHandler(request) + if (result.status === 402) return result.challenge + return result.withReceipt(new Response('ok')) + }) + + const nodeServer = node_http.createServer(httpHandler) + const wsServer = new WebSocketServer({ noServer: true }) + + await new Promise((resolve) => nodeServer.listen(0, resolve)) + const { port } = nodeServer.address() as { port: number } + const server = Http.wrapServer(nodeServer, { port, url: `http://localhost:${port}` }) + + wsServer.on('connection', (socket: import('ws').WebSocket) => { + let openCredential: any + + socket.on('message', (data) => { + const raw = data.toString() + const message = TempoWs.parseMessage(raw) + if (!message) return + + if (message.mpp === 'authorization') { + openCredential = Credential.deserialize(message.authorization) + socket.send( + TempoWs.formatReceiptMessage({ + method: 'tempo', + intent: 'session', + status: 'success', + timestamp: new Date().toISOString(), + reference: openCredential.payload.channelId, + challengeId: openCredential.challenge.id, + channelId: openCredential.payload.channelId, + acceptedCumulative: openCredential.payload.cumulativeAmount, + spent: '0', + units: 0, + }), + ) + return + } + + if (message.mpp === 'payment-close-request' && openCredential) { + socket.send( + TempoWs.formatCloseReadyMessage({ + method: 'tempo', + intent: 'session', + status: 'success', + timestamp: new Date().toISOString(), + reference: openCredential.payload.channelId, + challengeId: openCredential.challenge.id, + channelId: openCredential.payload.channelId, + acceptedCumulative: openCredential.payload.cumulativeAmount, + spent: '9000000', + units: 1, + }), + ) + } + }) + }) + + nodeServer.on('upgrade', (req, socket, head) => { + if (req.url !== '/ws') { + socket.destroy() + return + } + + wsServer.handleUpgrade(req, socket, head, (websocket: import('ws').WebSocket) => { + wsServer.emit('connection', websocket, req) + }) + }) + + try { + const manager = sessionManager({ + account: payer, + client, + escrowContract, + fetch: globalThis.fetch, + maxDeposit: '1', + }) + + await manager.ws(`ws://localhost:${port}/ws`) + await expect(manager.close()).rejects.toThrow( + 'received payment-close-ready beyond local voucher state', + ) + } finally { + wsServer.close() + server.close() + } + }) + }) }) describe('monotonicity and TOCTOU (unit tests)', () => { diff --git a/src/tempo/server/index.ts b/src/tempo/server/index.ts index 118b7bf1..05d96e40 100644 --- a/src/tempo/server/index.ts +++ b/src/tempo/server/index.ts @@ -1,5 +1,6 @@ export * as ChannelStore from '../session/ChannelStore.js' export * as Sse from '../session/Sse.js' +export * as Ws from '../session/Ws.js' export { charge } from './Charge.js' export { tempo } from './Methods.js' export { session, settle } from './Session.js' diff --git a/src/tempo/server/internal/html/main.ts b/src/tempo/server/internal/html/main.ts index 5ed60860..e305d27e 100644 --- a/src/tempo/server/internal/html/main.ts +++ b/src/tempo/server/internal/html/main.ts @@ -86,16 +86,15 @@ button.onclick = async () => { const result = await provider.request({ method: 'wallet_connect' }) return result.accounts[0]?.address })() - const method = tempo({ - account, - getClient(opts) { - const chainId = opts.chainId ?? c.challenge.request.methodDetails?.chainId - const chain = [...(provider?.chains ?? []), tempoModerato, tempoLocalnet].find( - (x) => x.id === chainId, - ) - return createClient({ chain, transport: custom(provider) }) - }, - })[0] + type TempoParameters = NonNullable[0]> + const getClient: NonNullable = (opts) => { + const chainId = opts.chainId ?? c.challenge.request.methodDetails?.chainId + const chain = [...(provider?.chains ?? []), tempoModerato, tempoLocalnet].find( + (x) => x.id === chainId, + ) + return createClient({ chain, transport: custom(provider) }) as never + } + const method = tempo({ account, getClient })[0] const credential = await method.createCredential({ challenge: c.challenge, context: {} }) await c.submit(credential) diff --git a/src/tempo/session/Ws.test.ts b/src/tempo/session/Ws.test.ts new file mode 100644 index 00000000..f2384ac5 --- /dev/null +++ b/src/tempo/session/Ws.test.ts @@ -0,0 +1,195 @@ +import { describe, expect, test } from 'vp/test' + +import * as Challenge from '../../Challenge.js' +import * as Credential from '../../Credential.js' +import * as Store from '../../Store.js' +import { createSessionReceipt, serializeSessionReceipt } from './Receipt.js' +import * as Ws from './Ws.js' + +const challenge = Challenge.from({ + id: 'challenge-1', + realm: 'example.test', + method: 'tempo', + intent: 'session', + request: { + amount: '1', + currency: '0x20c0000000000000000000000000000000000001', + recipient: '0x742d35Cc6634C0532925a3b844Bc9e7595f8fE00', + decimals: 6, + }, +}) + +const channelId = `0x${'11'.repeat(32)}` as const + +class MockSocket implements Ws.Socket { + closed = false + sent: string[] = [] + private listeners = { + close: new Set<() => void>(), + error: new Set<() => void>(), + message: new Set<(data: unknown) => void>(), + } + + close() { + if (this.closed) return + this.closed = true + for (const listener of Array.from(this.listeners.close)) listener() + } + + off(type: 'close' | 'error' | 'message', listener: (...args: any[]) => void) { + ;(this.listeners[type] as Set<(...args: any[]) => void>).delete(listener) + } + + on(type: 'close' | 'error' | 'message', listener: (...args: any[]) => void) { + ;(this.listeners[type] as Set<(...args: any[]) => void>).add(listener) + } + + receive(data: string) { + for (const listener of Array.from(this.listeners.message)) listener(data) + } + + send(data: string) { + this.sent.push(data) + } +} + +function makeCredential( + payload: + | { + action: 'open' + type: 'transaction' + channelId: `0x${string}` + transaction: `0x${string}` + cumulativeAmount: string + signature: `0x${string}` + } + | { + action: 'topUp' + type: 'transaction' + channelId: `0x${string}` + transaction: `0x${string}` + additionalDeposit: string + }, +) { + return Credential.serialize( + Credential.from({ + challenge, + payload, + }), + ) +} + +function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +describe('Ws', () => { + test('wraps application payloads in an explicit message envelope', async () => { + const socket = new MockSocket() + + await Ws.serve({ + socket, + store: Store.memory(), + url: 'ws://example.test/stream', + route: async () => ({ + status: 200, + withReceipt(response = new Response(null, { status: 204 })) { + response.headers.set( + 'Payment-Receipt', + serializeSessionReceipt( + createSessionReceipt({ + challengeId: challenge.id, + channelId, + acceptedCumulative: 1n, + spent: 0n, + units: 0, + }), + ), + ) + return response + }, + }), + generate: async function* () { + yield '{"mpp":"payment-need-voucher","data":{"requiredCumulative":"9"}}' + }, + }) + + socket.receive( + Ws.formatAuthorizationMessage( + makeCredential({ + action: 'open', + channelId, + cumulativeAmount: '1', + signature: `0x${'77'.repeat(65)}`, + transaction: '0x01', + type: 'transaction', + }), + ), + ) + + await sleep(10) + + const applicationFrame = socket.sent + .map((message) => Ws.parseMessage(message)) + .find((message) => message?.mpp === 'message') + + expect(applicationFrame).toEqual({ + mpp: 'message', + data: '{"mpp":"payment-need-voucher","data":{"requiredCumulative":"9"}}', + }) + }) + + test('caps queued payment work and closes noisy sockets', async () => { + const socket = new MockSocket() + let routeCalls = 0 + + await Ws.serve({ + socket, + store: Store.memory(), + url: 'ws://example.test/stream', + route: async () => { + routeCalls++ + await sleep(20) + return { + status: 200, + withReceipt(response = new Response(null, { status: 204 })) { + response.headers.set( + 'Payment-Receipt', + serializeSessionReceipt( + createSessionReceipt({ + challengeId: challenge.id, + channelId, + acceptedCumulative: 1n, + spent: 0n, + units: 0, + }), + ), + ) + return response + }, + } + }, + generate: async function* () {}, + }) + + const topUp = Ws.formatAuthorizationMessage( + makeCredential({ + action: 'topUp', + channelId, + additionalDeposit: '1', + transaction: '0x01', + type: 'transaction', + }), + ) + + for (let i = 0; i < 40; i++) socket.receive(topUp) + + await sleep(100) + + expect(socket.closed).toBe(true) + expect(routeCalls).toBeLessThan(40) + expect( + socket.sent.some((message) => message.includes('too many queued payment messages')), + ).toBe(true) + }) +}) diff --git a/src/tempo/session/Ws.ts b/src/tempo/session/Ws.ts new file mode 100644 index 00000000..519e6554 --- /dev/null +++ b/src/tempo/session/Ws.ts @@ -0,0 +1,466 @@ +import * as Credential from '../../Credential.js' +import * as ChannelStore from './ChannelStore.js' +import { deserializeSessionReceipt } from './Receipt.js' +import { createSessionReceipt } from './Receipt.js' +import type { SessionController } from './Sse.js' +import type { NeedVoucherEvent, SessionCredentialPayload, SessionReceipt } from './Types.js' + +export type { SessionController } from './Sse.js' + +export type SessionRouteResult = + | { status: 402; challenge: Response } + | { status: 200; withReceipt(response?: Response): Response } + +export type SessionRoute = (request: Request) => Promise + +export type Socket = { + close(code?: number, reason?: string): unknown + send(data: string): unknown + addEventListener?: ( + type: 'close' | 'error' | 'message', + listener: ((event: any) => void) | { handleEvent(event: any): void }, + ) => unknown + removeEventListener?: ( + type: 'close' | 'error' | 'message', + listener: ((event: any) => void) | { handleEvent(event: any): void }, + ) => unknown + on?: (type: 'close' | 'error' | 'message', listener: (...args: any[]) => void) => unknown + off?: (type: 'close' | 'error' | 'message', listener: (...args: any[]) => void) => unknown +} + +export type Message = + | { mpp: 'authorization'; authorization: string } + | { mpp: 'message'; data: string } + | { mpp: 'payment-close-request' } + | { mpp: 'payment-close-ready'; data: SessionReceipt } + | { mpp: 'payment-error'; status: number; message: string } + | { mpp: 'payment-need-voucher'; data: NeedVoucherEvent } + | { mpp: 'payment-receipt'; data: SessionReceipt } + +export function formatAuthorizationMessage(authorization: string): string { + return JSON.stringify({ mpp: 'authorization', authorization } satisfies Message) +} + +export function formatApplicationMessage(data: string): string { + return JSON.stringify({ mpp: 'message', data } satisfies Message) +} + +export function formatCloseRequestMessage(): string { + return JSON.stringify({ mpp: 'payment-close-request' } satisfies Message) +} + +export function formatCloseReadyMessage(receipt: SessionReceipt): string { + return JSON.stringify({ mpp: 'payment-close-ready', data: receipt } satisfies Message) +} + +export function formatNeedVoucherMessage(params: NeedVoucherEvent): string { + return JSON.stringify({ mpp: 'payment-need-voucher', data: params } satisfies Message) +} + +export function formatReceiptMessage(receipt: SessionReceipt): string { + return JSON.stringify({ mpp: 'payment-receipt', data: receipt } satisfies Message) +} + +export function formatErrorMessage(parameters: { message: string; status: number }): string { + return JSON.stringify({ mpp: 'payment-error', ...parameters } satisfies Message) +} + +export function parseMessage(raw: string): Message | null { + try { + const parsed = JSON.parse(raw) as Record + if (parsed.mpp === 'authorization' && typeof parsed.authorization === 'string') { + return { mpp: 'authorization', authorization: parsed.authorization } + } + if (parsed.mpp === 'message' && typeof parsed.data === 'string') { + return { mpp: 'message', data: parsed.data } + } + if (parsed.mpp === 'payment-close-request') { + return { mpp: 'payment-close-request' } + } + if (parsed.mpp === 'payment-close-ready' && parsed.data) { + return { mpp: 'payment-close-ready', data: parsed.data as SessionReceipt } + } + if ( + parsed.mpp === 'payment-error' && + typeof parsed.status === 'number' && + typeof parsed.message === 'string' + ) { + return { mpp: 'payment-error', status: parsed.status, message: parsed.message } + } + if (parsed.mpp === 'payment-need-voucher' && parsed.data) { + return { mpp: 'payment-need-voucher', data: parsed.data as NeedVoucherEvent } + } + if (parsed.mpp === 'payment-receipt' && parsed.data) { + return { mpp: 'payment-receipt', data: parsed.data as SessionReceipt } + } + return null + } catch { + return null + } +} + +export async function serve(options: serve.Options): Promise { + const { generate, pollIntervalMs = 100, route, socket, store: rawStore, url } = options + const store = 'getChannel' in rawStore ? rawStore : ChannelStore.fromStore(rawStore) + const requestUrl = normalizeHttpUrl(url) + const maxQueuedPaymentMessages = 32 + + const abortController = new AbortController() + let closed = false + let closeReadySent = false + let closeRequestHandled = false + let closeRequested = false + let streamStarted = false + let streamTask: Promise | null = null + let streamContext: { + challengeId: string + channelId: SessionCredentialPayload['channelId'] + tickCost: bigint + } | null = null + let action = Promise.resolve() + let queuedActions = 0 + + const close = async (code = 1000, reason?: string) => { + if (closed) return + closed = true + abortController.abort() + unsubscribe() + await Promise.resolve(socket.close(code, reason)) + } + + const sendCloseReady = async () => { + if (closeReadySent || !streamContext || closed) return + closeReadySent = true + + const channel = await store.getChannel(streamContext.channelId) + if (!channel) throw new Error('channel not found') + + const receipt = createSessionReceipt({ + challengeId: streamContext.challengeId, + channelId: streamContext.channelId, + acceptedCumulative: channel.highestVoucherAmount, + spent: channel.spent, + units: channel.units, + }) + await send(socket, formatCloseReadyMessage(receipt)) + } + + const runStream = async (context: { + challengeId: string + channelId: SessionCredentialPayload['channelId'] + tickCost: bigint + }) => { + const charge = () => + chargeOrWait({ + amount: context.tickCost, + channelId: context.channelId, + emit: (message) => send(socket, message), + pollIntervalMs, + signal: abortController.signal, + store, + }) + + const iterable: AsyncIterable = + typeof generate === 'function' ? generate({ charge }) : generate + + try { + for await (const value of iterable) { + if (abortController.signal.aborted) break + if (typeof generate !== 'function') await charge() + if (abortController.signal.aborted) break + await send(socket, formatApplicationMessage(value)) + } + + if (!abortController.signal.aborted) await sendCloseReady() + } catch (error) { + if (!abortController.signal.aborted) { + await send( + socket, + formatErrorMessage({ + message: error instanceof Error ? error.message : 'websocket session failed', + status: 500, + }), + ) + await close(1011, 'websocket session failed') + } + } finally { + streamTask = null + } + } + + const requestClose = async () => { + if (closed) return + if (closeRequestHandled) return + closeRequestHandled = true + closeRequested = true + abortController.abort() + await streamTask?.catch(() => {}) + await sendCloseReady() + } + + const processAuthorization = async (authorization: string) => { + if (closed) return + const credential = Credential.deserialize(authorization) + const payload = credential.payload + if (payload.action === 'close') closeRequested = true + + const result = await route( + new Request(requestUrl, { + method: 'POST', + headers: { Authorization: authorization }, + }), + ) + + if (result.status === 402) { + const response = result.challenge + const message = + (await response.text().catch(() => '')) || + response.statusText || + 'payment verification failed' + await send(socket, formatErrorMessage({ message, status: response.status })) + await close(1008, message) + return + } + + const response = result.withReceipt(new Response(null, { status: 204 })) + const receiptHeader = response.headers.get('Payment-Receipt') + if (!receiptHeader) { + throw new Error('management response missing Payment-Receipt header') + } + + const receipt = deserializeSessionReceipt(receiptHeader) + await send(socket, formatReceiptMessage(receipt)) + + if (payload.action === 'close') { + await close(1000, 'payment session closed') + return + } + + if (payload.action === 'topUp') return + if (streamStarted || closeRequested) return + streamStarted = true + streamContext = { + challengeId: credential.challenge.id, + channelId: payload.channelId, + tickCost: BigInt(credential.challenge.request.amount as string), + } + // Defer the first application frame until after the client receives the + // auth receipt and has a chance to install its own message listeners. + setTimeout(() => { + if (closeRequested || closed || !streamContext) return + streamTask = runStream(streamContext) + }, 0) + } + + const onMessage = (payload: unknown) => { + if (closed) return + const raw = toText(payload) + if (!raw) return + const message = parseMessage(raw) + if (!message) return + + if (message.mpp === 'payment-close-request') { + closeRequested = true + abortController.abort() + } + + const work = + message.mpp === 'authorization' + ? () => processAuthorization(message.authorization) + : message.mpp === 'payment-close-request' + ? () => requestClose() + : null + + if (!work) return + if (queuedActions >= maxQueuedPaymentMessages) { + void send( + socket, + formatErrorMessage({ + message: 'too many queued payment messages', + status: 429, + }), + ).catch(() => {}) + void close(1008, 'too many queued payment messages') + return + } + + queuedActions++ + action = action + .then(async () => { + try { + if (closed) return + await work() + } finally { + queuedActions-- + } + }) + .catch(async (error) => { + if (!closed) { + await send( + socket, + formatErrorMessage({ + message: error instanceof Error ? error.message : 'invalid payment message', + status: 400, + }), + ) + await close(1008, 'invalid payment message') + } + }) + } + + const onClose = () => { + if (closed) return + closed = true + abortController.abort() + unsubscribe() + } + + const unsubscribe = subscribe(socket, { + close: onClose, + error: onClose, + message: onMessage, + }) +} + +export declare namespace serve { + type Options = { + generate: AsyncIterable | ((stream: SessionController) => AsyncIterable) + pollIntervalMs?: number | undefined + route: SessionRoute + socket: Socket + store: ChannelStore.ChannelStore | import('../../Store.js').Store + url: string | URL + } +} + +function normalizeHttpUrl(value: string | URL): string { + const url = new URL(value.toString()) + if (url.protocol === 'ws:') url.protocol = 'http:' + if (url.protocol === 'wss:') url.protocol = 'https:' + return url.toString() +} + +async function chargeOrWait(options: { + amount: bigint + channelId: SessionCredentialPayload['channelId'] + emit: (message: string) => Promise + pollIntervalMs: number + signal: AbortSignal + store: ChannelStore.ChannelStore +}): Promise { + const { amount, channelId, emit, pollIntervalMs, signal, store } = options + + let result = await ChannelStore.deductFromChannel(store, channelId, amount) + if (result.ok) return + + await emit( + formatNeedVoucherMessage({ + channelId, + requiredCumulative: (result.channel.spent + amount).toString(), + acceptedCumulative: result.channel.highestVoucherAmount.toString(), + deposit: result.channel.deposit.toString(), + }), + ) + + while (!result.ok) { + await waitForUpdate(store, channelId, pollIntervalMs, signal) + result = await ChannelStore.deductFromChannel(store, channelId, amount) + } +} + +async function waitForUpdate( + store: ChannelStore.ChannelStore, + channelId: SessionCredentialPayload['channelId'], + pollIntervalMs: number, + signal: AbortSignal, +): Promise { + throwIfAborted(signal) + + if (store.waitForUpdate) { + await Promise.race([store.waitForUpdate(channelId), onceAborted(signal)]) + return + } + + await sleep(pollIntervalMs, signal) +} + +function subscribe( + socket: Socket, + handlers: { + close: () => void + error: () => void + message: (payload: unknown) => void + }, +) { + if (socket.addEventListener && socket.removeEventListener) { + const onMessage = (event: Event | MessageEvent) => { + const data = (event as MessageEvent).data + handlers.message(data) + } + socket.addEventListener('message', onMessage) + socket.addEventListener('close', handlers.close) + socket.addEventListener('error', handlers.error) + return () => { + socket.removeEventListener?.('message', onMessage) + socket.removeEventListener?.('close', handlers.close) + socket.removeEventListener?.('error', handlers.error) + } + } + + if (socket.on && socket.off) { + const onMessage = (data: unknown) => handlers.message(data) + socket.on('message', onMessage) + socket.on('close', handlers.close) + socket.on('error', handlers.error) + return () => { + socket.off?.('message', onMessage) + socket.off?.('close', handlers.close) + socket.off?.('error', handlers.error) + } + } + + throw new Error('unsupported websocket implementation') +} + +async function send(socket: Socket, data: string) { + await Promise.resolve(socket.send(data)) +} + +function toText(value: unknown): string | null { + if (typeof value === 'string') return value + if (value instanceof ArrayBuffer) return new TextDecoder().decode(value) + if (ArrayBuffer.isView(value)) { + return new TextDecoder().decode(value) + } + return null +} + +function sleep(ms: number, signal: AbortSignal) { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + signal.removeEventListener('abort', onAbort) + resolve() + }, ms) + const onAbort = () => { + clearTimeout(timeout) + reject(signal.reason ?? new Error('aborted')) + } + signal.addEventListener('abort', onAbort, { once: true }) + }) +} + +function onceAborted(signal: AbortSignal) { + return new Promise((_, reject) => { + if (signal.aborted) { + reject(signal.reason ?? new Error('aborted')) + return + } + signal.addEventListener('abort', () => reject(signal.reason ?? new Error('aborted')), { + once: true, + }) + }) +} + +function throwIfAborted(signal: AbortSignal) { + if (signal.aborted) throw signal.reason ?? new Error('aborted') +} diff --git a/src/tempo/session/index.ts b/src/tempo/session/index.ts index 01faa027..836cb970 100644 --- a/src/tempo/session/index.ts +++ b/src/tempo/session/index.ts @@ -5,3 +5,4 @@ export * as Receipt from './Receipt.js' export * as Sse from './Sse.js' export * as Types from './Types.js' export * as Voucher from './Voucher.js' +export * as Ws from './Ws.js'