diff --git a/apps/server/src/diagnostics/ProcessDiagnostics.test.ts b/apps/server/src/diagnostics/ProcessDiagnostics.test.ts index 18a54326de1..a468746a081 100644 --- a/apps/server/src/diagnostics/ProcessDiagnostics.test.ts +++ b/apps/server/src/diagnostics/ProcessDiagnostics.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "@effect/vitest"; +import { assert, describe, expect, it } from "@effect/vitest"; import * as DateTime from "effect/DateTime"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; @@ -66,6 +66,43 @@ describe("ProcessDiagnostics", () => { }), ); + it.effect("parses Windows process JSON with the schema decoder", () => + Effect.sync(() => { + const rows = ProcessDiagnostics.parseWindowsProcessRows( + [ + "[", + '{"ProcessId":10,"ParentProcessId":1,"Name":"node.exe","CommandLine":"node server.js","Status":null,"WorkingSetSize":2048,"PercentProcessorTime":1.25},', + '{"ProcessId":11,"ParentProcessId":10,"Name":"agent.exe","CommandLine":null,"Status":"Running","WorkingSetSize":4096,"PercentProcessorTime":0}', + "]", + ].join(""), + ); + + assert.deepEqual(rows, [ + { + pid: 10, + ppid: 1, + pgid: null, + status: "Live", + cpuPercent: 1.25, + rssBytes: 2048, + elapsed: "", + command: "node server.js", + }, + { + pid: 11, + ppid: 10, + pgid: null, + status: "Running", + cpuPercent: 0, + rssBytes: 4096, + elapsed: "", + command: "agent.exe", + }, + ]); + assert.deepEqual(ProcessDiagnostics.parseWindowsProcessRows("{"), []); + }), + ); + it.effect("aggregates only descendants of the server process", () => Effect.sync(() => { const diagnostics = ProcessDiagnostics.aggregateProcessDiagnostics({ diff --git a/apps/server/src/diagnostics/ProcessDiagnostics.ts b/apps/server/src/diagnostics/ProcessDiagnostics.ts index 7730b8c7d6b..b3eb6f10c1b 100644 --- a/apps/server/src/diagnostics/ProcessDiagnostics.ts +++ b/apps/server/src/diagnostics/ProcessDiagnostics.ts @@ -26,7 +26,7 @@ interface ProcessRow { readonly command: string; } -const PROCESS_QUERY_TIMEOUT_MS = 1_000; +const PROCESS_QUERY_TIMEOUT = Duration.seconds(1); const POSIX_PROCESS_QUERY_COMMAND = "pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command="; const PROCESS_QUERY_MAX_OUTPUT_BYTES = 2 * 1024 * 1024; @@ -52,6 +52,20 @@ class ProcessDiagnosticsError extends Schema.TaggedErrorClass => + Option.fromNullishOr(value).pipe( + Option.map((entry) => entry.trim()), + Option.filter((entry) => entry.length > 0), + ); + export function parsePosixProcessRows(output: string): ReadonlyArray { const rows: ProcessRow[] = []; const rowPattern = @@ -139,51 +159,54 @@ export function parsePosixProcessRows(output: string): ReadonlyArray return rows; } -function normalizeWindowsProcessRow(value: unknown): ProcessRow | null { - if (typeof value !== "object" || value === null) return null; - const record = value as Record; - const pid = typeof record.ProcessId === "number" ? record.ProcessId : null; - const ppid = typeof record.ParentProcessId === "number" ? record.ParentProcessId : null; - const commandLine = - typeof record.CommandLine === "string" && record.CommandLine.trim().length > 0 - ? record.CommandLine - : typeof record.Name === "string" - ? record.Name - : null; - const workingSet = - typeof record.WorkingSetSize === "number" && Number.isFinite(record.WorkingSetSize) - ? Math.max(0, Math.round(record.WorkingSetSize)) - : 0; - const cpuPercent = - typeof record.PercentProcessorTime === "number" && Number.isFinite(record.PercentProcessorTime) - ? Math.max(0, record.PercentProcessorTime) - : 0; - - if (!pid || pid <= 0 || ppid === null || ppid < 0 || !commandLine) return null; - return { - pid, - ppid, - pgid: null, - status: typeof record.Status === "string" && record.Status.length > 0 ? record.Status : "Live", - cpuPercent, - rssBytes: workingSet, - elapsed: "", - command: commandLine, - }; +function normalizeWindowsProcessRow(value: unknown): Option.Option { + return decodeWindowsProcessRecord(value).pipe( + Option.flatMap((record: WindowsProcessRecord) => { + const commandLine = trimNonEmptyOption(record.CommandLine).pipe( + Option.orElse(() => trimNonEmptyOption(record.Name)), + ); + if ( + !Number.isInteger(record.ProcessId) || + record.ProcessId <= 0 || + !Number.isInteger(record.ParentProcessId) || + record.ParentProcessId < 0 || + Option.isNone(commandLine) + ) { + return Option.none(); + } + + return Option.some({ + pid: record.ProcessId, + ppid: record.ParentProcessId, + pgid: null, + status: Option.getOrElse(trimNonEmptyOption(record.Status), () => "Live"), + cpuPercent: + typeof record.PercentProcessorTime === "number" && + Number.isFinite(record.PercentProcessorTime) + ? Math.max(0, record.PercentProcessorTime) + : 0, + rssBytes: + typeof record.WorkingSetSize === "number" && Number.isFinite(record.WorkingSetSize) + ? Math.max(0, Math.round(record.WorkingSetSize)) + : 0, + elapsed: "", + command: commandLine.value, + }); + }), + ); } -function parseWindowsProcessRows(output: string): ReadonlyArray { - if (output.trim().length === 0) return []; - try { - const parsed = JSON.parse(output) as unknown; - const records = Array.isArray(parsed) ? parsed : [parsed]; - return records.flatMap((record) => { - const row = normalizeWindowsProcessRow(record); - return row ? [row] : []; - }); - } catch { - return []; - } +export function parseWindowsProcessRows(output: string): ReadonlyArray { + const parsed = decodeWindowsProcessJson(output); + if (Option.isNone(parsed)) return []; + + const records = Array.isArray(parsed.value) ? parsed.value : [parsed.value]; + return records.flatMap((record) => + Option.match(normalizeWindowsProcessRow(record), { + onNone: () => [], + onSome: (row) => [row], + }), + ); } function buildDescendantEntries( @@ -309,7 +332,7 @@ const runProcess = Effect.fn("runProcess")( (effect, input) => effect.pipe( Effect.scoped, - Effect.timeoutOption(Duration.millis(PROCESS_QUERY_TIMEOUT_MS)), + Effect.timeoutOption(PROCESS_QUERY_TIMEOUT), Effect.flatMap((result) => Option.match(result, { onNone: () => Effect.fail(toProcessDiagnosticsError(`${input.errorMessage} timed out.`)), diff --git a/packages/tailscale/src/tailscale.test.ts b/packages/tailscale/src/tailscale.test.ts index dd2b1772fd6..1113db04f2b 100644 --- a/packages/tailscale/src/tailscale.test.ts +++ b/packages/tailscale/src/tailscale.test.ts @@ -1,8 +1,12 @@ import { assert, describe, it } from "@effect/vitest"; +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; +import * as Fiber from "effect/Fiber"; import * as Layer from "effect/Layer"; +import * as Result from "effect/Result"; import * as Sink from "effect/Sink"; import * as Stream from "effect/Stream"; +import * as TestClock from "effect/testing/TestClock"; import { ChildProcessSpawner } from "effect/unstable/process"; import { @@ -35,6 +39,31 @@ function mockHandle(result: { stdout?: string; stderr?: string; code?: number }) }); } +function mockNeverFinishingHandle() { + let finish: ((exitCode: ChildProcessSpawner.ExitCode) => void) | null = null; + return ChildProcessSpawner.makeHandle({ + pid: ChildProcessSpawner.ProcessId(1), + exitCode: Effect.callback((resume) => { + finish = (exitCode) => resume(Effect.succeed(exitCode)); + return Effect.sync(() => { + finish = null; + }); + }), + isRunning: Effect.succeed(true), + kill: () => + Effect.sync(() => { + finish?.(ChildProcessSpawner.ExitCode(143)); + }), + unref: Effect.succeed(Effect.void), + stdin: Sink.drain, + stdout: Stream.empty, + stderr: Stream.empty, + all: Stream.empty, + getInputFd: () => Sink.drain, + getOutputFd: () => Stream.empty, + }); +} + function mockSpawnerLayer( handler: ( command: string, @@ -112,6 +141,27 @@ describe("tailscale", () => { }); }); + it.effect("times out status reads using the test clock", () => { + const spawnerLayer = Layer.succeed( + ChildProcessSpawner.ChildProcessSpawner, + ChildProcessSpawner.make(() => Effect.succeed(mockNeverFinishingHandle())), + ); + const layer = Layer.mergeAll(spawnerLayer, TestClock.layer()); + + return Effect.gen(function* () { + const fiber = yield* Effect.forkChild(Effect.result(readTailscaleStatus)); + yield* Effect.yieldNow; + yield* TestClock.adjust(Duration.seconds(2)); + + const result = yield* Fiber.join(fiber); + assert.isTrue(Result.isFailure(result)); + if (Result.isFailure(result)) { + assert.equal(result.failure._tag, "TailscaleCommandError"); + assert.equal(result.failure.message, "Tailscale status timed out."); + } + }).pipe(Effect.provide(layer)); + }); + it.effect("configures tailscale serve through the process spawner service", () => { const layer = mockSpawnerLayer((command, args) => { assert.equal(command, "tailscale"); diff --git a/packages/tailscale/src/tailscale.ts b/packages/tailscale/src/tailscale.ts index c40cd54fc44..319ab4430ab 100644 --- a/packages/tailscale/src/tailscale.ts +++ b/packages/tailscale/src/tailscale.ts @@ -1,4 +1,4 @@ -import * as Data from "effect/Data"; +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Option from "effect/Option"; import * as Schema from "effect/Schema"; @@ -7,24 +7,33 @@ import { HttpClient, HttpClientRequest } from "effect/unstable/http"; import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; export const DEFAULT_TAILSCALE_SERVE_PORT = 443; -export const TAILSCALE_STATUS_TIMEOUT_MS = 1_500; -export const TAILSCALE_SERVE_TIMEOUT_MS = 10_000; -export const TAILSCALE_PROBE_TIMEOUT_MS = 2_500; +export const TAILSCALE_STATUS_TIMEOUT = Duration.millis(1_500); +export const TAILSCALE_SERVE_TIMEOUT = Duration.seconds(10); +export const TAILSCALE_PROBE_TIMEOUT = Duration.millis(2_500); -export class TailscaleCommandError extends Data.TaggedError("TailscaleCommandError")<{ - readonly command: readonly string[]; - readonly message: string; - readonly exitCode: number | null; - readonly stderr: string; -}> {} +export class TailscaleCommandError extends Schema.TaggedErrorClass()( + "TailscaleCommandError", + { + command: Schema.Array(Schema.String), + message: Schema.String, + exitCode: Schema.NullOr(Schema.Int), + stderr: Schema.String, + }, +) {} -export class TailscaleStatusParseError extends Data.TaggedError("TailscaleStatusParseError")<{ - readonly cause: unknown; -}> {} +export class TailscaleStatusParseError extends Schema.TaggedErrorClass()( + "TailscaleStatusParseError", + { + cause: Schema.Defect, + }, +) {} -export class TailscaleUnavailableError extends Data.TaggedError("TailscaleUnavailableError")<{ - readonly reason: string; -}> {} +export class TailscaleUnavailableError extends Schema.TaggedErrorClass()( + "TailscaleUnavailableError", + { + reason: Schema.String, + }, +) {} const TailscaleStatusSelf = Schema.Struct({ DNSName: Schema.optional(Schema.Unknown), @@ -174,7 +183,7 @@ export const readTailscaleStatus: Effect.Effect< return yield* parseTailscaleStatus(stdout); }).pipe( Effect.scoped, - Effect.timeoutOption(TAILSCALE_STATUS_TIMEOUT_MS), + Effect.timeoutOption(TAILSCALE_STATUS_TIMEOUT), Effect.flatMap((result) => Option.match(result, { onNone: () => @@ -206,7 +215,7 @@ const runTailscaleCommand = ( readonly runMessage: string; readonly exitMessage: (exitCode: number) => string; readonly timeoutMessage: string; - readonly timeoutMs: number; + readonly timeout: Duration.Duration; }, ): Effect.Effect => Effect.gen(function* () { @@ -243,7 +252,7 @@ const runTailscaleCommand = ( } }).pipe( Effect.scoped, - Effect.timeoutOption(input.timeoutMs), + Effect.timeoutOption(input.timeout), Effect.flatMap((result) => Option.match(result, { onNone: () => Effect.fail(tailscaleCommandError(args, input.timeoutMessage, null)), @@ -265,7 +274,7 @@ export const ensureTailscaleServe = (input: { runMessage: "Failed to run tailscale serve.", exitMessage: (exitCode) => `Tailscale serve exited with code ${exitCode}.`, timeoutMessage: "Tailscale serve timed out.", - timeoutMs: TAILSCALE_SERVE_TIMEOUT_MS, + timeout: TAILSCALE_SERVE_TIMEOUT, }); }; @@ -281,13 +290,13 @@ export const disableTailscaleServe = ( runMessage: "Failed to run tailscale serve off.", exitMessage: (exitCode) => `Tailscale serve off exited with code ${exitCode}.`, timeoutMessage: "Tailscale serve off timed out.", - timeoutMs: TAILSCALE_SERVE_TIMEOUT_MS, + timeout: TAILSCALE_SERVE_TIMEOUT, }); }); export const probeTailscaleHttpsEndpoint = (input: { readonly baseUrl: string; - readonly timeoutMs?: number; + readonly timeout?: Duration.Duration; }): Effect.Effect => Effect.gen(function* () { const client = yield* HttpClient.HttpClient; @@ -295,7 +304,7 @@ export const probeTailscaleHttpsEndpoint = (input: { const url = new URL("/.well-known/t3/environment", input.baseUrl); const request = HttpClientRequest.get(url.toString()); return yield* client.execute(request); - }).pipe(Effect.timeoutOption(input.timeoutMs ?? TAILSCALE_PROBE_TIMEOUT_MS)); + }).pipe(Effect.timeoutOption(input.timeout ?? TAILSCALE_PROBE_TIMEOUT)); return Option.match(response, { onNone: () => false,