diff --git a/packages/appkit/src/database/introspector/diff.ts b/packages/appkit/src/database/introspector/diff.ts index 2c0f4a12..11f0ee4f 100644 --- a/packages/appkit/src/database/introspector/diff.ts +++ b/packages/appkit/src/database/introspector/diff.ts @@ -1,27 +1,23 @@ import type { IntrospectedTable, IntrospectionResult } from "./types"; -/** Severity of a drift entry. */ export type DriftSeverity = "info" | "warn" | "error"; -/** A single drift entry. */ export interface DriftEntry { - /** The severity of the drift entry. */ severity: DriftSeverity; - /** The kind of drift entry. */ kind: "live-only" | "schema-only" | "type-mismatch"; - /** The message of the drift entry. */ message: string; } -/** A report of drift entries. */ export interface DriftReport { - /** Whether there is any drift. */ hasDrift: boolean; - /** The entries of the drift report. */ entries: DriftEntry[]; } -/** Diff two introspections and return a report of drift entries. */ +/** + * TODO(rls): policies are not compared — `schemaToIntrospection` always + * returns `policies: []`, so any DB-side policy would show as `live-only`. + * Re-enable once the schema-builder declares policies. + */ export function diffIntrospections( live: IntrospectionResult, declared: IntrospectionResult, @@ -56,7 +52,6 @@ export function diffIntrospections( return { hasDrift: entries.length > 0, entries }; } -/** Diff two tables and return a report of drift entries. */ function diffColumns( key: string, live: IntrospectedTable, @@ -98,24 +93,15 @@ function diffColumns( } } -/** Get the key of a table. */ function tableKey(table: Pick): string { return `${table.schema}.${table.name}`; } /** - * Compares the column contract beyond the raw Postgres type. - * - * Runtime writes and migrations depend on nullability, defaults, keys, - * generated columns, and FK actions, so drift detection must compare the - * metadata captured by introspection instead of stopping at `pgType`. - * - * Server-generated columns get special treatment: when both sides agree the - * column is server-generated, we skip `hasDefault` and `defaultExpression` - * comparisons because the live DB stores the literal `nextval(...)` / - * `GENERATED AS IDENTITY` expression while the schema models the same fact - * as `serverGenerated: true` metadata. Comparing them would produce noise on - * every introspect → verify roundtrip for serial / bigserial / identity PKs. + * Compare column metadata beyond `pgType` (nullable, default, PK, FK). + * Skip default/hasDefault when both sides are server-generated — the live DB + * stores `nextval(...)` / `GENERATED AS IDENTITY` while the schema flags it + * as `serverGenerated: true`; direct compare would noise-flag every serial PK. */ function diffColumnMetadata( table: string, @@ -184,7 +170,6 @@ function diffColumnMetadata( } } -/** Compare a field of a column and return a report of drift entries. */ function compareField( table: string, column: string, @@ -203,10 +188,7 @@ function compareField( }); } -/** - * Normalizes FK metadata into one comparable value so missing references and - * action changes produce a single readable drift entry. - */ +/** Flatten FK metadata to one comparable string for a single drift entry. */ function normalizeReference( reference: IntrospectedTable["columns"][number]["references"], ): string { @@ -223,17 +205,10 @@ function formatValue(value: unknown): string { } /** - * Strip the trivial `'literal'::type` cast Postgres emits around quoted - * string defaults so that `'member'::text` (live) compares equal to `member` - * (declared). Also unescapes `''` -> `'` inside the literal. - * - * Deliberately conservative: - * - Matches a SINGLE quoted literal followed by a single `::type` cast. - * - Does NOT touch expressions that contain `||`, function calls, or - * additional casts — those are kept verbatim and compared as-is so we - * don't claim equality between two non-trivially-different expressions - * and silently miss real drift. Example: `'foo'::text || 'bar'::text` - * and `'foobar'` stay distinct. + * Strip Postgres's `'literal'::type` cast so `'member'::text` (live) compares + * equal to `member` (declared); unescape `''` → `'`. Conservative: only one + * quoted literal + one cast; expressions with `||`, function calls, or extra + * casts pass through verbatim — better a false positive than missed drift. */ function normalizeDefaultExpression( value: string | undefined, @@ -245,10 +220,6 @@ function normalizeDefaultExpression( return trimmed; } -/** - * Matches `'literal'::type` where the literal is a single quoted string with - * `''` escaping and the type is a simple identifier (no parens, no `||`, - * no further casts). - */ +/** `'literal'::type` — single quoted string + simple type identifier only. */ const SIMPLE_CAST_LITERAL = /^'((?:[^']|'')*)'::[a-zA-Z_][\w]*(?:\s*\(\s*\d+\s*\))?$/; diff --git a/packages/appkit/src/plugins/database/defaults.ts b/packages/appkit/src/plugins/database/defaults.ts index 266d2e84..46bff8ab 100644 --- a/packages/appkit/src/plugins/database/defaults.ts +++ b/packages/appkit/src/plugins/database/defaults.ts @@ -14,13 +14,17 @@ export const STATEMENT_TIMEOUT_DEFAULT_MS = 15_000; /** `application_name` per connection — surfaces in `pg_stat_activity`/Lakebase audit. */ export const APPLICATION_NAME = "appkit:database"; +/** GUC name AppKit `SET`s on every OBO connection for RLS policies to read. */ +export const DEFAULT_RLS_SESSION_VARIABLE = "app.user_id"; + /** - * OBO pool defaults — small (one pool per user). Fan-out = `(1 + oboPoolMax) × max`; - * defaults cap at `(1+25)×4 + 10 ≈ 114` conns per instance. + * OBO pool defaults. `max=2` because a single user typically serializes HTTP + * requests; 2 conns covers occasional overlap without bloating fan-out. + * Combined with `oboPoolMax=100`, fan-out is `(1+100)×2 + 10 ≈ 212` conns. */ export const OBO_POOL_DEFAULTS = { ...POOL_DEFAULTS, - max: 4, + max: 2, }; /** Default page size when no `?limit=` is given. */ diff --git a/packages/appkit/src/plugins/database/entity-wiring.ts b/packages/appkit/src/plugins/database/entity-wiring.ts index ab5aa821..b46c883d 100644 --- a/packages/appkit/src/plugins/database/entity-wiring.ts +++ b/packages/appkit/src/plugins/database/entity-wiring.ts @@ -13,6 +13,7 @@ import { AuthenticationError, ConfigurationError } from "@/errors"; import { createLogger } from "@/logging/logger"; import { APPLICATION_NAME, + DEFAULT_RLS_SESSION_VARIABLE, OBO_POOL_DEFAULTS, STATEMENT_TIMEOUT_DEFAULT_MS, } from "./defaults"; @@ -160,14 +161,14 @@ function makeUserPoolRegistry( user: identity.email, workspaceClient: createUserWorkspaceClient(identity.token), }); - // Session-local `app.user_id` so `current_user_id()` RLS helpers resolve - // to the OBO user — safe at session scope since identity is invariant in - // this per-user pool. `statement_timeout` set here too so OBO queries get - // the same server-side cap as SP ones. + // Session-local GUC so RLS helpers resolve to the OBO user — safe at + // session scope since identity is invariant in this per-user pool. + // `statement_timeout` set here too so OBO matches SP server-side cap. const statementTimeoutMs = config.statementTimeoutMs ?? STATEMENT_TIMEOUT_DEFAULT_MS; + const sessionVariable = + config.rls?.sessionVariable ?? DEFAULT_RLS_SESSION_VARIABLE; pool.on("connect", (client) => { - // Tag OBO conns in pg_stat_activity so operators can split SP vs OBO traffic. client .query(`SET application_name = '${APPLICATION_NAME}:obo'`) .catch((err) => { @@ -178,10 +179,14 @@ function makeUserPoolRegistry( ); }); client - .query("SELECT set_config('app.user_id', $1, false)", [identity.email]) + .query("SELECT set_config($1, $2, false)", [ + sessionVariable, + identity.email, + ]) .catch((err) => { logger.error( - "Failed to set app.user_id on user pool connection for %s: %O", + "Failed to set %s on user pool connection for %s: %O", + sessionVariable, tag, err, ); @@ -249,7 +254,7 @@ function resolveUserPoolIdentity( if (email && token) return { email, token }; if (isDev) { - logger.warn( + logger.debug( "Database OBO requested without x-forwarded-email/x-forwarded-access-token; falling back to service pool in development.", ); return null; @@ -275,9 +280,10 @@ function createUserWorkspaceClient(token: string): WorkspaceClient { } function normalizePoolMax(value: number | undefined): number { - // Default 25 keeps fan-out tractable on Lakebase tiers ((1+25)×4 + SP(10) - // ≈ 114 conns). Hot-OBO apps should raise explicitly after sizing the tier. - if (!Number.isFinite(value) || value === undefined) return 25; + // Default 100 active users per instance before LRU evicts; with + // OBO_POOL_DEFAULTS.max=2, fan-out is (1+100)×2 + SP(10) ≈ 212 conns. + // Sized for 1+ CU Lakebase tiers; tune up for hot OBO, down for 0.5 CU. + if (!Number.isFinite(value) || value === undefined) return 100; return Math.max(1, Math.floor(value)); } diff --git a/packages/appkit/src/plugins/database/manifest.json b/packages/appkit/src/plugins/database/manifest.json index 6896ef80..0267e167 100644 --- a/packages/appkit/src/plugins/database/manifest.json +++ b/packages/appkit/src/plugins/database/manifest.json @@ -82,8 +82,20 @@ }, "oboPoolMax": { "type": "number", - "default": 25, - "description": "Maximum number of per-user OBO pools to keep open. Worst-case fan-out is (1 + oboPoolMax) × OBO_POOL_DEFAULTS.max + POOL_DEFAULTS.max connections per app instance." + "default": 100, + "description": "Maximum number of per-user OBO pools to keep open. Worst-case fan-out is (1 + oboPoolMax) × OBO_POOL_DEFAULTS.max + POOL_DEFAULTS.max connections per app instance (default 212)." + }, + "rls": { + "type": "object", + "additionalProperties": false, + "description": "Row-level security tunables.", + "properties": { + "sessionVariable": { + "type": "string", + "default": "app.user_id", + "description": "GUC name AppKit SETs on every OBO connection. Override to align with existing policies that read another setting." + } + } }, "cache": { "type": "object", diff --git a/packages/appkit/src/plugins/database/tests/plugin.test.ts b/packages/appkit/src/plugins/database/tests/plugin.test.ts index 249fd026..ec03a8ad 100644 --- a/packages/appkit/src/plugins/database/tests/plugin.test.ts +++ b/packages/appkit/src/plugins/database/tests/plugin.test.ts @@ -496,8 +496,67 @@ describe("DatabasePlugin", () => { const client = { query: vi.fn(async () => ({})) }; handler(client); expect(client.query).toHaveBeenCalledWith( - "SELECT set_config('app.user_id', $1, false)", - ["alice@example.com"], + "SELECT set_config($1, $2, false)", + ["app.user_id", "alice@example.com"], + ); + + if (originalHost === undefined) { + delete process.env.DATABRICKS_HOST; + } else { + process.env.DATABRICKS_HOST = originalHost; + } + }); + + test("rls.sessionVariable override flows into the SET on user pool connect", async () => { + const originalHost = process.env.DATABRICKS_HOST; + process.env.DATABRICKS_HOST = "https://example.cloud.databricks.com"; + const servicePool = { + end: vi.fn(async () => undefined), + on: vi.fn(), + } as unknown as Pool; + const userPool = { + end: vi.fn(async () => undefined), + on: vi.fn(), + } as unknown as Pool; + vi.mocked(createLakebasePool) + .mockReturnValueOnce(servicePool) + .mockReturnValueOnce(userPool); + const schema = defineSchema(({ table }) => ({ + user: table("user", { id: id(), email: text().notNull() }), + })); + vi.mocked(loadSchemaByConvention).mockResolvedValue({ + schema, + schemaPath: "/app/config/database/schema.ts", + }); + + const plugin = createPlugin({ + rls: { sessionVariable: "myapp.uid" }, + }); + await plugin.setup(); + const exports = plugin.exports() as unknown as { + user: { asUser: (req: import("express").Request) => unknown }; + }; + const req = { + header: vi.fn((name: string) => { + if (name === "x-forwarded-email") return "alice@example.com"; + if (name === "x-forwarded-access-token") return "tok-alice"; + return undefined; + }), + } as unknown as import("express").Request; + exports.user.asUser(req); + + const handler = vi + .mocked(userPool.on) + .mock.calls.find( + ([event]) => event === "connect", + )?.[1] as unknown as (client: { + query: ReturnType; + }) => void; + const client = { query: vi.fn(async () => ({})) }; + handler(client); + expect(client.query).toHaveBeenCalledWith( + "SELECT set_config($1, $2, false)", + ["myapp.uid", "alice@example.com"], ); if (originalHost === undefined) { diff --git a/packages/appkit/src/plugins/database/types.ts b/packages/appkit/src/plugins/database/types.ts index 1cfbb52f..7535e73c 100644 --- a/packages/appkit/src/plugins/database/types.ts +++ b/packages/appkit/src/plugins/database/types.ts @@ -155,9 +155,9 @@ export interface IDatabaseConfig extends BasePluginConfig { cache?: CacheSettings; /** * Maximum number of distinct per-user (OBO) pools the registry keeps alive - * at once. Each pool defaults to `OBO_POOL_DEFAULTS.max = 4` connections, so - * the worst-case fan-out is `(1 + oboPoolMax) × poolMax`. Defaults to 25 — - * tune up for hot OBO traffic, down for low-tier Lakebase plans. + * at once. Each pool defaults to `OBO_POOL_DEFAULTS.max = 2` connections, so + * worst-case fan-out is `(1 + oboPoolMax) × poolMax + 10`. Defaults to 100 — + * tune up for hot OBO traffic, down for 0.5 CU Lakebase tiers. */ oboPoolMax?: number; /** @@ -166,6 +166,14 @@ export interface IDatabaseConfig extends BasePluginConfig { * timeout interceptor still applies on the client side. */ statementTimeoutMs?: number; + /** Row-level security tunables. */ + rls?: { + /** + * GUC name AppKit `SET`s on every OBO connection. Override to align with + * existing policies that read another setting. Defaults to `app.user_id`. + */ + sessionVariable?: string; + }; /** * When true, schema-load and drift-check failures during `setup()` are * logged but do not throw. Defaults to false (fail closed). Useful in diff --git a/packages/shared/src/cli/commands/db/__tests__/db.test.ts b/packages/shared/src/cli/commands/db/__tests__/db.test.ts index f956981c..907cbee0 100644 --- a/packages/shared/src/cli/commands/db/__tests__/db.test.ts +++ b/packages/shared/src/cli/commands/db/__tests__/db.test.ts @@ -16,6 +16,7 @@ describe("dbCommand", () => { "introspect", "migration", "migrate", + "rls", "seed", "setup:dev", "types", diff --git a/packages/shared/src/cli/commands/db/__tests__/rls.test.ts b/packages/shared/src/cli/commands/db/__tests__/rls.test.ts new file mode 100644 index 00000000..a9cc4eaf --- /dev/null +++ b/packages/shared/src/cli/commands/db/__tests__/rls.test.ts @@ -0,0 +1,343 @@ +import { + existsSync, + mkdtempSync, + readdirSync, + readFileSync, + rmSync, + writeFileSync, +} from "node:fs"; +import { tmpdir } from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, test } from "vitest"; +import { + buildHelpersMigrationSql, + buildRlsScaffold, + compileRlsExpression, + ensureRlsHelpersMigration, + writeMigration, +} from "../rls"; + +describe("compileRlsExpression", () => { + test("expands owner_email shorthand to schema-qualified current_user_email()", () => { + expect(compileRlsExpression("owner_email:email").sql).toBe( + `email = "app".current_user_email()`, + ); + }); + + test("honours custom schema in the qualified function reference", () => { + expect( + compileRlsExpression("owner_email:author", { schemaName: "tenant_a" }) + .sql, + ).toBe(`author = "tenant_a".current_user_email()`); + }); + + test("returns the matched shorthand column for downstream validation", () => { + expect(compileRlsExpression("owner_email:authorEmail")).toEqual({ + sql: expect.any(String), + shorthandColumn: "authorEmail", + }); + }); + + test("rejects the legacy owner: shorthand with a clear error", () => { + expect(() => compileRlsExpression("owner:userId")).toThrow( + /owner: \/ tenant: shorthands were removed/, + ); + }); + + test("rejects the legacy tenant: shorthand", () => { + expect(() => compileRlsExpression("tenant:orgId")).toThrow( + /shorthands were removed/, + ); + }); + + test("passes through safe raw SQL", () => { + expect(compileRlsExpression("status <> 'archived'").sql).toBe( + "status <> 'archived'", + ); + }); + + test("rejects raw SQL containing semicolons", () => { + expect(() => compileRlsExpression("true; drop table users")).toThrow( + /must not contain ';'/, + ); + }); + + test("rejects raw SQL containing comments", () => { + expect(() => compileRlsExpression("true -- bypass")).toThrow( + /SQL comments/, + ); + expect(() => compileRlsExpression("true /* bypass */")).toThrow( + /SQL comments/, + ); + }); + + test("rejects raw SQL with unbalanced parens", () => { + expect(() => compileRlsExpression("(a = 1")).toThrow(/Unbalanced/); + expect(() => compileRlsExpression("a = 1)")).toThrow(/Unbalanced/); + }); + + test("rejects empty spec", () => { + expect(() => compileRlsExpression(" ")).toThrow(); + }); +}); + +describe("buildRlsScaffold", () => { + test("emits ENABLE + FORCE + idempotent CREATE POLICY for FOR ALL", () => { + const scaffold = buildRlsScaffold({ + entity: "user", + tableName: "user", + policyName: "user_self_only", + spec: "owner_email:email", + }); + const sql = scaffold.migrationSql; + expect(sql).toContain('ALTER TABLE "app"."user" ENABLE ROW LEVEL SECURITY'); + expect(sql).toContain('ALTER TABLE "app"."user" FORCE ROW LEVEL SECURITY'); + expect(sql).toContain( + 'DROP POLICY IF EXISTS "user_self_only" ON "app"."user";', + ); + expect(sql).toContain(`CREATE POLICY "user_self_only"`); + expect(sql).toContain("FOR ALL"); + expect(sql).toContain("USING (email ="); + expect(sql).toContain("WITH CHECK (email ="); + }); + + test("INSERT emits WITH CHECK only (USING is invalid)", () => { + const scaffold = buildRlsScaffold({ + entity: "post", + tableName: "post", + policyName: "post_owner_insert", + spec: "owner_email:author_email", + actions: ["insert"], + }); + expect(scaffold.migrationSql).toContain("FOR INSERT"); + expect(scaffold.migrationSql).toContain("WITH CHECK (author_email ="); + expect(scaffold.migrationSql).not.toContain("USING (author_email ="); + }); + + test("SELECT and DELETE emit USING only", () => { + const sql = buildRlsScaffold({ + entity: "row", + tableName: "row", + policyName: "row_select", + spec: "owner_email:owner", + actions: ["select"], + }).migrationSql; + expect(sql).toContain("FOR SELECT"); + expect(sql).toContain("USING (owner ="); + expect(sql).not.toContain("WITH CHECK"); + }); + + test("UPDATE emits both USING and WITH CHECK to lock pre- and post-image", () => { + const sql = buildRlsScaffold({ + entity: "row", + tableName: "row", + policyName: "row_update", + spec: "owner_email:owner", + actions: ["update"], + }).migrationSql; + expect(sql).toContain("FOR UPDATE"); + expect(sql).toContain("USING (owner ="); + expect(sql).toContain("WITH CHECK (owner ="); + }); + + test("multi-verb emits one CREATE POLICY per verb with derived names", () => { + const sql = buildRlsScaffold({ + entity: "case", + tableName: "cases", + policyName: "cases_owner", + spec: "owner_email:assigned_to", + actions: ["select", "update"], + }).migrationSql; + expect(sql).toContain('CREATE POLICY "cases_owner_select"'); + expect(sql).toContain("FOR SELECT"); + expect(sql).toContain('CREATE POLICY "cases_owner_update"'); + expect(sql).toContain("FOR UPDATE"); + expect(sql).not.toContain("FOR ALL"); + }); + + test("custom schema flows into the policy SQL", () => { + const sql = buildRlsScaffold({ + entity: "case", + tableName: "cases", + schemaName: "public", + policyName: "cases_owner_select", + spec: "owner_email:assigned_to_email", + actions: ["select"], + }).migrationSql; + expect(sql).toContain('ALTER TABLE "public"."cases"'); + expect(sql).toContain( + `USING (assigned_to_email = "public".current_user_email())`, + ); + }); + + test("rejects policy names containing path separators", () => { + expect(() => + buildRlsScaffold({ + entity: "user", + tableName: "user", + policyName: "../../etc/passwd", + spec: "owner_email:email", + }), + ).toThrow(/Policy name must match/); + }); + + test("rejects policy names with spaces or unsafe chars", () => { + expect(() => + buildRlsScaffold({ + entity: "user", + tableName: "user", + policyName: "foo bar", + spec: "owner_email:email", + }), + ).toThrow(/Policy name must match/); + }); +}); + +describe("buildHelpersMigrationSql", () => { + test("schema-qualifies the function definition so search_path can't reroute it", () => { + expect(buildHelpersMigrationSql("app")).toContain( + `CREATE OR REPLACE FUNCTION "app".current_user_email()`, + ); + expect(buildHelpersMigrationSql("tenant_a")).toContain( + `CREATE OR REPLACE FUNCTION "tenant_a".current_user_email()`, + ); + }); + + test("does not emit current_tenant_id() (removed in this layer)", () => { + expect(buildHelpersMigrationSql("app")).not.toContain("current_tenant_id"); + }); +}); + +/* ============================================================ */ +/* Filesystem + journal — tmpdir-backed */ +/* ============================================================ */ + +describe("writeMigration + ensureRlsHelpersMigration", () => { + let migrationsDir: string; + + beforeEach(() => { + migrationsDir = mkdtempSync(path.join(tmpdir(), "appkit-rls-fs-")); + }); + + afterEach(() => { + rmSync(migrationsDir, { recursive: true, force: true }); + }); + + test("first run creates helpers migration and registers it in the journal", () => { + const filePath = ensureRlsHelpersMigration(migrationsDir, "app"); + expect(filePath).not.toBeNull(); + expect(existsSync(filePath as string)).toBe(true); + expect(path.basename(filePath as string)).toBe( + "0000_appkit_rls_helpers.sql", + ); + + const journal = readJournal(migrationsDir); + expect(journal.dialect).toBe("postgresql"); + expect(journal.entries).toHaveLength(1); + expect(journal.entries[0]).toMatchObject({ + idx: 0, + tag: "0000_appkit_rls_helpers", + breakpoints: false, + }); + }); + + test("re-running returns null and does not duplicate the journal entry", () => { + ensureRlsHelpersMigration(migrationsDir, "app"); + expect(ensureRlsHelpersMigration(migrationsDir, "app")).toBeNull(); + expect(readJournal(migrationsDir).entries).toHaveLength(1); + }); + + test("writeMigration registers each policy file in the journal", () => { + ensureRlsHelpersMigration(migrationsDir, "app"); + const policySql = "-- placeholder"; + const policyPath = writeMigration( + migrationsDir, + "user", + "user_owner_email", + policySql, + ); + expect(path.basename(policyPath)).toBe( + "0001_rls_user_user_owner_email.sql", + ); + + const journal = readJournal(migrationsDir); + expect(journal.entries.map((e) => e.tag)).toEqual([ + "0000_appkit_rls_helpers", + "0001_rls_user_user_owner_email", + ]); + expect(journal.entries.map((e) => e.idx)).toEqual([0, 1]); + }); + + test("nextMigrationNumber outruns both journal and orphaned on-disk files", () => { + writeFileSync( + path.join(migrationsDir, "0007_orphan.sql"), + "-- not journaled", + "utf8", + ); + const filePath = ensureRlsHelpersMigration(migrationsDir, "app"); + expect(path.basename(filePath as string)).toBe( + "0008_appkit_rls_helpers.sql", + ); + }); + + test("refuses migration tags containing path separators", () => { + expect(() => + writeMigration(migrationsDir, "user", "../escape", "-- placeholder"), + ).toThrow(/Refusing to write migration tag/); + }); + + test("preserves a pre-existing journal with non-rls entries", () => { + seedJournal(migrationsDir, [ + { idx: 0, version: "7", when: 1, tag: "0000_init", breakpoints: true }, + ]); + const filePath = ensureRlsHelpersMigration(migrationsDir, "app"); + expect(path.basename(filePath as string)).toBe( + "0001_appkit_rls_helpers.sql", + ); + const journal = readJournal(migrationsDir); + expect(journal.entries.map((e) => e.tag)).toEqual([ + "0000_init", + "0001_appkit_rls_helpers", + ]); + }); +}); + +interface JournalShape { + version: string; + dialect: string; + entries: Array<{ + idx: number; + version: string; + when: number; + tag: string; + breakpoints: boolean; + }>; +} + +function readJournal(migrationsDir: string): JournalShape { + return JSON.parse( + readFileSync(path.join(migrationsDir, "meta", "_journal.json"), "utf8"), + ); +} + +function seedJournal( + migrationsDir: string, + entries: JournalShape["entries"], +): void { + const metaDir = path.join(migrationsDir, "meta"); + if (!existsSync(metaDir)) { + require("node:fs").mkdirSync(metaDir, { recursive: true }); + } + writeFileSync( + path.join(metaDir, "_journal.json"), + JSON.stringify({ version: "7", dialect: "postgresql", entries }, null, 2), + "utf8", + ); + for (const entry of entries) { + const sqlPath = path.join(migrationsDir, `${entry.tag}.sql`); + if (!existsSync(sqlPath)) { + writeFileSync(sqlPath, "-- seed", "utf8"); + } + } + void readdirSync(migrationsDir); +} diff --git a/packages/shared/src/cli/commands/db/index.ts b/packages/shared/src/cli/commands/db/index.ts index c2a37672..c5639c99 100644 --- a/packages/shared/src/cli/commands/db/index.ts +++ b/packages/shared/src/cli/commands/db/index.ts @@ -3,6 +3,7 @@ import { initCommand } from "./init"; import { introspectCommand } from "./introspect"; import { migrateCommand } from "./migrate"; import { migrationCommand } from "./migration"; +import { rlsCommand } from "./rls"; import { seedCommand } from "./seed"; import { setupDevCommand } from "./setup-dev"; import { typesCommand } from "./types"; @@ -17,6 +18,7 @@ export const dbCommand = new Command("db") .addCommand(introspectCommand) .addCommand(migrationCommand) .addCommand(migrateCommand) + .addCommand(rlsCommand) .addCommand(seedCommand) .addCommand(setupDevCommand) .addCommand(typesCommand) @@ -32,5 +34,6 @@ Examples: $ appkit db seed $ appkit db setup:dev --seed --name init $ appkit db types generate + $ appkit db rls user owner:userId $ appkit db verify`, ); diff --git a/packages/shared/src/cli/commands/db/rls.ts b/packages/shared/src/cli/commands/db/rls.ts new file mode 100644 index 00000000..d9420abe --- /dev/null +++ b/packages/shared/src/cli/commands/db/rls.ts @@ -0,0 +1,531 @@ +import { createHash } from "node:crypto"; +import { + existsSync, + mkdirSync, + readdirSync, + readFileSync, + writeFileSync, +} from "node:fs"; +import path from "node:path"; +import { Command } from "commander"; +import { + bullet, + check, + databasePaths, + loadSchemaFile, + runCommandAction, + warn, +} from "./shared"; + +/** + * `appkit db rls ` — scaffold an RLS policy. Emits a numbered + * `.sql` and registers it in `meta/_journal.json` so `drizzle-orm/migrator` + * actually applies it on `appkit db migrate up`. + */ +export const rlsCommand = new Command("rls") + .description("Scaffold a row-level security policy for an entity") + .argument("", "Logical entity key (matches a defineSchema export)") + .argument( + "", + "Policy expression. Shorthand: 'owner_email:' compares to current_user_email(). Anything else is treated as raw SQL.", + ) + .option( + "--name ", + "Policy name. Defaults to '_'. Must match [A-Za-z_][A-Za-z0-9_]*.", + ) + .option( + "--actions ", + "Comma-separated verbs (select,insert,update,delete,all). Multi-verb emits one CREATE POLICY per verb. Defaults to 'all'.", + "all", + ) + .option( + "--dry-run", + "Print intended SQL and target paths; do not write or update the journal.", + ) + .addHelpText( + "after", + [ + "", + "RLS threat model & prerequisites:", + " • Generated SQL emits FORCE ROW LEVEL SECURITY so the SP pool", + " (table owner) is also constrained — every server query through", + " appkit.database. is now subject to the policy.", + " • Identity GUC: AppKit's per-user pool sets app.user_id to the OBO", + " user's email on connection check-out (entity-wiring.ts). The", + " helper current_user_email() reads that GUC; it returns NULL on", + " SP connections so policies fail-closed (deny every row).", + " • The owner_email: shorthand expects a column that stores the user's", + " email (case-sensitive comparison). Map any other identifier to a", + " raw SQL expression instead.", + " • Re-run is safe: helpers + policies use CREATE OR REPLACE / DROP IF", + " EXISTS. But repeated runs grow the migration count — re-emit only", + " when the predicate genuinely changes.", + "", + ].join("\n"), + ) + .action(async (entity: string, spec: string, options: RlsCliOptions) => { + await runCommandAction(() => runRls({ entity, spec, options })); + }); + +interface RlsCliOptions { + name?: string; + actions?: string; + dryRun?: boolean; +} + +interface RunRlsArgs { + entity: string; + spec: string; + options: RlsCliOptions; +} + +interface RlsScaffoldOptions { + schemaName?: string; + entity: string; + tableName: string; + policyName: string; + spec: string; + actions?: ReadonlyArray; +} + +interface RlsScaffold { + migrationSql: string; +} + +interface RlsSchema { + $schemaName?: string; + $tables?: Record< + string, + { name?: string; $columns?: Record } + >; +} + +const ALLOWED_ACTIONS = new Set([ + "select", + "insert", + "update", + "delete", + "all", +] as const); + +type AllowedAction = "select" | "insert" | "update" | "delete" | "all"; + +/** Journal format `drizzle-orm/migrator` reads. */ +interface DrizzleJournal { + version: string; + dialect: string; + entries: DrizzleJournalEntry[]; +} + +interface DrizzleJournalEntry { + idx: number; + version: string; + when: number; + tag: string; + breakpoints: boolean; +} + +const JOURNAL_VERSION = "7"; +const JOURNAL_DIALECT = "postgresql"; +const FORBIDDEN_RAW_SQL = /;|--|\/\*|\*\//; +const POLICY_NAME_REGEX = /^[A-Za-z_][A-Za-z0-9_]*$/; + +/* ============================================================ */ +/* Pure helpers */ +/* ============================================================ */ + +/** + * Resolve `owner_email:` to ` = .current_user_email()`. + * Anything else is raw SQL, gated by `validateRawSqlPredicate`. + */ +export function compileRlsExpression( + spec: string, + options: { schemaName?: string } = {}, +): { sql: string; shorthandColumn: string | null } { + const trimmed = spec.trim(); + if (!trimmed) throw new Error("RLS expression must not be empty"); + + const schema = escapeIdent(options.schemaName ?? "app"); + const match = /^owner_email:([A-Za-z_][A-Za-z0-9_]*)$/.exec(trimmed); + if (match) { + const [, column] = match; + return { + sql: `${column} = ${schema}.current_user_email()`, + shorthandColumn: column, + }; + } + + if (/^(owner|tenant):/.test(trimmed)) { + throw new Error( + "owner: / tenant: shorthands were removed. Use owner_email: " + + "(compares to current_user_email()) or write raw SQL. The runtime " + + "currently only sets app.user_id (= identity email).", + ); + } + + validateRawSqlPredicate(trimmed); + return { sql: trimmed, shorthandColumn: null }; +} + +/** + * Build migration SQL: ENABLE+FORCE RLS once, then one DROP/CREATE POLICY + * block per action. Multi-verb specs get one policy per verb, suffixed. + */ +export function buildRlsScaffold(options: RlsScaffoldOptions): RlsScaffold { + validatePolicyName(options.policyName); + const schemaName = options.schemaName ?? "app"; + const compiled = compileRlsExpression(options.spec, { schemaName }); + const actions = options.actions ?? ["all"]; + const qualified = `${escapeIdent(schemaName)}.${escapeIdent(options.tableName)}`; + + const blocks: string[] = [ + `-- RLS policy ${options.policyName} on ${qualified}`, + `ALTER TABLE ${qualified} ENABLE ROW LEVEL SECURITY;`, + `ALTER TABLE ${qualified} FORCE ROW LEVEL SECURITY;`, + ]; + + const isMultiVerb = actions.length > 1; + for (const action of actions) { + const policyName = isMultiVerb + ? `${options.policyName}_${action}` + : options.policyName; + validatePolicyName(policyName); + blocks.push( + `DROP POLICY IF EXISTS ${escapeIdent(policyName)} ON ${qualified};`, + renderCreatePolicy(qualified, policyName, action, compiled.sql), + ); + } + blocks.push(""); + + return { migrationSql: blocks.join("\n") }; +} + +/** + * Helpers SQL. `current_user_email()` reads the `app.user_id` GUC set by + * the per-user pool; schema-qualified so `search_path` can't rebind it. + */ +export function buildHelpersMigrationSql(schemaName: string): string { + const schema = escapeIdent(schemaName); + return [ + "-- AppKit RLS helpers — see entity-wiring.ts for the GUC contract.", + `CREATE OR REPLACE FUNCTION ${schema}.current_user_email() RETURNS text`, + " LANGUAGE sql STABLE AS", + " $$ SELECT current_setting('app.user_id', true) $$;", + "", + ].join("\n"); +} + +function renderCreatePolicy( + qualified: string, + policyName: string, + action: AllowedAction, + predicate: string, +): string { + const head = `CREATE POLICY ${escapeIdent(policyName)} ON ${qualified}\n FOR ${action.toUpperCase()}`; + switch (action) { + case "select": + case "delete": + return `${head}\n USING (${predicate});`; + case "insert": + return `${head}\n WITH CHECK (${predicate});`; + default: + return `${head}\n USING (${predicate})\n WITH CHECK (${predicate});`; + } +} + +function validateRawSqlPredicate(expr: string): void { + if (FORBIDDEN_RAW_SQL.test(expr)) { + throw new Error( + `Raw RLS predicate must not contain ';', SQL comments ('--', '/*', '*/'). Got: ${expr}`, + ); + } + let depth = 0; + for (const ch of expr) { + if (ch === "(") depth++; + else if (ch === ")") { + depth--; + if (depth < 0) { + throw new Error(`Unbalanced ')' in RLS predicate: ${expr}`); + } + } + } + if (depth !== 0) { + throw new Error(`Unbalanced parens in RLS predicate: ${expr}`); + } +} + +function validatePolicyName(name: string): void { + if (!POLICY_NAME_REGEX.test(name)) { + throw new Error( + `Policy name must match ${POLICY_NAME_REGEX.source} (got '${name}'). ` + + "Used both as a SQL identifier and migration filename — strict for safety.", + ); + } +} + +/* ============================================================ */ +/* Command runner */ +/* ============================================================ */ + +async function runRls({ entity, spec, options }: RunRlsArgs): Promise { + const paths = databasePaths(); + if (!existsSync(paths.schemaFile)) { + throw new Error( + `${paths.schemaFile} not found. Run 'appkit db init' or 'appkit db introspect' first.`, + ); + } + + const schema = (await loadSchemaFile(paths.schemaFile)) as RlsSchema | null; + if (!schema) { + throw new Error(`Could not load schema from ${paths.schemaFile}.`); + } + + const table = schema.$tables?.[entity]; + if (!table) { + const known = Object.keys(schema.$tables ?? {}).join(", ") || "(none)"; + throw new Error( + `Unknown entity "${entity}" in schema. Available: ${known}`, + ); + } + + const tableName = table.name ?? entity; + const schemaName = schema.$schemaName ?? "app"; + const actions = parseActions(options.actions); + const policyName = + options.name ?? defaultPolicyName(entity, spec, options.name === undefined); + validatePolicyName(policyName); + + const compiled = compileRlsExpression(spec, { schemaName }); + if (compiled.shorthandColumn) { + assertColumnExists(table, entity, compiled.shorthandColumn); + } else { + console.log(warn(`Using raw SQL predicate: ${compiled.sql}`)); + } + + const scaffold = buildRlsScaffold({ + schemaName, + entity, + tableName, + policyName, + spec, + actions, + }); + + if (options.dryRun) { + runDryRun(paths, schemaName, scaffold.migrationSql); + return; + } + + const helpersFile = ensureRlsHelpersMigration( + paths.migrationsDir, + schemaName, + ); + if (helpersFile) { + console.log(check(`Wrote ${path.relative(paths.root, helpersFile)}`)); + } + + const migrationFile = writeMigration( + paths.migrationsDir, + entity, + policyName, + scaffold.migrationSql, + ); + console.log(check(`Wrote ${path.relative(paths.root, migrationFile)}`)); + console.log(""); + console.log(bullet(`Apply with: appkit db migrate up`)); + console.log( + bullet( + "Track the policy in the migration file above. The schema-side `policy()` DSL is not yet implemented.", + ), + ); +} + +function runDryRun( + paths: ReturnType, + schemaName: string, + migrationSql: string, +): void { + console.log(bullet(`Dry run: would write into ${paths.migrationsDir}`)); + console.log(bullet(`Helpers (schema-qualified to "${schemaName}"):`)); + console.log(""); + console.log(indent(buildHelpersMigrationSql(schemaName), 2)); + console.log(bullet("Policy migration:")); + console.log(""); + console.log(indent(migrationSql, 2)); + console.log(bullet("No journal entries were written.")); +} + +function assertColumnExists( + table: { $columns?: Record }, + entity: string, + column: string, +): void { + const columns = table.$columns ?? {}; + if (!Object.hasOwn(columns, column)) { + const known = Object.keys(columns).join(", ") || "(none)"; + throw new Error( + `Column "${column}" not found on entity "${entity}". Available: ${known}`, + ); + } +} + +function parseActions(raw: string | undefined): AllowedAction[] | undefined { + if (!raw) return undefined; + const parts = raw + .split(",") + .map((p) => p.trim().toLowerCase()) + .filter(Boolean); + for (const part of parts) { + if (!ALLOWED_ACTIONS.has(part as AllowedAction)) { + throw new Error( + `Unknown action "${part}". Allowed: ${[...ALLOWED_ACTIONS].join(", ")}.`, + ); + } + } + if (parts.includes("all") && parts.length > 1) { + throw new Error("'all' cannot be combined with other actions."); + } + return parts as AllowedAction[]; +} + +/** Slug the spec; append a hash on truncation to avoid 30-char collisions. */ +function defaultPolicyName( + entity: string, + spec: string, + appendHash: boolean, +): string { + const slug = spec + .replace(/[^A-Za-z0-9]+/g, "_") + .replace(/^_+|_+$/g, "") + .toLowerCase(); + const truncated = slug.slice(0, 30); + if (appendHash && truncated !== slug) { + const suffix = createHash("sha1").update(spec).digest("hex").slice(0, 6); + return `${entity}_${truncated || "policy"}_${suffix}`; + } + return `${entity}_${truncated || "policy"}`; +} + +/* ============================================================ */ +/* Filesystem + journal */ +/* ============================================================ */ + +export function writeMigration( + migrationsDir: string, + entity: string, + policyName: string, + sql: string, +): string { + ensureDir(migrationsDir); + const tag = `${nextMigrationNumber(migrationsDir)}_rls_${entity}_${policyName}`; + assertSafeFilename(tag); + const filePath = path.join(migrationsDir, `${tag}.sql`); + writeFileSync(filePath, sql, "utf8"); + appendJournalEntry(migrationsDir, tag); + return filePath; +} + +export function ensureRlsHelpersMigration( + migrationsDir: string, + schemaName: string, +): string | null { + ensureDir(migrationsDir); + const journal = readJournal(migrationsDir); + if (journal.entries.some((e) => /_appkit_rls_helpers$/.test(e.tag))) { + return null; + } + const tag = `${nextMigrationNumber(migrationsDir)}_appkit_rls_helpers`; + assertSafeFilename(tag); + const filePath = path.join(migrationsDir, `${tag}.sql`); + writeFileSync(filePath, buildHelpersMigrationSql(schemaName), "utf8"); + appendJournalEntry(migrationsDir, tag); + return filePath; +} + +/** Append to `meta/_journal.json` — `drizzle-orm/migrator` skips anything not listed. */ +function appendJournalEntry(migrationsDir: string, tag: string): void { + const journalPath = journalFilePath(migrationsDir); + const journal = readJournal(migrationsDir); + if (journal.entries.some((entry) => entry.tag === tag)) return; + const idx = journal.entries.length; + journal.entries.push({ + idx, + version: JOURNAL_VERSION, + when: Date.now(), + tag, + breakpoints: false, + }); + ensureDir(path.dirname(journalPath)); + writeFileSync(journalPath, `${JSON.stringify(journal, null, 2)}\n`, "utf8"); +} + +function readJournal(migrationsDir: string): DrizzleJournal { + const journalPath = journalFilePath(migrationsDir); + if (!existsSync(journalPath)) { + return { version: JOURNAL_VERSION, dialect: JOURNAL_DIALECT, entries: [] }; + } + const raw = readFileSync(journalPath, "utf8"); + const parsed = JSON.parse(raw) as Partial; + return { + version: parsed.version ?? JOURNAL_VERSION, + dialect: parsed.dialect ?? JOURNAL_DIALECT, + entries: Array.isArray(parsed.entries) ? parsed.entries : [], + }; +} + +function journalFilePath(migrationsDir: string): string { + return path.join(migrationsDir, "meta", "_journal.json"); +} + +/** Max of journal + on-disk so we outrun orphans from prior bad runs. */ +function nextMigrationNumber(migrationsDir: string): string { + let max = -1; + const journal = readJournal(migrationsDir); + for (const entry of journal.entries) { + const match = /^(\d{4})_/.exec(entry.tag); + if (!match) continue; + const n = Number(match[1]); + if (Number.isFinite(n) && n > max) max = n; + } + if (existsSync(migrationsDir)) { + for (const file of readDirSafe(migrationsDir)) { + const match = /^(\d{4})_/.exec(file); + if (!match) continue; + const n = Number(match[1]); + if (Number.isFinite(n) && n > max) max = n; + } + } + return String(max + 1).padStart(4, "0"); +} + +function readDirSafe(dir: string): string[] { + try { + return readdirSync(dir); + } catch { + return []; + } +} + +function ensureDir(dir: string): void { + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); +} + +function assertSafeFilename(tag: string): void { + if (!/^[A-Za-z0-9_]+$/.test(tag)) { + throw new Error( + `Refusing to write migration tag '${tag}' — only [A-Za-z0-9_] allowed.`, + ); + } +} + +function indent(text: string, spaces: number): string { + const pad = " ".repeat(spaces); + return text + .split("\n") + .map((line) => (line ? pad + line : line)) + .join("\n"); +} + +function escapeIdent(name: string): string { + return `"${name.replace(/"/g, '""')}"`; +}