Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion apps/server/src/diagnostics/ProcessDiagnostics.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, expect, it } from "@effect/vitest";
import { assert, describe, expect, it } from "@effect/vitest";
import * as DateTime from "effect/DateTime";
import * as Effect from "effect/Effect";
import * as Layer from "effect/Layer";
Expand Down Expand Up @@ -66,6 +66,43 @@ describe("ProcessDiagnostics", () => {
}),
);

it.effect("parses Windows process JSON with the schema decoder", () =>
Effect.sync(() => {
const rows = ProcessDiagnostics.parseWindowsProcessRows(
[
"[",
'{"ProcessId":10,"ParentProcessId":1,"Name":"node.exe","CommandLine":"node server.js","Status":null,"WorkingSetSize":2048,"PercentProcessorTime":1.25},',
'{"ProcessId":11,"ParentProcessId":10,"Name":"agent.exe","CommandLine":null,"Status":"Running","WorkingSetSize":4096,"PercentProcessorTime":0}',
"]",
].join(""),
);

assert.deepEqual(rows, [
{
pid: 10,
ppid: 1,
pgid: null,
status: "Live",
cpuPercent: 1.25,
rssBytes: 2048,
elapsed: "",
command: "node server.js",
},
{
pid: 11,
ppid: 10,
pgid: null,
status: "Running",
cpuPercent: 0,
rssBytes: 4096,
elapsed: "",
command: "agent.exe",
},
]);
assert.deepEqual(ProcessDiagnostics.parseWindowsProcessRows("{"), []);
}),
);

it.effect("aggregates only descendants of the server process", () =>
Effect.sync(() => {
const diagnostics = ProcessDiagnostics.aggregateProcessDiagnostics({
Expand Down
113 changes: 68 additions & 45 deletions apps/server/src/diagnostics/ProcessDiagnostics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ interface ProcessRow {
readonly command: string;
}

const PROCESS_QUERY_TIMEOUT_MS = 1_000;
const PROCESS_QUERY_TIMEOUT = Duration.seconds(1);
const POSIX_PROCESS_QUERY_COMMAND = "pid=,ppid=,pgid=,stat=,pcpu=,rss=,etime=,command=";
const PROCESS_QUERY_MAX_OUTPUT_BYTES = 2 * 1024 * 1024;

Expand All @@ -52,6 +52,20 @@ class ProcessDiagnosticsError extends Schema.TaggedErrorClass<ProcessDiagnostics
) {}
const isProcessDiagnosticsError = Schema.is(ProcessDiagnosticsError);

const WindowsProcessRecord = Schema.Struct({
ProcessId: Schema.Number,
ParentProcessId: Schema.Number,
CommandLine: Schema.optional(Schema.NullOr(Schema.String)),
Name: Schema.optional(Schema.NullOr(Schema.String)),
PercentProcessorTime: Schema.optional(Schema.NullOr(Schema.Number)),
Status: Schema.optional(Schema.NullOr(Schema.String)),
WorkingSetSize: Schema.optional(Schema.NullOr(Schema.Number)),
});
type WindowsProcessRecord = typeof WindowsProcessRecord.Type;

const decodeWindowsProcessJson = Schema.decodeOption(Schema.fromJsonString(Schema.Unknown));
const decodeWindowsProcessRecord = Schema.decodeUnknownOption(WindowsProcessRecord);

function toProcessDiagnosticsError(message: string, cause?: unknown): ProcessDiagnosticsError {
return new ProcessDiagnosticsError({
message,
Expand All @@ -74,6 +88,12 @@ function parseNumber(value: string): number | null {
return Number.isFinite(parsed) ? parsed : null;
}

const trimNonEmptyOption = (value: string | null | undefined): Option.Option<string> =>
Option.fromNullishOr(value).pipe(
Option.map((entry) => entry.trim()),
Option.filter((entry) => entry.length > 0),
);

export function parsePosixProcessRows(output: string): ReadonlyArray<ProcessRow> {
const rows: ProcessRow[] = [];
const rowPattern =
Expand Down Expand Up @@ -139,51 +159,54 @@ export function parsePosixProcessRows(output: string): ReadonlyArray<ProcessRow>
return rows;
}

function normalizeWindowsProcessRow(value: unknown): ProcessRow | null {
if (typeof value !== "object" || value === null) return null;
const record = value as Record<string, unknown>;
const pid = typeof record.ProcessId === "number" ? record.ProcessId : null;
const ppid = typeof record.ParentProcessId === "number" ? record.ParentProcessId : null;
const commandLine =
typeof record.CommandLine === "string" && record.CommandLine.trim().length > 0
? record.CommandLine
: typeof record.Name === "string"
? record.Name
: null;
const workingSet =
typeof record.WorkingSetSize === "number" && Number.isFinite(record.WorkingSetSize)
? Math.max(0, Math.round(record.WorkingSetSize))
: 0;
const cpuPercent =
typeof record.PercentProcessorTime === "number" && Number.isFinite(record.PercentProcessorTime)
? Math.max(0, record.PercentProcessorTime)
: 0;

if (!pid || pid <= 0 || ppid === null || ppid < 0 || !commandLine) return null;
return {
pid,
ppid,
pgid: null,
status: typeof record.Status === "string" && record.Status.length > 0 ? record.Status : "Live",
cpuPercent,
rssBytes: workingSet,
elapsed: "",
command: commandLine,
};
function normalizeWindowsProcessRow(value: unknown): Option.Option<ProcessRow> {
return decodeWindowsProcessRecord(value).pipe(
Option.flatMap((record: WindowsProcessRecord) => {
const commandLine = trimNonEmptyOption(record.CommandLine).pipe(
Option.orElse(() => trimNonEmptyOption(record.Name)),
);
if (
!Number.isInteger(record.ProcessId) ||
record.ProcessId <= 0 ||
!Number.isInteger(record.ParentProcessId) ||
record.ParentProcessId < 0 ||
Option.isNone(commandLine)
) {
return Option.none();
}

return Option.some({
pid: record.ProcessId,
ppid: record.ParentProcessId,
pgid: null,
status: Option.getOrElse(trimNonEmptyOption(record.Status), () => "Live"),
cpuPercent:
typeof record.PercentProcessorTime === "number" &&
Number.isFinite(record.PercentProcessorTime)
? Math.max(0, record.PercentProcessorTime)
: 0,
rssBytes:
typeof record.WorkingSetSize === "number" && Number.isFinite(record.WorkingSetSize)
? Math.max(0, Math.round(record.WorkingSetSize))
: 0,
elapsed: "",
command: commandLine.value,
});
}),
);
}

function parseWindowsProcessRows(output: string): ReadonlyArray<ProcessRow> {
if (output.trim().length === 0) return [];
try {
const parsed = JSON.parse(output) as unknown;
const records = Array.isArray(parsed) ? parsed : [parsed];
return records.flatMap((record) => {
const row = normalizeWindowsProcessRow(record);
return row ? [row] : [];
});
} catch {
return [];
}
export function parseWindowsProcessRows(output: string): ReadonlyArray<ProcessRow> {
const parsed = decodeWindowsProcessJson(output);
if (Option.isNone(parsed)) return [];

const records = Array.isArray(parsed.value) ? parsed.value : [parsed.value];
return records.flatMap((record) =>
Option.match(normalizeWindowsProcessRow(record), {
onNone: () => [],
onSome: (row) => [row],
}),
);
}

function buildDescendantEntries(
Expand Down Expand Up @@ -309,7 +332,7 @@ const runProcess = Effect.fn("runProcess")(
(effect, input) =>
effect.pipe(
Effect.scoped,
Effect.timeoutOption(Duration.millis(PROCESS_QUERY_TIMEOUT_MS)),
Effect.timeoutOption(PROCESS_QUERY_TIMEOUT),
Effect.flatMap((result) =>
Option.match(result, {
onNone: () => Effect.fail(toProcessDiagnosticsError(`${input.errorMessage} timed out.`)),
Expand Down
50 changes: 50 additions & 0 deletions packages/tailscale/src/tailscale.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { assert, describe, it } from "@effect/vitest";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Fiber from "effect/Fiber";
import * as Layer from "effect/Layer";
import * as Result from "effect/Result";
import * as Sink from "effect/Sink";
import * as Stream from "effect/Stream";
import * as TestClock from "effect/testing/TestClock";
import { ChildProcessSpawner } from "effect/unstable/process";

import {
Expand Down Expand Up @@ -35,6 +39,31 @@ function mockHandle(result: { stdout?: string; stderr?: string; code?: number })
});
}

function mockNeverFinishingHandle() {
let finish: ((exitCode: ChildProcessSpawner.ExitCode) => void) | null = null;
return ChildProcessSpawner.makeHandle({
pid: ChildProcessSpawner.ProcessId(1),
exitCode: Effect.callback<ChildProcessSpawner.ExitCode>((resume) => {
finish = (exitCode) => resume(Effect.succeed(exitCode));
return Effect.sync(() => {
finish = null;
});
}),
isRunning: Effect.succeed(true),
kill: () =>
Effect.sync(() => {
finish?.(ChildProcessSpawner.ExitCode(143));
}),
unref: Effect.succeed(Effect.void),
stdin: Sink.drain,
stdout: Stream.empty,
stderr: Stream.empty,
all: Stream.empty,
getInputFd: () => Sink.drain,
getOutputFd: () => Stream.empty,
});
}

function mockSpawnerLayer(
handler: (
command: string,
Expand Down Expand Up @@ -112,6 +141,27 @@ describe("tailscale", () => {
});
});

it.effect("times out status reads using the test clock", () => {
const spawnerLayer = Layer.succeed(
ChildProcessSpawner.ChildProcessSpawner,
ChildProcessSpawner.make(() => Effect.succeed(mockNeverFinishingHandle())),
);
const layer = Layer.mergeAll(spawnerLayer, TestClock.layer());

return Effect.gen(function* () {
const fiber = yield* Effect.forkChild(Effect.result(readTailscaleStatus));
yield* Effect.yieldNow;
yield* TestClock.adjust(Duration.seconds(2));

const result = yield* Fiber.join(fiber);
assert.isTrue(Result.isFailure(result));
if (Result.isFailure(result)) {
assert.equal(result.failure._tag, "TailscaleCommandError");
assert.equal(result.failure.message, "Tailscale status timed out.");
}
}).pipe(Effect.provide(layer));
});

it.effect("configures tailscale serve through the process spawner service", () => {
const layer = mockSpawnerLayer((command, args) => {
assert.equal(command, "tailscale");
Expand Down
55 changes: 32 additions & 23 deletions packages/tailscale/src/tailscale.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as Data from "effect/Data";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Option from "effect/Option";
import * as Schema from "effect/Schema";
Expand All @@ -7,24 +7,33 @@ import { HttpClient, HttpClientRequest } from "effect/unstable/http";
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";

export const DEFAULT_TAILSCALE_SERVE_PORT = 443;
export const TAILSCALE_STATUS_TIMEOUT_MS = 1_500;
export const TAILSCALE_SERVE_TIMEOUT_MS = 10_000;
export const TAILSCALE_PROBE_TIMEOUT_MS = 2_500;
export const TAILSCALE_STATUS_TIMEOUT = Duration.millis(1_500);
export const TAILSCALE_SERVE_TIMEOUT = Duration.seconds(10);
export const TAILSCALE_PROBE_TIMEOUT = Duration.millis(2_500);

export class TailscaleCommandError extends Data.TaggedError("TailscaleCommandError")<{
readonly command: readonly string[];
readonly message: string;
readonly exitCode: number | null;
readonly stderr: string;
}> {}
export class TailscaleCommandError extends Schema.TaggedErrorClass<TailscaleCommandError>()(
"TailscaleCommandError",
{
command: Schema.Array(Schema.String),
message: Schema.String,
exitCode: Schema.NullOr(Schema.Int),
stderr: Schema.String,
},
) {}

export class TailscaleStatusParseError extends Data.TaggedError("TailscaleStatusParseError")<{
readonly cause: unknown;
}> {}
export class TailscaleStatusParseError extends Schema.TaggedErrorClass<TailscaleStatusParseError>()(
"TailscaleStatusParseError",
{
cause: Schema.Defect,
},
) {}

export class TailscaleUnavailableError extends Data.TaggedError("TailscaleUnavailableError")<{
readonly reason: string;
}> {}
export class TailscaleUnavailableError extends Schema.TaggedErrorClass<TailscaleUnavailableError>()(
"TailscaleUnavailableError",
{
reason: Schema.String,
},
) {}

const TailscaleStatusSelf = Schema.Struct({
DNSName: Schema.optional(Schema.Unknown),
Expand Down Expand Up @@ -174,7 +183,7 @@ export const readTailscaleStatus: Effect.Effect<
return yield* parseTailscaleStatus(stdout);
}).pipe(
Effect.scoped,
Effect.timeoutOption(TAILSCALE_STATUS_TIMEOUT_MS),
Effect.timeoutOption(TAILSCALE_STATUS_TIMEOUT),
Effect.flatMap((result) =>
Option.match(result, {
onNone: () =>
Expand Down Expand Up @@ -206,7 +215,7 @@ const runTailscaleCommand = (
readonly runMessage: string;
readonly exitMessage: (exitCode: number) => string;
readonly timeoutMessage: string;
readonly timeoutMs: number;
readonly timeout: Duration.Duration;
},
): Effect.Effect<void, TailscaleCommandError, ChildProcessSpawner.ChildProcessSpawner> =>
Effect.gen(function* () {
Expand Down Expand Up @@ -243,7 +252,7 @@ const runTailscaleCommand = (
}
}).pipe(
Effect.scoped,
Effect.timeoutOption(input.timeoutMs),
Effect.timeoutOption(input.timeout),
Effect.flatMap((result) =>
Option.match(result, {
onNone: () => Effect.fail(tailscaleCommandError(args, input.timeoutMessage, null)),
Expand All @@ -265,7 +274,7 @@ export const ensureTailscaleServe = (input: {
runMessage: "Failed to run tailscale serve.",
exitMessage: (exitCode) => `Tailscale serve exited with code ${exitCode}.`,
timeoutMessage: "Tailscale serve timed out.",
timeoutMs: TAILSCALE_SERVE_TIMEOUT_MS,
timeout: TAILSCALE_SERVE_TIMEOUT,
});
};

Expand All @@ -281,21 +290,21 @@ export const disableTailscaleServe = (
runMessage: "Failed to run tailscale serve off.",
exitMessage: (exitCode) => `Tailscale serve off exited with code ${exitCode}.`,
timeoutMessage: "Tailscale serve off timed out.",
timeoutMs: TAILSCALE_SERVE_TIMEOUT_MS,
timeout: TAILSCALE_SERVE_TIMEOUT,
});
});

export const probeTailscaleHttpsEndpoint = (input: {
readonly baseUrl: string;
readonly timeoutMs?: number;
readonly timeout?: Duration.Duration;
}): Effect.Effect<boolean, never, HttpClient.HttpClient> =>
Effect.gen(function* () {
const client = yield* HttpClient.HttpClient;
const response = yield* Effect.gen(function* () {
const url = new URL("/.well-known/t3/environment", input.baseUrl);
const request = HttpClientRequest.get(url.toString());
return yield* client.execute(request);
}).pipe(Effect.timeoutOption(input.timeoutMs ?? TAILSCALE_PROBE_TIMEOUT_MS));
}).pipe(Effect.timeoutOption(input.timeout ?? TAILSCALE_PROBE_TIMEOUT));

return Option.match(response, {
onNone: () => false,
Expand Down
Loading