From 7061c5dccd931dbc218bcaf29fc02d21061b7a8c Mon Sep 17 00:00:00 2001 From: dzucconi Date: Fri, 13 Mar 2026 11:54:22 -0400 Subject: [PATCH] Harden CLI for non-interactive use and network resilience --- README.md | 32 ++++++- src/api/client.ts | 10 +++ src/cli.contract.test.ts | 72 +++++++++++++++ src/cli.tsx | 176 +++++++++++++++++++++++++++++++++---- src/commands/import.tsx | 3 + src/components/Spinner.tsx | 6 +- src/lib/args.test.ts | 30 +++++++ src/lib/args.ts | 21 +++++ src/lib/import.test.ts | 100 +++++++++++++++++++++ src/lib/import.ts | 124 ++++++++++++++++++++++++-- src/lib/network.ts | 75 ++++++++++++++++ src/lib/oauth.ts | 3 +- src/lib/registry.test.ts | 17 +--- src/lib/upload.ts | 3 +- src/lib/vcr.ts | 10 ++- 15 files changed, 639 insertions(+), 43 deletions(-) create mode 100644 src/cli.contract.test.ts create mode 100644 src/lib/network.ts diff --git a/README.md b/README.md index 9c940dc..3359848 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,34 @@ arena channel contents --help | `--yes` | Bypass destructive confirmation prompts | | `--help` | Show help | +## Output & Errors + +- `stdout`: + - Successful command output. + - `--json` returns JSON objects; `import --json` returns NDJSON progress events. + - `--quiet` only changes JSON formatting (compact, single-line); it does **not** change response fields. +- `stderr`: + - Human-readable failures in non-interactive mode. + - Structured JSON errors in `--json` mode. +- JSON error shape: + - `{"error": string, "code": number | null, "type": string, "hint"?: string}` + - Common `type` values include: `unknown_command`, `unknown_subcommand`, `json_not_supported`, API-derived types like `not_found`. +- Exit codes: + - `0` success + - `1` client/usage errors and unknown command/subcommand + - `2` unauthorized (`401`) + - `3` not found (`404`) + - `4` validation/bad request (`400`, `422`) + - `5` rate limited (`429`) + - `6` forbidden (`403`) + +### Non-Interactive Behavior + +- Interactive session mode only starts when **both** `stdin` and `stdout` are TTYs. +- In non-interactive contexts (pipes/CI), output is deterministic and unknown commands fail fast with non-zero exits. +- `batch` is JSON-only; use `--json`. +- For stdin-driven automation (for example piping content into `add`), use `--json`. + ## Command Reference Examples are shown first, then options. @@ -337,7 +365,7 @@ arena add my-channel "Hello world" arena add my-channel "Hello" --title "Greeting" --description "Pinned note" arena add my-channel https://example.com --alt-text "Cover image" --insert-at 1 arena add my-channel https://example.com --original-source-url https://source.com --original-source-title "Original" -echo "piped text" | arena add my-channel +echo "piped text" | arena --json add my-channel ``` Options: @@ -370,6 +398,8 @@ Options: Create many blocks asynchronously. +`batch` is available in `--json` mode. + Examples: ```bash diff --git a/src/api/client.ts b/src/api/client.ts index fd0cb6a..a935390 100644 --- a/src/api/client.ts +++ b/src/api/client.ts @@ -2,6 +2,7 @@ import createClient, { type Middleware } from "openapi-fetch"; import { loadEnv } from "../lib/env"; import { config } from "../lib/config"; import { vcrFetch } from "../lib/vcr"; +import { withRequestSignal } from "../lib/network"; import type { paths } from "./schema"; // Ensure env is populated before reading API URL at module init time. @@ -59,6 +60,14 @@ const authMiddleware: Middleware = { }, }; +const timeoutMiddleware: Middleware = { + async onRequest({ request }) { + return new Request(request, { + signal: withRequestSignal(request.signal), + }); + }, +}; + const errorMiddleware: Middleware = { async onResponse({ response }) { if (!response.ok) { @@ -85,4 +94,5 @@ export const client = createClient({ client.use(baseUrlMiddleware); client.use(authMiddleware); +client.use(timeoutMiddleware); client.use(errorMiddleware); diff --git a/src/cli.contract.test.ts b/src/cli.contract.test.ts new file mode 100644 index 0000000..60b8b5f --- /dev/null +++ b/src/cli.contract.test.ts @@ -0,0 +1,72 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { spawnSync } from "node:child_process"; +import { fileURLToPath } from "node:url"; +import { dirname, resolve } from "node:path"; + +const __dirname = dirname(fileURLToPath(import.meta.url)); +const projectRoot = resolve(__dirname, ".."); + +function runCli(args: string[]) { + return spawnSync( + process.execPath, + ["--import", "tsx", "src/cli.tsx", ...args], + { + cwd: projectRoot, + encoding: "utf8", + env: process.env, + }, + ); +} + +test("json unknown command returns typed error and non-zero exit", () => { + const result = runCli(["--json", "pign"]); + assert.equal(result.status, 1); + const payload = JSON.parse(result.stdout || result.stderr) as { + type: string; + hint?: string; + }; + assert.equal(payload.type, "unknown_command"); + assert.ok(payload.hint?.includes("arena ping")); +}); + +test("json unknown subcommand returns typed error and non-zero exit", () => { + const result = runCli(["--json", "channel", "contnts", "slug"]); + assert.equal(result.status, 1); + const payload = JSON.parse(result.stdout || result.stderr) as { + type: string; + hint?: string; + }; + assert.equal(payload.type, "unknown_subcommand"); + assert.ok(payload.hint?.includes("arena channel contents")); +}); + +test("json unsupported command returns json_not_supported type", () => { + const result = runCli(["--json", "login"]); + assert.equal(result.status, 1); + const payload = JSON.parse(result.stdout || result.stderr) as { + type: string; + }; + assert.equal(payload.type, "json_not_supported"); +}); + +test("plain unknown command fails non-interactive with stderr error", () => { + const result = runCli(["pign"]); + assert.equal(result.status, 1); + assert.ok(result.stderr.includes("Unknown command: pign")); +}); + +test("--quiet keeps schema while compacting JSON", () => { + const quiet = runCli(["--json", "--quiet", "version"]); + const pretty = runCli(["--json", "version"]); + + assert.equal(quiet.status, 0); + assert.equal(pretty.status, 0); + + const quietPayload = JSON.parse(quiet.stdout) as Record; + const prettyPayload = JSON.parse(pretty.stdout) as Record; + + assert.deepEqual(quietPayload, prettyPayload); + assert.ok(!quiet.stdout.includes("\n "), "expected compact JSON output"); + assert.ok(pretty.stdout.includes("\n "), "expected pretty JSON output"); +}); diff --git a/src/cli.tsx b/src/cli.tsx index d43747d..5b65b60 100644 --- a/src/cli.tsx +++ b/src/cli.tsx @@ -6,6 +6,7 @@ import { render, Box, Text, useApp } from "ink"; import { SWRConfig } from "swr"; import { parseArgs, type Flags } from "./lib/args"; import { + commands, commandMap, commandHelpDocs, type CommandHelpDoc, @@ -14,6 +15,9 @@ import { exitCodeFromError, formatJsonError } from "./lib/exit-codes"; import { CLI_PACKAGE_NAME, getCliVersion } from "./lib/version"; import { confirmDestructiveIfNeeded } from "./lib/destructive-confirmation"; import { SessionMode } from "./commands/session"; +import { initCancellationHandling } from "./lib/network"; + +initCancellationHandling(); // ── Help ── @@ -185,13 +189,94 @@ function RenderError({ message }: { message: string }) { return ✕ {message}; } -function quietResult(result: unknown): unknown { - if (!result || typeof result !== "object") return result; - if (Array.isArray(result)) return result.map(quietResult); - const obj = result as Record; - if ("id" in obj) return { id: obj.id }; - if ("slug" in obj) return { slug: obj.slug }; - return result; +function levenshtein(a: string, b: string): number { + const rows = a.length + 1; + const cols = b.length + 1; + const dist = Array.from({ length: rows }, () => Array(cols).fill(0)); + + for (let i = 0; i < rows; i++) dist[i]![0] = i; + for (let j = 0; j < cols; j++) dist[0]![j] = j; + + for (let i = 1; i < rows; i++) { + for (let j = 1; j < cols; j++) { + const cost = a[i - 1] === b[j - 1] ? 0 : 1; + dist[i]![j] = Math.min( + dist[i - 1]![j]! + 1, + dist[i]![j - 1]! + 1, + dist[i - 1]![j - 1]! + cost, + ); + } + } + + return dist[rows - 1]![cols - 1]!; +} + +function nearest(input: string, candidates: string[]): string | undefined { + if (!input || candidates.length === 0) return undefined; + const normalized = input.toLowerCase(); + let best: { candidate: string; score: number } | undefined; + + for (const candidate of candidates) { + const score = levenshtein(normalized, candidate.toLowerCase()); + if (!best || score < best.score) { + best = { candidate, score }; + } + } + + if (!best) return undefined; + const threshold = Math.max(2, Math.floor(normalized.length / 3)); + return best.score <= threshold ? best.candidate : undefined; +} + +function allCanonicalCommands(): string[] { + return [...new Set(commands.map((command) => command.name))]; +} + +function knownSubcommands(commandName: string): string[] { + return Object.keys(commandHelpDocs[commandName]?.subcommands ?? {}); +} + +function validateSubcommand( + commandName: string, + args: string[], +): { + badSubcommand?: string; + suggestion?: string; +} { + const subcommands = knownSubcommands(commandName); + if (subcommands.length === 0 || args.length <= 1) return {}; + const provided = args[0]; + if (!provided || subcommands.includes(provided) || provided.startsWith("-")) { + return {}; + } + return { + badSubcommand: provided, + suggestion: nearest(provided, subcommands), + }; +} + +function unknownCommandPayload(command: string) { + const suggestion = nearest(command, allCanonicalCommands()); + return { + error: `Unknown command: ${command}`, + code: null, + type: "unknown_command", + hint: suggestion + ? `Did you mean "arena ${suggestion}"?` + : "Run: arena --help", + }; +} + +function unknownSubcommandPayload(command: string, subcommand: string) { + const suggestion = nearest(subcommand, knownSubcommands(command)); + return { + error: `Unknown subcommand: ${command} ${subcommand}`, + code: null, + type: "unknown_subcommand", + hint: suggestion + ? `Did you mean "arena ${command} ${suggestion}"?` + : `Run: arena help ${command}`, + }; } // ── JSON handler ── @@ -199,12 +284,29 @@ function quietResult(result: unknown): unknown { async function handleJson(command: string, args: string[], flags: Flags) { const def = commandMap.get(command); - if (!def || (!def.json && !def.jsonStream)) { + if (!def) { + process.stderr.write(JSON.stringify(unknownCommandPayload(command)) + "\n"); + process.exit(1); + } + + const canonicalCommand = def.name; + const { badSubcommand } = validateSubcommand(canonicalCommand, args); + if (badSubcommand) { + process.stderr.write( + JSON.stringify( + unknownSubcommandPayload(canonicalCommand, badSubcommand), + ) + "\n", + ); + process.exit(1); + } + + if (!def.json && !def.jsonStream) { process.stderr.write( JSON.stringify({ - error: `Unknown command: ${command}`, + error: `Command does not support --json: ${canonicalCommand}`, code: null, - type: "unknown_command", + type: "json_not_supported", + hint: `Run without --json or use: arena help ${canonicalCommand}`, }) + "\n", ); process.exit(1); @@ -223,13 +325,12 @@ async function handleJson(command: string, args: string[], flags: Flags) { } if (!def.json) { - throw new Error(`Unknown command: ${command}`); + throw new Error(`Command does not support --json: ${canonicalCommand}`); } const result = await def.json(args, flags); - const output = flags.quiet ? quietResult(result) : result; const indent = flags.quiet ? undefined : 2; - process.stdout.write(JSON.stringify(output, null, indent) + "\n"); + process.stdout.write(JSON.stringify(result, null, indent) + "\n"); } catch (err: unknown) { process.stderr.write(JSON.stringify(formatJsonError(err)) + "\n"); process.exit(exitCodeFromError(err)); @@ -246,7 +347,24 @@ function routeCommand( const def = commandMap.get(command); if (!def) { - return ; + const payload = unknownCommandPayload(command); + return ; + } + + const canonicalCommand = def.name; + const { badSubcommand, suggestion } = validateSubcommand( + canonicalCommand, + rest, + ); + if (badSubcommand) { + const hint = suggestion + ? `Did you mean "arena ${canonicalCommand} ${suggestion}"?` + : `Run: arena help ${canonicalCommand}`; + return ( + + ); } try { @@ -261,6 +379,9 @@ function routeCommand( const { args, flags } = parseArgs(process.argv.slice(2)); const [command, ...rest] = args; +const isInteractiveTerminal = Boolean( + process.stdin.isTTY && process.stdout.isTTY, +); const SWR_OPTIONS = { revalidateOnFocus: false, @@ -327,11 +448,34 @@ if (flags.json && command) { ); } } else if (!command) { - const element = process.stdin.isTTY ? : ; + const element = isInteractiveTerminal ? : ; await runInk({element}, { - fullscreen: Boolean(process.stdin.isTTY && process.stdout.isTTY), + fullscreen: isInteractiveTerminal, }); } else { + if (!isInteractiveTerminal) { + const maybeDef = commandMap.get(command); + if (!maybeDef) { + const payload = unknownCommandPayload(command); + process.stderr.write(`${payload.error}. ${payload.hint}\n`); + process.exit(1); + } + + const { badSubcommand, suggestion } = validateSubcommand( + maybeDef.name, + rest, + ); + if (badSubcommand) { + const hint = suggestion + ? `Did you mean "arena ${maybeDef.name} ${suggestion}"?` + : `Run: arena help ${maybeDef.name}`; + process.stderr.write( + `Unknown subcommand: ${maybeDef.name} ${badSubcommand}. ${hint}\n`, + ); + process.exit(1); + } + } + const def = commandMap.get(command); let element: React.JSX.Element; try { diff --git a/src/commands/import.tsx b/src/commands/import.tsx index 098d874..aba308d 100644 --- a/src/commands/import.tsx +++ b/src/commands/import.tsx @@ -13,6 +13,7 @@ import { type ImportSummary, } from "../lib/import"; import { Spinner } from "../components/Spinner"; +import { cancellationSignal } from "../lib/network"; export interface ImportCommandOptions { channel: string; @@ -354,6 +355,7 @@ export function ImportCommand({ uploadConcurrency, pollIntervalMs, onEvent, + signal: cancellationSignal(), }; const result = await executeImport(runOptions); @@ -599,6 +601,7 @@ export async function runImportJsonStream( uploadConcurrency: options.uploadConcurrency, pollIntervalMs: options.pollIntervalMs, onEvent: write, + signal: cancellationSignal(), }); return importExitCode(summary); diff --git a/src/components/Spinner.tsx b/src/components/Spinner.tsx index d92801c..3fe5a43 100644 --- a/src/components/Spinner.tsx +++ b/src/components/Spinner.tsx @@ -4,18 +4,20 @@ import { Text } from "ink"; const frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; export function Spinner({ label }: { label?: string }) { + const animate = Boolean(process.stdout.isTTY); const [i, setI] = useState(0); useEffect(() => { + if (!animate) return; const timer = setInterval(() => { setI((prev) => (prev + 1) % frames.length); }, 80); return () => clearInterval(timer); - }, []); + }, [animate]); return ( - {frames[i]} + {animate ? frames[i] : "…"} {label && {label}} ); diff --git a/src/lib/args.test.ts b/src/lib/args.test.ts index 8613c13..b3bd070 100644 --- a/src/lib/args.test.ts +++ b/src/lib/args.test.ts @@ -26,6 +26,36 @@ test("parseArgs handles long, short, and -- separator", () => { assert.equal(parsed.flags.h, true); }); +test("parseArgs keeps command tokens after boolean globals", () => { + const helpFirst = parseArgs(["--help", "channel"]); + assert.deepEqual(helpFirst.args, ["channel"]); + assert.equal(helpFirst.flags.help, true); + + const jsonFirst = parseArgs(["--json", "ping"]); + assert.deepEqual(jsonFirst.args, ["ping"]); + assert.equal(jsonFirst.flags.json, true); + + const mixedOrder = parseArgs(["channel", "--help"]); + assert.deepEqual(mixedOrder.args, ["channel"]); + assert.equal(mixedOrder.flags.help, true); +}); + +test("parseArgs preserves value semantics for value flags", () => { + const parsed = parseArgs([ + "channel", + "contents", + "worldmaking", + "--sort", + "updated_at_desc", + "--page", + "2", + ]); + + assert.deepEqual(parsed.args, ["channel", "contents", "worldmaking"]); + assert.equal(parsed.flags.sort, "updated_at_desc"); + assert.equal(parsed.flags.page, "2"); +}); + test("requireArg enforces required values", () => { assert.equal(requireArg(["abc"], 0, "slug"), "abc"); assert.throws( diff --git a/src/lib/args.ts b/src/lib/args.ts index dd875b3..ccd7143 100644 --- a/src/lib/args.ts +++ b/src/lib/args.ts @@ -5,6 +5,18 @@ interface ParsedArgs { flags: Flags; } +const KNOWN_BOOLEAN_LONG_FLAGS = new Set([ + "help", + "json", + "quiet", + "version", + "yes", + "recursive", + "interactive", +]); + +const KNOWN_BOOLEAN_SHORT_FLAGS = new Set(["h", "j", "q", "v", "y"]); + export function parseArgs(argv: string[]): ParsedArgs { const args: string[] = []; const flags: Flags = {}; @@ -33,6 +45,11 @@ export function parseArgs(argv: string[]): ParsedArgs { continue; } + if (KNOWN_BOOLEAN_LONG_FLAGS.has(key)) { + flags[key] = true; + continue; + } + const next = argv[i + 1]; if (next && !next.startsWith("-")) { flags[key] = next; @@ -49,6 +66,10 @@ export function parseArgs(argv: string[]): ParsedArgs { for (const shortFlag of shorts) flags[shortFlag] = true; } else { const key = shorts; + if (KNOWN_BOOLEAN_SHORT_FLAGS.has(key)) { + flags[key] = true; + continue; + } const next = argv[i + 1]; if (next && !next.startsWith("-")) { flags[key] = next; diff --git a/src/lib/import.test.ts b/src/lib/import.test.ts index 7253da8..b5eef4b 100644 --- a/src/lib/import.test.ts +++ b/src/lib/import.test.ts @@ -182,3 +182,103 @@ test("executeImport captures upload and batch failures", async () => { assert.equal(summary.batch_failures[0]?.file, "c.txt"); assert.equal(importExitCode(summary), 1); }); + +test("executeImport retries transient upload failures", async () => { + const files: ImportFile[] = [ + { absolutePath: "/tmp/retry.txt", relativePath: "retry.txt" }, + ]; + let uploadAttempts = 0; + let sleepCalls = 0; + + const adapter: ImportAdapter = { + async uploadFile(file) { + uploadAttempts += 1; + if (uploadAttempts < 3) { + throw new Error("fetch failed"); + } + return { s3Url: `https://s3/${file.relativePath}` }; + }, + async submitBatch() { + return { batch_id: "batch-1" }; + }, + async getBatchStatus(batchId) { + return { + batch_id: batchId, + status: "completed", + total: 1, + successful_count: 1, + failed_count: 0, + }; + }, + async sleep() { + sleepCalls += 1; + }, + }; + + const summary = await executeImport({ + channel: "test-channel", + directory: ".", + recursive: false, + batchSize: 100, + uploadConcurrency: 1, + pollIntervalMs: 1, + adapter, + discover: async () => files, + }); + + assert.equal(uploadAttempts, 3); + assert.ok(sleepCalls >= 2); + assert.equal(summary.uploaded, 1); + assert.equal(summary.upload_failed, 0); + assert.equal(importExitCode(summary), 0); +}); + +test("executeImport tolerates transient batch status errors", async () => { + const files: ImportFile[] = [ + { absolutePath: "/tmp/a.txt", relativePath: "a.txt" }, + ]; + let statusAttempts = 0; + let sleepCalls = 0; + + const adapter: ImportAdapter = { + async uploadFile(file) { + return { s3Url: `https://s3/${file.relativePath}` }; + }, + async submitBatch() { + return { batch_id: "batch-1" }; + }, + async getBatchStatus(batchId) { + statusAttempts += 1; + if (statusAttempts < 3) { + throw new Error("network timeout"); + } + return { + batch_id: batchId, + status: "completed", + total: 1, + successful_count: 1, + failed_count: 0, + }; + }, + async sleep() { + sleepCalls += 1; + }, + }; + + const summary = await executeImport({ + channel: "test-channel", + directory: ".", + recursive: false, + batchSize: 100, + uploadConcurrency: 1, + pollIntervalMs: 1, + adapter, + discover: async () => files, + }); + + assert.equal(statusAttempts, 3); + assert.ok(sleepCalls >= 2); + assert.equal(summary.batch_successful, 1); + assert.equal(summary.batch_failed, 0); + assert.equal(importExitCode(summary), 0); +}); diff --git a/src/lib/import.ts b/src/lib/import.ts index e0cab00..5950e34 100644 --- a/src/lib/import.ts +++ b/src/lib/import.ts @@ -1,6 +1,6 @@ import { readdir } from "node:fs/promises"; import { join, resolve } from "node:path"; -import { client, getData } from "../api/client"; +import { ArenaError, client, getData } from "../api/client"; import { uploadLocalFile } from "./upload"; export interface ImportFile { @@ -106,6 +106,12 @@ interface ImportBatch { status?: BatchStatus; } +const MAX_UPLOAD_ATTEMPTS = 3; +const MAX_BATCH_SUBMIT_ATTEMPTS = 3; +const MAX_BATCH_STATUS_ATTEMPTS = 4; +const MAX_STATUS_FAILURE_STREAK = 5; +const RETRY_BASE_DELAY_MS = 300; + export interface ImportAdapter { uploadFile: (file: ImportFile) => Promise<{ s3Url: string }>; submitBatch: ( @@ -128,6 +134,7 @@ export interface ExecuteImportOptions { adapter?: ImportAdapter; discover?: (directory: string, recursive: boolean) => Promise; onEvent?: (event: ImportEvent) => void | Promise; + signal?: AbortSignal; } export function importExitCode(summary: ImportSummary): number { @@ -219,6 +226,13 @@ async function emit( await onEvent(event); } +function throwIfAborted(signal?: AbortSignal): void { + if (!signal?.aborted) return; + const reason = signal.reason; + if (reason instanceof Error) throw reason; + throw new Error("Operation cancelled by user"); +} + function terminal(status: BatchStatus["status"]): boolean { return status === "completed" || status === "failed"; } @@ -281,6 +295,47 @@ function mapBatchFailures(batches: ImportBatch[]): BatchFailure[] { return failures; } +function isTransientError(err: unknown): boolean { + if (err instanceof ArenaError) { + return err.status >= 500 || err.status === 429; + } + const message = + err instanceof Error ? err.message.toLowerCase() : String(err); + return ( + message.includes("timed out") || + message.includes("fetch failed") || + message.includes("network") || + message.includes("econnreset") || + message.includes("socket") + ); +} + +async function retryWithBackoff(options: { + attempts: number; + baseDelayMs: number; + sleep: (ms: number) => Promise; + run: () => Promise; + shouldRetry: (err: unknown) => boolean; +}): Promise { + const { attempts, baseDelayMs, sleep, run, shouldRetry } = options; + let lastError: unknown; + + for (let attempt = 1; attempt <= attempts; attempt++) { + try { + return await run(); + } catch (err: unknown) { + lastError = err; + if (attempt >= attempts || !shouldRetry(err)) { + throw err; + } + const waitMs = Math.min(baseDelayMs * 2 ** (attempt - 1), 5_000); + await sleep(waitMs); + } + } + + throw lastError instanceof Error ? lastError : new Error(String(lastError)); +} + async function runImportPipeline(options: { channel: string; directory: string; @@ -292,6 +347,7 @@ async function runImportPipeline(options: { pollIntervalMs: number; adapter: ImportAdapter; onEvent?: (event: ImportEvent) => void | Promise; + signal?: AbortSignal; }): Promise { const { channel, @@ -304,8 +360,11 @@ async function runImportPipeline(options: { pollIntervalMs, adapter, onEvent, + signal, } = options; + throwIfAborted(signal); + await emit(onEvent, { type: "selection_completed", selected: selectedFiles.length, @@ -325,6 +384,7 @@ async function runImportPipeline(options: { const workers = Array.from({ length: workerCount }, async () => { while (true) { + throwIfAborted(signal); const index = cursor; cursor += 1; if (index >= selectedFiles.length) return; @@ -332,7 +392,13 @@ async function runImportPipeline(options: { const file = selectedFiles[index]!; try { - const uploaded = await adapter.uploadFile(file); + const uploaded = await retryWithBackoff({ + attempts: MAX_UPLOAD_ATTEMPTS, + baseDelayMs: RETRY_BASE_DELAY_MS, + sleep: adapter.sleep, + run: () => adapter.uploadFile(file), + shouldRetry: isTransientError, + }); uploadedByIndex[index] = { index, file, s3Url: uploaded.s3Url }; successfulUploads += 1; } catch (err: unknown) { @@ -376,11 +442,19 @@ async function runImportPipeline(options: { const batches: ImportBatch[] = []; for (let i = 0; i < chunks.length; i++) { + throwIfAborted(signal); const chunk = chunks[i]!; - const response = await adapter.submitBatch( - channel, - chunk.map((entry) => ({ value: entry.s3Url })), - ); + const response = await retryWithBackoff({ + attempts: MAX_BATCH_SUBMIT_ATTEMPTS, + baseDelayMs: RETRY_BASE_DELAY_MS, + sleep: adapter.sleep, + run: () => + adapter.submitBatch( + channel, + chunk.map((entry) => ({ value: entry.s3Url })), + ), + shouldRetry: isTransientError, + }); batches.push({ batchId: response.batch_id, @@ -397,11 +471,41 @@ async function runImportPipeline(options: { } const pending = new Set(batches.map((batch) => batch.batchId)); + const statusFailureStreak = new Map(); while (pending.size > 0) { + throwIfAborted(signal); for (const batch of batches) { if (!pending.has(batch.batchId)) continue; + throwIfAborted(signal); + + let status: BatchStatus; + try { + status = await retryWithBackoff({ + attempts: MAX_BATCH_STATUS_ATTEMPTS, + baseDelayMs: RETRY_BASE_DELAY_MS, + sleep: adapter.sleep, + run: () => adapter.getBatchStatus(batch.batchId), + shouldRetry: isTransientError, + }); + statusFailureStreak.delete(batch.batchId); + } catch (err: unknown) { + const streak = (statusFailureStreak.get(batch.batchId) ?? 0) + 1; + statusFailureStreak.set(batch.batchId, streak); + if (streak < MAX_STATUS_FAILURE_STREAK) { + continue; + } + + const error = err instanceof Error ? err.message : String(err); + status = { + batch_id: batch.batchId, + status: "failed", + total: batch.entries.length, + successful_count: 0, + failed_count: batch.entries.length, + error: `Status polling failed repeatedly: ${error}`, + }; + } - const status = await adapter.getBatchStatus(batch.batchId); batch.status = status; await emit(onEvent, { @@ -428,6 +532,7 @@ async function runImportPipeline(options: { }); if (pending.size > 0) { + throwIfAborted(signal); await adapter.sleep(pollIntervalMs); } } @@ -468,8 +573,11 @@ export async function executeImport( discoveredCount, discover, onEvent, + signal, } = options; + throwIfAborted(signal); + let files = selectedFiles; let totalDiscovered = discoveredCount; @@ -482,6 +590,7 @@ export async function executeImport( const discoverFiles = discover ?? discoverImportFiles; files = await discoverFiles(directory, recursive); + throwIfAborted(signal); totalDiscovered = files.length; await emit(onEvent, { @@ -502,6 +611,7 @@ export async function executeImport( pollIntervalMs: options.pollIntervalMs, adapter, onEvent, + signal, }); await emit(onEvent, { type: "completed", summary }); diff --git a/src/lib/network.ts b/src/lib/network.ts new file mode 100644 index 0000000..b6afe11 --- /dev/null +++ b/src/lib/network.ts @@ -0,0 +1,75 @@ +const DEFAULT_REQUEST_TIMEOUT_MS = 30_000; + +const cancellationController = new AbortController(); +let cancellationInitialized = false; +let forcedExitTimer: NodeJS.Timeout | undefined; + +function configuredTimeoutMs(): number { + const raw = process.env["ARENA_REQUEST_TIMEOUT_MS"]; + if (!raw) return DEFAULT_REQUEST_TIMEOUT_MS; + const parsed = Number(raw); + if (!Number.isFinite(parsed) || parsed <= 0) + return DEFAULT_REQUEST_TIMEOUT_MS; + return Math.floor(parsed); +} + +export function initCancellationHandling(): void { + if (cancellationInitialized) return; + cancellationInitialized = true; + + process.on("SIGINT", () => { + if (cancellationController.signal.aborted) { + process.exit(130); + return; + } + + process.exitCode = 130; + cancellationController.abort(new Error("Operation cancelled by user")); + forcedExitTimer = setTimeout(() => process.exit(130), 500); + forcedExitTimer.unref(); + }); +} + +export function cancellationSignal(): AbortSignal { + return cancellationController.signal; +} + +export function withRequestSignal( + inputSignal?: AbortSignal, + timeoutMs = configuredTimeoutMs(), +): AbortSignal { + const signals: AbortSignal[] = [cancellationSignal()]; + if (inputSignal) signals.push(inputSignal); + if (timeoutMs > 0) signals.push(AbortSignal.timeout(timeoutMs)); + return AbortSignal.any(signals); +} + +function timeoutError(timeoutMs: number): Error { + return new Error(`Request timed out after ${timeoutMs}ms`); +} + +export async function fetchWithTimeout( + input: RequestInfo | URL, + init?: RequestInit, + timeoutMs = configuredTimeoutMs(), +): Promise { + const timeoutSignal = + timeoutMs > 0 ? AbortSignal.timeout(timeoutMs) : undefined; + const signal = AbortSignal.any( + [cancellationSignal(), init?.signal, timeoutSignal].filter( + (candidate): candidate is AbortSignal => Boolean(candidate), + ), + ); + + try { + return await fetch(input, { ...init, signal }); + } catch (err: unknown) { + if (!signal.aborted) throw err; + if (timeoutSignal?.aborted) { + throw timeoutError(timeoutMs); + } + const reason = signal.reason; + if (reason instanceof Error) throw reason; + throw new Error("Operation cancelled by user"); + } +} diff --git a/src/lib/oauth.ts b/src/lib/oauth.ts index c934f32..c4c64d8 100644 --- a/src/lib/oauth.ts +++ b/src/lib/oauth.ts @@ -2,6 +2,7 @@ import { createServer, type IncomingMessage, type ServerResponse } from "http"; import { randomBytes, createHash } from "crypto"; import { openUrl } from "./open"; import { loadEnv } from "./env"; +import { fetchWithTimeout } from "./network"; // Ensure env is populated before reading OAuth URLs. loadEnv(); @@ -127,7 +128,7 @@ export async function performOAuthFlow( }); // Exchange code for token - const response = await fetch(tokenUrl(), { + const response = await fetchWithTimeout(tokenUrl(), { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ diff --git a/src/lib/registry.test.ts b/src/lib/registry.test.ts index 896b6f3..d459abe 100644 --- a/src/lib/registry.test.ts +++ b/src/lib/registry.test.ts @@ -38,22 +38,13 @@ function assertNonProductionApiBase(): void { assertNonProductionApiBase(); -function quietResult(result: unknown): unknown { - if (!result || typeof result !== "object") return result; - if (Array.isArray(result)) return result.map(quietResult); - const obj = result as Record; - if ("id" in obj) return { id: obj.id }; - if ("slug" in obj) return { slug: obj.slug }; - return result; -} - async function json(...rawArgs: string[]): Promise { const { args, flags } = parseArgs(rawArgs); const [command, ...rest] = args; const def = commandMap.get(command!); if (!def?.json) throw new Error(`Unknown command: ${command}`); const result = await def.json(rest, flags); - return flags.quiet ? quietResult(result) : result; + return result; } async function sleep(ms: number): Promise { @@ -665,11 +656,11 @@ describe("aliases", () => { // ── Quiet mode ── describe("quiet mode", () => { - test("--quiet reduces output to just id", async () => { + test("--quiet preserves schema shape", async () => { const data = (await json("whoami", "--quiet")) as Record; assert.ok("id" in data); - assert.ok(!("name" in data)); - assert.ok(!("slug" in data)); + assert.ok("name" in data); + assert.ok("slug" in data); }); test("--quiet passes through objects without id or slug", async () => { diff --git a/src/lib/upload.ts b/src/lib/upload.ts index 8e6cbe9..26e8da0 100644 --- a/src/lib/upload.ts +++ b/src/lib/upload.ts @@ -1,6 +1,7 @@ import { readFileSync } from "fs"; import { basename } from "path"; import { ArenaError, client, getData } from "../api/client"; +import { fetchWithTimeout } from "./network"; const EXT_TO_MIME: Record = { jpg: "image/jpeg", @@ -76,7 +77,7 @@ export async function uploadLocalFile(filePath: string): Promise<{ if (!uploadTarget) throw new Error("Upload target was not returned by Are.na"); - const putResponse = await fetch(uploadTarget.upload_url, { + const putResponse = await fetchWithTimeout(uploadTarget.upload_url, { method: "PUT", headers: { "Content-Type": uploadTarget.content_type }, body: fileBuffer, diff --git a/src/lib/vcr.ts b/src/lib/vcr.ts index 1aef854..7348fd0 100644 --- a/src/lib/vcr.ts +++ b/src/lib/vcr.ts @@ -1,5 +1,6 @@ import { join } from "node:path"; import { mkdirSync, readFileSync, writeFileSync, existsSync } from "node:fs"; +import { fetchWithTimeout } from "./network"; type VcrMode = "off" | "record" | "replay" | "auto"; @@ -89,7 +90,7 @@ export async function vcrFetch( init?: RequestInit, ): Promise { const mode = modeFromEnv(); - if (mode === "off") return fetch(input, init); + if (mode === "off") return fetchWithTimeout(input, init); const normalized = new Request(input, init); const method = normalized.method.toUpperCase(); @@ -118,7 +119,12 @@ export async function vcrFetch( } const headers = Object.fromEntries(normalized.headers.entries()); - const response = await fetch(url, { method, headers, body: requestBody }); + const response = await fetchWithTimeout(url, { + method, + headers, + body: requestBody, + signal: normalized.signal, + }); const responseBody = await response.text(); tape.entries.push({