From c625ac95329d3cde416aba222668f8aae6b922c7 Mon Sep 17 00:00:00 2001 From: Polliog Date: Fri, 20 Mar 2026 21:33:36 +0100 Subject: [PATCH 01/22] add log_pipelines table --- .../backend/migrations/033_log_pipelines.sql | 28 +++++++++++++++++++ packages/backend/src/database/types.ts | 22 +++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 packages/backend/migrations/033_log_pipelines.sql diff --git a/packages/backend/migrations/033_log_pipelines.sql b/packages/backend/migrations/033_log_pipelines.sql new file mode 100644 index 0000000..67ece42 --- /dev/null +++ b/packages/backend/migrations/033_log_pipelines.sql @@ -0,0 +1,28 @@ +CREATE TABLE IF NOT EXISTS log_pipelines ( + id UUID NOT NULL DEFAULT gen_random_uuid(), + organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE, + project_id UUID REFERENCES projects(id) ON DELETE CASCADE, + name VARCHAR(200) NOT NULL, + description TEXT, + enabled BOOLEAN NOT NULL DEFAULT TRUE, + steps JSONB NOT NULL DEFAULT '[]'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (id) +); + +CREATE INDEX IF NOT EXISTS idx_log_pipelines_org + ON log_pipelines(organization_id); + +CREATE INDEX IF NOT EXISTS idx_log_pipelines_project + ON log_pipelines(project_id) + WHERE project_id IS NOT NULL; + +-- Only one pipeline per project (or one org-wide default when project_id IS NULL) +CREATE UNIQUE INDEX IF NOT EXISTS idx_log_pipelines_org_null_project + ON log_pipelines(organization_id) + WHERE project_id IS NULL; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_log_pipelines_org_project + ON log_pipelines(organization_id, project_id) + WHERE project_id IS NOT NULL; diff --git a/packages/backend/src/database/types.ts b/packages/backend/src/database/types.ts index 198ed78..c69bafa 100644 --- a/packages/backend/src/database/types.ts +++ b/packages/backend/src/database/types.ts @@ -834,6 +834,26 @@ export interface MetricExemplarsTable { attributes: ColumnType | null, Record | null, Record | null>; } +// ============================================================================ +// LOG PIPELINES TABLE +// ============================================================================ + +export interface LogPipelinesTable { + id: Generated; + organization_id: string; + project_id: string | null; + name: string; + description: string | null; + enabled: Generated; + steps: ColumnType< + Record[], + Record[], + Record[] + >; + created_at: Generated; + updated_at: Generated; +} + export interface Database { logs: LogsTable; users: UsersTable; @@ -898,4 +918,6 @@ export interface Database { // Metrics (OTLP) metrics: MetricsTable; metric_exemplars: MetricExemplarsTable; + // Log pipelines + log_pipelines: LogPipelinesTable; } From 56a8ab9e4fc518fbe39a7bc85ed735d593f60d1e Mon Sep 17 00:00:00 2001 From: Polliog Date: Fri, 20 Mar 2026 21:34:19 +0100 Subject: [PATCH 02/22] add pipeline types --- .../backend/src/modules/log-pipeline/types.ts | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 packages/backend/src/modules/log-pipeline/types.ts diff --git a/packages/backend/src/modules/log-pipeline/types.ts b/packages/backend/src/modules/log-pipeline/types.ts new file mode 100644 index 0000000..a44fbb7 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/types.ts @@ -0,0 +1,70 @@ +export type BuiltinParser = 'nginx' | 'apache' | 'syslog' | 'logfmt' | 'json'; + +export interface ParserStep { + type: 'parser'; + parser: BuiltinParser; +} + +export interface GrokStep { + type: 'grok'; + pattern: string; + source?: string; // field to parse, default 'message' +} + +export interface GeoIpStep { + type: 'geoip'; + field: string; // metadata field containing the IP + target: string; // output metadata key (e.g. 'geo') +} + +export type PipelineStep = ParserStep | GrokStep | GeoIpStep; + +export interface Pipeline { + id: string; + organizationId: string; + projectId: string | null; + name: string; + description: string | null; + enabled: boolean; + steps: PipelineStep[]; + createdAt: Date; + updatedAt: Date; +} + +export interface CreatePipelineInput { + organizationId: string; + projectId?: string | null; + name: string; + description?: string; + enabled?: boolean; + steps: PipelineStep[]; +} + +export interface UpdatePipelineInput { + name?: string; + description?: string; + enabled?: boolean; + steps?: PipelineStep[]; +} + +/** Input to the pipeline executor — one log entry */ +export interface LogForPipeline { + id: string; + time: Date; + message: string; + metadata: Record | null; +} + +/** Output: fields to merge into the log's metadata */ +export type ExtractedFields = Record; + +export interface StepResult { + step: PipelineStep; + extracted: ExtractedFields; + error?: string; +} + +export interface ExecutorResult { + steps: StepResult[]; + merged: ExtractedFields; // union of all extracted fields +} From ebc9f55c0c53113286039ce567d5e4347f39c61f Mon Sep 17 00:00:00 2001 From: Polliog Date: Fri, 20 Mar 2026 21:40:49 +0100 Subject: [PATCH 03/22] add built-in log parsers --- .../modules/log-pipeline/parsers/apache.ts | 2 + .../src/modules/log-pipeline/parsers/index.ts | 20 +++++++++ .../log-pipeline/parsers/json-message.ts | 11 +++++ .../modules/log-pipeline/parsers/logfmt.ts | 20 +++++++++ .../src/modules/log-pipeline/parsers/nginx.ts | 26 ++++++++++++ .../modules/log-pipeline/parsers/syslog.ts | 35 ++++++++++++++++ .../log-pipeline/parsers/apache.test.ts | 16 ++++++++ .../log-pipeline/parsers/json-message.test.ts | 24 +++++++++++ .../log-pipeline/parsers/logfmt.test.ts | 32 +++++++++++++++ .../log-pipeline/parsers/nginx.test.ts | 41 +++++++++++++++++++ .../log-pipeline/parsers/syslog.test.ts | 33 +++++++++++++++ packages/backend/vitest.unit.config.ts | 18 ++++++++ 12 files changed, 278 insertions(+) create mode 100644 packages/backend/src/modules/log-pipeline/parsers/apache.ts create mode 100644 packages/backend/src/modules/log-pipeline/parsers/index.ts create mode 100644 packages/backend/src/modules/log-pipeline/parsers/json-message.ts create mode 100644 packages/backend/src/modules/log-pipeline/parsers/logfmt.ts create mode 100644 packages/backend/src/modules/log-pipeline/parsers/nginx.ts create mode 100644 packages/backend/src/modules/log-pipeline/parsers/syslog.ts create mode 100644 packages/backend/src/tests/modules/log-pipeline/parsers/apache.test.ts create mode 100644 packages/backend/src/tests/modules/log-pipeline/parsers/json-message.test.ts create mode 100644 packages/backend/src/tests/modules/log-pipeline/parsers/logfmt.test.ts create mode 100644 packages/backend/src/tests/modules/log-pipeline/parsers/nginx.test.ts create mode 100644 packages/backend/src/tests/modules/log-pipeline/parsers/syslog.test.ts create mode 100644 packages/backend/vitest.unit.config.ts diff --git a/packages/backend/src/modules/log-pipeline/parsers/apache.ts b/packages/backend/src/modules/log-pipeline/parsers/apache.ts new file mode 100644 index 0000000..cef31c1 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/apache.ts @@ -0,0 +1,2 @@ +// Apache combined log format is identical to nginx combined format +export { parseNginx as parseApache } from './nginx.js'; diff --git a/packages/backend/src/modules/log-pipeline/parsers/index.ts b/packages/backend/src/modules/log-pipeline/parsers/index.ts new file mode 100644 index 0000000..1683ee9 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/index.ts @@ -0,0 +1,20 @@ +import type { ParserStep } from '../types.js'; +import { parseNginx } from './nginx.js'; +import { parseApache } from './apache.js'; +import { parseSyslog } from './syslog.js'; +import { parseLogfmt } from './logfmt.js'; +import { parseJsonMessage } from './json-message.js'; + +export function runBuiltinParser( + step: ParserStep, + message: string +): Record | null { + switch (step.parser) { + case 'nginx': return parseNginx(message); + case 'apache': return parseApache(message); + case 'syslog': return parseSyslog(message); + case 'logfmt': return parseLogfmt(message); + case 'json': return parseJsonMessage(message); + default: return null; + } +} diff --git a/packages/backend/src/modules/log-pipeline/parsers/json-message.ts b/packages/backend/src/modules/log-pipeline/parsers/json-message.ts new file mode 100644 index 0000000..aef3ac6 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/json-message.ts @@ -0,0 +1,11 @@ +export function parseJsonMessage(message: string): Record | null { + if (!message || !message.trimStart().startsWith('{')) return null; + + try { + const parsed = JSON.parse(message); + if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) return null; + return parsed as Record; + } catch { + return null; + } +} diff --git a/packages/backend/src/modules/log-pipeline/parsers/logfmt.ts b/packages/backend/src/modules/log-pipeline/parsers/logfmt.ts new file mode 100644 index 0000000..85b7038 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/logfmt.ts @@ -0,0 +1,20 @@ +export function parseLogfmt(message: string): Record | null { + if (!message) return null; + + const result: Record = {}; + // Match key=value or key="quoted value" + const regex = /(\w+)=(?:"((?:[^"\\]|\\.)*)"|(\S+))/g; + let match: RegExpExecArray | null; + let count = 0; + + while ((match = regex.exec(message)) !== null) { + const [, key, quoted, unquoted] = match; + result[key] = quoted !== undefined ? quoted : unquoted; + count++; + } + + // Require at least 2 pairs to avoid false positives on messages with URLs etc. + if (count < 2) return null; + + return result; +} diff --git a/packages/backend/src/modules/log-pipeline/parsers/nginx.ts b/packages/backend/src/modules/log-pipeline/parsers/nginx.ts new file mode 100644 index 0000000..6bf5b4d --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/nginx.ts @@ -0,0 +1,26 @@ +// Standard nginx combined log format: +// $remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" +const NGINX_REGEX = /^(\S+) \S+ (\S+) \[([^\]]+)\] "(\S+) ([^"]*?) (HTTP\/[\d.]+)" (\d{3}) (\d+) "([^"]*)" "([^"]*)"/; + +export function parseNginx(message: string): Record | null { + const m = NGINX_REGEX.exec(message); + if (!m) return null; + + const [, clientIp, remoteUser, timeLocal, method, rawPath, httpVersion, status, bytes, referer, userAgent] = m; + + const [path, query] = rawPath.split('?'); + + return { + client_ip: clientIp, + remote_user: remoteUser, + timestamp: timeLocal, + http_method: method, + http_path: path, + ...(query ? { http_query: query } : {}), + http_version: httpVersion.replace('HTTP/', ''), + http_status: parseInt(status, 10), + response_bytes: parseInt(bytes, 10), + http_referer: referer, + user_agent: userAgent, + }; +} diff --git a/packages/backend/src/modules/log-pipeline/parsers/syslog.ts b/packages/backend/src/modules/log-pipeline/parsers/syslog.ts new file mode 100644 index 0000000..18566dc --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/syslog.ts @@ -0,0 +1,35 @@ +const RFC3164 = /^(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(\S+)\s+(\S+?)(?:\[(\d+)\])?:\s+(.*)/; +const RFC5424 = /^<(\d+)>(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(.*)/; + +export function parseSyslog(message: string): Record | null { + // Try RFC 5424 first (has version number after priority) + const m5424 = RFC5424.exec(message); + if (m5424) { + const [, priority, , timestamp, hostname, appname, pid, , , msg] = m5424; + return { + priority: parseInt(priority, 10), + facility: Math.floor(parseInt(priority, 10) / 8), + severity: parseInt(priority, 10) % 8, + timestamp, + hostname: hostname === '-' ? undefined : hostname, + appname: appname === '-' ? undefined : appname, + pid: pid !== '-' ? parseInt(pid, 10) : undefined, + syslog_message: msg, + }; + } + + // Try RFC 3164 + const m3164 = RFC3164.exec(message); + if (m3164) { + const [, timestamp, hostname, appname, pid, msg] = m3164; + return { + timestamp, + hostname, + appname, + ...(pid ? { pid: parseInt(pid, 10) } : {}), + syslog_message: msg, + }; + } + + return null; +} diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/apache.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/apache.test.ts new file mode 100644 index 0000000..ba7348c --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/apache.test.ts @@ -0,0 +1,16 @@ +import { describe, it, expect } from 'vitest'; +import { parseApache } from '../../../../modules/log-pipeline/parsers/apache.js'; + +describe('parseApache', () => { + it('parses apache combined log format', () => { + const line = '127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 "http://www.example.com/start.html" "Mozilla/4.08"'; + const result = parseApache(line); + expect(result?.client_ip).toBe('127.0.0.1'); + expect(result?.http_status).toBe(200); + expect(result?.response_bytes).toBe(2326); + }); + + it('returns null for non-apache lines', () => { + expect(parseApache('garbage')).toBeNull(); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/json-message.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/json-message.test.ts new file mode 100644 index 0000000..a4d80db --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/json-message.test.ts @@ -0,0 +1,24 @@ +import { describe, it, expect } from 'vitest'; +import { parseJsonMessage } from '../../../../modules/log-pipeline/parsers/json-message.js'; + +describe('parseJsonMessage', () => { + it('parses a JSON object in the message', () => { + const result = parseJsonMessage('{"level":"info","user_id":42,"action":"login"}'); + expect(result?.level).toBe('info'); + expect(result?.user_id).toBe(42); + expect(result?.action).toBe('login'); + }); + + it('returns null for non-JSON messages', () => { + expect(parseJsonMessage('plain text message')).toBeNull(); + expect(parseJsonMessage('[1,2,3]')).toBeNull(); + }); + + it('returns null for invalid JSON', () => { + expect(parseJsonMessage('{broken json')).toBeNull(); + }); + + it('returns null for empty message', () => { + expect(parseJsonMessage('')).toBeNull(); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/logfmt.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/logfmt.test.ts new file mode 100644 index 0000000..207ad90 --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/logfmt.test.ts @@ -0,0 +1,32 @@ +import { describe, it, expect } from 'vitest'; +import { parseLogfmt } from '../../../../modules/log-pipeline/parsers/logfmt.js'; + +describe('parseLogfmt', () => { + it('parses simple key=value pairs', () => { + const result = parseLogfmt('level=info msg="user logged in" user_id=42 latency=1.23ms'); + expect(result?.level).toBe('info'); + expect(result?.msg).toBe('user logged in'); + expect(result?.user_id).toBe('42'); + expect(result?.latency).toBe('1.23ms'); + }); + + it('handles quoted values with spaces', () => { + const result = parseLogfmt('error="file not found" path="/var/log/app.log"'); + expect(result?.error).toBe('file not found'); + expect(result?.path).toBe('/var/log/app.log'); + }); + + it('handles boolean-like values', () => { + const result = parseLogfmt('success=true retried=false'); + expect(result?.success).toBe('true'); + }); + + it('returns null if no key=value found', () => { + expect(parseLogfmt('this is a plain message')).toBeNull(); + expect(parseLogfmt('')).toBeNull(); + }); + + it('requires at least 2 pairs to avoid false positives', () => { + expect(parseLogfmt('id=123')).toBeNull(); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/nginx.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/nginx.test.ts new file mode 100644 index 0000000..cdb7901 --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/nginx.test.ts @@ -0,0 +1,41 @@ +import { describe, it, expect } from 'vitest'; +import { parseNginx } from '../../../../modules/log-pipeline/parsers/nginx.js'; + +describe('parseNginx', () => { + it('parses a standard nginx access log line', () => { + const line = '192.168.1.1 - john [01/Jan/2024:12:00:00 +0000] "GET /api/v1/logs?limit=10 HTTP/1.1" 200 1234 "-" "curl/7.68.0"'; + const result = parseNginx(line); + expect(result).toMatchObject({ + client_ip: '192.168.1.1', + remote_user: 'john', + http_method: 'GET', + http_path: '/api/v1/logs', + http_query: 'limit=10', + http_version: '1.1', + http_status: 200, + response_bytes: 1234, + http_referer: '-', + user_agent: 'curl/7.68.0', + }); + expect(result?.timestamp).toBeDefined(); + }); + + it('handles anonymous user (-)', () => { + const line = '10.0.0.1 - - [15/Mar/2024:08:30:00 +0100] "POST /ingest HTTP/1.1" 201 0 "-" "sdk/1.0"'; + const result = parseNginx(line); + expect(result?.remote_user).toBe('-'); + expect(result?.http_status).toBe(201); + expect(result?.http_method).toBe('POST'); + }); + + it('returns null for non-nginx lines', () => { + expect(parseNginx('this is not a log line')).toBeNull(); + expect(parseNginx('')).toBeNull(); + }); + + it('parses response bytes as number', () => { + const line = '1.2.3.4 - - [01/Jan/2024:00:00:00 +0000] "GET / HTTP/2.0" 304 0 "-" "-"'; + expect(parseNginx(line)?.response_bytes).toBe(0); + expect(typeof parseNginx(line)?.response_bytes).toBe('number'); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/syslog.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/syslog.test.ts new file mode 100644 index 0000000..8313ee9 --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/syslog.test.ts @@ -0,0 +1,33 @@ +import { describe, it, expect } from 'vitest'; +import { parseSyslog } from '../../../../modules/log-pipeline/parsers/syslog.js'; + +describe('parseSyslog', () => { + it('parses RFC 3164 format', () => { + const line = 'Jan 15 12:00:00 myhost myapp[1234]: Connection established'; + const result = parseSyslog(line); + expect(result?.hostname).toBe('myhost'); + expect(result?.appname).toBe('myapp'); + expect(result?.pid).toBe(1234); + expect(result?.syslog_message).toBe('Connection established'); + }); + + it('parses RFC 3164 without PID', () => { + const line = 'Mar 20 08:15:30 server nginx: worker process 123 exited'; + const result = parseSyslog(line); + expect(result?.hostname).toBe('server'); + expect(result?.appname).toBe('nginx'); + expect(result?.pid).toBeUndefined(); + }); + + it('parses RFC 5424 format', () => { + const line = '<34>1 2024-01-15T12:00:00.000Z myhost myapp 1234 - - Connection established'; + const result = parseSyslog(line); + expect(result?.priority).toBe(34); + expect(result?.hostname).toBe('myhost'); + expect(result?.syslog_message).toBe('Connection established'); + }); + + it('returns null for non-syslog', () => { + expect(parseSyslog('hello world')).toBeNull(); + }); +}); diff --git a/packages/backend/vitest.unit.config.ts b/packages/backend/vitest.unit.config.ts new file mode 100644 index 0000000..72f08c1 --- /dev/null +++ b/packages/backend/vitest.unit.config.ts @@ -0,0 +1,18 @@ +import { defineConfig } from 'vitest/config'; +import path from 'path'; + +// Unit test config - no DB required, no globalSetup +export default defineConfig({ + test: { + globals: true, + environment: 'node', + // No globalSetup, no setupFiles - pure unit tests + include: ['src/**/*.test.ts'], + testTimeout: 10000, + }, + resolve: { + alias: { + '@': path.resolve(__dirname, './src'), + }, + }, +}); From 0b3ff659badb088cecfdd897d72308cb98372fbe Mon Sep 17 00:00:00 2001 From: Polliog Date: Fri, 20 Mar 2026 21:51:44 +0100 Subject: [PATCH 04/22] add grok engine --- packages/backend/src/database/migrator.ts | 2 +- .../log-pipeline/parsers/grok-engine.ts | 89 +++++++++++++++++++ packages/backend/src/tests/globalSetup.ts | 20 ++++- .../log-pipeline/parsers/grok-engine.test.ts | 49 ++++++++++ packages/backend/src/tests/setup.ts | 72 +++++++++------ 5 files changed, 204 insertions(+), 28 deletions(-) create mode 100644 packages/backend/src/modules/log-pipeline/parsers/grok-engine.ts create mode 100644 packages/backend/src/tests/modules/log-pipeline/parsers/grok-engine.test.ts diff --git a/packages/backend/src/database/migrator.ts b/packages/backend/src/database/migrator.ts index 91ae84b..83dfd22 100644 --- a/packages/backend/src/database/migrator.ts +++ b/packages/backend/src/database/migrator.ts @@ -66,7 +66,7 @@ export async function migrateToLatest() { if (error) { console.error('Migration failed'); console.error(error); - process.exit(1); + throw error; } console.log('All migrations completed'); diff --git a/packages/backend/src/modules/log-pipeline/parsers/grok-engine.ts b/packages/backend/src/modules/log-pipeline/parsers/grok-engine.ts new file mode 100644 index 0000000..2938c6b --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/grok-engine.ts @@ -0,0 +1,89 @@ +const IPV4 = '(?:(?:25[0-5]|2[0-4]\\d|[01]?\\d\\d?)\\.){3}(?:25[0-5]|2[0-4]\\d|[01]?\\d\\d?)'; +const IPV6 = '(?:[0-9A-Fa-f]{1,4}:){7}[0-9A-Fa-f]{1,4}|::1|::'; + +export const BUILTIN_PATTERNS: Record = { + INT: '[+-]?(?:0|[1-9]\\d*)', + POSINT: '[1-9]\\d*|0', + NUMBER: '[+-]?(?:\\d+(?:\\.\\d*)?|\\.\\d+)(?:[eE][+-]?\\d+)?', + WORD: '\\w+', + NOTSPACE: '\\S+', + SPACE: '\\s+', + DATA: '[\\s\\S]*?', + GREEDYDATA: '[\\s\\S]*', + IPV4, + IPV6, + IP: `(?:${IPV6}|${IPV4})`, + HOSTNAME: '[a-zA-Z0-9][a-zA-Z0-9\\-\\.]*', + URIPATH: '/[^\\s?#]*', + URIPARAM: '\\?[^\\s#]*', + URI: '[a-zA-Z][a-zA-Z0-9+\\-.]*://\\S+', + QUOTEDSTRING: '"(?:[^"\\\\]|\\\\.)*"', + QS: '"(?:[^"\\\\]|\\\\.)*"', + STATUS_CODE: '[1-5]\\d{2}', + HTTPDATE: '\\d{2}/\\w+/\\d{4}:\\d{2}:\\d{2}:\\d{2} [+-]\\d{4}', + TIMESTAMP_ISO8601: '\\d{4}-\\d{2}-\\d{2}[T ]\\d{2}:\\d{2}:\\d{2}(?:\\.\\d+)?(?:Z|[+-]\\d{2}:?\\d{2})?', + USER: '[a-zA-Z0-9._-]+', + METHOD: '(?:GET|POST|PUT|DELETE|PATCH|HEAD|OPTIONS|CONNECT|TRACE)', +}; + +interface FieldMeta { + name: string; + type: 'string' | 'int' | 'float'; +} + +/** + * Compile a grok-like pattern string into a RegExp. + * Syntax: %{PATTERN_NAME:field_name} or %{PATTERN_NAME:field_name:type} + */ +export function compileGrok( + pattern: string, + customPatterns: Record = {} +): { regex: RegExp; fields: FieldMeta[] } { + const all = { ...BUILTIN_PATTERNS, ...customPatterns }; + const fields: FieldMeta[] = []; + + const re = pattern.replace( + /%\{(\w+)(?::(\w+))?(?::(int|float|string))?\}/g, + (_, patternName, fieldName, typeName) => { + const pat = all[patternName]; + if (!pat) throw new Error(`Unknown grok pattern: ${patternName}`); + + if (fieldName) { + fields.push({ name: fieldName, type: (typeName as FieldMeta['type']) || 'string' }); + return `(?<${fieldName}>${pat})`; + } + return `(?:${pat})`; + } + ); + + return { regex: new RegExp(re), fields }; +} + +/** + * Match a grok pattern against input. Returns extracted fields or null. + */ +export function matchGrok( + pattern: string, + input: string, + customPatterns: Record = {} +): Record | null { + const { regex, fields } = compileGrok(pattern, customPatterns); + const m = regex.exec(input); + if (!m || !m.groups) return null; + + const result: Record = {}; + for (const field of fields) { + const raw = m.groups[field.name]; + if (raw === undefined) continue; + + if (field.type === 'int') { + result[field.name] = parseInt(raw, 10); + } else if (field.type === 'float') { + result[field.name] = parseFloat(raw); + } else { + result[field.name] = raw; + } + } + + return result; +} diff --git a/packages/backend/src/tests/globalSetup.ts b/packages/backend/src/tests/globalSetup.ts index a301c15..089b05f 100644 --- a/packages/backend/src/tests/globalSetup.ts +++ b/packages/backend/src/tests/globalSetup.ts @@ -24,8 +24,24 @@ export default async function globalSetup() { await migrateToLatest(); console.log('[Global Setup] Migrations completed'); } catch (error) { - console.error('[Global Setup] Failed to run migrations:', error); - throw error; + const isConnRefused = (function checkConnRefused(err: unknown): boolean { + if (err instanceof AggregateError) { + return err.errors.some(checkConnRefused); + } + if (err instanceof Error) { + return ( + err.message.includes('ECONNREFUSED') || + (err as NodeJS.ErrnoException).code === 'ECONNREFUSED' + ); + } + return false; + })(error); + if (isConnRefused) { + console.warn('[Global Setup] DB not reachable — skipping migrations (unit-test mode)'); + } else { + console.error('[Global Setup] Failed to run migrations:', error); + throw error; + } } console.log('[Global Setup] Test environment ready!'); diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/grok-engine.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/grok-engine.test.ts new file mode 100644 index 0000000..3c5d45b --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/grok-engine.test.ts @@ -0,0 +1,49 @@ +import { describe, it, expect } from 'vitest'; +import { compileGrok, matchGrok } from '../../../../modules/log-pipeline/parsers/grok-engine.js'; + +describe('compileGrok', () => { + it('compiles a pattern with built-in WORD', () => { + const { regex } = compileGrok('%{WORD:method} %{URIPATH:path}'); + expect(regex).toBeInstanceOf(RegExp); + }); + + it('throws for unknown pattern name', () => { + expect(() => compileGrok('%{UNKNOWN_PATTERN:field}')).toThrow('Unknown grok pattern'); + }); +}); + +describe('matchGrok', () => { + it('extracts named fields from a line', () => { + const result = matchGrok('%{IPV4:client_ip} %{WORD:method}', '10.0.0.1 GET'); + expect(result).toEqual({ client_ip: '10.0.0.1', method: 'GET' }); + }); + + it('coerces :int type to number', () => { + const result = matchGrok('%{POSINT:status:int} %{WORD:method}', '200 GET'); + expect(result?.status).toBe(200); + expect(typeof result?.status).toBe('number'); + }); + + it('coerces :float type to number', () => { + const result = matchGrok('%{NUMBER:latency:float}ms', '1.23ms'); + expect(result?.latency).toBeCloseTo(1.23); + }); + + it('returns null when pattern does not match', () => { + expect(matchGrok('%{IPV4:ip}', 'not-an-ip')).toBeNull(); + }); + + it('supports non-capturing %{PATTERN} without field name', () => { + const result = matchGrok('%{IPV4} %{WORD:method}', '10.0.0.1 POST'); + expect(result).toEqual({ method: 'POST' }); + }); + + it('handles a realistic nginx-like grok pattern', () => { + const pattern = '%{IPV4:client_ip} - %{NOTSPACE:user} \\[%{DATA:timestamp}\\] "%{WORD:method} %{NOTSPACE:path} HTTP/%{NUMBER:http_version}" %{POSINT:status:int}'; + const line = '192.168.0.1 - alice [01/Jan/2024:00:00:00 +0000] "GET /api HTTP/1.1" 200'; + const result = matchGrok(pattern, line); + expect(result?.client_ip).toBe('192.168.0.1'); + expect(result?.method).toBe('GET'); + expect(result?.status).toBe(200); + }); +}); diff --git a/packages/backend/src/tests/setup.ts b/packages/backend/src/tests/setup.ts index 28fc012..28dcd33 100644 --- a/packages/backend/src/tests/setup.ts +++ b/packages/backend/src/tests/setup.ts @@ -18,9 +18,23 @@ beforeAll(async () => { await db.selectFrom('users').selectAll().execute(); console.log('Database connection established'); } catch (error) { - console.error('Failed to connect to test database:', error); - console.error('Make sure the test database is running (docker-compose.test.yml)'); - throw error; + const isConnRefused = (function check(err: unknown): boolean { + if (err instanceof AggregateError) return err.errors.some(check); + if (err instanceof Error) { + return ( + err.message.includes('ECONNREFUSED') || + (err as NodeJS.ErrnoException).code === 'ECONNREFUSED' + ); + } + return false; + })(error); + if (isConnRefused) { + console.warn('DB not reachable — running in unit-test mode (no DB cleanup)'); + } else { + console.error('Failed to connect to test database:', error); + console.error('Make sure the test database is running (docker-compose.test.yml)'); + throw error; + } } }); @@ -42,32 +56,40 @@ beforeEach(async () => { // Clear Redis rate limit keys to prevent 429 errors in tests // @fastify/rate-limit uses keys starting with 'rl:' const redis = getConnection(); - if (redis) { - const rateLimitKeys = await redis.keys('rl:*'); - if (rateLimitKeys.length > 0) { - await redis.del(...rateLimitKeys); + if (redis && redis.status === 'ready') { + try { + const rateLimitKeys = await redis.keys('rl:*'); + if (rateLimitKeys.length > 0) { + await redis.del(...rateLimitKeys); + } + } catch { + // Redis not available — unit-test mode } } // Delete all data from tables in reverse dependency order - await db.deleteFrom('logs').execute(); - await db.deleteFrom('alert_history').execute(); - // SIEM tables (must delete before incidents and sigma_rules) - await db.deleteFrom('incident_comments').execute(); - await db.deleteFrom('incident_history').execute(); - await db.deleteFrom('detection_events').execute(); - await db.deleteFrom('incidents').execute(); - await db.deleteFrom('organization_invitations').execute(); - await db.deleteFrom('sigma_rules').execute(); - await db.deleteFrom('alert_rules').execute(); - await db.deleteFrom('api_keys').execute(); - await db.deleteFrom('notifications').execute(); - await db.deleteFrom('organization_members').execute(); - await db.deleteFrom('projects').execute(); - await db.deleteFrom('organizations').execute(); - await db.deleteFrom('audit_log').execute(); - await db.deleteFrom('sessions').execute(); - await db.deleteFrom('users').execute(); + try { + await db.deleteFrom('logs').execute(); + await db.deleteFrom('alert_history').execute(); + // SIEM tables (must delete before incidents and sigma_rules) + await db.deleteFrom('incident_comments').execute(); + await db.deleteFrom('incident_history').execute(); + await db.deleteFrom('detection_events').execute(); + await db.deleteFrom('incidents').execute(); + await db.deleteFrom('organization_invitations').execute(); + await db.deleteFrom('sigma_rules').execute(); + await db.deleteFrom('alert_rules').execute(); + await db.deleteFrom('api_keys').execute(); + await db.deleteFrom('notifications').execute(); + await db.deleteFrom('organization_members').execute(); + await db.deleteFrom('projects').execute(); + await db.deleteFrom('organizations').execute(); + await db.deleteFrom('audit_log').execute(); + await db.deleteFrom('sessions').execute(); + await db.deleteFrom('users').execute(); + } catch { + // DB not available — unit-test mode, skip cleanup + } }); /** From 74e7cbd926a7bf6982c4837c897bfed5fcb23435 Mon Sep 17 00:00:00 2001 From: Polliog Date: Fri, 20 Mar 2026 21:55:30 +0100 Subject: [PATCH 05/22] add pipeline executor and geoip step --- .../modules/log-pipeline/pipeline-executor.ts | 53 ++++++++++++++++++ .../src/modules/log-pipeline/steps/geoip.ts | 29 ++++++++++ .../log-pipeline/pipeline-executor.test.ts | 55 +++++++++++++++++++ 3 files changed, 137 insertions(+) create mode 100644 packages/backend/src/modules/log-pipeline/pipeline-executor.ts create mode 100644 packages/backend/src/modules/log-pipeline/steps/geoip.ts create mode 100644 packages/backend/src/tests/modules/log-pipeline/pipeline-executor.test.ts diff --git a/packages/backend/src/modules/log-pipeline/pipeline-executor.ts b/packages/backend/src/modules/log-pipeline/pipeline-executor.ts new file mode 100644 index 0000000..51710c4 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/pipeline-executor.ts @@ -0,0 +1,53 @@ +import type { LogForPipeline, PipelineStep, ExecutorResult, StepResult, ExtractedFields } from './types.js'; +import { runBuiltinParser } from './parsers/index.js'; +import { matchGrok } from './parsers/grok-engine.js'; +import { runGeoIpStep } from './steps/geoip.js'; + +export class PipelineExecutor { + static async execute(log: LogForPipeline, steps: PipelineStep[]): Promise { + const stepResults: StepResult[] = []; + // Accumulated extracted fields — earlier steps take priority (no overwrite) + const merged: ExtractedFields = {}; + + // Build a running metadata view that includes extracted fields so later steps can use them + let currentMeta: Record = { ...(log.metadata ?? {}) }; + + for (const step of steps) { + const stepResult: StepResult = { step, extracted: {} }; + + try { + let extracted: Record | null = null; + + if (step.type === 'parser') { + extracted = runBuiltinParser(step, log.message); + } else if (step.type === 'grok') { + const source = step.source + ? (currentMeta[step.source] as string | undefined) ?? log.message + : log.message; + extracted = matchGrok(step.pattern, source); + } else if (step.type === 'geoip') { + // Pass log with accumulated metadata so geoip can read previously extracted IP + extracted = await runGeoIpStep(step, { ...log, metadata: currentMeta }); + } + + if (extracted) { + stepResult.extracted = extracted; + // Merge into accumulated: don't overwrite keys set by earlier steps + for (const [k, v] of Object.entries(extracted)) { + if (!(k in merged)) { + merged[k] = v; + } + } + // Update running metadata view + currentMeta = { ...extracted, ...currentMeta }; + } + } catch (err) { + stepResult.error = err instanceof Error ? err.message : String(err); + } + + stepResults.push(stepResult); + } + + return { steps: stepResults, merged }; + } +} diff --git a/packages/backend/src/modules/log-pipeline/steps/geoip.ts b/packages/backend/src/modules/log-pipeline/steps/geoip.ts new file mode 100644 index 0000000..e239158 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/steps/geoip.ts @@ -0,0 +1,29 @@ +import type { GeoIpStep, LogForPipeline } from '../types.js'; + +// Lazy import to avoid crashing when GeoLite2 DB is not present +async function tryGeoLookup(ip: string): Promise | null> { + try { + const { geoLite2Service } = await import('../../siem/geolite2-service.js'); + const geo = geoLite2Service.lookup(ip); + return geo ?? null; + } catch { + return null; + } +} + +export async function runGeoIpStep( + step: GeoIpStep, + log: LogForPipeline +): Promise> { + const meta = log.metadata ?? {}; + const ip = meta[step.field]; + if (typeof ip !== 'string' || !ip) return {}; + + try { + const geo = await tryGeoLookup(ip); + if (!geo) return {}; + return { [step.target]: geo }; + } catch { + return {}; + } +} diff --git a/packages/backend/src/tests/modules/log-pipeline/pipeline-executor.test.ts b/packages/backend/src/tests/modules/log-pipeline/pipeline-executor.test.ts new file mode 100644 index 0000000..9588f8e --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/pipeline-executor.test.ts @@ -0,0 +1,55 @@ +import { describe, it, expect } from 'vitest'; +import { PipelineExecutor } from '../../../modules/log-pipeline/pipeline-executor.js'; +import type { LogForPipeline, PipelineStep } from '../../../modules/log-pipeline/types.js'; + +const sampleLog: LogForPipeline = { + id: 'test-id', + time: new Date(), + message: '192.168.1.1 - - [01/Jan/2024:00:00:00 +0000] "GET /api HTTP/1.1" 200 512 "-" "curl/7.68.0"', + metadata: null, +}; + +describe('PipelineExecutor', () => { + it('runs a parser step and extracts fields', async () => { + const steps: PipelineStep[] = [{ type: 'parser', parser: 'nginx' }]; + const result = await PipelineExecutor.execute(sampleLog, steps); + expect(result.merged.client_ip).toBe('192.168.1.1'); + expect(result.merged.http_status).toBe(200); + expect(result.steps[0].error).toBeUndefined(); + }); + + it('runs a grok step', async () => { + const log: LogForPipeline = { ...sampleLog, message: 'user=alice action=login' }; + const steps: PipelineStep[] = [{ type: 'grok', pattern: 'user=%{WORD:user} action=%{WORD:action}' }]; + const result = await PipelineExecutor.execute(log, steps); + expect(result.merged.user).toBe('alice'); + expect(result.merged.action).toBe('login'); + }); + + it('continues executing remaining steps when one step fails', async () => { + const steps: PipelineStep[] = [ + { type: 'grok', pattern: '%{UNKNOWN_PATTERN:x}' }, // will error + { type: 'parser', parser: 'json' }, + ]; + const log: LogForPipeline = { ...sampleLog, message: '{"key":"val"}' }; + const result = await PipelineExecutor.execute(log, steps); + expect(result.steps[0].error).toBeDefined(); + expect(result.merged.key).toBe('val'); // second step still ran + }); + + it('merges fields from multiple steps, earlier steps take priority', async () => { + const steps: PipelineStep[] = [ + { type: 'parser', parser: 'nginx' }, + { type: 'grok', pattern: '%{IPV4:client_ip}' }, + ]; + const result = await PipelineExecutor.execute(sampleLog, steps); + expect(result.merged.client_ip).toBe('192.168.1.1'); + }); + + it('returns empty merged when no steps match', async () => { + const steps: PipelineStep[] = [{ type: 'parser', parser: 'json' }]; + const log: LogForPipeline = { ...sampleLog, message: 'plain text' }; + const result = await PipelineExecutor.execute(log, steps); + expect(result.merged).toEqual({}); + }); +}); From 421774ddfeb54b3d04819dad38afee1af4a2a1d9 Mon Sep 17 00:00:00 2001 From: Polliog Date: Fri, 20 Mar 2026 21:59:45 +0100 Subject: [PATCH 06/22] add pipeline service with CRUD and yaml import --- .../src/modules/log-pipeline/service.ts | 153 ++++++++++++ .../modules/log-pipeline/service.test.ts | 233 ++++++++++++++++++ 2 files changed, 386 insertions(+) create mode 100644 packages/backend/src/modules/log-pipeline/service.ts create mode 100644 packages/backend/src/tests/modules/log-pipeline/service.test.ts diff --git a/packages/backend/src/modules/log-pipeline/service.ts b/packages/backend/src/modules/log-pipeline/service.ts new file mode 100644 index 0000000..8443f26 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/service.ts @@ -0,0 +1,153 @@ +import yaml from 'js-yaml'; +import { db } from '../../database/index.js'; +import type { Pipeline, CreatePipelineInput, UpdatePipelineInput, PipelineStep } from './types.js'; + +interface CacheEntry { pipeline: Pipeline | null; expiresAt: number } + +export class PipelineService { + private cache = new Map(); + private cacheTTL = 5 * 60 * 1000; + + private mapRow(row: Record): Pipeline { + return { + id: row.id as string, + organizationId: row.organization_id as string, + projectId: row.project_id as string | null, + name: row.name as string, + description: row.description as string | null, + enabled: row.enabled as boolean, + steps: (row.steps as PipelineStep[]) ?? [], + createdAt: row.created_at as Date, + updatedAt: row.updated_at as Date, + }; + } + + async create(input: CreatePipelineInput): Promise { + const row = await db + .insertInto('log_pipelines') + .values({ + organization_id: input.organizationId, + project_id: input.projectId ?? null, + name: input.name, + description: input.description ?? null, + enabled: input.enabled ?? true, + steps: input.steps as unknown as Record[], + }) + .returningAll() + .executeTakeFirstOrThrow(); + this.invalidateCache(input.organizationId); + return this.mapRow(row as unknown as Record); + } + + async update(id: string, organizationId: string, input: UpdatePipelineInput): Promise { + const updates: Record = { updated_at: new Date() }; + if (input.name !== undefined) updates.name = input.name; + if (input.description !== undefined) updates.description = input.description; + if (input.enabled !== undefined) updates.enabled = input.enabled; + if (input.steps !== undefined) updates.steps = input.steps; + + const row = await db + .updateTable('log_pipelines') + .set(updates) + .where('id', '=', id) + .where('organization_id', '=', organizationId) + .returningAll() + .executeTakeFirstOrThrow(); + this.invalidateCache(organizationId); + return this.mapRow(row as unknown as Record); + } + + async delete(id: string, organizationId: string): Promise { + await db + .deleteFrom('log_pipelines') + .where('id', '=', id) + .where('organization_id', '=', organizationId) + .execute(); + this.invalidateCache(organizationId); + } + + async listForOrg(organizationId: string): Promise { + const rows = await db + .selectFrom('log_pipelines') + .selectAll() + .where('organization_id', '=', organizationId) + .orderBy('created_at', 'asc') + .execute(); + return rows.map((r) => this.mapRow(r as unknown as Record)); + } + + async getById(id: string, organizationId: string): Promise { + const row = await db + .selectFrom('log_pipelines') + .selectAll() + .where('id', '=', id) + .where('organization_id', '=', organizationId) + .executeTakeFirst(); + return row ? this.mapRow(row as unknown as Record) : null; + } + + /** Used by the BullMQ job — cached. Project pipeline takes priority over org-wide. */ + async getForProject(projectId: string, organizationId: string): Promise { + const cacheKey = `${organizationId}:${projectId}`; + const cached = this.cache.get(cacheKey); + if (cached && Date.now() < cached.expiresAt) return cached.pipeline; + + // Project-specific first, then org-wide fallback + const row = await db + .selectFrom('log_pipelines') + .selectAll() + .where('organization_id', '=', organizationId) + .where('enabled', '=', true) + .where((eb) => + eb.or([ + eb('project_id', '=', projectId), + eb('project_id', 'is', null), + ]) + ) + .orderBy('project_id', 'desc') // non-null (project-specific) sorts first + .executeTakeFirst(); + + const pipeline = row ? this.mapRow(row as unknown as Record) : null; + this.cache.set(cacheKey, { pipeline, expiresAt: Date.now() + this.cacheTTL }); + return pipeline; + } + + async importFromYaml(yamlText: string, organizationId: string, projectId: string | null): Promise { + let doc: unknown; + try { + doc = yaml.load(yamlText); + } catch (e: unknown) { + throw new Error(`Invalid YAML: ${e instanceof Error ? e.message : String(e)}`); + } + if (!doc || typeof doc !== 'object') throw new Error('YAML must be a mapping object'); + const docObj = doc as Record; + if (!docObj.name) throw new Error('Pipeline YAML must have a "name" field'); + if (!Array.isArray(docObj.steps)) throw new Error('Pipeline YAML must have a "steps" array'); + + // Upsert: delete existing if any, then create + await db + .deleteFrom('log_pipelines') + .where('organization_id', '=', organizationId) + .where((eb) => + projectId ? eb('project_id', '=', projectId) : eb('project_id', 'is', null) + ) + .execute(); + + return this.create({ + organizationId, + projectId, + name: docObj.name as string, + description: typeof docObj.description === 'string' ? docObj.description : undefined, + enabled: typeof docObj.enabled === 'boolean' ? docObj.enabled : true, + steps: docObj.steps as PipelineStep[], + }); + } + + invalidateCache(organizationId: string): void { + for (const key of this.cache.keys()) { + if (key.startsWith(organizationId + ':')) this.cache.delete(key); + } + } +} + +export const pipelineService = new PipelineService(); diff --git a/packages/backend/src/tests/modules/log-pipeline/service.test.ts b/packages/backend/src/tests/modules/log-pipeline/service.test.ts new file mode 100644 index 0000000..4eb2ef2 --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/service.test.ts @@ -0,0 +1,233 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { pipelineService } from '../../../modules/log-pipeline/service.js'; +import { createTestContext } from '../../helpers/index.js'; +import { db } from '../../../database/index.js'; + +let ctx: Awaited>; + +beforeEach(async () => { + await db.deleteFrom('log_pipelines').execute(); + await db.deleteFrom('logs').execute(); + await db.deleteFrom('api_keys').execute(); + await db.deleteFrom('organization_members').execute(); + await db.deleteFrom('projects').execute(); + await db.deleteFrom('organizations').execute(); + await db.deleteFrom('sessions').execute(); + await db.deleteFrom('users').execute(); + + ctx = await createTestContext(); + pipelineService.invalidateCache(ctx.organization.id); +}); + +describe('pipelineService.create', () => { + it('creates a pipeline with steps', async () => { + const pipeline = await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Test Pipeline', + steps: [{ type: 'parser', parser: 'nginx' }], + }); + expect(pipeline.id).toBeDefined(); + expect(pipeline.steps).toHaveLength(1); + expect(pipeline.enabled).toBe(true); + expect(pipeline.organizationId).toBe(ctx.organization.id); + expect(pipeline.projectId).toBe(ctx.project.id); + }); + + it('creates an org-wide pipeline when projectId is null', async () => { + const pipeline = await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'Org Pipeline', + steps: [], + }); + expect(pipeline.projectId).toBeNull(); + expect(pipeline.name).toBe('Org Pipeline'); + }); + + it('enforces unique pipeline per project', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'First', + steps: [], + }); + await expect(pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Second', + steps: [], + })).rejects.toThrow(); + }); +}); + +describe('pipelineService.update', () => { + it('updates name and steps', async () => { + const created = await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Original', + steps: [], + }); + const updated = await pipelineService.update(created.id, ctx.organization.id, { + name: 'Updated', + steps: [{ type: 'parser', parser: 'json' }], + }); + expect(updated.name).toBe('Updated'); + expect(updated.steps).toHaveLength(1); + }); + + it('can disable a pipeline', async () => { + const created = await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'Enabled', + steps: [], + }); + const updated = await pipelineService.update(created.id, ctx.organization.id, { enabled: false }); + expect(updated.enabled).toBe(false); + }); +}); + +describe('pipelineService.delete', () => { + it('deletes a pipeline', async () => { + const created = await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'To Delete', + steps: [], + }); + await pipelineService.delete(created.id, ctx.organization.id); + const found = await pipelineService.getById(created.id, ctx.organization.id); + expect(found).toBeNull(); + }); +}); + +describe('pipelineService.listForOrg', () => { + it('lists all pipelines for org', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'Org-wide', + steps: [], + }); + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Project-specific', + steps: [], + }); + const list = await pipelineService.listForOrg(ctx.organization.id); + expect(list).toHaveLength(2); + }); +}); + +describe('pipelineService.getForProject', () => { + it('returns project pipeline', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'P', + steps: [], + }); + pipelineService.invalidateCache(ctx.organization.id); + const p = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + expect(p?.name).toBe('P'); + }); + + it('falls back to org-wide pipeline when no project pipeline exists', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'Org-wide', + steps: [], + }); + pipelineService.invalidateCache(ctx.organization.id); + const p = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + expect(p?.name).toBe('Org-wide'); + }); + + it('prefers project pipeline over org-wide', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'Org-wide', + steps: [], + }); + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Project-specific', + steps: [], + }); + pipelineService.invalidateCache(ctx.organization.id); + const p = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + expect(p?.name).toBe('Project-specific'); + }); + + it('returns null when no pipeline configured', async () => { + const p = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + expect(p).toBeNull(); + }); + + it('caches result for subsequent calls', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Cached', + steps: [], + }); + pipelineService.invalidateCache(ctx.organization.id); + const first = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + // Delete from DB — cache should still return value + await db.deleteFrom('log_pipelines').execute(); + const second = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + expect(first?.name).toBe(second?.name); + }); +}); + +describe('pipelineService.importFromYaml', () => { + it('parses valid YAML and creates pipeline', async () => { + const yamlText = ` +name: nginx-pipeline +description: Parse nginx logs +enabled: true +steps: + - type: parser + parser: nginx + - type: geoip + field: client_ip + target: geo +`; + const pipeline = await pipelineService.importFromYaml(yamlText, ctx.organization.id, ctx.project.id); + expect(pipeline.name).toBe('nginx-pipeline'); + expect(pipeline.description).toBe('Parse nginx logs'); + expect(pipeline.steps).toHaveLength(2); + expect(pipeline.enabled).toBe(true); + }); + + it('replaces existing pipeline on re-import', async () => { + const yaml1 = `name: first\nsteps: []`; + const yaml2 = `name: second\nsteps:\n - type: parser\n parser: json`; + await pipelineService.importFromYaml(yaml1, ctx.organization.id, ctx.project.id); + const pipeline = await pipelineService.importFromYaml(yaml2, ctx.organization.id, ctx.project.id); + expect(pipeline.name).toBe('second'); + const list = await pipelineService.listForOrg(ctx.organization.id); + expect(list).toHaveLength(1); + }); + + it('throws on invalid YAML syntax', async () => { + await expect(pipelineService.importFromYaml('invalid: [yaml: bad', ctx.organization.id, null)) + .rejects.toThrow(); + }); + + it('throws when name is missing', async () => { + await expect(pipelineService.importFromYaml('steps: []', ctx.organization.id, null)) + .rejects.toThrow('name'); + }); + + it('throws when steps is not an array', async () => { + await expect(pipelineService.importFromYaml('name: test\nsteps: not-an-array', ctx.organization.id, null)) + .rejects.toThrow('steps'); + }); +}); From f8eec43fbf2c4f9f90952fc4c0115c8880866990 Mon Sep 17 00:00:00 2001 From: Polliog Date: Fri, 20 Mar 2026 22:04:21 +0100 Subject: [PATCH 07/22] add pipeline API routes --- .../backend/src/modules/log-pipeline/index.ts | 2 + .../src/modules/log-pipeline/routes.ts | 148 +++++++++ packages/backend/src/server.ts | 2 + .../tests/modules/log-pipeline/routes.test.ts | 289 ++++++++++++++++++ 4 files changed, 441 insertions(+) create mode 100644 packages/backend/src/modules/log-pipeline/index.ts create mode 100644 packages/backend/src/modules/log-pipeline/routes.ts create mode 100644 packages/backend/src/tests/modules/log-pipeline/routes.test.ts diff --git a/packages/backend/src/modules/log-pipeline/index.ts b/packages/backend/src/modules/log-pipeline/index.ts new file mode 100644 index 0000000..8e4c16d --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/index.ts @@ -0,0 +1,2 @@ +export { pipelineRoutes } from './routes.js'; +export { pipelineService } from './service.js'; diff --git a/packages/backend/src/modules/log-pipeline/routes.ts b/packages/backend/src/modules/log-pipeline/routes.ts new file mode 100644 index 0000000..5594d4b --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/routes.ts @@ -0,0 +1,148 @@ +import type { FastifyInstance } from 'fastify'; +import { z } from 'zod'; +import { authenticate } from '../auth/middleware.js'; +import { OrganizationsService } from '../organizations/service.js'; +import { pipelineService } from './service.js'; +import { PipelineExecutor } from './pipeline-executor.js'; + +const organizationsService = new OrganizationsService(); + +const stepSchema = z.discriminatedUnion('type', [ + z.object({ + type: z.literal('parser'), + parser: z.enum(['nginx', 'apache', 'syslog', 'logfmt', 'json']), + }), + z.object({ + type: z.literal('grok'), + pattern: z.string().min(1), + source: z.string().optional(), + }), + z.object({ + type: z.literal('geoip'), + field: z.string().min(1), + target: z.string().min(1), + }), +]); + +const createSchema = z.object({ + organizationId: z.string().uuid(), + projectId: z.string().uuid().optional().nullable(), + name: z.string().min(1).max(200), + description: z.string().optional(), + enabled: z.boolean().optional(), + steps: z.array(stepSchema), +}); + +const updateSchema = z.object({ + name: z.string().min(1).max(200).optional(), + description: z.string().optional(), + enabled: z.boolean().optional(), + steps: z.array(stepSchema).optional(), +}); + +const previewSchema = z.object({ + organizationId: z.string().uuid(), + steps: z.array(stepSchema), + message: z.string(), + metadata: z.record(z.unknown()).optional(), +}); + +const importYamlSchema = z.object({ + organizationId: z.string().uuid(), + projectId: z.string().uuid().optional().nullable(), + yaml: z.string().min(1), +}); + +async function checkMembership(userId: string, orgId: string): Promise { + const orgs = await organizationsService.getUserOrganizations(userId); + return orgs.some((o) => o.id === orgId); +} + +export async function pipelineRoutes(fastify: FastifyInstance) { + fastify.addHook('onRequest', authenticate); + + // List pipelines for org + fastify.get('/', async (request: any, reply) => { + const orgId = (request.query as any).organizationId as string; + if (!orgId) return reply.status(400).send({ error: 'organizationId required' }); + if (!await checkMembership(request.user.id, orgId)) return reply.status(403).send({ error: 'Forbidden' }); + const pipelines = await pipelineService.listForOrg(orgId); + return reply.send({ pipelines }); + }); + + // Preview (before /:id routes to avoid conflict) + fastify.post('/preview', async (request: any, reply) => { + try { + const body = previewSchema.parse(request.body); + if (!await checkMembership(request.user.id, body.organizationId)) return reply.status(403).send({ error: 'Forbidden' }); + const logEntry = { id: '', time: new Date(), message: body.message, metadata: body.metadata ?? null }; + const result = await PipelineExecutor.execute(logEntry, body.steps as any); + return reply.send(result); + } catch (e) { + if (e instanceof z.ZodError) return reply.status(400).send({ error: 'Validation error', details: e.errors }); + throw e; + } + }); + + // YAML import + fastify.post('/import-yaml', async (request: any, reply) => { + try { + const body = importYamlSchema.parse(request.body); + if (!await checkMembership(request.user.id, body.organizationId)) return reply.status(403).send({ error: 'Forbidden' }); + const pipeline = await pipelineService.importFromYaml(body.yaml, body.organizationId, body.projectId ?? null); + return reply.status(201).send({ pipeline }); + } catch (e) { + if (e instanceof z.ZodError) return reply.status(400).send({ error: 'Validation error', details: e.errors }); + if (e instanceof Error) return reply.status(400).send({ error: e.message }); + throw e; + } + }); + + // Create pipeline + fastify.post('/', async (request: any, reply) => { + try { + const body = createSchema.parse(request.body); + if (!await checkMembership(request.user.id, body.organizationId)) return reply.status(403).send({ error: 'Forbidden' }); + const pipeline = await pipelineService.create(body as any); + return reply.status(201).send({ pipeline }); + } catch (e) { + if (e instanceof z.ZodError) return reply.status(400).send({ error: 'Validation error', details: e.errors }); + if (e instanceof Error && e.message.includes('unique')) return reply.status(409).send({ error: 'A pipeline already exists for this project/org scope.' }); + throw e; + } + }); + + // Get pipeline by ID + fastify.get('/:id', async (request: any, reply) => { + const orgId = (request.query as any).organizationId as string; + if (!orgId) return reply.status(400).send({ error: 'organizationId required' }); + if (!await checkMembership(request.user.id, orgId)) return reply.status(403).send({ error: 'Forbidden' }); + const pipeline = await pipelineService.getById((request.params as any).id, orgId); + if (!pipeline) return reply.status(404).send({ error: 'Not found' }); + return reply.send({ pipeline }); + }); + + // Update pipeline + fastify.put('/:id', async (request: any, reply) => { + try { + const orgId = (request.query as any).organizationId as string; + if (!orgId) return reply.status(400).send({ error: 'organizationId required' }); + if (!await checkMembership(request.user.id, orgId)) return reply.status(403).send({ error: 'Forbidden' }); + const body = updateSchema.parse(request.body); + const pipeline = await pipelineService.update((request.params as any).id, orgId, body as any); + return reply.send({ pipeline }); + } catch (e) { + if (e instanceof z.ZodError) return reply.status(400).send({ error: 'Validation error', details: e.errors }); + throw e; + } + }); + + // Delete pipeline + fastify.delete('/:id', async (request: any, reply) => { + const orgId = (request.query as any).organizationId as string; + if (!orgId) return reply.status(400).send({ error: 'organizationId required' }); + if (!await checkMembership(request.user.id, orgId)) return reply.status(403).send({ error: 'Forbidden' }); + await pipelineService.delete((request.params as any).id, orgId); + return reply.status(204).send(); + }); +} diff --git a/packages/backend/src/server.ts b/packages/backend/src/server.ts index bb22dc7..4f35dc8 100644 --- a/packages/backend/src/server.ts +++ b/packages/backend/src/server.ts @@ -31,6 +31,7 @@ import { settingsRoutes, publicSettingsRoutes, settingsService } from './modules import { retentionRoutes } from './modules/retention/index.js'; import { correlationRoutes, patternRoutes } from './modules/correlation/index.js'; import { piiMaskingRoutes } from './modules/pii-masking/index.js'; +import { pipelineRoutes } from './modules/log-pipeline/index.js'; import { sessionsRoutes } from './modules/sessions/routes.js'; import { sourcemapsRoutes } from './modules/sourcemaps/index.js'; import { auditLogRoutes, auditLogService } from './modules/audit-log/index.js'; @@ -180,6 +181,7 @@ export async function build(opts = {}) { await fastify.register(correlationRoutes, { prefix: '/api' }); await fastify.register(patternRoutes, { prefix: '/api' }); await fastify.register(piiMaskingRoutes, { prefix: '/api' }); + await fastify.register(pipelineRoutes, { prefix: '/api/v1/pipelines' }); await fastify.register(otlpRoutes); await fastify.register(otlpTraceRoutes); await fastify.register(otlpMetricRoutes); diff --git a/packages/backend/src/tests/modules/log-pipeline/routes.test.ts b/packages/backend/src/tests/modules/log-pipeline/routes.test.ts new file mode 100644 index 0000000..a84ffaf --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/routes.test.ts @@ -0,0 +1,289 @@ +import { describe, it, expect, beforeEach, beforeAll, afterAll } from 'vitest'; +import Fastify, { FastifyInstance } from 'fastify'; +import { db } from '../../../database/index.js'; +import { pipelineRoutes } from '../../../modules/log-pipeline/routes.js'; +import { createTestContext, createTestSession, getSessionHeaders } from '../../helpers/index.js'; + +let app: FastifyInstance; +let ctx: Awaited>; +let authToken: string; + +beforeAll(async () => { + app = Fastify(); + await app.register(pipelineRoutes, { prefix: '/api/v1/pipelines' }); + await app.ready(); +}); + +afterAll(async () => { + await app.close(); +}); + +beforeEach(async () => { + await db.deleteFrom('log_pipelines').execute(); + await db.deleteFrom('logs').execute(); + await db.deleteFrom('api_keys').execute(); + await db.deleteFrom('organization_members').execute(); + await db.deleteFrom('projects').execute(); + await db.deleteFrom('organizations').execute(); + await db.deleteFrom('sessions').execute(); + await db.deleteFrom('users').execute(); + + ctx = await createTestContext(); + const session = await createTestSession(ctx.user.id); + authToken = session.token; +}); + +describe('POST /api/v1/pipelines', () => { + it('creates a pipeline', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: getSessionHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'My Pipeline', + steps: [{ type: 'parser', parser: 'nginx' }], + }, + }); + expect(res.statusCode).toBe(201); + const body = JSON.parse(res.payload); + expect(body.pipeline.name).toBe('My Pipeline'); + expect(body.pipeline.steps).toHaveLength(1); + }); + + it('returns 400 for invalid step type', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: getSessionHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + name: 'Bad Pipeline', + steps: [{ type: 'unknown_step' }], + }, + }); + expect(res.statusCode).toBe(400); + }); + + it('returns 401 without auth', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + payload: { + organizationId: ctx.organization.id, + name: 'Unauthenticated', + steps: [], + }, + }); + expect(res.statusCode).toBe(401); + }); + + it('returns 403 for non-member', async () => { + // Create a separate context (different org) + const other = await createTestContext(); + const otherSession = await createTestSession(other.user.id); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: getSessionHeaders(otherSession.token), + payload: { + organizationId: ctx.organization.id, + name: 'Hack Attempt', + steps: [], + }, + }); + expect(res.statusCode).toBe(403); + }); +}); + +describe('POST /api/v1/pipelines/preview', () => { + it('returns parsed fields for nginx log line', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines/preview', + headers: getSessionHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + steps: [{ type: 'parser', parser: 'nginx' }], + message: '10.0.0.1 - - [01/Jan/2024:00:00:00 +0000] "GET /api HTTP/1.1" 200 512 "-" "-"', + }, + }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.payload); + expect(body.merged.client_ip).toBe('10.0.0.1'); + expect(body.merged.http_status).toBe(200); + }); + + it('returns 400 for invalid step in preview', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines/preview', + headers: getSessionHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + steps: [{ type: 'bad_type' }], + message: 'some log', + }, + }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('POST /api/v1/pipelines/import-yaml', () => { + it('imports a pipeline from YAML', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines/import-yaml', + headers: getSessionHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + projectId: ctx.project.id, + yaml: 'name: yaml-pipeline\nsteps:\n - type: parser\n parser: nginx\n', + }, + }); + expect(res.statusCode).toBe(201); + const body = JSON.parse(res.payload); + expect(body.pipeline.name).toBe('yaml-pipeline'); + expect(body.pipeline.steps).toHaveLength(1); + }); + + it('returns 400 for invalid YAML', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines/import-yaml', + headers: getSessionHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + yaml: ':::not valid yaml:::', + }, + }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('GET /api/v1/pipelines', () => { + it('lists pipelines for org', async () => { + // Create a pipeline first + await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: getSessionHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + name: 'Listed Pipeline', + steps: [], + }, + }); + + const res = await app.inject({ + method: 'GET', + url: `/api/v1/pipelines?organizationId=${ctx.organization.id}`, + headers: getSessionHeaders(authToken), + }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.payload); + expect(body.pipelines.length).toBeGreaterThanOrEqual(1); + expect(body.pipelines[0].name).toBe('Listed Pipeline'); + }); + + it('returns 400 if organizationId is missing', async () => { + const res = await app.inject({ + method: 'GET', + url: '/api/v1/pipelines', + headers: getSessionHeaders(authToken), + }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('GET /api/v1/pipelines/:id', () => { + it('returns a pipeline by id', async () => { + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: getSessionHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + name: 'Fetch By ID', + steps: [], + }, + }); + const created = JSON.parse(createRes.payload).pipeline; + + const res = await app.inject({ + method: 'GET', + url: `/api/v1/pipelines/${created.id}?organizationId=${ctx.organization.id}`, + headers: getSessionHeaders(authToken), + }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.payload); + expect(body.pipeline.id).toBe(created.id); + }); + + it('returns 404 for unknown id', async () => { + const res = await app.inject({ + method: 'GET', + url: `/api/v1/pipelines/00000000-0000-0000-0000-000000000000?organizationId=${ctx.organization.id}`, + headers: getSessionHeaders(authToken), + }); + expect(res.statusCode).toBe(404); + }); +}); + +describe('PUT /api/v1/pipelines/:id', () => { + it('updates a pipeline', async () => { + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: getSessionHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + name: 'Original Name', + steps: [], + }, + }); + const created = JSON.parse(createRes.payload).pipeline; + + const res = await app.inject({ + method: 'PUT', + url: `/api/v1/pipelines/${created.id}?organizationId=${ctx.organization.id}`, + headers: getSessionHeaders(authToken), + payload: { name: 'Updated Name' }, + }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.payload); + expect(body.pipeline.name).toBe('Updated Name'); + }); +}); + +describe('DELETE /api/v1/pipelines/:id', () => { + it('deletes a pipeline', async () => { + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: getSessionHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + name: 'To Delete', + steps: [], + }, + }); + const created = JSON.parse(createRes.payload).pipeline; + + const deleteRes = await app.inject({ + method: 'DELETE', + url: `/api/v1/pipelines/${created.id}?organizationId=${ctx.organization.id}`, + headers: getSessionHeaders(authToken), + }); + expect(deleteRes.statusCode).toBe(204); + + // Verify it's gone + const getRes = await app.inject({ + method: 'GET', + url: `/api/v1/pipelines/${created.id}?organizationId=${ctx.organization.id}`, + headers: getSessionHeaders(authToken), + }); + expect(getRes.statusCode).toBe(404); + }); +}); From 995e872dcdc6ce4d036f2866c6ccf36e8bf5b3ec Mon Sep 17 00:00:00 2001 From: Polliog Date: Fri, 20 Mar 2026 22:10:05 +0100 Subject: [PATCH 08/22] add pipeline BullMQ job and worker --- .../backend/src/modules/ingestion/service.ts | 37 +++++ .../backend/src/queue/jobs/log-pipeline.ts | 62 +++++++++ .../src/tests/queue/jobs/log-pipeline.test.ts | 131 ++++++++++++++++++ packages/backend/src/worker.ts | 28 ++++ 4 files changed, 258 insertions(+) create mode 100644 packages/backend/src/queue/jobs/log-pipeline.ts create mode 100644 packages/backend/src/tests/queue/jobs/log-pipeline.test.ts diff --git a/packages/backend/src/modules/ingestion/service.ts b/packages/backend/src/modules/ingestion/service.ts index 005a11e..d3bec81 100644 --- a/packages/backend/src/modules/ingestion/service.ts +++ b/packages/backend/src/modules/ingestion/service.ts @@ -134,6 +134,13 @@ export class IngestionService { console.error('[Ingestion] Failed to trigger Exception parsing:', err); }); + // Trigger pipeline processing (async, non-blocking) + if (organizationId) { + this.triggerPipelineProcessing(logs, insertedLogs, projectId, organizationId).catch((err) => { + console.error('[Ingestion] Failed to trigger pipeline processing:', err); + }); + } + // Invalidate query caches for this project (async, non-blocking) CacheManager.invalidateProjectQueries(projectId).catch((err) => { console.error('[Ingestion] Failed to invalidate cache:', err); @@ -283,6 +290,36 @@ export class IngestionService { } } + /** + * Trigger log pipeline processing for ingested logs + */ + private async triggerPipelineProcessing( + logs: LogInput[], + insertedLogs: any[], + projectId: string, + organizationId: string + ): Promise { + try { + const payload = logs.map((log: LogInput, i: number) => ({ + id: insertedLogs[i]?.id ?? '', + time: + insertedLogs[i]?.time instanceof Date + ? insertedLogs[i].time.toISOString() + : String(insertedLogs[i]?.time ?? new Date().toISOString()), + message: log.message, + metadata: (log.metadata as Record | null | undefined) ?? null, + })); + + const pipelineQueue = createQueue('log-pipeline'); + await pipelineQueue.add('process-pipeline', { logs: payload, projectId, organizationId }); + + console.log(`[Ingestion] Queued pipeline processing for ${logs.length} logs`); + } catch (error) { + console.error('[Ingestion] Error queuing pipeline job:', error); + // Don't throw - ingestion should succeed even if pipeline queueing fails + } + } + /** * Get log statistics (reservoir: works with any engine) */ diff --git a/packages/backend/src/queue/jobs/log-pipeline.ts b/packages/backend/src/queue/jobs/log-pipeline.ts new file mode 100644 index 0000000..293c490 --- /dev/null +++ b/packages/backend/src/queue/jobs/log-pipeline.ts @@ -0,0 +1,62 @@ +import type { Job } from 'bullmq'; +import { sql } from 'kysely'; +import { db } from '../../database/connection.js'; +import { pipelineService } from '../../modules/log-pipeline/service.js'; +import { PipelineExecutor } from '../../modules/log-pipeline/pipeline-executor.js'; + +export interface LogPipelineJobData { + logs: Array<{ + id: string; + time: string; // ISO string (serialized for BullMQ) + message: string; + metadata: Record | null; + }>; + projectId: string; + organizationId: string; +} + +export async function processLogPipeline(job: Job): Promise { + const { logs, projectId, organizationId } = job.data; + + const pipeline = await pipelineService.getForProject(projectId, organizationId); + if (!pipeline || !pipeline.enabled || pipeline.steps.length === 0) return; + + console.log(`[Pipeline] Processing ${logs.length} logs for project ${projectId}`); + + const updates: Array<{ id: string; time: Date; fields: Record }> = []; + + for (const log of logs) { + try { + const result = await PipelineExecutor.execute( + { id: log.id, time: new Date(log.time), message: log.message, metadata: log.metadata }, + pipeline.steps + ); + if (Object.keys(result.merged).length > 0) { + updates.push({ id: log.id, time: new Date(log.time), fields: result.merged }); + } + } catch (err) { + console.error(`[Pipeline] Failed to process log ${log.id}:`, err); + } + } + + if (updates.length === 0) return; + + // Batch update: extracted fields do NOT overwrite existing metadata keys. + // TimescaleDB hypertable requires time in the WHERE clause. + for (const update of updates) { + try { + await db + .updateTable('logs') + .set({ + metadata: sql`${JSON.stringify(update.fields)}::jsonb || COALESCE(metadata, '{}'::jsonb)`, + }) + .where('id', '=', update.id) + .where('time', '=', update.time) + .execute(); + } catch (err) { + console.error(`[Pipeline] Failed to update metadata for log ${update.id}:`, err); + } + } + + console.log(`[Pipeline] Updated metadata for ${updates.length}/${logs.length} logs`); +} diff --git a/packages/backend/src/tests/queue/jobs/log-pipeline.test.ts b/packages/backend/src/tests/queue/jobs/log-pipeline.test.ts new file mode 100644 index 0000000..345a6d7 --- /dev/null +++ b/packages/backend/src/tests/queue/jobs/log-pipeline.test.ts @@ -0,0 +1,131 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { db } from '../../../database/index.js'; +import { createTestContext, createTestLog } from '../../helpers/index.js'; +import { pipelineService } from '../../../modules/log-pipeline/service.js'; + +// Mock queue connection BEFORE importing anything that uses it +vi.mock('../../../queue/connection.js', () => { + return { + createQueue: vi.fn(() => ({ + add: vi.fn().mockResolvedValue({ id: 'job-id' }), + close: vi.fn(), + })), + createWorker: vi.fn(() => ({ + on: vi.fn(), + close: vi.fn(), + })), + getConnection: () => null, + }; +}); + +// Mock config module +vi.mock('../../../config/index.js', () => ({ + config: { + SMTP_HOST: 'localhost', + SMTP_PORT: 1025, + SMTP_SECURE: false, + SMTP_USER: 'test@example.com', + SMTP_PASS: 'password', + SMTP_FROM: 'noreply@test.com', + REDIS_URL: 'redis://localhost:6379', + }, + isSmtpConfigured: vi.fn(() => false), +})); + +// Import after mocks +import { processLogPipeline } from '../../../queue/jobs/log-pipeline.js'; +import type { Job } from 'bullmq'; +import type { LogPipelineJobData } from '../../../queue/jobs/log-pipeline.js'; + +let ctx: Awaited>; + +beforeEach(async () => { + vi.clearAllMocks(); + await db.deleteFrom('log_pipelines').execute(); + ctx = await createTestContext(); + pipelineService.invalidateCache(ctx.organization.id); +}); + +describe('processLogPipeline', () => { + it('is a no-op when no pipeline is configured for the project', async () => { + const log = await createTestLog({ projectId: ctx.project.id, message: 'plain text' }); + const timeStr = log.time instanceof Date ? log.time.toISOString() : String(log.time); + const job = { + data: { + logs: [{ id: log.id, time: timeStr, message: log.message, metadata: null }], + projectId: ctx.project.id, + organizationId: ctx.organization.id, + } as LogPipelineJobData, + } as Job; + + await expect(processLogPipeline(job)).resolves.toBeUndefined(); + + const updated = await db + .selectFrom('logs') + .select('metadata') + .where('id', '=', log.id) + .executeTakeFirst(); + expect(updated?.metadata).toBeNull(); + }); + + it('updates log metadata when pipeline matches', async () => { + const nginxLine = + '10.0.0.1 - - [01/Jan/2024:00:00:00 +0000] "GET /api HTTP/1.1" 200 512 "-" "-"'; + + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'nginx', + steps: [{ type: 'parser', parser: 'nginx' }], + }); + + const log = await createTestLog({ projectId: ctx.project.id, message: nginxLine }); + const timeStr = log.time instanceof Date ? log.time.toISOString() : String(log.time); + const job = { + data: { + logs: [{ id: log.id, time: timeStr, message: nginxLine, metadata: null }], + projectId: ctx.project.id, + organizationId: ctx.organization.id, + } as LogPipelineJobData, + } as Job; + + await processLogPipeline(job); + + const updated = await db + .selectFrom('logs') + .select('metadata') + .where('id', '=', log.id) + .executeTakeFirst(); + + expect((updated?.metadata as Record)?.client_ip).toBe('10.0.0.1'); + expect((updated?.metadata as Record)?.http_status).toBe(200); + }); + + it('skips logs that produce no extracted fields', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'nginx', + steps: [{ type: 'parser', parser: 'nginx' }], + }); + + const log = await createTestLog({ projectId: ctx.project.id, message: 'not a log line' }); + const timeStr = log.time instanceof Date ? log.time.toISOString() : String(log.time); + const job = { + data: { + logs: [{ id: log.id, time: timeStr, message: 'not a log line', metadata: null }], + projectId: ctx.project.id, + organizationId: ctx.organization.id, + } as LogPipelineJobData, + } as Job; + + await processLogPipeline(job); + + const updated = await db + .selectFrom('logs') + .select('metadata') + .where('id', '=', log.id) + .executeTakeFirst(); + expect(updated?.metadata).toBeNull(); + }); +}); diff --git a/packages/backend/src/worker.ts b/packages/backend/src/worker.ts index c8c981f..1c0ddc8 100644 --- a/packages/backend/src/worker.ts +++ b/packages/backend/src/worker.ts @@ -9,6 +9,7 @@ import { processInvitationEmail, type InvitationEmailData } from './queue/jobs/i import { processIncidentNotification, type IncidentNotificationJob } from './queue/jobs/incident-notification.js'; import { processExceptionParsing, type ExceptionParsingJobData } from './queue/jobs/exception-parsing.js'; import { processErrorNotification, type ErrorNotificationJobData } from './queue/jobs/error-notification.js'; +import { processLogPipeline, type LogPipelineJobData } from './queue/jobs/log-pipeline.js'; import { alertsService } from './modules/alerts/index.js'; import { enrichmentService } from './modules/siem/enrichment-service.js'; import { retentionService } from './modules/retention/index.js'; @@ -62,6 +63,11 @@ const errorNotificationWorker = createWorker('error-no await processErrorNotification(job); }); +// Create worker for log pipeline processing +const pipelineWorker = createWorker('log-pipeline', async (job) => { + await processLogPipeline(job); +}); + // Start workers (required for graphile-worker backend, no-op for BullMQ) console.log(`[Worker] Using queue backend: ${getQueueBackend()}`); await startQueueWorkers(); @@ -224,6 +230,27 @@ errorNotificationWorker.on('failed', (job, err) => { } }); +pipelineWorker.on('completed', (job) => { + if (isInternalLoggingEnabled()) { + hub.captureLog('info', `Log pipeline job completed`, { + jobId: job.id, + logCount: job.data?.logs?.length, + }); + } +}); + +pipelineWorker.on('failed', (job, err) => { + console.error(`Log pipeline job ${job?.id} failed:`, err); + + if (isInternalLoggingEnabled()) { + hub.captureLog('error', `Log pipeline job failed: ${err.message}`, { + error: { name: err.name, message: err.message, stack: err.stack }, + jobId: job?.id, + logCount: job?.data?.logs?.length, + }); + } +}); + // Lock to prevent overlapping alert checks (race condition protection) let isCheckingAlerts = false; @@ -562,6 +589,7 @@ async function gracefulShutdown(signal: string) { await incidentNotificationWorker.close(); await exceptionWorker.close(); await errorNotificationWorker.close(); + await pipelineWorker.close(); console.log('[Worker] Workers closed'); // Close queue system (Redis/PostgreSQL connections) From 8c54992327bf17b28d4c3196678fab5a9a91d115 Mon Sep 17 00:00:00 2001 From: Polliog Date: Fri, 20 Mar 2026 22:11:27 +0100 Subject: [PATCH 09/22] add pipeline frontend API client and store --- packages/frontend/src/lib/api/log-pipeline.ts | 196 ++++++++++++++++++ .../frontend/src/lib/stores/log-pipeline.ts | 137 ++++++++++++ 2 files changed, 333 insertions(+) create mode 100644 packages/frontend/src/lib/api/log-pipeline.ts create mode 100644 packages/frontend/src/lib/stores/log-pipeline.ts diff --git a/packages/frontend/src/lib/api/log-pipeline.ts b/packages/frontend/src/lib/api/log-pipeline.ts new file mode 100644 index 0000000..a5df002 --- /dev/null +++ b/packages/frontend/src/lib/api/log-pipeline.ts @@ -0,0 +1,196 @@ +import { getApiUrl } from '$lib/config'; +import { getAuthToken } from '$lib/utils/auth'; + +export interface PipelineStep { + type: 'parser' | 'grok' | 'geoip'; + parser?: 'nginx' | 'apache' | 'syslog' | 'logfmt' | 'json'; + pattern?: string; + source?: string; + field?: string; + target?: string; +} + +export interface Pipeline { + id: string; + organizationId: string; + projectId: string | null; + name: string; + description: string | null; + enabled: boolean; + steps: PipelineStep[]; + createdAt: string; + updatedAt: string; +} + +export interface PreviewResult { + steps: Array<{ step: PipelineStep; extracted: Record; error?: string }>; + merged: Record; +} + +export interface CreatePipelineInput { + name: string; + description?: string; + steps: PipelineStep[]; + enabled?: boolean; + projectId?: string; +} + +export interface UpdatePipelineInput { + name?: string; + description?: string; + steps?: PipelineStep[]; + enabled?: boolean; +} + +async function fetchWithAuth(url: string, options: RequestInit = {}): Promise { + const token = getAuthToken(); + const headers: HeadersInit = { + ...(options.body ? { 'Content-Type': 'application/json' } : {}), + ...(token ? { Authorization: `Bearer ${token}` } : {}), + ...(options.headers || {}), + }; + + return fetch(url, { + ...options, + headers, + credentials: 'include', + }); +} + +export const logPipelineAPI = { + /** + * List all pipelines for an organization + */ + async list(organizationId: string, projectId?: string): Promise { + const params = new URLSearchParams({ organizationId }); + if (projectId) params.set('projectId', projectId); + + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines?${params}`); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to fetch pipelines' })); + throw new Error(error.error || 'Failed to fetch pipelines'); + } + + const data = await response.json(); + return data.pipelines; + }, + + /** + * Get a single pipeline + */ + async get(id: string, organizationId: string): Promise { + const params = new URLSearchParams({ organizationId }); + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines/${id}?${params}`); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to fetch pipeline' })); + throw new Error(error.error || 'Failed to fetch pipeline'); + } + + const data = await response.json(); + return data.pipeline; + }, + + /** + * Create a new pipeline + */ + async create(organizationId: string, input: CreatePipelineInput): Promise { + const params = new URLSearchParams({ organizationId }); + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines?${params}`, { + method: 'POST', + body: JSON.stringify(input), + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to create pipeline' })); + throw new Error(error.error || 'Failed to create pipeline'); + } + + const data = await response.json(); + return data.pipeline; + }, + + /** + * Update a pipeline + */ + async update( + id: string, + organizationId: string, + input: UpdatePipelineInput + ): Promise { + const params = new URLSearchParams({ organizationId }); + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines/${id}?${params}`, { + method: 'PUT', + body: JSON.stringify(input), + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to update pipeline' })); + throw new Error(error.error || 'Failed to update pipeline'); + } + + const data = await response.json(); + return data.pipeline; + }, + + /** + * Delete a pipeline + */ + async delete(id: string, organizationId: string): Promise { + const params = new URLSearchParams({ organizationId }); + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines/${id}?${params}`, { + method: 'DELETE', + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to delete pipeline' })); + throw new Error(error.error || 'Failed to delete pipeline'); + } + }, + + /** + * Preview pipeline execution on a sample log + */ + async preview( + organizationId: string, + steps: PipelineStep[], + message: string, + metadata?: Record + ): Promise { + const params = new URLSearchParams({ organizationId }); + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines/preview?${params}`, { + method: 'POST', + body: JSON.stringify({ steps, message, metadata }), + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to preview pipeline' })); + throw new Error(error.error || 'Failed to preview pipeline'); + } + + const data = await response.json(); + return data.result; + }, + + /** + * Import pipeline from YAML + */ + async importYaml(organizationId: string, projectId: string | null, yaml: string): Promise { + const params = new URLSearchParams({ organizationId }); + if (projectId) params.set('projectId', projectId); + + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines/import?${params}`, { + method: 'POST', + body: JSON.stringify({ yaml }), + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to import pipeline' })); + throw new Error(error.error || 'Failed to import pipeline'); + } + + const data = await response.json(); + return data.pipeline; + }, +}; diff --git a/packages/frontend/src/lib/stores/log-pipeline.ts b/packages/frontend/src/lib/stores/log-pipeline.ts new file mode 100644 index 0000000..088d999 --- /dev/null +++ b/packages/frontend/src/lib/stores/log-pipeline.ts @@ -0,0 +1,137 @@ +import { writable, derived } from 'svelte/store'; +import { + logPipelineAPI, + type Pipeline, + type CreatePipelineInput, + type UpdatePipelineInput, +} from '$lib/api/log-pipeline'; + +// ============================================================================ +// TYPES +// ============================================================================ + +export interface PipelineStoreState { + pipelines: Pipeline[]; + loading: boolean; + error: string | null; +} + +const initialState: PipelineStoreState = { + pipelines: [], + loading: false, + error: null, +}; + +// ============================================================================ +// STORE +// ============================================================================ + +function createPipelineStore() { + const { subscribe, set, update } = writable(initialState); + + return { + subscribe, + + /** + * Load all pipelines for an organization + */ + async load(organizationId: string, projectId?: string) { + update((s) => ({ ...s, loading: true, error: null })); + + try { + const pipelines = await logPipelineAPI.list(organizationId, projectId); + update((s) => ({ ...s, pipelines, loading: false })); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Failed to load pipelines'; + update((s) => ({ ...s, pipelines: [], loading: false, error: errorMessage })); + } + }, + + /** + * Get a single pipeline + */ + async get(id: string, organizationId: string): Promise { + return logPipelineAPI.get(id, organizationId); + }, + + /** + * Create a new pipeline + */ + async create(organizationId: string, input: CreatePipelineInput): Promise { + const pipeline = await logPipelineAPI.create(organizationId, input); + update((s) => ({ ...s, pipelines: [...s.pipelines, pipeline] })); + return pipeline; + }, + + /** + * Update a pipeline + */ + async update( + id: string, + organizationId: string, + input: UpdatePipelineInput + ): Promise { + const pipeline = await logPipelineAPI.update(id, organizationId, input); + update((s) => ({ + ...s, + pipelines: s.pipelines.map((p) => (p.id === id ? pipeline : p)), + })); + return pipeline; + }, + + /** + * Delete a pipeline + */ + async delete(id: string, organizationId: string): Promise { + await logPipelineAPI.delete(id, organizationId); + update((s) => ({ + ...s, + pipelines: s.pipelines.filter((p) => p.id !== id), + })); + }, + + /** + * Toggle pipeline enabled state + */ + async toggleEnabled(pipeline: Pipeline, organizationId: string): Promise { + const updated = await logPipelineAPI.update(pipeline.id, organizationId, { + enabled: !pipeline.enabled, + }); + update((s) => ({ + ...s, + pipelines: s.pipelines.map((p) => (p.id === updated.id ? updated : p)), + })); + return updated; + }, + + /** + * Reset store + */ + reset() { + set(initialState); + }, + }; +} + +export const pipelineStore = createPipelineStore(); + +// ============================================================================ +// DERIVED STORES +// ============================================================================ + +/** + * Only enabled pipelines + */ +export const enabledPipelines = derived(pipelineStore, ($store) => + $store.pipelines.filter((p) => p.enabled) +); + +/** + * Pipeline count + */ +export const pipelineCount = derived(pipelineStore, ($store) => $store.pipelines.length); + +/** + * Enabled pipeline count + */ +export const enabledPipelineCount = derived(enabledPipelines, ($enabled) => $enabled.length); From 41c62d6f9b4422b8b6d22708a76d661d7cc76bd0 Mon Sep 17 00:00:00 2001 From: Polliog Date: Fri, 20 Mar 2026 22:15:28 +0100 Subject: [PATCH 10/22] add pipeline settings pages and components --- .../pipelines/PipelinePreview.svelte | 122 ++++++++++ .../components/pipelines/StepBuilder.svelte | 125 ++++++++++ .../components/pipelines/StepConfig.svelte | 81 +++++++ .../routes/dashboard/settings/+layout.svelte | 2 + .../dashboard/settings/pipelines/+page.svelte | 205 +++++++++++++++++ .../settings/pipelines/[id]/+page.svelte | 188 +++++++++++++++ .../settings/pipelines/new/+page.svelte | 217 ++++++++++++++++++ 7 files changed, 940 insertions(+) create mode 100644 packages/frontend/src/lib/components/pipelines/PipelinePreview.svelte create mode 100644 packages/frontend/src/lib/components/pipelines/StepBuilder.svelte create mode 100644 packages/frontend/src/lib/components/pipelines/StepConfig.svelte create mode 100644 packages/frontend/src/routes/dashboard/settings/pipelines/+page.svelte create mode 100644 packages/frontend/src/routes/dashboard/settings/pipelines/[id]/+page.svelte create mode 100644 packages/frontend/src/routes/dashboard/settings/pipelines/new/+page.svelte diff --git a/packages/frontend/src/lib/components/pipelines/PipelinePreview.svelte b/packages/frontend/src/lib/components/pipelines/PipelinePreview.svelte new file mode 100644 index 0000000..a30160c --- /dev/null +++ b/packages/frontend/src/lib/components/pipelines/PipelinePreview.svelte @@ -0,0 +1,122 @@ + + + + + + + Preview + + Test your pipeline steps against a sample log message + + +
+ +