diff --git a/CHANGELOG.md b/CHANGELOG.md index ccda9aa9..66fcd617 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,22 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.9.0] - unreleased + +### Added +- **Log parsing and enrichment pipelines**: define multi-step processing rules that automatically parse and enrich incoming log messages before they are stored + - **5 built-in parsers**: nginx (combined log format), apache (identical to nginx), syslog (RFC 3164 and RFC 5424), logfmt, and JSON message body + - **Custom grok patterns**: `%{PATTERN:field}` and `%{PATTERN:field:type}` syntax with 22 built-in patterns (IPV4, WORD, NOTSPACE, NUMBER, POSINT, DATA, GREEDYDATA, QUOTEDSTRING, METHOD, URIPATH, HTTPDATE, etc.) and optional type coercion (`:int`, `:float`) + - **GeoIP enrichment**: extract country, city, coordinates, timezone, and ISP data from any IP field using the embedded MaxMind GeoLite2 database + - **Async processing via BullMQ**: pipelines run as background jobs after ingestion — zero impact on ingestion latency + - **Project-scoped vs org-wide**: pipelines can target a specific project or apply to all projects in the organization; project-specific pipelines take priority over org-wide ones + - **Pipeline preview**: test any combination of steps against a sample log message and inspect per-step extracted fields and the final merged result before saving + - **YAML import/export**: import pipeline definitions from YAML with `name`, `description`, `enabled`, and `steps` fields; upserts (replace existing pipeline for the same scope) + - **In-memory cache**: `getForProject` caches the resolved pipeline per project for 5 minutes, automatically invalidated on create/update/delete + - **Settings UI** (`/dashboard/settings/pipelines`): list, enable/disable toggle, create, edit, and delete pipelines with live org-switch reactivity (`$effect` instead of `onMount`) + - **Step builder**: interactive UI for adding, reordering, and configuring parser, grok, and geoip steps with per-type configuration forms + - **Pipeline edit page** redirects to the list when the active organization is switched, preventing stale-ID errors + ## [0.8.4] - 2026-03-19 ### Added diff --git a/package.json b/package.json index 35c9c333..33ee638f 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,8 @@ "dev:frontend": "pnpm --filter \"@logtide/frontend\" dev", "dev:worker": "pnpm --filter \"@logtide/backend\" dev:worker", "dev:all": "concurrently -n fe,be,wk -c blue,green,yellow \"pnpm dev:frontend\" \"pnpm dev:backend\" \"pnpm dev:worker\"", + "infra:up": "docker compose -f docker/docker-compose.dev.yml up -d", + "infra:down": "docker compose -f docker/docker-compose.dev.yml down", "build": "pnpm --recursive --filter \"./packages/**\" build", "build:shared": "pnpm --filter \"@logtide/shared\" build", "test": "pnpm --recursive --filter \"./packages/**\" test", diff --git a/packages/backend/migrations/033_log_pipelines.sql b/packages/backend/migrations/033_log_pipelines.sql new file mode 100644 index 00000000..67ece42a --- /dev/null +++ b/packages/backend/migrations/033_log_pipelines.sql @@ -0,0 +1,28 @@ +CREATE TABLE IF NOT EXISTS log_pipelines ( + id UUID NOT NULL DEFAULT gen_random_uuid(), + organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE, + project_id UUID REFERENCES projects(id) ON DELETE CASCADE, + name VARCHAR(200) NOT NULL, + description TEXT, + enabled BOOLEAN NOT NULL DEFAULT TRUE, + steps JSONB NOT NULL DEFAULT '[]'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (id) +); + +CREATE INDEX IF NOT EXISTS idx_log_pipelines_org + ON log_pipelines(organization_id); + +CREATE INDEX IF NOT EXISTS idx_log_pipelines_project + ON log_pipelines(project_id) + WHERE project_id IS NOT NULL; + +-- Only one pipeline per project (or one org-wide default when project_id IS NULL) +CREATE UNIQUE INDEX IF NOT EXISTS idx_log_pipelines_org_null_project + ON log_pipelines(organization_id) + WHERE project_id IS NULL; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_log_pipelines_org_project + ON log_pipelines(organization_id, project_id) + WHERE project_id IS NOT NULL; diff --git a/packages/backend/migrations/034_service_health_monitoring.sql b/packages/backend/migrations/034_service_health_monitoring.sql new file mode 100644 index 00000000..e4c314c0 --- /dev/null +++ b/packages/backend/migrations/034_service_health_monitoring.sql @@ -0,0 +1,158 @@ +-- ============================================================================ +-- Migration 034: Service Health Monitoring +-- ============================================================================ + +-- 1. Add slug to projects +ALTER TABLE projects ADD COLUMN IF NOT EXISTS slug VARCHAR(255); + +-- Generate slugs for existing projects (handles duplicate base slugs per org) +WITH ranked AS ( + SELECT + id, + organization_id, + BTRIM(LOWER(REGEXP_REPLACE(TRIM(name), '[^a-zA-Z0-9]+', '-', 'g')), '-') AS base_slug, + ROW_NUMBER() OVER ( + PARTITION BY + organization_id, + BTRIM(LOWER(REGEXP_REPLACE(TRIM(name), '[^a-zA-Z0-9]+', '-', 'g')), '-') + ORDER BY created_at + ) AS rn + FROM projects +) +UPDATE projects p +SET slug = CASE + WHEN r.rn = 1 THEN r.base_slug + ELSE r.base_slug || '-' || r.rn::text +END +FROM ranked r +WHERE p.id = r.id; + +-- Fallback for names that produce empty slugs (all special chars) +UPDATE projects +SET slug = 'project-' || SUBSTRING(id::text, 1, 8) +WHERE slug IS NULL OR slug = '' OR slug = '-'; + +ALTER TABLE projects ALTER COLUMN slug SET NOT NULL; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_projects_org_slug ON projects (organization_id, slug); + +-- ============================================================================ +-- 2. Monitors table +-- ============================================================================ + +CREATE TABLE IF NOT EXISTS monitors ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + organization_id UUID NOT NULL REFERENCES organizations(id) ON DELETE CASCADE, + project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE, + name VARCHAR(255) NOT NULL, + type VARCHAR(20) NOT NULL CHECK (type IN ('http', 'tcp', 'heartbeat')), + -- target: URL for HTTP, host:port for TCP, null for heartbeat + target TEXT, + interval_seconds INTEGER NOT NULL DEFAULT 60 CHECK (interval_seconds >= 30), + timeout_seconds INTEGER NOT NULL DEFAULT 10 CHECK (timeout_seconds >= 1 AND timeout_seconds <= 60), + failure_threshold INTEGER NOT NULL DEFAULT 2 CHECK (failure_threshold >= 1), + auto_resolve BOOLEAN NOT NULL DEFAULT true, + enabled BOOLEAN NOT NULL DEFAULT true, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_monitors_project ON monitors (project_id); +CREATE INDEX IF NOT EXISTS idx_monitors_org ON monitors (organization_id); +CREATE INDEX IF NOT EXISTS idx_monitors_enabled ON monitors (enabled) WHERE enabled = true; + +-- ============================================================================ +-- 3. Monitor status table (current state, one row per monitor) +-- ============================================================================ + +CREATE TABLE IF NOT EXISTS monitor_status ( + monitor_id UUID PRIMARY KEY REFERENCES monitors(id) ON DELETE CASCADE, + status VARCHAR(20) NOT NULL DEFAULT 'unknown' CHECK (status IN ('up', 'down', 'unknown')), + consecutive_failures INTEGER NOT NULL DEFAULT 0, + consecutive_successes INTEGER NOT NULL DEFAULT 0, + last_checked_at TIMESTAMPTZ, + last_status_change_at TIMESTAMPTZ, + response_time_ms INTEGER, + last_error_code VARCHAR(50), + incident_id UUID REFERENCES incidents(id) ON DELETE SET NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- ============================================================================ +-- 4. Monitor results hypertable (time-series of all check results) +-- ============================================================================ +-- Note: No FK on monitor_id for hypertable performance + +CREATE TABLE IF NOT EXISTS monitor_results ( + time TIMESTAMPTZ NOT NULL, + id UUID NOT NULL DEFAULT gen_random_uuid(), + monitor_id UUID NOT NULL, + organization_id UUID NOT NULL, + project_id UUID NOT NULL, + status VARCHAR(20) NOT NULL CHECK (status IN ('up', 'down')), + response_time_ms INTEGER, + status_code INTEGER, + -- sanitized error code (never raw OS/network error messages) + error_code VARCHAR(50), + -- true when written by POST /monitors/:id/heartbeat, false for worker-initiated checks + is_heartbeat BOOLEAN NOT NULL DEFAULT false, + PRIMARY KEY (time, id) +); + +SELECT create_hypertable('monitor_results', 'time', if_not_exists => TRUE); + +CREATE INDEX IF NOT EXISTS idx_monitor_results_monitor ON monitor_results (monitor_id, time DESC); +CREATE INDEX IF NOT EXISTS idx_monitor_results_org ON monitor_results (organization_id, time DESC); + +ALTER TABLE monitor_results SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'monitor_id', + timescaledb.compress_orderby = 'time DESC' +); + +SELECT add_compression_policy('monitor_results', INTERVAL '7 days', if_not_exists => TRUE); +SELECT add_retention_policy('monitor_results', INTERVAL '30 days', if_not_exists => TRUE); + +-- ============================================================================ +-- 5. Continuous aggregate: daily uptime percentage per monitor +-- ============================================================================ + +CREATE MATERIALIZED VIEW IF NOT EXISTS monitor_uptime_daily +WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS +SELECT + time_bucket('1 day', time) AS bucket, + monitor_id, + organization_id, + project_id, + COUNT(*) AS total_checks, + COUNT(*) FILTER (WHERE status = 'up') AS successful_checks, + ROUND( + 100.0 * COUNT(*) FILTER (WHERE status = 'up') / NULLIF(COUNT(*), 0), + 2 + ) AS uptime_pct +FROM monitor_results +GROUP BY bucket, monitor_id, organization_id, project_id +WITH NO DATA; + +SELECT add_continuous_aggregate_policy('monitor_uptime_daily', + start_offset => INTERVAL '3 days', + end_offset => INTERVAL '1 minute', + schedule_interval => INTERVAL '1 hour', + if_not_exists => TRUE +); + +-- ============================================================================ +-- 6. Extend incidents table with source tracking +-- ============================================================================ + +ALTER TABLE incidents + ADD COLUMN IF NOT EXISTS source VARCHAR(50) NOT NULL DEFAULT 'sigma', + ADD COLUMN IF NOT EXISTS monitor_id UUID REFERENCES monitors(id) ON DELETE SET NULL; + +ALTER TABLE incidents + ADD CONSTRAINT incidents_source_check + CHECK (source IN ('sigma', 'monitor', 'manual')) + NOT VALID; + +CREATE INDEX IF NOT EXISTS idx_incidents_source ON incidents (source); +CREATE INDEX IF NOT EXISTS idx_incidents_monitor ON incidents (monitor_id) WHERE monitor_id IS NOT NULL; diff --git a/packages/backend/migrations/035_monitoring_enhancements.sql b/packages/backend/migrations/035_monitoring_enhancements.sql new file mode 100644 index 00000000..0a34bad5 --- /dev/null +++ b/packages/backend/migrations/035_monitoring_enhancements.sql @@ -0,0 +1,15 @@ +-- ============================================================================ +-- Migration 035: Monitoring enhancements +-- - Add http_config JSONB and severity to monitors +-- - Add status_page_public to projects +-- ============================================================================ + +-- 1. Add HTTP config column to monitors (for method, expectedStatus, headers, bodyAssertion) +ALTER TABLE monitors ADD COLUMN IF NOT EXISTS http_config JSONB; + +-- 2. Add per-monitor incident severity (default 'high' matches previous hardcoded behavior) +ALTER TABLE monitors ADD COLUMN IF NOT EXISTS severity VARCHAR(20) NOT NULL DEFAULT 'high' + CHECK (severity IN ('critical', 'high', 'medium', 'low', 'informational')); + +-- 3. Add status page visibility to projects (default false = private) +ALTER TABLE projects ADD COLUMN IF NOT EXISTS status_page_public BOOLEAN NOT NULL DEFAULT false; diff --git a/packages/backend/src/database/migrator.ts b/packages/backend/src/database/migrator.ts index 91ae84bd..83dfd222 100644 --- a/packages/backend/src/database/migrator.ts +++ b/packages/backend/src/database/migrator.ts @@ -66,7 +66,7 @@ export async function migrateToLatest() { if (error) { console.error('Migration failed'); console.error(error); - process.exit(1); + throw error; } console.log('All migrations completed'); diff --git a/packages/backend/src/database/types.ts b/packages/backend/src/database/types.ts index 198ed789..b66e9505 100644 --- a/packages/backend/src/database/types.ts +++ b/packages/backend/src/database/types.ts @@ -118,7 +118,9 @@ export interface ProjectsTable { organization_id: string; user_id: string; // Keep for tracking who created the project name: string; + slug: string; description: string | null; + status_page_public: Generated; created_at: Generated; updated_at: Generated; } @@ -420,11 +422,81 @@ export interface IncidentsTable { mitre_techniques: string[] | null; ip_reputation: ColumnType | null, Record | null, Record | null>; geo_data: ColumnType | null, Record | null, Record | null>; + source: Generated; + monitor_id: string | null; created_at: Generated; updated_at: Generated; resolved_at: Timestamp | null; } +// ============================================================================ +// SERVICE HEALTH MONITORING TABLES +// ============================================================================ + +export type MonitorType = 'http' | 'tcp' | 'heartbeat'; +export type MonitorStatusValue = 'up' | 'down' | 'unknown'; + +export interface MonitorHttpConfig { + method?: string; + expectedStatus?: number; + headers?: Record; + bodyAssertion?: { type: 'contains'; value: string } | { type: 'regex'; pattern: string }; +} + +export interface MonitorsTable { + id: Generated; + organization_id: string; + project_id: string; + name: string; + type: MonitorType; + target: string | null; + interval_seconds: Generated; + timeout_seconds: Generated; + failure_threshold: Generated; + auto_resolve: Generated; + enabled: Generated; + http_config: MonitorHttpConfig | null; + severity: Generated; + created_at: Generated; + updated_at: Generated; +} + +export interface MonitorStatusTable { + monitor_id: string; + status: Generated; + consecutive_failures: Generated; + consecutive_successes: Generated; + last_checked_at: Timestamp | null; + last_status_change_at: Timestamp | null; + response_time_ms: number | null; + last_error_code: string | null; + incident_id: string | null; + updated_at: Generated; +} + +export interface MonitorResultsTable { + time: Timestamp; + id: Generated; + monitor_id: string; + organization_id: string; + project_id: string; + status: 'up' | 'down'; + response_time_ms: number | null; + status_code: number | null; + error_code: string | null; + is_heartbeat: Generated; +} + +export interface MonitorUptimeDailyTable { + bucket: Timestamp; + monitor_id: string; + organization_id: string; + project_id: string; + total_checks: number; + successful_checks: number; + uptime_pct: number | null; +} + export interface IncidentAlertsTable { id: Generated; incident_id: string; @@ -834,6 +906,26 @@ export interface MetricExemplarsTable { attributes: ColumnType | null, Record | null, Record | null>; } +// ============================================================================ +// LOG PIPELINES TABLE +// ============================================================================ + +export interface LogPipelinesTable { + id: Generated; + organization_id: string; + project_id: string | null; + name: string; + description: string | null; + enabled: Generated; + steps: ColumnType< + Record[], + Record[], + Record[] + >; + created_at: Generated; + updated_at: Generated; +} + export interface Database { logs: LogsTable; users: UsersTable; @@ -898,4 +990,11 @@ export interface Database { // Metrics (OTLP) metrics: MetricsTable; metric_exemplars: MetricExemplarsTable; + // Log pipelines + log_pipelines: LogPipelinesTable; + // Service health monitoring + monitors: MonitorsTable; + monitor_status: MonitorStatusTable; + monitor_results: MonitorResultsTable; + monitor_uptime_daily: MonitorUptimeDailyTable; } diff --git a/packages/backend/src/modules/auth/plugin.ts b/packages/backend/src/modules/auth/plugin.ts index 137f36af..5c587e02 100644 --- a/packages/backend/src/modules/auth/plugin.ts +++ b/packages/backend/src/modules/auth/plugin.ts @@ -83,7 +83,8 @@ const authPlugin: FastifyPluginAsync = async (fastify) => { request.url.startsWith('/api/v1/projects') || request.url.startsWith('/api/v1/alerts') || request.url.startsWith('/api/v1/notifications') || - request.url.startsWith('/api/v1/invitations') + request.url.startsWith('/api/v1/invitations') || + request.url.startsWith('/api/v1/status') ) { return; } diff --git a/packages/backend/src/modules/ingestion/service.ts b/packages/backend/src/modules/ingestion/service.ts index 005a11e6..d3bec814 100644 --- a/packages/backend/src/modules/ingestion/service.ts +++ b/packages/backend/src/modules/ingestion/service.ts @@ -134,6 +134,13 @@ export class IngestionService { console.error('[Ingestion] Failed to trigger Exception parsing:', err); }); + // Trigger pipeline processing (async, non-blocking) + if (organizationId) { + this.triggerPipelineProcessing(logs, insertedLogs, projectId, organizationId).catch((err) => { + console.error('[Ingestion] Failed to trigger pipeline processing:', err); + }); + } + // Invalidate query caches for this project (async, non-blocking) CacheManager.invalidateProjectQueries(projectId).catch((err) => { console.error('[Ingestion] Failed to invalidate cache:', err); @@ -283,6 +290,36 @@ export class IngestionService { } } + /** + * Trigger log pipeline processing for ingested logs + */ + private async triggerPipelineProcessing( + logs: LogInput[], + insertedLogs: any[], + projectId: string, + organizationId: string + ): Promise { + try { + const payload = logs.map((log: LogInput, i: number) => ({ + id: insertedLogs[i]?.id ?? '', + time: + insertedLogs[i]?.time instanceof Date + ? insertedLogs[i].time.toISOString() + : String(insertedLogs[i]?.time ?? new Date().toISOString()), + message: log.message, + metadata: (log.metadata as Record | null | undefined) ?? null, + })); + + const pipelineQueue = createQueue('log-pipeline'); + await pipelineQueue.add('process-pipeline', { logs: payload, projectId, organizationId }); + + console.log(`[Ingestion] Queued pipeline processing for ${logs.length} logs`); + } catch (error) { + console.error('[Ingestion] Error queuing pipeline job:', error); + // Don't throw - ingestion should succeed even if pipeline queueing fails + } + } + /** * Get log statistics (reservoir: works with any engine) */ diff --git a/packages/backend/src/modules/log-pipeline/index.ts b/packages/backend/src/modules/log-pipeline/index.ts new file mode 100644 index 00000000..8e4c16d9 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/index.ts @@ -0,0 +1,2 @@ +export { pipelineRoutes } from './routes.js'; +export { pipelineService } from './service.js'; diff --git a/packages/backend/src/modules/log-pipeline/parsers/apache.ts b/packages/backend/src/modules/log-pipeline/parsers/apache.ts new file mode 100644 index 00000000..cef31c1d --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/apache.ts @@ -0,0 +1,2 @@ +// Apache combined log format is identical to nginx combined format +export { parseNginx as parseApache } from './nginx.js'; diff --git a/packages/backend/src/modules/log-pipeline/parsers/grok-engine.ts b/packages/backend/src/modules/log-pipeline/parsers/grok-engine.ts new file mode 100644 index 00000000..2938c6bd --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/grok-engine.ts @@ -0,0 +1,89 @@ +const IPV4 = '(?:(?:25[0-5]|2[0-4]\\d|[01]?\\d\\d?)\\.){3}(?:25[0-5]|2[0-4]\\d|[01]?\\d\\d?)'; +const IPV6 = '(?:[0-9A-Fa-f]{1,4}:){7}[0-9A-Fa-f]{1,4}|::1|::'; + +export const BUILTIN_PATTERNS: Record = { + INT: '[+-]?(?:0|[1-9]\\d*)', + POSINT: '[1-9]\\d*|0', + NUMBER: '[+-]?(?:\\d+(?:\\.\\d*)?|\\.\\d+)(?:[eE][+-]?\\d+)?', + WORD: '\\w+', + NOTSPACE: '\\S+', + SPACE: '\\s+', + DATA: '[\\s\\S]*?', + GREEDYDATA: '[\\s\\S]*', + IPV4, + IPV6, + IP: `(?:${IPV6}|${IPV4})`, + HOSTNAME: '[a-zA-Z0-9][a-zA-Z0-9\\-\\.]*', + URIPATH: '/[^\\s?#]*', + URIPARAM: '\\?[^\\s#]*', + URI: '[a-zA-Z][a-zA-Z0-9+\\-.]*://\\S+', + QUOTEDSTRING: '"(?:[^"\\\\]|\\\\.)*"', + QS: '"(?:[^"\\\\]|\\\\.)*"', + STATUS_CODE: '[1-5]\\d{2}', + HTTPDATE: '\\d{2}/\\w+/\\d{4}:\\d{2}:\\d{2}:\\d{2} [+-]\\d{4}', + TIMESTAMP_ISO8601: '\\d{4}-\\d{2}-\\d{2}[T ]\\d{2}:\\d{2}:\\d{2}(?:\\.\\d+)?(?:Z|[+-]\\d{2}:?\\d{2})?', + USER: '[a-zA-Z0-9._-]+', + METHOD: '(?:GET|POST|PUT|DELETE|PATCH|HEAD|OPTIONS|CONNECT|TRACE)', +}; + +interface FieldMeta { + name: string; + type: 'string' | 'int' | 'float'; +} + +/** + * Compile a grok-like pattern string into a RegExp. + * Syntax: %{PATTERN_NAME:field_name} or %{PATTERN_NAME:field_name:type} + */ +export function compileGrok( + pattern: string, + customPatterns: Record = {} +): { regex: RegExp; fields: FieldMeta[] } { + const all = { ...BUILTIN_PATTERNS, ...customPatterns }; + const fields: FieldMeta[] = []; + + const re = pattern.replace( + /%\{(\w+)(?::(\w+))?(?::(int|float|string))?\}/g, + (_, patternName, fieldName, typeName) => { + const pat = all[patternName]; + if (!pat) throw new Error(`Unknown grok pattern: ${patternName}`); + + if (fieldName) { + fields.push({ name: fieldName, type: (typeName as FieldMeta['type']) || 'string' }); + return `(?<${fieldName}>${pat})`; + } + return `(?:${pat})`; + } + ); + + return { regex: new RegExp(re), fields }; +} + +/** + * Match a grok pattern against input. Returns extracted fields or null. + */ +export function matchGrok( + pattern: string, + input: string, + customPatterns: Record = {} +): Record | null { + const { regex, fields } = compileGrok(pattern, customPatterns); + const m = regex.exec(input); + if (!m || !m.groups) return null; + + const result: Record = {}; + for (const field of fields) { + const raw = m.groups[field.name]; + if (raw === undefined) continue; + + if (field.type === 'int') { + result[field.name] = parseInt(raw, 10); + } else if (field.type === 'float') { + result[field.name] = parseFloat(raw); + } else { + result[field.name] = raw; + } + } + + return result; +} diff --git a/packages/backend/src/modules/log-pipeline/parsers/index.ts b/packages/backend/src/modules/log-pipeline/parsers/index.ts new file mode 100644 index 00000000..1683ee9f --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/index.ts @@ -0,0 +1,20 @@ +import type { ParserStep } from '../types.js'; +import { parseNginx } from './nginx.js'; +import { parseApache } from './apache.js'; +import { parseSyslog } from './syslog.js'; +import { parseLogfmt } from './logfmt.js'; +import { parseJsonMessage } from './json-message.js'; + +export function runBuiltinParser( + step: ParserStep, + message: string +): Record | null { + switch (step.parser) { + case 'nginx': return parseNginx(message); + case 'apache': return parseApache(message); + case 'syslog': return parseSyslog(message); + case 'logfmt': return parseLogfmt(message); + case 'json': return parseJsonMessage(message); + default: return null; + } +} diff --git a/packages/backend/src/modules/log-pipeline/parsers/json-message.ts b/packages/backend/src/modules/log-pipeline/parsers/json-message.ts new file mode 100644 index 00000000..aef3ac6c --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/json-message.ts @@ -0,0 +1,11 @@ +export function parseJsonMessage(message: string): Record | null { + if (!message || !message.trimStart().startsWith('{')) return null; + + try { + const parsed = JSON.parse(message); + if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) return null; + return parsed as Record; + } catch { + return null; + } +} diff --git a/packages/backend/src/modules/log-pipeline/parsers/logfmt.ts b/packages/backend/src/modules/log-pipeline/parsers/logfmt.ts new file mode 100644 index 00000000..85b7038f --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/logfmt.ts @@ -0,0 +1,20 @@ +export function parseLogfmt(message: string): Record | null { + if (!message) return null; + + const result: Record = {}; + // Match key=value or key="quoted value" + const regex = /(\w+)=(?:"((?:[^"\\]|\\.)*)"|(\S+))/g; + let match: RegExpExecArray | null; + let count = 0; + + while ((match = regex.exec(message)) !== null) { + const [, key, quoted, unquoted] = match; + result[key] = quoted !== undefined ? quoted : unquoted; + count++; + } + + // Require at least 2 pairs to avoid false positives on messages with URLs etc. + if (count < 2) return null; + + return result; +} diff --git a/packages/backend/src/modules/log-pipeline/parsers/nginx.ts b/packages/backend/src/modules/log-pipeline/parsers/nginx.ts new file mode 100644 index 00000000..6bf5b4d4 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/nginx.ts @@ -0,0 +1,26 @@ +// Standard nginx combined log format: +// $remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" +const NGINX_REGEX = /^(\S+) \S+ (\S+) \[([^\]]+)\] "(\S+) ([^"]*?) (HTTP\/[\d.]+)" (\d{3}) (\d+) "([^"]*)" "([^"]*)"/; + +export function parseNginx(message: string): Record | null { + const m = NGINX_REGEX.exec(message); + if (!m) return null; + + const [, clientIp, remoteUser, timeLocal, method, rawPath, httpVersion, status, bytes, referer, userAgent] = m; + + const [path, query] = rawPath.split('?'); + + return { + client_ip: clientIp, + remote_user: remoteUser, + timestamp: timeLocal, + http_method: method, + http_path: path, + ...(query ? { http_query: query } : {}), + http_version: httpVersion.replace('HTTP/', ''), + http_status: parseInt(status, 10), + response_bytes: parseInt(bytes, 10), + http_referer: referer, + user_agent: userAgent, + }; +} diff --git a/packages/backend/src/modules/log-pipeline/parsers/syslog.ts b/packages/backend/src/modules/log-pipeline/parsers/syslog.ts new file mode 100644 index 00000000..18566dca --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/parsers/syslog.ts @@ -0,0 +1,35 @@ +const RFC3164 = /^(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(\S+)\s+(\S+?)(?:\[(\d+)\])?:\s+(.*)/; +const RFC5424 = /^<(\d+)>(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(.*)/; + +export function parseSyslog(message: string): Record | null { + // Try RFC 5424 first (has version number after priority) + const m5424 = RFC5424.exec(message); + if (m5424) { + const [, priority, , timestamp, hostname, appname, pid, , , msg] = m5424; + return { + priority: parseInt(priority, 10), + facility: Math.floor(parseInt(priority, 10) / 8), + severity: parseInt(priority, 10) % 8, + timestamp, + hostname: hostname === '-' ? undefined : hostname, + appname: appname === '-' ? undefined : appname, + pid: pid !== '-' ? parseInt(pid, 10) : undefined, + syslog_message: msg, + }; + } + + // Try RFC 3164 + const m3164 = RFC3164.exec(message); + if (m3164) { + const [, timestamp, hostname, appname, pid, msg] = m3164; + return { + timestamp, + hostname, + appname, + ...(pid ? { pid: parseInt(pid, 10) } : {}), + syslog_message: msg, + }; + } + + return null; +} diff --git a/packages/backend/src/modules/log-pipeline/pipeline-executor.ts b/packages/backend/src/modules/log-pipeline/pipeline-executor.ts new file mode 100644 index 00000000..51710c41 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/pipeline-executor.ts @@ -0,0 +1,53 @@ +import type { LogForPipeline, PipelineStep, ExecutorResult, StepResult, ExtractedFields } from './types.js'; +import { runBuiltinParser } from './parsers/index.js'; +import { matchGrok } from './parsers/grok-engine.js'; +import { runGeoIpStep } from './steps/geoip.js'; + +export class PipelineExecutor { + static async execute(log: LogForPipeline, steps: PipelineStep[]): Promise { + const stepResults: StepResult[] = []; + // Accumulated extracted fields — earlier steps take priority (no overwrite) + const merged: ExtractedFields = {}; + + // Build a running metadata view that includes extracted fields so later steps can use them + let currentMeta: Record = { ...(log.metadata ?? {}) }; + + for (const step of steps) { + const stepResult: StepResult = { step, extracted: {} }; + + try { + let extracted: Record | null = null; + + if (step.type === 'parser') { + extracted = runBuiltinParser(step, log.message); + } else if (step.type === 'grok') { + const source = step.source + ? (currentMeta[step.source] as string | undefined) ?? log.message + : log.message; + extracted = matchGrok(step.pattern, source); + } else if (step.type === 'geoip') { + // Pass log with accumulated metadata so geoip can read previously extracted IP + extracted = await runGeoIpStep(step, { ...log, metadata: currentMeta }); + } + + if (extracted) { + stepResult.extracted = extracted; + // Merge into accumulated: don't overwrite keys set by earlier steps + for (const [k, v] of Object.entries(extracted)) { + if (!(k in merged)) { + merged[k] = v; + } + } + // Update running metadata view + currentMeta = { ...extracted, ...currentMeta }; + } + } catch (err) { + stepResult.error = err instanceof Error ? err.message : String(err); + } + + stepResults.push(stepResult); + } + + return { steps: stepResults, merged }; + } +} diff --git a/packages/backend/src/modules/log-pipeline/routes.ts b/packages/backend/src/modules/log-pipeline/routes.ts new file mode 100644 index 00000000..5594d4b2 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/routes.ts @@ -0,0 +1,148 @@ +import type { FastifyInstance } from 'fastify'; +import { z } from 'zod'; +import { authenticate } from '../auth/middleware.js'; +import { OrganizationsService } from '../organizations/service.js'; +import { pipelineService } from './service.js'; +import { PipelineExecutor } from './pipeline-executor.js'; + +const organizationsService = new OrganizationsService(); + +const stepSchema = z.discriminatedUnion('type', [ + z.object({ + type: z.literal('parser'), + parser: z.enum(['nginx', 'apache', 'syslog', 'logfmt', 'json']), + }), + z.object({ + type: z.literal('grok'), + pattern: z.string().min(1), + source: z.string().optional(), + }), + z.object({ + type: z.literal('geoip'), + field: z.string().min(1), + target: z.string().min(1), + }), +]); + +const createSchema = z.object({ + organizationId: z.string().uuid(), + projectId: z.string().uuid().optional().nullable(), + name: z.string().min(1).max(200), + description: z.string().optional(), + enabled: z.boolean().optional(), + steps: z.array(stepSchema), +}); + +const updateSchema = z.object({ + name: z.string().min(1).max(200).optional(), + description: z.string().optional(), + enabled: z.boolean().optional(), + steps: z.array(stepSchema).optional(), +}); + +const previewSchema = z.object({ + organizationId: z.string().uuid(), + steps: z.array(stepSchema), + message: z.string(), + metadata: z.record(z.unknown()).optional(), +}); + +const importYamlSchema = z.object({ + organizationId: z.string().uuid(), + projectId: z.string().uuid().optional().nullable(), + yaml: z.string().min(1), +}); + +async function checkMembership(userId: string, orgId: string): Promise { + const orgs = await organizationsService.getUserOrganizations(userId); + return orgs.some((o) => o.id === orgId); +} + +export async function pipelineRoutes(fastify: FastifyInstance) { + fastify.addHook('onRequest', authenticate); + + // List pipelines for org + fastify.get('/', async (request: any, reply) => { + const orgId = (request.query as any).organizationId as string; + if (!orgId) return reply.status(400).send({ error: 'organizationId required' }); + if (!await checkMembership(request.user.id, orgId)) return reply.status(403).send({ error: 'Forbidden' }); + const pipelines = await pipelineService.listForOrg(orgId); + return reply.send({ pipelines }); + }); + + // Preview (before /:id routes to avoid conflict) + fastify.post('/preview', async (request: any, reply) => { + try { + const body = previewSchema.parse(request.body); + if (!await checkMembership(request.user.id, body.organizationId)) return reply.status(403).send({ error: 'Forbidden' }); + const logEntry = { id: '', time: new Date(), message: body.message, metadata: body.metadata ?? null }; + const result = await PipelineExecutor.execute(logEntry, body.steps as any); + return reply.send(result); + } catch (e) { + if (e instanceof z.ZodError) return reply.status(400).send({ error: 'Validation error', details: e.errors }); + throw e; + } + }); + + // YAML import + fastify.post('/import-yaml', async (request: any, reply) => { + try { + const body = importYamlSchema.parse(request.body); + if (!await checkMembership(request.user.id, body.organizationId)) return reply.status(403).send({ error: 'Forbidden' }); + const pipeline = await pipelineService.importFromYaml(body.yaml, body.organizationId, body.projectId ?? null); + return reply.status(201).send({ pipeline }); + } catch (e) { + if (e instanceof z.ZodError) return reply.status(400).send({ error: 'Validation error', details: e.errors }); + if (e instanceof Error) return reply.status(400).send({ error: e.message }); + throw e; + } + }); + + // Create pipeline + fastify.post('/', async (request: any, reply) => { + try { + const body = createSchema.parse(request.body); + if (!await checkMembership(request.user.id, body.organizationId)) return reply.status(403).send({ error: 'Forbidden' }); + const pipeline = await pipelineService.create(body as any); + return reply.status(201).send({ pipeline }); + } catch (e) { + if (e instanceof z.ZodError) return reply.status(400).send({ error: 'Validation error', details: e.errors }); + if (e instanceof Error && e.message.includes('unique')) return reply.status(409).send({ error: 'A pipeline already exists for this project/org scope.' }); + throw e; + } + }); + + // Get pipeline by ID + fastify.get('/:id', async (request: any, reply) => { + const orgId = (request.query as any).organizationId as string; + if (!orgId) return reply.status(400).send({ error: 'organizationId required' }); + if (!await checkMembership(request.user.id, orgId)) return reply.status(403).send({ error: 'Forbidden' }); + const pipeline = await pipelineService.getById((request.params as any).id, orgId); + if (!pipeline) return reply.status(404).send({ error: 'Not found' }); + return reply.send({ pipeline }); + }); + + // Update pipeline + fastify.put('/:id', async (request: any, reply) => { + try { + const orgId = (request.query as any).organizationId as string; + if (!orgId) return reply.status(400).send({ error: 'organizationId required' }); + if (!await checkMembership(request.user.id, orgId)) return reply.status(403).send({ error: 'Forbidden' }); + const body = updateSchema.parse(request.body); + const pipeline = await pipelineService.update((request.params as any).id, orgId, body as any); + return reply.send({ pipeline }); + } catch (e) { + if (e instanceof z.ZodError) return reply.status(400).send({ error: 'Validation error', details: e.errors }); + throw e; + } + }); + + // Delete pipeline + fastify.delete('/:id', async (request: any, reply) => { + const orgId = (request.query as any).organizationId as string; + if (!orgId) return reply.status(400).send({ error: 'organizationId required' }); + if (!await checkMembership(request.user.id, orgId)) return reply.status(403).send({ error: 'Forbidden' }); + await pipelineService.delete((request.params as any).id, orgId); + return reply.status(204).send(); + }); +} diff --git a/packages/backend/src/modules/log-pipeline/service.ts b/packages/backend/src/modules/log-pipeline/service.ts new file mode 100644 index 00000000..b63c5f32 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/service.ts @@ -0,0 +1,154 @@ +import yaml from 'js-yaml'; +import { sql } from 'kysely'; +import { db } from '../../database/index.js'; +import type { Pipeline, CreatePipelineInput, UpdatePipelineInput, PipelineStep } from './types.js'; + +interface CacheEntry { pipeline: Pipeline | null; expiresAt: number } + +export class PipelineService { + private cache = new Map(); + private cacheTTL = 5 * 60 * 1000; + + private mapRow(row: Record): Pipeline { + return { + id: row.id as string, + organizationId: row.organization_id as string, + projectId: row.project_id as string | null, + name: row.name as string, + description: row.description as string | null, + enabled: row.enabled as boolean, + steps: (row.steps as PipelineStep[]) ?? [], + createdAt: row.created_at as Date, + updatedAt: row.updated_at as Date, + }; + } + + async create(input: CreatePipelineInput): Promise { + const row = await db + .insertInto('log_pipelines') + .values({ + organization_id: input.organizationId, + project_id: input.projectId ?? null, + name: input.name, + description: input.description ?? null, + enabled: input.enabled ?? true, + steps: JSON.stringify(input.steps) as unknown as Record[], + }) + .returningAll() + .executeTakeFirstOrThrow(); + this.invalidateCache(input.organizationId); + return this.mapRow(row as unknown as Record); + } + + async update(id: string, organizationId: string, input: UpdatePipelineInput): Promise { + const updates: Record = { updated_at: new Date() }; + if (input.name !== undefined) updates.name = input.name; + if (input.description !== undefined) updates.description = input.description; + if (input.enabled !== undefined) updates.enabled = input.enabled; + if (input.steps !== undefined) updates.steps = JSON.stringify(input.steps); + + const row = await db + .updateTable('log_pipelines') + .set(updates) + .where('id', '=', id) + .where('organization_id', '=', organizationId) + .returningAll() + .executeTakeFirstOrThrow(); + this.invalidateCache(organizationId); + return this.mapRow(row as unknown as Record); + } + + async delete(id: string, organizationId: string): Promise { + await db + .deleteFrom('log_pipelines') + .where('id', '=', id) + .where('organization_id', '=', organizationId) + .execute(); + this.invalidateCache(organizationId); + } + + async listForOrg(organizationId: string): Promise { + const rows = await db + .selectFrom('log_pipelines') + .selectAll() + .where('organization_id', '=', organizationId) + .orderBy('created_at', 'asc') + .execute(); + return rows.map((r) => this.mapRow(r as unknown as Record)); + } + + async getById(id: string, organizationId: string): Promise { + const row = await db + .selectFrom('log_pipelines') + .selectAll() + .where('id', '=', id) + .where('organization_id', '=', organizationId) + .executeTakeFirst(); + return row ? this.mapRow(row as unknown as Record) : null; + } + + /** Used by the BullMQ job — cached. Project pipeline takes priority over org-wide. */ + async getForProject(projectId: string, organizationId: string): Promise { + const cacheKey = `${organizationId}:${projectId}`; + const cached = this.cache.get(cacheKey); + if (cached && Date.now() < cached.expiresAt) return cached.pipeline; + + // Project-specific first, then org-wide fallback + const row = await db + .selectFrom('log_pipelines') + .selectAll() + .where('organization_id', '=', organizationId) + .where('enabled', '=', true) + .where((eb) => + eb.or([ + eb('project_id', '=', projectId), + eb('project_id', 'is', null), + ]) + ) + .orderBy(sql`project_id IS NULL`, 'asc') // false (not null = project-specific) sorts before true (null = org-wide) + .executeTakeFirst(); + + const pipeline = row ? this.mapRow(row as unknown as Record) : null; + this.cache.set(cacheKey, { pipeline, expiresAt: Date.now() + this.cacheTTL }); + return pipeline; + } + + async importFromYaml(yamlText: string, organizationId: string, projectId: string | null): Promise { + let doc: unknown; + try { + doc = yaml.load(yamlText); + } catch (e: unknown) { + throw new Error(`Invalid YAML: ${e instanceof Error ? e.message : String(e)}`); + } + if (!doc || typeof doc !== 'object') throw new Error('YAML must be a mapping object'); + const docObj = doc as Record; + if (!docObj.name) throw new Error('Pipeline YAML must have a "name" field'); + if (!Array.isArray(docObj.steps)) throw new Error('Pipeline YAML must have a "steps" array'); + + // Upsert: delete existing if any, then create + await db + .deleteFrom('log_pipelines') + .where('organization_id', '=', organizationId) + .where((eb) => + projectId ? eb('project_id', '=', projectId) : eb('project_id', 'is', null) + ) + .execute(); + + return this.create({ + organizationId, + projectId, + name: docObj.name as string, + description: typeof docObj.description === 'string' ? docObj.description : undefined, + enabled: typeof docObj.enabled === 'boolean' ? docObj.enabled : true, + steps: docObj.steps as PipelineStep[], + }); + } + + invalidateCache(organizationId: string): void { + for (const key of this.cache.keys()) { + if (key.startsWith(organizationId + ':')) this.cache.delete(key); + } + } +} + +export const pipelineService = new PipelineService(); diff --git a/packages/backend/src/modules/log-pipeline/steps/geoip.ts b/packages/backend/src/modules/log-pipeline/steps/geoip.ts new file mode 100644 index 00000000..40815073 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/steps/geoip.ts @@ -0,0 +1,29 @@ +import type { GeoIpStep, LogForPipeline } from '../types.js'; + +// Lazy import to avoid crashing when GeoLite2 DB is not present +async function tryGeoLookup(ip: string): Promise | null> { + try { + const { geoLite2Service } = await import('../../siem/geolite2-service.js'); + const geo = geoLite2Service.lookup(ip); + return geo ? (geo as unknown as Record) : null; + } catch { + return null; + } +} + +export async function runGeoIpStep( + step: GeoIpStep, + log: LogForPipeline +): Promise> { + const meta = log.metadata ?? {}; + const ip = meta[step.field]; + if (typeof ip !== 'string' || !ip) return {}; + + try { + const geo = await tryGeoLookup(ip); + if (!geo) return {}; + return { [step.target]: geo }; + } catch { + return {}; + } +} diff --git a/packages/backend/src/modules/log-pipeline/types.ts b/packages/backend/src/modules/log-pipeline/types.ts new file mode 100644 index 00000000..a44fbb74 --- /dev/null +++ b/packages/backend/src/modules/log-pipeline/types.ts @@ -0,0 +1,70 @@ +export type BuiltinParser = 'nginx' | 'apache' | 'syslog' | 'logfmt' | 'json'; + +export interface ParserStep { + type: 'parser'; + parser: BuiltinParser; +} + +export interface GrokStep { + type: 'grok'; + pattern: string; + source?: string; // field to parse, default 'message' +} + +export interface GeoIpStep { + type: 'geoip'; + field: string; // metadata field containing the IP + target: string; // output metadata key (e.g. 'geo') +} + +export type PipelineStep = ParserStep | GrokStep | GeoIpStep; + +export interface Pipeline { + id: string; + organizationId: string; + projectId: string | null; + name: string; + description: string | null; + enabled: boolean; + steps: PipelineStep[]; + createdAt: Date; + updatedAt: Date; +} + +export interface CreatePipelineInput { + organizationId: string; + projectId?: string | null; + name: string; + description?: string; + enabled?: boolean; + steps: PipelineStep[]; +} + +export interface UpdatePipelineInput { + name?: string; + description?: string; + enabled?: boolean; + steps?: PipelineStep[]; +} + +/** Input to the pipeline executor — one log entry */ +export interface LogForPipeline { + id: string; + time: Date; + message: string; + metadata: Record | null; +} + +/** Output: fields to merge into the log's metadata */ +export type ExtractedFields = Record; + +export interface StepResult { + step: PipelineStep; + extracted: ExtractedFields; + error?: string; +} + +export interface ExecutorResult { + steps: StepResult[]; + merged: ExtractedFields; // union of all extracted fields +} diff --git a/packages/backend/src/modules/monitoring/checker.ts b/packages/backend/src/modules/monitoring/checker.ts new file mode 100644 index 00000000..7b77f177 --- /dev/null +++ b/packages/backend/src/modules/monitoring/checker.ts @@ -0,0 +1,159 @@ +import { createConnection } from 'net'; +import type { Kysely } from 'kysely'; +import type { Database } from '../../database/types.js'; +import type { CheckResult, HttpConfig, ErrorCode } from './types.js'; + +/** + * HTTP/HTTPS health check. + * Never surfaces raw error messages — maps all failures to sanitized error codes. + */ +export async function runHttpCheck( + target: string, + timeoutSeconds: number, + config: HttpConfig = {} +): Promise { + const { method = 'GET', expectedStatus = 200, headers = {}, bodyAssertion } = config; + const start = Date.now(); + + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutSeconds * 1000); + + try { + const response = await fetch(target, { + method, + headers: { 'User-Agent': 'LogTide-Monitor/1.0', ...headers }, + signal: controller.signal, + redirect: 'follow', + }); + + const responseTimeMs = Date.now() - start; + const statusCode = response.status; + + if (statusCode !== expectedStatus) { + return { status: 'down', responseTimeMs, statusCode, errorCode: 'http_error' }; + } + + if (bodyAssertion) { + const body = await response.text(); + let passes: boolean; + if (bodyAssertion.type === 'contains') { + passes = body.includes(bodyAssertion.value); + } else { + // Limit body size and pattern length to mitigate ReDoS + const safeBody = body.slice(0, 10000); + const safePattern = bodyAssertion.pattern.slice(0, 256); + passes = new RegExp(safePattern).test(safeBody); + } + + if (!passes) { + return { status: 'down', responseTimeMs, statusCode, errorCode: 'http_error' }; + } + } + + + return { status: 'up', responseTimeMs, statusCode, errorCode: null }; + } catch (err: unknown) { + const responseTimeMs = Date.now() - start; + const msg = err instanceof Error ? err.message : ''; + let errorCode: ErrorCode; + + if (err instanceof Error && err.name === 'AbortError') { + errorCode = 'timeout'; + } else if (msg.includes('ECONNREFUSED')) { + errorCode = 'connection_refused'; + } else if (msg.includes('ENOTFOUND') || msg.includes('EAI_')) { + errorCode = 'dns_error'; + } else if (msg.includes('SSL') || msg.includes('certificate') || msg.includes('CERT_')) { + errorCode = 'ssl_error'; + } else { + errorCode = 'unexpected'; + } + + return { status: 'down', responseTimeMs, statusCode: null, errorCode }; + } finally { + clearTimeout(timer); + } +} + +/** + * TCP connectivity check — measures time to establish connection. + */ +export function runTcpCheck( + host: string, + port: number, + timeoutSeconds: number +): Promise { + return new Promise((resolve) => { + const start = Date.now(); + const socket = createConnection({ host, port }); + + const timer = setTimeout(() => { + socket.destroy(); + resolve({ status: 'down', responseTimeMs: Date.now() - start, statusCode: null, errorCode: 'timeout' }); + }, timeoutSeconds * 1000); + + socket.on('connect', () => { + clearTimeout(timer); + socket.destroy(); + resolve({ status: 'up', responseTimeMs: Date.now() - start, statusCode: null, errorCode: null }); + }); + + socket.on('error', (err: Error) => { + clearTimeout(timer); + const msg = err.message; + let errorCode: ErrorCode = 'unexpected'; + if (msg.includes('ECONNREFUSED')) errorCode = 'connection_refused'; + else if (msg.includes('ENOTFOUND') || msg.includes('EAI_')) errorCode = 'dns_error'; + resolve({ status: 'down', responseTimeMs: Date.now() - start, statusCode: null, errorCode }); + }); + }); +} + +/** + * Heartbeat check — looks for a recent heartbeat ping in monitor_results. + * Returns 'up' if a heartbeat was received within the grace window (interval * 1.5). + */ +export async function runHeartbeatCheck( + monitorId: string, + intervalSeconds: number, + db: Kysely +): Promise { + const graceMs = intervalSeconds * 1.5 * 1000; + const since = new Date(Date.now() - graceMs); + + const recent = await db + .selectFrom('monitor_results') + .select('time') + .where('monitor_id', '=', monitorId) + .where('is_heartbeat', '=', true) + .where('status', '=', 'up') + .where('time', '>=', since) + .orderBy('time', 'desc') + .limit(1) + .executeTakeFirst(); + + if (recent) { + return { status: 'up', responseTimeMs: null, statusCode: null, errorCode: null }; + } + + return { status: 'down', responseTimeMs: null, statusCode: null, errorCode: 'no_heartbeat' }; +} + +/** + * Parse "host:port" string for TCP monitors. + * Handles IPv6 addresses like "[::1]:5432". + */ +export function parseTcpTarget(target: string): { host: string; port: number } { + // IPv6 with brackets: [::1]:5432 + const ipv6Match = target.match(/^\[(.+)\]:(\d+)$/); + if (ipv6Match) { + return { host: ipv6Match[1], port: parseInt(ipv6Match[2], 10) }; + } + // Standard host:port + const lastColon = target.lastIndexOf(':'); + if (lastColon === -1) throw new Error('TCP target must be host:port'); + return { + host: target.slice(0, lastColon), + port: parseInt(target.slice(lastColon + 1), 10), + }; +} diff --git a/packages/backend/src/modules/monitoring/index.ts b/packages/backend/src/modules/monitoring/index.ts new file mode 100644 index 00000000..3896dbd3 --- /dev/null +++ b/packages/backend/src/modules/monitoring/index.ts @@ -0,0 +1,4 @@ +export { MonitorService } from './service.js'; +export { monitorService } from './routes.js'; +export { monitoringRoutes, heartbeatRoutes, publicStatusRoutes } from './routes.js'; +export type { Monitor, CreateMonitorInput, UpdateMonitorInput, PublicStatusPage } from './types.js'; diff --git a/packages/backend/src/modules/monitoring/routes.ts b/packages/backend/src/modules/monitoring/routes.ts new file mode 100644 index 00000000..440ee12c --- /dev/null +++ b/packages/backend/src/modules/monitoring/routes.ts @@ -0,0 +1,201 @@ +import type { FastifyInstance } from 'fastify'; +import { z } from 'zod'; +import { MONITOR_TYPES } from '@logtide/shared'; +import { MonitorService } from './service.js'; +import { SiemService } from '../siem/service.js'; +import { authenticate } from '../auth/middleware.js'; +import { db } from '../../database/index.js'; + +const siemService = new SiemService(db); +export const monitorService = new MonitorService(db, siemService); + +async function checkOrgMembership(userId: string, organizationId: string): Promise { + const member = await db + .selectFrom('organization_members') + .select('id') + .where('user_id', '=', userId) + .where('organization_id', '=', organizationId) + .executeTakeFirst(); + return !!member; +} + +const httpConfigSchema = z.object({ + method: z.string().optional(), + expectedStatus: z.number().int().min(100).max(599).optional(), + headers: z.record(z.string()).optional(), + bodyAssertion: z.union([ + z.object({ type: z.literal('contains'), value: z.string().min(1).max(10000) }), + z.object({ type: z.literal('regex'), pattern: z.string().min(1).max(256) }), + ]).optional(), +}).optional().nullable(); + +const createMonitorSchema = z.object({ + organizationId: z.string().uuid(), + projectId: z.string().uuid(), + name: z.string().min(1).max(255), + type: z.enum(MONITOR_TYPES), + target: z.string().optional().nullable(), + intervalSeconds: z.number().int().min(30).max(86400).optional(), + timeoutSeconds: z.number().int().min(1).max(60).optional(), + failureThreshold: z.number().int().min(1).max(20).optional(), + autoResolve: z.boolean().optional(), + enabled: z.boolean().optional(), + httpConfig: httpConfigSchema, + severity: z.enum(['critical', 'high', 'medium', 'low', 'informational']).optional(), +}).refine( + (d) => { + if (d.type === 'http') return !!d.target && (d.target.startsWith('http://') || d.target.startsWith('https://')); + if (d.type === 'tcp') return !!d.target && d.target.includes(':'); + return true; + }, + { message: 'Invalid target for monitor type' } +); + +const updateMonitorSchema = z.object({ + name: z.string().min(1).max(255).optional(), + target: z.string().nullable().optional(), + intervalSeconds: z.number().int().min(30).max(86400).optional(), + timeoutSeconds: z.number().int().min(1).max(60).optional(), + failureThreshold: z.number().int().min(1).max(20).optional(), + autoResolve: z.boolean().optional(), + enabled: z.boolean().optional(), + httpConfig: httpConfigSchema, + severity: z.enum(['critical', 'high', 'medium', 'low', 'informational']).optional(), +}); + +// ============================================================================ +// Authenticated management routes (session required) +// ============================================================================ + +export async function monitoringRoutes(fastify: FastifyInstance) { + fastify.addHook('onRequest', authenticate); + + fastify.get('/', async (request: any, reply) => { + const { organizationId, projectId } = request.query as any; + if (!organizationId) return reply.status(400).send({ error: 'organizationId required' }); + if (!(await checkOrgMembership(request.user.id, organizationId))) return reply.status(403).send({ error: 'Forbidden' }); + + const monitors = await monitorService.listMonitors(organizationId, projectId); + return reply.send({ monitors }); + }); + + fastify.get('/:id', async (request: any, reply) => { + const { organizationId } = request.query as any; + if (!organizationId) return reply.status(400).send({ error: 'organizationId required' }); + if (!(await checkOrgMembership(request.user.id, organizationId))) return reply.status(403).send({ error: 'Forbidden' }); + + const monitor = await monitorService.getMonitor(request.params.id, organizationId); + if (!monitor) return reply.status(404).send({ error: 'Not found' }); + return reply.send({ monitor }); + }); + + fastify.post('/', async (request: any, reply) => { + const parse = createMonitorSchema.safeParse(request.body); + if (!parse.success) return reply.status(400).send({ error: parse.error.errors[0].message }); + const input = parse.data; + if (!(await checkOrgMembership(request.user.id, input.organizationId))) return reply.status(403).send({ error: 'Forbidden' }); + + const monitor = await monitorService.createMonitor(input); + return reply.status(201).send({ monitor }); + }); + + fastify.put('/:id', async (request: any, reply) => { + const { organizationId } = request.query as any; + if (!organizationId) return reply.status(400).send({ error: 'organizationId required' }); + if (!(await checkOrgMembership(request.user.id, organizationId))) return reply.status(403).send({ error: 'Forbidden' }); + + const parse = updateMonitorSchema.safeParse(request.body); + if (!parse.success) return reply.status(400).send({ error: parse.error.errors[0].message }); + + // Validate target format against monitor type if target is being changed + if (parse.data.target) { + const existing = await monitorService.getMonitor(request.params.id, organizationId); + if (!existing) return reply.status(404).send({ error: 'Not found' }); + if (existing.type === 'http' && !(parse.data.target.startsWith('http://') || parse.data.target.startsWith('https://'))) { + return reply.status(400).send({ error: 'HTTP target must start with http:// or https://' }); + } + if (existing.type === 'tcp' && !parse.data.target.includes(':')) { + return reply.status(400).send({ error: 'TCP target must be in host:port format' }); + } + } + + const monitor = await monitorService.updateMonitor(request.params.id, organizationId, parse.data); + return reply.send({ monitor }); + }); + + fastify.delete('/:id', async (request: any, reply) => { + const { organizationId } = request.query as any; + if (!organizationId) return reply.status(400).send({ error: 'organizationId required' }); + if (!(await checkOrgMembership(request.user.id, organizationId))) return reply.status(403).send({ error: 'Forbidden' }); + + await monitorService.deleteMonitor(request.params.id, organizationId); + return reply.status(204).send(); + }); + + fastify.get('/:id/results', async (request: any, reply) => { + const { organizationId, limit } = request.query as any; + if (!organizationId) return reply.status(400).send({ error: 'organizationId required' }); + if (!(await checkOrgMembership(request.user.id, organizationId))) return reply.status(403).send({ error: 'Forbidden' }); + + const results = await monitorService.getRecentResults( + request.params.id, organizationId, Math.min(Number(limit) || 50, 200) + ); + return reply.send({ results }); + }); + + fastify.get('/:id/uptime', async (request: any, reply) => { + const { organizationId, days } = request.query as any; + if (!organizationId) return reply.status(400).send({ error: 'organizationId required' }); + if (!(await checkOrgMembership(request.user.id, organizationId))) return reply.status(403).send({ error: 'Forbidden' }); + + const history = await monitorService.getUptimeHistory( + request.params.id, organizationId, Math.min(Number(days) || 90, 365) + ); + return reply.send({ history }); + }); +} + +// ============================================================================ +// Heartbeat endpoint — accepts API key auth OR session auth +// The global auth plugin already validates API keys and sets request.organizationId. +// No additional authenticate hook needed here. +// ============================================================================ + +export async function heartbeatRoutes(fastify: FastifyInstance) { + fastify.post('/:id/heartbeat', { + config: { rateLimit: { max: 600, timeWindow: '1 minute' } }, + }, async (request: any, reply) => { + const monitorId = request.params.id; + + // API key path: global auth plugin set organizationId + if (request.organizationId) { + await monitorService.recordHeartbeat(monitorId, request.organizationId); + return reply.status(204).send(); + } + + // Session path: organizationId from query + if (request.user) { + const { organizationId } = request.query as any; + if (!organizationId) return reply.status(400).send({ error: 'organizationId required' }); + if (!(await checkOrgMembership(request.user.id, organizationId))) return reply.status(403).send({ error: 'Forbidden' }); + await monitorService.recordHeartbeat(monitorId, organizationId); + return reply.status(204).send(); + } + + return reply.status(401).send({ error: 'Unauthorized' }); + }); +} + +// ============================================================================ +// Public status page — no auth, scrubbed data +// ============================================================================ + +export async function publicStatusRoutes(fastify: FastifyInstance) { + fastify.get('/project/:slug', { + config: { rateLimit: { max: 60, timeWindow: '1 minute' } }, + }, async (request: any, reply) => { + const status = await monitorService.getPublicStatus(request.params.slug); + if (!status) return reply.status(404).send({ error: 'Not found' }); + return reply.send(status); + }); +} diff --git a/packages/backend/src/modules/monitoring/service.ts b/packages/backend/src/modules/monitoring/service.ts new file mode 100644 index 00000000..bbbc983e --- /dev/null +++ b/packages/backend/src/modules/monitoring/service.ts @@ -0,0 +1,598 @@ +import type { Kysely } from 'kysely'; +import { sql } from 'kysely'; +import type { Database } from '../../database/types.js'; +import type { Severity } from '@logtide/shared'; +import type { SiemService } from '../siem/service.js'; +import type { + Monitor, + MonitorResult, + UptimeBucket, + CreateMonitorInput, + UpdateMonitorInput, + CheckResult, + HttpConfig, + PublicStatusPage, + PublicMonitorStatus, + MonitorCurrentStatus, +} from './types.js'; +import { runHttpCheck, runTcpCheck, runHeartbeatCheck, parseTcpTarget } from './checker.js'; + +const MAX_CONCURRENT_CHECKS = 20; + +// Row type returned by the monitors LEFT JOIN monitor_status query +interface MonitorWithStatusRow { + id: string; + organization_id: string; + project_id: string; + name: string; + type: string; + target: string | null; + interval_seconds: number; + timeout_seconds: number; + failure_threshold: number; + auto_resolve: boolean; + enabled: boolean; + http_config: unknown; + severity: Severity; + created_at: Date; + updated_at: Date; + // Joined from monitor_status (aliased or direct) + status?: string | null; + consecutive_failures?: number | null; + consecutive_successes?: number | null; + last_checked_at?: Date | null; + last_status_change_at?: Date | null; + ms_response_time_ms?: number | null; + last_error_code?: string | null; + incident_id?: string | null; + ms_updated_at?: Date | null; +} + +export class MonitorService { + constructor( + private db: Kysely, + private siemService: SiemService + ) {} + + // ============================================================================ + // CRUD + // ============================================================================ + + async listMonitors(organizationId: string, projectId?: string): Promise { + let query = this.db + .selectFrom('monitors') + .leftJoin('monitor_status', 'monitor_status.monitor_id', 'monitors.id') + .selectAll('monitors') + .select([ + 'monitor_status.status', + 'monitor_status.consecutive_failures', + 'monitor_status.consecutive_successes', + 'monitor_status.last_checked_at', + 'monitor_status.last_status_change_at', + 'monitor_status.response_time_ms as ms_response_time_ms', + 'monitor_status.last_error_code', + 'monitor_status.incident_id', + 'monitor_status.updated_at as ms_updated_at', + ]) + .where('monitors.organization_id', '=', organizationId); + + if (projectId) { + query = query.where('monitors.project_id', '=', projectId); + } + + const rows = await query.orderBy('monitors.created_at', 'asc').execute(); + return rows.map((row) => this.mapMonitor(row as MonitorWithStatusRow)); + } + + async getMonitor(id: string, organizationId: string): Promise { + const row = await this.db + .selectFrom('monitors') + .leftJoin('monitor_status', 'monitor_status.monitor_id', 'monitors.id') + .selectAll('monitors') + .select([ + 'monitor_status.status', + 'monitor_status.consecutive_failures', + 'monitor_status.consecutive_successes', + 'monitor_status.last_checked_at', + 'monitor_status.last_status_change_at', + 'monitor_status.response_time_ms as ms_response_time_ms', + 'monitor_status.last_error_code', + 'monitor_status.incident_id', + 'monitor_status.updated_at as ms_updated_at', + ]) + .where('monitors.id', '=', id) + .where('monitors.organization_id', '=', organizationId) + .executeTakeFirst(); + + return row ? this.mapMonitor(row as MonitorWithStatusRow) : null; + } + + async createMonitor(input: CreateMonitorInput): Promise { + return this.db.transaction().execute(async (trx) => { + const row = await trx + .insertInto('monitors') + .values({ + organization_id: input.organizationId, + project_id: input.projectId, + name: input.name, + type: input.type, + target: input.target ?? null, + interval_seconds: input.intervalSeconds ?? 60, + timeout_seconds: input.timeoutSeconds ?? 10, + failure_threshold: input.failureThreshold ?? 2, + auto_resolve: input.autoResolve ?? true, + enabled: input.enabled ?? true, + http_config: input.httpConfig ? (JSON.stringify(input.httpConfig) as unknown as null) : null, + severity: input.severity ?? 'high', + }) + .returningAll() + .executeTakeFirstOrThrow(); + + // Initialize status row in the same transaction + await trx + .insertInto('monitor_status') + .values({ monitor_id: row.id }) + .execute(); + + return this.mapMonitor(row as unknown as MonitorWithStatusRow); + }); + } + + async updateMonitor( + id: string, + organizationId: string, + input: UpdateMonitorInput + ): Promise { + const row = await this.db + .updateTable('monitors') + .set({ + ...(input.name !== undefined && { name: input.name }), + ...(input.target !== undefined && { target: input.target }), + ...(input.intervalSeconds !== undefined && { interval_seconds: input.intervalSeconds }), + ...(input.timeoutSeconds !== undefined && { timeout_seconds: input.timeoutSeconds }), + ...(input.failureThreshold !== undefined && { failure_threshold: input.failureThreshold }), + ...(input.autoResolve !== undefined && { auto_resolve: input.autoResolve }), + ...(input.enabled !== undefined && { enabled: input.enabled }), + ...(input.httpConfig !== undefined && { http_config: input.httpConfig ? (JSON.stringify(input.httpConfig) as unknown as null) : null }), + ...(input.severity !== undefined && { severity: input.severity }), + updated_at: new Date(), + }) + .where('id', '=', id) + .where('organization_id', '=', organizationId) + .returningAll() + .executeTakeFirstOrThrow(); + + return this.mapMonitor(row as unknown as MonitorWithStatusRow); + } + + async deleteMonitor(id: string, organizationId: string): Promise { + await this.db + .deleteFrom('monitors') + .where('id', '=', id) + .where('organization_id', '=', organizationId) + .execute(); + } + + // ============================================================================ + // HEARTBEAT + // ============================================================================ + + async recordHeartbeat(monitorId: string, organizationId: string): Promise { + const monitor = await this.db + .selectFrom('monitors') + .select(['id', 'type', 'project_id']) + .where('id', '=', monitorId) + .where('organization_id', '=', organizationId) + .where('type', '=', 'heartbeat') + .where('enabled', '=', true) + .executeTakeFirst(); + + if (!monitor) { + throw new Error('Heartbeat monitor not found or not enabled'); + } + + await this.db + .insertInto('monitor_results') + .values({ + time: new Date(), + monitor_id: monitorId, + organization_id: organizationId, + project_id: monitor.project_id, + status: 'up', + is_heartbeat: true, + }) + .execute(); + } + + // ============================================================================ + // RESULTS & UPTIME + // ============================================================================ + + async getRecentResults(monitorId: string, organizationId: string, limit = 50): Promise { + const rows = await this.db + .selectFrom('monitor_results') + .select(['time', 'id', 'monitor_id', 'status', 'response_time_ms', 'status_code', 'error_code', 'is_heartbeat']) + .where('monitor_id', '=', monitorId) + .where('organization_id', '=', organizationId) + .orderBy('time', 'desc') + .limit(limit) + .execute(); + + return rows.map((r) => ({ + time: r.time as Date, + id: r.id, + monitorId: r.monitor_id, + status: r.status as 'up' | 'down', + responseTimeMs: r.response_time_ms, + statusCode: r.status_code, + errorCode: r.error_code, + isHeartbeat: r.is_heartbeat, + })); + } + + async getUptimeHistory(monitorId: string, organizationId: string, days = 90): Promise { + const since = new Date(Date.now() - days * 24 * 60 * 60 * 1000); + + const rows = await this.db + .selectFrom('monitor_uptime_daily') + .select(['bucket', 'monitor_id', 'total_checks', 'successful_checks', 'uptime_pct']) + .where('monitor_id', '=', monitorId) + .where('organization_id', '=', organizationId) + .where('bucket', '>=', since) + .orderBy('bucket', 'asc') + .execute(); + + return rows.map((r) => ({ + bucket: r.bucket as Date, + monitorId: r.monitor_id, + totalChecks: r.total_checks, + successfulChecks: r.successful_checks, + uptimePct: r.uptime_pct ?? 0, + })); + } + + // ============================================================================ + // PUBLIC STATUS PAGE (no auth — scrubbed data) + // ============================================================================ + + async getPublicStatus(projectSlug: string): Promise { + // Query by slug — slugs are unique per org but not globally. + // We fetch all matching projects and pick the one with status_page_public = true. + const projects = await this.db + .selectFrom('projects') + .select(['id', 'name', 'slug', 'status_page_public']) + .where('slug', '=', projectSlug) + .execute(); + + // Find the first project that has its status page enabled + const project = projects.find((p) => p.status_page_public) ?? null; + if (!project) return null; + + const monitorRows = await this.db + .selectFrom('monitors') + .leftJoin('monitor_status', 'monitor_status.monitor_id', 'monitors.id') + .select([ + 'monitors.id', + 'monitors.name', + 'monitors.type', + 'monitor_status.status', + 'monitor_status.last_checked_at', + ]) + .where('monitors.project_id', '=', project.id) + .where('monitors.enabled', '=', true) + .orderBy('monitors.created_at', 'asc') + .execute(); + + if (monitorRows.length === 0) { + return { + projectName: project.name, + projectSlug: project.slug, + overallStatus: 'operational', + monitors: [], + lastUpdated: new Date().toISOString(), + }; + } + + const monitorIds = monitorRows.map((m) => m.id); + const since90d = new Date(Date.now() - 90 * 24 * 60 * 60 * 1000); + + const uptimeRows = await this.db + .selectFrom('monitor_uptime_daily') + .select(['bucket', 'monitor_id', 'uptime_pct']) + .where('monitor_id', 'in', monitorIds) + .where('bucket', '>=', since90d) + .orderBy('bucket', 'asc') + .execute(); + + // Group uptime by monitor + const uptimeByMonitor = new Map(); + for (const row of uptimeRows) { + const id = row.monitor_id; + if (!uptimeByMonitor.has(id)) uptimeByMonitor.set(id, []); + uptimeByMonitor.get(id)!.push({ + bucket: (row.bucket as Date).toISOString(), + uptimePct: row.uptime_pct ?? 0, + }); + } + + const monitors: PublicMonitorStatus[] = monitorRows.map((m) => ({ + name: m.name, + type: m.type, + status: (m.status ?? 'unknown') as 'up' | 'down' | 'unknown', + uptimeHistory: uptimeByMonitor.get(m.id) ?? [], + })); + + const downCount = monitors.filter((m) => m.status === 'down').length; + const overallStatus = + downCount === 0 + ? 'operational' + : downCount === monitors.length + ? 'outage' + : 'degraded'; + + return { + projectName: project.name, + projectSlug: project.slug, + overallStatus, + monitors, + lastUpdated: new Date().toISOString(), + }; + } + + // ============================================================================ + // WORKER: run all due checks + // ============================================================================ + + async runAllDueChecks(): Promise { + const now = new Date(); + + // Find enabled monitors where next check is due + const due = await this.db + .selectFrom('monitors') + .leftJoin('monitor_status', 'monitor_status.monitor_id', 'monitors.id') + .selectAll('monitors') + .select([ + 'monitor_status.status', + 'monitor_status.consecutive_failures', + 'monitor_status.consecutive_successes', + 'monitor_status.last_checked_at', + 'monitor_status.last_status_change_at', + 'monitor_status.response_time_ms as ms_response_time_ms', + 'monitor_status.last_error_code', + 'monitor_status.incident_id', + 'monitor_status.updated_at as ms_updated_at', + ]) + .where('monitors.enabled', '=', true) + .where((eb) => + eb.or([ + eb('monitor_status.last_checked_at', 'is', null), + eb( + sql`monitor_status.last_checked_at + monitors.interval_seconds * interval '1 second'`, + '<=', + now + ), + ]) + ) + .execute(); + + if (due.length === 0) return; + + // Process in batches of MAX_CONCURRENT_CHECKS + for (let i = 0; i < due.length; i += MAX_CONCURRENT_CHECKS) { + const batch = due.slice(i, i + MAX_CONCURRENT_CHECKS); + await Promise.allSettled( + batch.map((row) => { + const monitor = this.mapMonitor(row as MonitorWithStatusRow); + return this.runCheck(monitor); + }) + ); + } + } + + async runCheck(monitor: Monitor): Promise { + let result: CheckResult; + const httpConfig: HttpConfig = (monitor.httpConfig as HttpConfig) ?? {}; + + try { + if (monitor.type === 'http') { + result = await runHttpCheck(monitor.target!, monitor.timeoutSeconds, httpConfig); + } else if (monitor.type === 'tcp') { + const { host, port } = parseTcpTarget(monitor.target!); + result = await runTcpCheck(host, port, monitor.timeoutSeconds); + } else { + result = await runHeartbeatCheck(monitor.id, monitor.intervalSeconds, this.db); + } + } catch { + result = { status: 'down', responseTimeMs: null, statusCode: null, errorCode: 'unexpected' }; + } + + // Heartbeat 'up' results are recorded by the endpoint, not the worker + const skipWrite = monitor.type === 'heartbeat' && result.status === 'up'; + + if (!skipWrite) { + await this.db + .insertInto('monitor_results') + .values({ + time: new Date(), + monitor_id: monitor.id, + organization_id: monitor.organizationId, + project_id: monitor.projectId, + status: result.status, + response_time_ms: result.responseTimeMs, + status_code: result.statusCode, + error_code: result.errorCode, + is_heartbeat: false, + }) + .execute(); + } + + // Use the status data we already fetched (avoids redundant DB read) + await this.processCheckResult(monitor, result, monitor.status ?? null); + } + + // ============================================================================ + // STATE MACHINE + // ============================================================================ + + private async processCheckResult( + monitor: Monitor, + result: CheckResult, + currentStatusData: MonitorCurrentStatus | null + ): Promise { + if (!currentStatusData) return; + + const prevConsecutiveFailures = currentStatusData.consecutiveFailures; + const prevStatus = currentStatusData.status as 'up' | 'down' | 'unknown'; + const now = new Date(); + + if (result.status === 'down') { + const newFailures = prevConsecutiveFailures + 1; + const statusChanged = prevStatus !== 'down'; + + await this.db + .updateTable('monitor_status') + .set({ + status: 'down', + consecutive_failures: newFailures, + consecutive_successes: 0, + last_checked_at: now, + last_status_change_at: statusChanged ? now : currentStatusData.lastStatusChangeAt, + last_error_code: result.errorCode, + response_time_ms: result.responseTimeMs, + updated_at: now, + }) + .where('monitor_id', '=', monitor.id) + .execute(); + + // Open incident when threshold is first reached and no active incident exists. + // Use atomic WHERE guard to prevent duplicate incidents under concurrent checks. + if ( + newFailures >= monitor.failureThreshold && + prevConsecutiveFailures < monitor.failureThreshold + ) { + // Atomically claim incident slot: only proceed if incident_id is still null + const claimed = await this.db + .updateTable('monitor_status') + .set({ updated_at: new Date() }) + .where('monitor_id', '=', monitor.id) + .where('incident_id', 'is', null) + .executeTakeFirst(); + + if (Number(claimed?.numUpdatedRows ?? 0) > 0) { + await this.openIncident(monitor); + } + } + + if (statusChanged) { + console.log(`[MonitorService] Monitor "${monitor.name}" (${monitor.id}) is DOWN — ${result.errorCode ?? 'unknown error'}`); + } + } else { + const newSuccesses = (currentStatusData.consecutiveSuccesses ?? 0) + 1; + const statusChanged = prevStatus !== 'up'; + + await this.db + .updateTable('monitor_status') + .set({ + status: 'up', + consecutive_failures: 0, + consecutive_successes: newSuccesses, + last_checked_at: now, + last_status_change_at: statusChanged ? now : currentStatusData.lastStatusChangeAt, + last_error_code: null, + response_time_ms: result.responseTimeMs, + updated_at: now, + }) + .where('monitor_id', '=', monitor.id) + .execute(); + + // Auto-resolve linked incident on recovery + if (monitor.autoResolve && currentStatusData.incidentId && prevStatus === 'down') { + await this.resolveIncident(currentStatusData.incidentId, monitor.organizationId); + await this.db + .updateTable('monitor_status') + .set({ incident_id: null, updated_at: now }) + .where('monitor_id', '=', monitor.id) + .execute(); + } + + if (statusChanged) { + console.log(`[MonitorService] Monitor "${monitor.name}" (${monitor.id}) is UP — recovered after ${prevConsecutiveFailures} failures`); + } + } + } + + private async openIncident(monitor: Monitor): Promise { + try { + const incident = await this.siemService.createIncident({ + organizationId: monitor.organizationId, + projectId: monitor.projectId, + title: `Monitor down: ${monitor.name}`, + severity: monitor.severity, + status: 'open', + affectedServices: [monitor.name], + source: 'monitor', + monitorId: monitor.id, + }); + + await this.db + .updateTable('monitor_status') + .set({ incident_id: incident.id, updated_at: new Date() }) + .where('monitor_id', '=', monitor.id) + .execute(); + + console.log(`[MonitorService] Opened incident ${incident.id} for monitor "${monitor.name}"`); + } catch (err) { + console.error(`[MonitorService] Failed to open incident for monitor ${monitor.id}:`, err); + } + } + + private async resolveIncident(incidentId: string, organizationId: string): Promise { + try { + await this.siemService.updateIncident(incidentId, organizationId, { status: 'resolved' }); + // Queue a recovery notification via a new incident notification + // The SIEM incident notification system handles email/webhook delivery + console.log(`[MonitorService] Resolved incident ${incidentId}`); + } catch (err) { + console.error(`[MonitorService] Failed to resolve incident ${incidentId}:`, err); + } + } + + // ============================================================================ + // MAPPERS + // ============================================================================ + + private mapMonitor(row: MonitorWithStatusRow): Monitor { + const hasStatus = row.status !== undefined || row.consecutive_failures !== undefined; + return { + id: row.id, + organizationId: row.organization_id, + projectId: row.project_id, + name: row.name, + type: row.type as Monitor['type'], + target: row.target, + intervalSeconds: row.interval_seconds, + timeoutSeconds: row.timeout_seconds, + failureThreshold: row.failure_threshold, + autoResolve: row.auto_resolve, + enabled: row.enabled, + httpConfig: row.http_config ? (typeof row.http_config === 'string' ? JSON.parse(row.http_config) : row.http_config) : null, + severity: (row.severity ?? 'high') as Severity, + createdAt: row.created_at, + updatedAt: row.updated_at, + status: hasStatus + ? { + monitorId: row.id, + status: (row.status ?? 'unknown') as MonitorCurrentStatus['status'], + consecutiveFailures: row.consecutive_failures ?? 0, + consecutiveSuccesses: row.consecutive_successes ?? 0, + lastCheckedAt: row.last_checked_at ?? null, + lastStatusChangeAt: row.last_status_change_at ?? null, + responseTimeMs: row.ms_response_time_ms ?? null, + lastErrorCode: row.last_error_code ?? null, + incidentId: row.incident_id ?? null, + updatedAt: row.ms_updated_at ?? row.updated_at, + } + : undefined, + }; + } +} diff --git a/packages/backend/src/modules/monitoring/types.ts b/packages/backend/src/modules/monitoring/types.ts new file mode 100644 index 00000000..d313c8c6 --- /dev/null +++ b/packages/backend/src/modules/monitoring/types.ts @@ -0,0 +1,116 @@ +import type { MonitorType, MonitorStatusValue, MonitorHttpConfig } from '../../database/types.js'; +import type { Severity } from '@logtide/shared'; + +export type { MonitorType, MonitorStatusValue, MonitorHttpConfig }; + +export type ErrorCode = + | 'timeout' + | 'dns_error' + | 'connection_refused' + | 'ssl_error' + | 'http_error' + | 'no_heartbeat' + | 'unexpected'; + +// Alias for the database JSONB shape — single source of truth in database/types.ts +export type HttpConfig = MonitorHttpConfig; + +export interface CheckResult { + status: 'up' | 'down'; + responseTimeMs: number | null; + statusCode: number | null; + errorCode: ErrorCode | null; +} + +export interface Monitor { + id: string; + organizationId: string; + projectId: string; + name: string; + type: MonitorType; + target: string | null; + intervalSeconds: number; + timeoutSeconds: number; + failureThreshold: number; + autoResolve: boolean; + enabled: boolean; + httpConfig: HttpConfig | null; + severity: Severity; + status?: MonitorCurrentStatus; + createdAt: Date; + updatedAt: Date; +} + +export interface MonitorCurrentStatus { + monitorId: string; + status: MonitorStatusValue; + consecutiveFailures: number; + consecutiveSuccesses: number; + lastCheckedAt: Date | null; + lastStatusChangeAt: Date | null; + responseTimeMs: number | null; + lastErrorCode: string | null; + incidentId: string | null; + updatedAt: Date; +} + +export interface MonitorResult { + time: Date; + id: string; + monitorId: string; + status: 'up' | 'down'; + responseTimeMs: number | null; + statusCode: number | null; + errorCode: string | null; + isHeartbeat: boolean; +} + +export interface UptimeBucket { + bucket: Date; + monitorId: string; + totalChecks: number; + successfulChecks: number; + uptimePct: number; +} + +export interface CreateMonitorInput { + organizationId: string; + projectId: string; + name: string; + type: MonitorType; + target?: string | null; + intervalSeconds?: number; + timeoutSeconds?: number; + failureThreshold?: number; + autoResolve?: boolean; + enabled?: boolean; + httpConfig?: HttpConfig | null; + severity?: string; +} + +export interface UpdateMonitorInput { + name?: string; + target?: string | null; + intervalSeconds?: number; + timeoutSeconds?: number; + failureThreshold?: number; + autoResolve?: boolean; + enabled?: boolean; + httpConfig?: HttpConfig | null; + severity?: string; +} + +export interface PublicMonitorStatus { + name: string; + type: MonitorType; + status: MonitorStatusValue; + uptimeHistory: { bucket: string; uptimePct: number }[]; +} + +export interface PublicStatusPage { + projectName: string; + projectSlug: string; + overallStatus: 'operational' | 'degraded' | 'outage'; + monitors: PublicMonitorStatus[]; + lastUpdated: string; +} diff --git a/packages/backend/src/modules/projects/routes.ts b/packages/backend/src/modules/projects/routes.ts index c7321ae3..e7499bc7 100644 --- a/packages/backend/src/modules/projects/routes.ts +++ b/packages/backend/src/modules/projects/routes.ts @@ -16,6 +16,7 @@ const createProjectSchema = z.object({ const updateProjectSchema = z.object({ name: z.string().min(1).max(100).optional(), description: z.string().optional(), + statusPagePublic: z.boolean().optional(), }); const projectIdSchema = z.object({ diff --git a/packages/backend/src/modules/projects/service.ts b/packages/backend/src/modules/projects/service.ts index 6828e348..8de8a866 100644 --- a/packages/backend/src/modules/projects/service.ts +++ b/packages/backend/src/modules/projects/service.ts @@ -2,6 +2,11 @@ import { db } from '../../database/connection.js'; import { reservoir } from '../../database/reservoir.js'; import type { Project } from '@logtide/shared'; +function generateProjectSlug(name: string): string { + const base = name.toLowerCase().replace(/[^a-z0-9]+/g, '-').replace(/^-+|-+$/g, ''); + return base || 'project'; +} + export interface CreateProjectInput { organizationId: string; userId: string; @@ -12,6 +17,7 @@ export interface CreateProjectInput { export interface UpdateProjectInput { name?: string; description?: string; + statusPagePublic?: boolean; } export class ProjectsService { @@ -50,6 +56,21 @@ export class ProjectsService { throw new Error('A project with this name already exists in this organization'); } + // Generate a unique slug within the organization + const baseSlug = generateProjectSlug(input.name); + let slug = baseSlug; + let suffix = 2; + while (true) { + const conflict = await db + .selectFrom('projects') + .select('id') + .where('organization_id', '=', input.organizationId) + .where('slug', '=', slug) + .executeTakeFirst(); + if (!conflict) break; + slug = `${baseSlug}-${suffix++}`; + } + const project = await db .insertInto('projects') .values({ @@ -57,8 +78,9 @@ export class ProjectsService { user_id: input.userId, name: input.name, description: input.description || null, + slug, }) - .returning(['id', 'organization_id', 'name', 'description', 'created_at', 'updated_at']) + .returning(['id', 'organization_id', 'name', 'description', 'slug', 'status_page_public', 'created_at', 'updated_at']) .executeTakeFirstOrThrow(); return { @@ -66,6 +88,8 @@ export class ProjectsService { organizationId: project.organization_id, name: project.name, description: project.description || undefined, + slug: project.slug, + statusPagePublic: project.status_page_public, createdAt: new Date(project.created_at), updatedAt: new Date(project.updated_at), }; @@ -80,7 +104,7 @@ export class ProjectsService { const projects = await db .selectFrom('projects') - .select(['id', 'organization_id', 'name', 'description', 'created_at', 'updated_at']) + .select(['id', 'organization_id', 'name', 'description', 'slug', 'status_page_public', 'created_at', 'updated_at']) .where('organization_id', '=', organizationId) .orderBy('created_at', 'desc') .execute(); @@ -90,6 +114,8 @@ export class ProjectsService { organizationId: p.organization_id, name: p.name, description: p.description || undefined, + slug: p.slug, + statusPagePublic: p.status_page_public, createdAt: new Date(p.created_at), updatedAt: new Date(p.updated_at), })); @@ -102,7 +128,7 @@ export class ProjectsService { const project = await db .selectFrom('projects') .innerJoin('organization_members', 'projects.organization_id', 'organization_members.organization_id') - .select(['projects.id', 'projects.organization_id', 'projects.name', 'projects.description', 'projects.created_at', 'projects.updated_at']) + .select(['projects.id', 'projects.organization_id', 'projects.name', 'projects.description', 'projects.slug', 'projects.status_page_public', 'projects.created_at', 'projects.updated_at']) .where('projects.id', '=', projectId) .where('organization_members.user_id', '=', userId) .executeTakeFirst(); @@ -116,6 +142,8 @@ export class ProjectsService { organizationId: project.organization_id, name: project.name, description: project.description || undefined, + slug: project.slug, + statusPagePublic: project.status_page_public, createdAt: new Date(project.created_at), updatedAt: new Date(project.updated_at), }; @@ -155,10 +183,11 @@ export class ProjectsService { .set({ ...(input.name && { name: input.name }), ...(input.description !== undefined && { description: input.description || null }), + ...(input.statusPagePublic !== undefined && { status_page_public: input.statusPagePublic }), updated_at: new Date(), }) .where('id', '=', projectId) - .returning(['id', 'organization_id', 'name', 'description', 'created_at', 'updated_at']) + .returning(['id', 'organization_id', 'name', 'description', 'slug', 'status_page_public', 'created_at', 'updated_at']) .executeTakeFirst(); if (!project) { @@ -170,6 +199,8 @@ export class ProjectsService { organizationId: project.organization_id, name: project.name, description: project.description || undefined, + slug: project.slug, + statusPagePublic: project.status_page_public, createdAt: new Date(project.created_at), updatedAt: new Date(project.updated_at), }; diff --git a/packages/backend/src/modules/siem/service.ts b/packages/backend/src/modules/siem/service.ts index adb7f7af..5c9e23a5 100644 --- a/packages/backend/src/modules/siem/service.ts +++ b/packages/backend/src/modules/siem/service.ts @@ -153,6 +153,8 @@ export class SiemService { affected_services: input.affectedServices ?? null, mitre_tactics: input.mitreTactics ?? null, mitre_techniques: input.mitreTechniques ?? null, + source: input.source ?? 'sigma', + monitor_id: input.monitorId ?? null, }) .returningAll() .executeTakeFirstOrThrow(); @@ -591,6 +593,8 @@ export class SiemService { mitreTechniques: row.mitre_techniques, ipReputation: row.ip_reputation, geoData: row.geo_data, + source: row.source ?? 'sigma', + monitorId: row.monitor_id ?? null, createdAt: row.created_at, updatedAt: row.updated_at, resolvedAt: row.resolved_at, diff --git a/packages/backend/src/queue/jobs/log-pipeline.ts b/packages/backend/src/queue/jobs/log-pipeline.ts new file mode 100644 index 00000000..2a90f022 --- /dev/null +++ b/packages/backend/src/queue/jobs/log-pipeline.ts @@ -0,0 +1,62 @@ +import type { IJob } from '../abstractions/types.js'; +import { sql } from 'kysely'; +import { db } from '../../database/connection.js'; +import { pipelineService } from '../../modules/log-pipeline/service.js'; +import { PipelineExecutor } from '../../modules/log-pipeline/pipeline-executor.js'; + +export interface LogPipelineJobData { + logs: Array<{ + id: string; + time: string; // ISO string (serialized for BullMQ) + message: string; + metadata: Record | null; + }>; + projectId: string; + organizationId: string; +} + +export async function processLogPipeline(job: IJob): Promise { + const { logs, projectId, organizationId } = job.data; + + const pipeline = await pipelineService.getForProject(projectId, organizationId); + if (!pipeline || !pipeline.enabled || pipeline.steps.length === 0) return; + + console.log(`[Pipeline] Processing ${logs.length} logs for project ${projectId}`); + + const updates: Array<{ id: string; time: Date; fields: Record }> = []; + + for (const log of logs) { + try { + const result = await PipelineExecutor.execute( + { id: log.id, time: new Date(log.time), message: log.message, metadata: log.metadata }, + pipeline.steps + ); + if (Object.keys(result.merged).length > 0) { + updates.push({ id: log.id, time: new Date(log.time), fields: result.merged }); + } + } catch (err) { + console.error(`[Pipeline] Failed to process log ${log.id}:`, err); + } + } + + if (updates.length === 0) return; + + // Batch update: extracted fields do NOT overwrite existing metadata keys. + // TimescaleDB hypertable requires time in the WHERE clause. + for (const update of updates) { + try { + await db + .updateTable('logs') + .set({ + metadata: sql`${JSON.stringify(update.fields)}::jsonb || COALESCE(metadata, '{}'::jsonb)`, + }) + .where('id', '=', update.id) + .where('time', '=', update.time) + .execute(); + } catch (err) { + console.error(`[Pipeline] Failed to update metadata for log ${update.id}:`, err); + } + } + + console.log(`[Pipeline] Updated metadata for ${updates.length}/${logs.length} logs`); +} diff --git a/packages/backend/src/scripts/seed-load-test.ts b/packages/backend/src/scripts/seed-load-test.ts index 4af68566..06ae8d10 100644 --- a/packages/backend/src/scripts/seed-load-test.ts +++ b/packages/backend/src/scripts/seed-load-test.ts @@ -103,6 +103,7 @@ async function seedLoadTestData() { name: 'Load Test Project', organization_id: organization.id, user_id: user.id, + slug: 'load-test-project', }) .returningAll() .executeTakeFirstOrThrow(); diff --git a/packages/backend/src/server.ts b/packages/backend/src/server.ts index bb22dc73..f788abae 100644 --- a/packages/backend/src/server.ts +++ b/packages/backend/src/server.ts @@ -31,6 +31,8 @@ import { settingsRoutes, publicSettingsRoutes, settingsService } from './modules import { retentionRoutes } from './modules/retention/index.js'; import { correlationRoutes, patternRoutes } from './modules/correlation/index.js'; import { piiMaskingRoutes } from './modules/pii-masking/index.js'; +import { pipelineRoutes } from './modules/log-pipeline/index.js'; +import { monitoringRoutes, heartbeatRoutes, publicStatusRoutes } from './modules/monitoring/index.js'; import { sessionsRoutes } from './modules/sessions/routes.js'; import { sourcemapsRoutes } from './modules/sourcemaps/index.js'; import { auditLogRoutes, auditLogService } from './modules/audit-log/index.js'; @@ -180,6 +182,7 @@ export async function build(opts = {}) { await fastify.register(correlationRoutes, { prefix: '/api' }); await fastify.register(patternRoutes, { prefix: '/api' }); await fastify.register(piiMaskingRoutes, { prefix: '/api' }); + await fastify.register(pipelineRoutes, { prefix: '/api/v1/log-pipelines' }); await fastify.register(otlpRoutes); await fastify.register(otlpTraceRoutes); await fastify.register(otlpMetricRoutes); @@ -189,6 +192,9 @@ export async function build(opts = {}) { await fastify.register(sourcemapsRoutes); await fastify.register(websocketPlugin); await fastify.register(websocketRoutes); + await fastify.register(monitoringRoutes, { prefix: '/api/v1/monitors' }); + await fastify.register(heartbeatRoutes, { prefix: '/api/v1/monitors' }); + await fastify.register(publicStatusRoutes, { prefix: '/api/v1/status' }); return fastify; } diff --git a/packages/backend/src/tests/globalSetup.ts b/packages/backend/src/tests/globalSetup.ts index a301c151..089b05fb 100644 --- a/packages/backend/src/tests/globalSetup.ts +++ b/packages/backend/src/tests/globalSetup.ts @@ -24,8 +24,24 @@ export default async function globalSetup() { await migrateToLatest(); console.log('[Global Setup] Migrations completed'); } catch (error) { - console.error('[Global Setup] Failed to run migrations:', error); - throw error; + const isConnRefused = (function checkConnRefused(err: unknown): boolean { + if (err instanceof AggregateError) { + return err.errors.some(checkConnRefused); + } + if (err instanceof Error) { + return ( + err.message.includes('ECONNREFUSED') || + (err as NodeJS.ErrnoException).code === 'ECONNREFUSED' + ); + } + return false; + })(error); + if (isConnRefused) { + console.warn('[Global Setup] DB not reachable — skipping migrations (unit-test mode)'); + } else { + console.error('[Global Setup] Failed to run migrations:', error); + throw error; + } } console.log('[Global Setup] Test environment ready!'); diff --git a/packages/backend/src/tests/helpers/auth.ts b/packages/backend/src/tests/helpers/auth.ts index caba4e7f..6ffb8420 100644 --- a/packages/backend/src/tests/helpers/auth.ts +++ b/packages/backend/src/tests/helpers/auth.ts @@ -5,14 +5,12 @@ import crypto from 'crypto'; * Create a test session for a user */ export async function createTestSession(userId: string) { - const sessionId = crypto.randomBytes(32).toString('hex'); const token = crypto.randomBytes(32).toString('hex'); const expiresAt = new Date(Date.now() + 7 * 24 * 60 * 60 * 1000); // 7 days const session = await db .insertInto('sessions') .values({ - id: sessionId, user_id: userId, token, expires_at: expiresAt, diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/apache.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/apache.test.ts new file mode 100644 index 00000000..ba7348cc --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/apache.test.ts @@ -0,0 +1,16 @@ +import { describe, it, expect } from 'vitest'; +import { parseApache } from '../../../../modules/log-pipeline/parsers/apache.js'; + +describe('parseApache', () => { + it('parses apache combined log format', () => { + const line = '127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 "http://www.example.com/start.html" "Mozilla/4.08"'; + const result = parseApache(line); + expect(result?.client_ip).toBe('127.0.0.1'); + expect(result?.http_status).toBe(200); + expect(result?.response_bytes).toBe(2326); + }); + + it('returns null for non-apache lines', () => { + expect(parseApache('garbage')).toBeNull(); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/grok-engine.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/grok-engine.test.ts new file mode 100644 index 00000000..3c5d45bd --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/grok-engine.test.ts @@ -0,0 +1,49 @@ +import { describe, it, expect } from 'vitest'; +import { compileGrok, matchGrok } from '../../../../modules/log-pipeline/parsers/grok-engine.js'; + +describe('compileGrok', () => { + it('compiles a pattern with built-in WORD', () => { + const { regex } = compileGrok('%{WORD:method} %{URIPATH:path}'); + expect(regex).toBeInstanceOf(RegExp); + }); + + it('throws for unknown pattern name', () => { + expect(() => compileGrok('%{UNKNOWN_PATTERN:field}')).toThrow('Unknown grok pattern'); + }); +}); + +describe('matchGrok', () => { + it('extracts named fields from a line', () => { + const result = matchGrok('%{IPV4:client_ip} %{WORD:method}', '10.0.0.1 GET'); + expect(result).toEqual({ client_ip: '10.0.0.1', method: 'GET' }); + }); + + it('coerces :int type to number', () => { + const result = matchGrok('%{POSINT:status:int} %{WORD:method}', '200 GET'); + expect(result?.status).toBe(200); + expect(typeof result?.status).toBe('number'); + }); + + it('coerces :float type to number', () => { + const result = matchGrok('%{NUMBER:latency:float}ms', '1.23ms'); + expect(result?.latency).toBeCloseTo(1.23); + }); + + it('returns null when pattern does not match', () => { + expect(matchGrok('%{IPV4:ip}', 'not-an-ip')).toBeNull(); + }); + + it('supports non-capturing %{PATTERN} without field name', () => { + const result = matchGrok('%{IPV4} %{WORD:method}', '10.0.0.1 POST'); + expect(result).toEqual({ method: 'POST' }); + }); + + it('handles a realistic nginx-like grok pattern', () => { + const pattern = '%{IPV4:client_ip} - %{NOTSPACE:user} \\[%{DATA:timestamp}\\] "%{WORD:method} %{NOTSPACE:path} HTTP/%{NUMBER:http_version}" %{POSINT:status:int}'; + const line = '192.168.0.1 - alice [01/Jan/2024:00:00:00 +0000] "GET /api HTTP/1.1" 200'; + const result = matchGrok(pattern, line); + expect(result?.client_ip).toBe('192.168.0.1'); + expect(result?.method).toBe('GET'); + expect(result?.status).toBe(200); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/json-message.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/json-message.test.ts new file mode 100644 index 00000000..a4d80db0 --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/json-message.test.ts @@ -0,0 +1,24 @@ +import { describe, it, expect } from 'vitest'; +import { parseJsonMessage } from '../../../../modules/log-pipeline/parsers/json-message.js'; + +describe('parseJsonMessage', () => { + it('parses a JSON object in the message', () => { + const result = parseJsonMessage('{"level":"info","user_id":42,"action":"login"}'); + expect(result?.level).toBe('info'); + expect(result?.user_id).toBe(42); + expect(result?.action).toBe('login'); + }); + + it('returns null for non-JSON messages', () => { + expect(parseJsonMessage('plain text message')).toBeNull(); + expect(parseJsonMessage('[1,2,3]')).toBeNull(); + }); + + it('returns null for invalid JSON', () => { + expect(parseJsonMessage('{broken json')).toBeNull(); + }); + + it('returns null for empty message', () => { + expect(parseJsonMessage('')).toBeNull(); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/logfmt.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/logfmt.test.ts new file mode 100644 index 00000000..207ad903 --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/logfmt.test.ts @@ -0,0 +1,32 @@ +import { describe, it, expect } from 'vitest'; +import { parseLogfmt } from '../../../../modules/log-pipeline/parsers/logfmt.js'; + +describe('parseLogfmt', () => { + it('parses simple key=value pairs', () => { + const result = parseLogfmt('level=info msg="user logged in" user_id=42 latency=1.23ms'); + expect(result?.level).toBe('info'); + expect(result?.msg).toBe('user logged in'); + expect(result?.user_id).toBe('42'); + expect(result?.latency).toBe('1.23ms'); + }); + + it('handles quoted values with spaces', () => { + const result = parseLogfmt('error="file not found" path="/var/log/app.log"'); + expect(result?.error).toBe('file not found'); + expect(result?.path).toBe('/var/log/app.log'); + }); + + it('handles boolean-like values', () => { + const result = parseLogfmt('success=true retried=false'); + expect(result?.success).toBe('true'); + }); + + it('returns null if no key=value found', () => { + expect(parseLogfmt('this is a plain message')).toBeNull(); + expect(parseLogfmt('')).toBeNull(); + }); + + it('requires at least 2 pairs to avoid false positives', () => { + expect(parseLogfmt('id=123')).toBeNull(); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/nginx.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/nginx.test.ts new file mode 100644 index 00000000..cdb7901f --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/nginx.test.ts @@ -0,0 +1,41 @@ +import { describe, it, expect } from 'vitest'; +import { parseNginx } from '../../../../modules/log-pipeline/parsers/nginx.js'; + +describe('parseNginx', () => { + it('parses a standard nginx access log line', () => { + const line = '192.168.1.1 - john [01/Jan/2024:12:00:00 +0000] "GET /api/v1/logs?limit=10 HTTP/1.1" 200 1234 "-" "curl/7.68.0"'; + const result = parseNginx(line); + expect(result).toMatchObject({ + client_ip: '192.168.1.1', + remote_user: 'john', + http_method: 'GET', + http_path: '/api/v1/logs', + http_query: 'limit=10', + http_version: '1.1', + http_status: 200, + response_bytes: 1234, + http_referer: '-', + user_agent: 'curl/7.68.0', + }); + expect(result?.timestamp).toBeDefined(); + }); + + it('handles anonymous user (-)', () => { + const line = '10.0.0.1 - - [15/Mar/2024:08:30:00 +0100] "POST /ingest HTTP/1.1" 201 0 "-" "sdk/1.0"'; + const result = parseNginx(line); + expect(result?.remote_user).toBe('-'); + expect(result?.http_status).toBe(201); + expect(result?.http_method).toBe('POST'); + }); + + it('returns null for non-nginx lines', () => { + expect(parseNginx('this is not a log line')).toBeNull(); + expect(parseNginx('')).toBeNull(); + }); + + it('parses response bytes as number', () => { + const line = '1.2.3.4 - - [01/Jan/2024:00:00:00 +0000] "GET / HTTP/2.0" 304 0 "-" "-"'; + expect(parseNginx(line)?.response_bytes).toBe(0); + expect(typeof parseNginx(line)?.response_bytes).toBe('number'); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/parsers/syslog.test.ts b/packages/backend/src/tests/modules/log-pipeline/parsers/syslog.test.ts new file mode 100644 index 00000000..8313ee9e --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/parsers/syslog.test.ts @@ -0,0 +1,33 @@ +import { describe, it, expect } from 'vitest'; +import { parseSyslog } from '../../../../modules/log-pipeline/parsers/syslog.js'; + +describe('parseSyslog', () => { + it('parses RFC 3164 format', () => { + const line = 'Jan 15 12:00:00 myhost myapp[1234]: Connection established'; + const result = parseSyslog(line); + expect(result?.hostname).toBe('myhost'); + expect(result?.appname).toBe('myapp'); + expect(result?.pid).toBe(1234); + expect(result?.syslog_message).toBe('Connection established'); + }); + + it('parses RFC 3164 without PID', () => { + const line = 'Mar 20 08:15:30 server nginx: worker process 123 exited'; + const result = parseSyslog(line); + expect(result?.hostname).toBe('server'); + expect(result?.appname).toBe('nginx'); + expect(result?.pid).toBeUndefined(); + }); + + it('parses RFC 5424 format', () => { + const line = '<34>1 2024-01-15T12:00:00.000Z myhost myapp 1234 - - Connection established'; + const result = parseSyslog(line); + expect(result?.priority).toBe(34); + expect(result?.hostname).toBe('myhost'); + expect(result?.syslog_message).toBe('Connection established'); + }); + + it('returns null for non-syslog', () => { + expect(parseSyslog('hello world')).toBeNull(); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/pipeline-executor.test.ts b/packages/backend/src/tests/modules/log-pipeline/pipeline-executor.test.ts new file mode 100644 index 00000000..9588f8e7 --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/pipeline-executor.test.ts @@ -0,0 +1,55 @@ +import { describe, it, expect } from 'vitest'; +import { PipelineExecutor } from '../../../modules/log-pipeline/pipeline-executor.js'; +import type { LogForPipeline, PipelineStep } from '../../../modules/log-pipeline/types.js'; + +const sampleLog: LogForPipeline = { + id: 'test-id', + time: new Date(), + message: '192.168.1.1 - - [01/Jan/2024:00:00:00 +0000] "GET /api HTTP/1.1" 200 512 "-" "curl/7.68.0"', + metadata: null, +}; + +describe('PipelineExecutor', () => { + it('runs a parser step and extracts fields', async () => { + const steps: PipelineStep[] = [{ type: 'parser', parser: 'nginx' }]; + const result = await PipelineExecutor.execute(sampleLog, steps); + expect(result.merged.client_ip).toBe('192.168.1.1'); + expect(result.merged.http_status).toBe(200); + expect(result.steps[0].error).toBeUndefined(); + }); + + it('runs a grok step', async () => { + const log: LogForPipeline = { ...sampleLog, message: 'user=alice action=login' }; + const steps: PipelineStep[] = [{ type: 'grok', pattern: 'user=%{WORD:user} action=%{WORD:action}' }]; + const result = await PipelineExecutor.execute(log, steps); + expect(result.merged.user).toBe('alice'); + expect(result.merged.action).toBe('login'); + }); + + it('continues executing remaining steps when one step fails', async () => { + const steps: PipelineStep[] = [ + { type: 'grok', pattern: '%{UNKNOWN_PATTERN:x}' }, // will error + { type: 'parser', parser: 'json' }, + ]; + const log: LogForPipeline = { ...sampleLog, message: '{"key":"val"}' }; + const result = await PipelineExecutor.execute(log, steps); + expect(result.steps[0].error).toBeDefined(); + expect(result.merged.key).toBe('val'); // second step still ran + }); + + it('merges fields from multiple steps, earlier steps take priority', async () => { + const steps: PipelineStep[] = [ + { type: 'parser', parser: 'nginx' }, + { type: 'grok', pattern: '%{IPV4:client_ip}' }, + ]; + const result = await PipelineExecutor.execute(sampleLog, steps); + expect(result.merged.client_ip).toBe('192.168.1.1'); + }); + + it('returns empty merged when no steps match', async () => { + const steps: PipelineStep[] = [{ type: 'parser', parser: 'json' }]; + const log: LogForPipeline = { ...sampleLog, message: 'plain text' }; + const result = await PipelineExecutor.execute(log, steps); + expect(result.merged).toEqual({}); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/routes.test.ts b/packages/backend/src/tests/modules/log-pipeline/routes.test.ts new file mode 100644 index 00000000..55af21bc --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/routes.test.ts @@ -0,0 +1,293 @@ +import { describe, it, expect, beforeEach, beforeAll, afterAll } from 'vitest'; +import Fastify, { FastifyInstance } from 'fastify'; +import { db } from '../../../database/index.js'; +import { pipelineRoutes } from '../../../modules/log-pipeline/routes.js'; +import { createTestContext, createTestSession } from '../../helpers/index.js'; + +function authHeaders(token: string) { + return { Authorization: `Bearer ${token}` }; +} + +let app: FastifyInstance; +let ctx: Awaited>; +let authToken: string; + +beforeAll(async () => { + app = Fastify(); + await app.register(pipelineRoutes, { prefix: '/api/v1/pipelines' }); + await app.ready(); +}); + +afterAll(async () => { + await app.close(); +}); + +beforeEach(async () => { + await db.deleteFrom('log_pipelines').execute(); + await db.deleteFrom('logs').execute(); + await db.deleteFrom('api_keys').execute(); + await db.deleteFrom('organization_members').execute(); + await db.deleteFrom('projects').execute(); + await db.deleteFrom('organizations').execute(); + await db.deleteFrom('sessions').execute(); + await db.deleteFrom('users').execute(); + + ctx = await createTestContext(); + const session = await createTestSession(ctx.user.id); + authToken = session.token; +}); + +describe('POST /api/v1/pipelines', () => { + it('creates a pipeline', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: authHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'My Pipeline', + steps: [{ type: 'parser', parser: 'nginx' }], + }, + }); + expect(res.statusCode).toBe(201); + const body = JSON.parse(res.payload); + expect(body.pipeline.name).toBe('My Pipeline'); + expect(body.pipeline.steps).toHaveLength(1); + }); + + it('returns 400 for invalid step type', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: authHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + name: 'Bad Pipeline', + steps: [{ type: 'unknown_step' }], + }, + }); + expect(res.statusCode).toBe(400); + }); + + it('returns 401 without auth', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + payload: { + organizationId: ctx.organization.id, + name: 'Unauthenticated', + steps: [], + }, + }); + expect(res.statusCode).toBe(401); + }); + + it('returns 403 for non-member', async () => { + // Create a separate context (different org) + const other = await createTestContext(); + const otherSession = await createTestSession(other.user.id); + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: authHeaders(otherSession.token), + payload: { + organizationId: ctx.organization.id, + name: 'Hack Attempt', + steps: [], + }, + }); + expect(res.statusCode).toBe(403); + }); +}); + +describe('POST /api/v1/pipelines/preview', () => { + it('returns parsed fields for nginx log line', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines/preview', + headers: authHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + steps: [{ type: 'parser', parser: 'nginx' }], + message: '10.0.0.1 - - [01/Jan/2024:00:00:00 +0000] "GET /api HTTP/1.1" 200 512 "-" "-"', + }, + }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.payload); + expect(body.merged.client_ip).toBe('10.0.0.1'); + expect(body.merged.http_status).toBe(200); + }); + + it('returns 400 for invalid step in preview', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines/preview', + headers: authHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + steps: [{ type: 'bad_type' }], + message: 'some log', + }, + }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('POST /api/v1/pipelines/import-yaml', () => { + it('imports a pipeline from YAML', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines/import-yaml', + headers: authHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + projectId: ctx.project.id, + yaml: 'name: yaml-pipeline\nsteps:\n - type: parser\n parser: nginx\n', + }, + }); + expect(res.statusCode).toBe(201); + const body = JSON.parse(res.payload); + expect(body.pipeline.name).toBe('yaml-pipeline'); + expect(body.pipeline.steps).toHaveLength(1); + }); + + it('returns 400 for invalid YAML', async () => { + const res = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines/import-yaml', + headers: authHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + yaml: ':::not valid yaml:::', + }, + }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('GET /api/v1/pipelines', () => { + it('lists pipelines for org', async () => { + // Create a pipeline first + await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: authHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + name: 'Listed Pipeline', + steps: [], + }, + }); + + const res = await app.inject({ + method: 'GET', + url: `/api/v1/pipelines?organizationId=${ctx.organization.id}`, + headers: authHeaders(authToken), + }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.payload); + expect(body.pipelines.length).toBeGreaterThanOrEqual(1); + expect(body.pipelines[0].name).toBe('Listed Pipeline'); + }); + + it('returns 400 if organizationId is missing', async () => { + const res = await app.inject({ + method: 'GET', + url: '/api/v1/pipelines', + headers: authHeaders(authToken), + }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('GET /api/v1/pipelines/:id', () => { + it('returns a pipeline by id', async () => { + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: authHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + name: 'Fetch By ID', + steps: [], + }, + }); + const created = JSON.parse(createRes.payload).pipeline; + + const res = await app.inject({ + method: 'GET', + url: `/api/v1/pipelines/${created.id}?organizationId=${ctx.organization.id}`, + headers: authHeaders(authToken), + }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.payload); + expect(body.pipeline.id).toBe(created.id); + }); + + it('returns 404 for unknown id', async () => { + const res = await app.inject({ + method: 'GET', + url: `/api/v1/pipelines/00000000-0000-0000-0000-000000000000?organizationId=${ctx.organization.id}`, + headers: authHeaders(authToken), + }); + expect(res.statusCode).toBe(404); + }); +}); + +describe('PUT /api/v1/pipelines/:id', () => { + it('updates a pipeline', async () => { + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: authHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + name: 'Original Name', + steps: [], + }, + }); + const created = JSON.parse(createRes.payload).pipeline; + + const res = await app.inject({ + method: 'PUT', + url: `/api/v1/pipelines/${created.id}?organizationId=${ctx.organization.id}`, + headers: authHeaders(authToken), + payload: { name: 'Updated Name' }, + }); + expect(res.statusCode).toBe(200); + const body = JSON.parse(res.payload); + expect(body.pipeline.name).toBe('Updated Name'); + }); +}); + +describe('DELETE /api/v1/pipelines/:id', () => { + it('deletes a pipeline', async () => { + const createRes = await app.inject({ + method: 'POST', + url: '/api/v1/pipelines', + headers: authHeaders(authToken), + payload: { + organizationId: ctx.organization.id, + name: 'To Delete', + steps: [], + }, + }); + const created = JSON.parse(createRes.payload).pipeline; + + const deleteRes = await app.inject({ + method: 'DELETE', + url: `/api/v1/pipelines/${created.id}?organizationId=${ctx.organization.id}`, + headers: authHeaders(authToken), + }); + expect(deleteRes.statusCode).toBe(204); + + // Verify it's gone + const getRes = await app.inject({ + method: 'GET', + url: `/api/v1/pipelines/${created.id}?organizationId=${ctx.organization.id}`, + headers: authHeaders(authToken), + }); + expect(getRes.statusCode).toBe(404); + }); +}); diff --git a/packages/backend/src/tests/modules/log-pipeline/service.test.ts b/packages/backend/src/tests/modules/log-pipeline/service.test.ts new file mode 100644 index 00000000..4eb2ef29 --- /dev/null +++ b/packages/backend/src/tests/modules/log-pipeline/service.test.ts @@ -0,0 +1,233 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { pipelineService } from '../../../modules/log-pipeline/service.js'; +import { createTestContext } from '../../helpers/index.js'; +import { db } from '../../../database/index.js'; + +let ctx: Awaited>; + +beforeEach(async () => { + await db.deleteFrom('log_pipelines').execute(); + await db.deleteFrom('logs').execute(); + await db.deleteFrom('api_keys').execute(); + await db.deleteFrom('organization_members').execute(); + await db.deleteFrom('projects').execute(); + await db.deleteFrom('organizations').execute(); + await db.deleteFrom('sessions').execute(); + await db.deleteFrom('users').execute(); + + ctx = await createTestContext(); + pipelineService.invalidateCache(ctx.organization.id); +}); + +describe('pipelineService.create', () => { + it('creates a pipeline with steps', async () => { + const pipeline = await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Test Pipeline', + steps: [{ type: 'parser', parser: 'nginx' }], + }); + expect(pipeline.id).toBeDefined(); + expect(pipeline.steps).toHaveLength(1); + expect(pipeline.enabled).toBe(true); + expect(pipeline.organizationId).toBe(ctx.organization.id); + expect(pipeline.projectId).toBe(ctx.project.id); + }); + + it('creates an org-wide pipeline when projectId is null', async () => { + const pipeline = await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'Org Pipeline', + steps: [], + }); + expect(pipeline.projectId).toBeNull(); + expect(pipeline.name).toBe('Org Pipeline'); + }); + + it('enforces unique pipeline per project', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'First', + steps: [], + }); + await expect(pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Second', + steps: [], + })).rejects.toThrow(); + }); +}); + +describe('pipelineService.update', () => { + it('updates name and steps', async () => { + const created = await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Original', + steps: [], + }); + const updated = await pipelineService.update(created.id, ctx.organization.id, { + name: 'Updated', + steps: [{ type: 'parser', parser: 'json' }], + }); + expect(updated.name).toBe('Updated'); + expect(updated.steps).toHaveLength(1); + }); + + it('can disable a pipeline', async () => { + const created = await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'Enabled', + steps: [], + }); + const updated = await pipelineService.update(created.id, ctx.organization.id, { enabled: false }); + expect(updated.enabled).toBe(false); + }); +}); + +describe('pipelineService.delete', () => { + it('deletes a pipeline', async () => { + const created = await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'To Delete', + steps: [], + }); + await pipelineService.delete(created.id, ctx.organization.id); + const found = await pipelineService.getById(created.id, ctx.organization.id); + expect(found).toBeNull(); + }); +}); + +describe('pipelineService.listForOrg', () => { + it('lists all pipelines for org', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'Org-wide', + steps: [], + }); + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Project-specific', + steps: [], + }); + const list = await pipelineService.listForOrg(ctx.organization.id); + expect(list).toHaveLength(2); + }); +}); + +describe('pipelineService.getForProject', () => { + it('returns project pipeline', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'P', + steps: [], + }); + pipelineService.invalidateCache(ctx.organization.id); + const p = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + expect(p?.name).toBe('P'); + }); + + it('falls back to org-wide pipeline when no project pipeline exists', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'Org-wide', + steps: [], + }); + pipelineService.invalidateCache(ctx.organization.id); + const p = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + expect(p?.name).toBe('Org-wide'); + }); + + it('prefers project pipeline over org-wide', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: null, + name: 'Org-wide', + steps: [], + }); + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Project-specific', + steps: [], + }); + pipelineService.invalidateCache(ctx.organization.id); + const p = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + expect(p?.name).toBe('Project-specific'); + }); + + it('returns null when no pipeline configured', async () => { + const p = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + expect(p).toBeNull(); + }); + + it('caches result for subsequent calls', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'Cached', + steps: [], + }); + pipelineService.invalidateCache(ctx.organization.id); + const first = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + // Delete from DB — cache should still return value + await db.deleteFrom('log_pipelines').execute(); + const second = await pipelineService.getForProject(ctx.project.id, ctx.organization.id); + expect(first?.name).toBe(second?.name); + }); +}); + +describe('pipelineService.importFromYaml', () => { + it('parses valid YAML and creates pipeline', async () => { + const yamlText = ` +name: nginx-pipeline +description: Parse nginx logs +enabled: true +steps: + - type: parser + parser: nginx + - type: geoip + field: client_ip + target: geo +`; + const pipeline = await pipelineService.importFromYaml(yamlText, ctx.organization.id, ctx.project.id); + expect(pipeline.name).toBe('nginx-pipeline'); + expect(pipeline.description).toBe('Parse nginx logs'); + expect(pipeline.steps).toHaveLength(2); + expect(pipeline.enabled).toBe(true); + }); + + it('replaces existing pipeline on re-import', async () => { + const yaml1 = `name: first\nsteps: []`; + const yaml2 = `name: second\nsteps:\n - type: parser\n parser: json`; + await pipelineService.importFromYaml(yaml1, ctx.organization.id, ctx.project.id); + const pipeline = await pipelineService.importFromYaml(yaml2, ctx.organization.id, ctx.project.id); + expect(pipeline.name).toBe('second'); + const list = await pipelineService.listForOrg(ctx.organization.id); + expect(list).toHaveLength(1); + }); + + it('throws on invalid YAML syntax', async () => { + await expect(pipelineService.importFromYaml('invalid: [yaml: bad', ctx.organization.id, null)) + .rejects.toThrow(); + }); + + it('throws when name is missing', async () => { + await expect(pipelineService.importFromYaml('steps: []', ctx.organization.id, null)) + .rejects.toThrow('name'); + }); + + it('throws when steps is not an array', async () => { + await expect(pipelineService.importFromYaml('name: test\nsteps: not-an-array', ctx.organization.id, null)) + .rejects.toThrow('steps'); + }); +}); diff --git a/packages/backend/src/tests/queue/jobs/log-pipeline.test.ts b/packages/backend/src/tests/queue/jobs/log-pipeline.test.ts new file mode 100644 index 00000000..345a6d79 --- /dev/null +++ b/packages/backend/src/tests/queue/jobs/log-pipeline.test.ts @@ -0,0 +1,131 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { db } from '../../../database/index.js'; +import { createTestContext, createTestLog } from '../../helpers/index.js'; +import { pipelineService } from '../../../modules/log-pipeline/service.js'; + +// Mock queue connection BEFORE importing anything that uses it +vi.mock('../../../queue/connection.js', () => { + return { + createQueue: vi.fn(() => ({ + add: vi.fn().mockResolvedValue({ id: 'job-id' }), + close: vi.fn(), + })), + createWorker: vi.fn(() => ({ + on: vi.fn(), + close: vi.fn(), + })), + getConnection: () => null, + }; +}); + +// Mock config module +vi.mock('../../../config/index.js', () => ({ + config: { + SMTP_HOST: 'localhost', + SMTP_PORT: 1025, + SMTP_SECURE: false, + SMTP_USER: 'test@example.com', + SMTP_PASS: 'password', + SMTP_FROM: 'noreply@test.com', + REDIS_URL: 'redis://localhost:6379', + }, + isSmtpConfigured: vi.fn(() => false), +})); + +// Import after mocks +import { processLogPipeline } from '../../../queue/jobs/log-pipeline.js'; +import type { Job } from 'bullmq'; +import type { LogPipelineJobData } from '../../../queue/jobs/log-pipeline.js'; + +let ctx: Awaited>; + +beforeEach(async () => { + vi.clearAllMocks(); + await db.deleteFrom('log_pipelines').execute(); + ctx = await createTestContext(); + pipelineService.invalidateCache(ctx.organization.id); +}); + +describe('processLogPipeline', () => { + it('is a no-op when no pipeline is configured for the project', async () => { + const log = await createTestLog({ projectId: ctx.project.id, message: 'plain text' }); + const timeStr = log.time instanceof Date ? log.time.toISOString() : String(log.time); + const job = { + data: { + logs: [{ id: log.id, time: timeStr, message: log.message, metadata: null }], + projectId: ctx.project.id, + organizationId: ctx.organization.id, + } as LogPipelineJobData, + } as Job; + + await expect(processLogPipeline(job)).resolves.toBeUndefined(); + + const updated = await db + .selectFrom('logs') + .select('metadata') + .where('id', '=', log.id) + .executeTakeFirst(); + expect(updated?.metadata).toBeNull(); + }); + + it('updates log metadata when pipeline matches', async () => { + const nginxLine = + '10.0.0.1 - - [01/Jan/2024:00:00:00 +0000] "GET /api HTTP/1.1" 200 512 "-" "-"'; + + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'nginx', + steps: [{ type: 'parser', parser: 'nginx' }], + }); + + const log = await createTestLog({ projectId: ctx.project.id, message: nginxLine }); + const timeStr = log.time instanceof Date ? log.time.toISOString() : String(log.time); + const job = { + data: { + logs: [{ id: log.id, time: timeStr, message: nginxLine, metadata: null }], + projectId: ctx.project.id, + organizationId: ctx.organization.id, + } as LogPipelineJobData, + } as Job; + + await processLogPipeline(job); + + const updated = await db + .selectFrom('logs') + .select('metadata') + .where('id', '=', log.id) + .executeTakeFirst(); + + expect((updated?.metadata as Record)?.client_ip).toBe('10.0.0.1'); + expect((updated?.metadata as Record)?.http_status).toBe(200); + }); + + it('skips logs that produce no extracted fields', async () => { + await pipelineService.create({ + organizationId: ctx.organization.id, + projectId: ctx.project.id, + name: 'nginx', + steps: [{ type: 'parser', parser: 'nginx' }], + }); + + const log = await createTestLog({ projectId: ctx.project.id, message: 'not a log line' }); + const timeStr = log.time instanceof Date ? log.time.toISOString() : String(log.time); + const job = { + data: { + logs: [{ id: log.id, time: timeStr, message: 'not a log line', metadata: null }], + projectId: ctx.project.id, + organizationId: ctx.organization.id, + } as LogPipelineJobData, + } as Job; + + await processLogPipeline(job); + + const updated = await db + .selectFrom('logs') + .select('metadata') + .where('id', '=', log.id) + .executeTakeFirst(); + expect(updated?.metadata).toBeNull(); + }); +}); diff --git a/packages/backend/src/tests/setup.ts b/packages/backend/src/tests/setup.ts index 28fc0126..0c9d026e 100644 --- a/packages/backend/src/tests/setup.ts +++ b/packages/backend/src/tests/setup.ts @@ -18,9 +18,23 @@ beforeAll(async () => { await db.selectFrom('users').selectAll().execute(); console.log('Database connection established'); } catch (error) { - console.error('Failed to connect to test database:', error); - console.error('Make sure the test database is running (docker-compose.test.yml)'); - throw error; + const isConnRefused = (function check(err: unknown): boolean { + if (err instanceof AggregateError) return err.errors.some(check); + if (err instanceof Error) { + return ( + err.message.includes('ECONNREFUSED') || + (err as NodeJS.ErrnoException).code === 'ECONNREFUSED' + ); + } + return false; + })(error); + if (isConnRefused) { + console.warn('DB not reachable — running in unit-test mode (no DB cleanup)'); + } else { + console.error('Failed to connect to test database:', error); + console.error('Make sure the test database is running (docker-compose.test.yml)'); + throw error; + } } }); @@ -42,32 +56,45 @@ beforeEach(async () => { // Clear Redis rate limit keys to prevent 429 errors in tests // @fastify/rate-limit uses keys starting with 'rl:' const redis = getConnection(); - if (redis) { - const rateLimitKeys = await redis.keys('rl:*'); - if (rateLimitKeys.length > 0) { - await redis.del(...rateLimitKeys); + if (redis && redis.status === 'ready') { + try { + const rateLimitKeys = await redis.keys('rl:*'); + if (rateLimitKeys.length > 0) { + await redis.del(...rateLimitKeys); + } + } catch { + // Redis not available — unit-test mode } } // Delete all data from tables in reverse dependency order - await db.deleteFrom('logs').execute(); - await db.deleteFrom('alert_history').execute(); - // SIEM tables (must delete before incidents and sigma_rules) - await db.deleteFrom('incident_comments').execute(); - await db.deleteFrom('incident_history').execute(); - await db.deleteFrom('detection_events').execute(); - await db.deleteFrom('incidents').execute(); - await db.deleteFrom('organization_invitations').execute(); - await db.deleteFrom('sigma_rules').execute(); - await db.deleteFrom('alert_rules').execute(); - await db.deleteFrom('api_keys').execute(); - await db.deleteFrom('notifications').execute(); - await db.deleteFrom('organization_members').execute(); - await db.deleteFrom('projects').execute(); - await db.deleteFrom('organizations').execute(); - await db.deleteFrom('audit_log').execute(); - await db.deleteFrom('sessions').execute(); - await db.deleteFrom('users').execute(); + try { + await db.deleteFrom('logs').execute(); + await db.deleteFrom('alert_history').execute(); + // Monitoring tables (must delete before monitors and incidents) + await db.deleteFrom('monitor_results').execute(); + await db.deleteFrom('monitor_status').execute(); + await db.deleteFrom('monitors').execute(); + // SIEM tables (must delete before incidents and sigma_rules) + await db.deleteFrom('incident_alerts').execute(); + await db.deleteFrom('incident_comments').execute(); + await db.deleteFrom('incident_history').execute(); + await db.deleteFrom('detection_events').execute(); + await db.deleteFrom('incidents').execute(); + await db.deleteFrom('organization_invitations').execute(); + await db.deleteFrom('sigma_rules').execute(); + await db.deleteFrom('alert_rules').execute(); + await db.deleteFrom('api_keys').execute(); + await db.deleteFrom('notifications').execute(); + await db.deleteFrom('organization_members').execute(); + await db.deleteFrom('projects').execute(); + await db.deleteFrom('organizations').execute(); + await db.deleteFrom('audit_log').execute(); + await db.deleteFrom('sessions').execute(); + await db.deleteFrom('users').execute(); + } catch { + // DB not available — unit-test mode, skip cleanup + } }); /** diff --git a/packages/backend/src/utils/internal-logging-bootstrap.ts b/packages/backend/src/utils/internal-logging-bootstrap.ts index 93625bde..aab12831 100644 --- a/packages/backend/src/utils/internal-logging-bootstrap.ts +++ b/packages/backend/src/utils/internal-logging-bootstrap.ts @@ -127,6 +127,7 @@ export async function bootstrapInternalLogging(): Promise { user_id: organization.owner_id, name: 'Internal Monitoring', description: 'Self-monitoring logs for LogTide backend and worker', + slug: 'internal-monitoring', }) .returningAll() .executeTakeFirstOrThrow(); diff --git a/packages/backend/src/worker.ts b/packages/backend/src/worker.ts index c8c981f7..7c7f4d16 100644 --- a/packages/backend/src/worker.ts +++ b/packages/backend/src/worker.ts @@ -9,7 +9,9 @@ import { processInvitationEmail, type InvitationEmailData } from './queue/jobs/i import { processIncidentNotification, type IncidentNotificationJob } from './queue/jobs/incident-notification.js'; import { processExceptionParsing, type ExceptionParsingJobData } from './queue/jobs/exception-parsing.js'; import { processErrorNotification, type ErrorNotificationJobData } from './queue/jobs/error-notification.js'; +import { processLogPipeline, type LogPipelineJobData } from './queue/jobs/log-pipeline.js'; import { alertsService } from './modules/alerts/index.js'; +import { monitorService } from './modules/monitoring/index.js'; import { enrichmentService } from './modules/siem/enrichment-service.js'; import { retentionService } from './modules/retention/index.js'; import { sigmaSyncService } from './modules/sigma/sync-service.js'; @@ -62,6 +64,11 @@ const errorNotificationWorker = createWorker('error-no await processErrorNotification(job); }); +// Create worker for log pipeline processing +const pipelineWorker = createWorker('log-pipeline', async (job) => { + await processLogPipeline(job); +}); + // Start workers (required for graphile-worker backend, no-op for BullMQ) console.log(`[Worker] Using queue backend: ${getQueueBackend()}`); await startQueueWorkers(); @@ -224,6 +231,27 @@ errorNotificationWorker.on('failed', (job, err) => { } }); +pipelineWorker.on('completed', (job) => { + if (isInternalLoggingEnabled()) { + hub.captureLog('info', `Log pipeline job completed`, { + jobId: job.id, + logCount: job.data?.logs?.length, + }); + } +}); + +pipelineWorker.on('failed', (job, err) => { + console.error(`Log pipeline job ${job?.id} failed:`, err); + + if (isInternalLoggingEnabled()) { + hub.captureLog('error', `Log pipeline job failed: ${err.message}`, { + error: { name: err.name, message: err.message, stack: err.stack }, + jobId: job?.id, + logCount: job?.data?.logs?.length, + }); + } +}); + // Lock to prevent overlapping alert checks (race condition protection) let isCheckingAlerts = false; @@ -549,6 +577,34 @@ function scheduleNextSigmaSync() { scheduleNextSigmaSync(); +// ============================================================================ +// Service Health Monitor Checks (every 30 seconds) +// ============================================================================ + +let isRunningMonitorChecks = false; + +async function runMonitorChecks() { + if (isRunningMonitorChecks) return; + isRunningMonitorChecks = true; + try { + await monitorService.runAllDueChecks(); + } catch (error) { + console.error('[Worker] Monitor check error:', error); + if (isInternalLoggingEnabled()) { + hub.captureLog('error', `Monitor check failed: ${(error as Error).message}`, { + error: error instanceof Error ? { name: error.name, message: error.message, stack: error.stack } : { message: String(error) }, + }); + } + } finally { + isRunningMonitorChecks = false; + } +} + +// Run checks every 30 seconds +setInterval(runMonitorChecks, 30000); +// Run immediately on start +runMonitorChecks(); + // Graceful shutdown async function gracefulShutdown(signal: string) { console.log(`Received ${signal}, shutting down gracefully...`); @@ -562,6 +618,7 @@ async function gracefulShutdown(signal: string) { await incidentNotificationWorker.close(); await exceptionWorker.close(); await errorNotificationWorker.close(); + await pipelineWorker.close(); console.log('[Worker] Workers closed'); // Close queue system (Redis/PostgreSQL connections) diff --git a/packages/backend/vitest.unit.config.ts b/packages/backend/vitest.unit.config.ts new file mode 100644 index 00000000..72f08c19 --- /dev/null +++ b/packages/backend/vitest.unit.config.ts @@ -0,0 +1,18 @@ +import { defineConfig } from 'vitest/config'; +import path from 'path'; + +// Unit test config - no DB required, no globalSetup +export default defineConfig({ + test: { + globals: true, + environment: 'node', + // No globalSetup, no setupFiles - pure unit tests + include: ['src/**/*.test.ts'], + testTimeout: 10000, + }, + resolve: { + alias: { + '@': path.resolve(__dirname, './src'), + }, + }, +}); diff --git a/packages/frontend/src/lib/api/log-pipeline.ts b/packages/frontend/src/lib/api/log-pipeline.ts new file mode 100644 index 00000000..4bd8ff77 --- /dev/null +++ b/packages/frontend/src/lib/api/log-pipeline.ts @@ -0,0 +1,196 @@ +import { getApiUrl } from '$lib/config'; +import { getAuthToken } from '$lib/utils/auth'; + +export interface PipelineStep { + type: 'parser' | 'grok' | 'geoip'; + parser?: 'nginx' | 'apache' | 'syslog' | 'logfmt' | 'json'; + pattern?: string; + source?: string; + field?: string; + target?: string; +} + +export interface Pipeline { + id: string; + organizationId: string; + projectId: string | null; + name: string; + description: string | null; + enabled: boolean; + steps: PipelineStep[]; + createdAt: string; + updatedAt: string; +} + +export interface PreviewResult { + steps: Array<{ step: PipelineStep; extracted: Record; error?: string }>; + merged: Record; +} + +export interface CreatePipelineInput { + name: string; + description?: string; + steps: PipelineStep[]; + enabled?: boolean; + projectId?: string; +} + +export interface UpdatePipelineInput { + name?: string; + description?: string; + steps?: PipelineStep[]; + enabled?: boolean; +} + +async function fetchWithAuth(url: string, options: RequestInit = {}): Promise { + const token = getAuthToken(); + const headers: HeadersInit = { + ...(options.body ? { 'Content-Type': 'application/json' } : {}), + ...(token ? { Authorization: `Bearer ${token}` } : {}), + ...(options.headers || {}), + }; + + return fetch(url, { + ...options, + headers, + credentials: 'include', + }); +} + +export const logPipelineAPI = { + /** + * List all pipelines for an organization + */ + async list(organizationId: string, projectId?: string): Promise { + const params = new URLSearchParams({ organizationId }); + if (projectId) params.set('projectId', projectId); + + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines?${params}`); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to fetch pipelines' })); + throw new Error(error.error || 'Failed to fetch pipelines'); + } + + const data = await response.json(); + return data.pipelines; + }, + + /** + * Get a single pipeline + */ + async get(id: string, organizationId: string): Promise { + const params = new URLSearchParams({ organizationId }); + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines/${id}?${params}`); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to fetch pipeline' })); + throw new Error(error.error || 'Failed to fetch pipeline'); + } + + const data = await response.json(); + return data.pipeline; + }, + + /** + * Create a new pipeline + */ + async create(organizationId: string, input: CreatePipelineInput): Promise { + const params = new URLSearchParams({ organizationId }); + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines?${params}`, { + method: 'POST', + body: JSON.stringify(input), + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to create pipeline' })); + throw new Error(error.error || 'Failed to create pipeline'); + } + + const data = await response.json(); + return data.pipeline; + }, + + /** + * Update a pipeline + */ + async update( + id: string, + organizationId: string, + input: UpdatePipelineInput + ): Promise { + const params = new URLSearchParams({ organizationId }); + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines/${id}?${params}`, { + method: 'PUT', + body: JSON.stringify(input), + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to update pipeline' })); + throw new Error(error.error || 'Failed to update pipeline'); + } + + const data = await response.json(); + return data.pipeline; + }, + + /** + * Delete a pipeline + */ + async delete(id: string, organizationId: string): Promise { + const params = new URLSearchParams({ organizationId }); + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines/${id}?${params}`, { + method: 'DELETE', + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to delete pipeline' })); + throw new Error(error.error || 'Failed to delete pipeline'); + } + }, + + /** + * Preview pipeline execution on a sample log + */ + async preview( + organizationId: string, + steps: PipelineStep[], + message: string, + metadata?: Record + ): Promise { + const params = new URLSearchParams({ organizationId }); + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines/preview?${params}`, { + method: 'POST', + body: JSON.stringify({ steps, message, metadata }), + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to preview pipeline' })); + throw new Error(error.error || 'Failed to preview pipeline'); + } + + const data = await response.json(); + return data.result; + }, + + /** + * Import pipeline from YAML + */ + async importYaml(organizationId: string, projectId: string | null, yaml: string): Promise { + const params = new URLSearchParams({ organizationId }); + if (projectId) params.set('projectId', projectId); + + const response = await fetchWithAuth(`${getApiUrl()}/api/v1/log-pipelines/import-yaml?${params}`, { + method: 'POST', + body: JSON.stringify({ yaml }), + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Failed to import pipeline' })); + throw new Error(error.error || 'Failed to import pipeline'); + } + + const data = await response.json(); + return data.pipeline; + }, +}; diff --git a/packages/frontend/src/lib/api/monitoring.ts b/packages/frontend/src/lib/api/monitoring.ts new file mode 100644 index 00000000..3e235eae --- /dev/null +++ b/packages/frontend/src/lib/api/monitoring.ts @@ -0,0 +1,167 @@ +import { getApiUrl } from '$lib/config'; +import { getAuthToken } from '$lib/utils/auth'; + +export type MonitorType = 'http' | 'tcp' | 'heartbeat'; +export type MonitorStatus = 'up' | 'down' | 'unknown'; + +export interface HttpConfig { + method?: string; + expectedStatus?: number; + headers?: Record; + bodyAssertion?: { type: 'contains'; value: string } | { type: 'regex'; pattern: string }; +} + +export interface Monitor { + id: string; + organizationId: string; + projectId: string; + name: string; + type: MonitorType; + target: string | null; + intervalSeconds: number; + timeoutSeconds: number; + failureThreshold: number; + autoResolve: boolean; + enabled: boolean; + httpConfig: HttpConfig | null; + severity: 'critical' | 'high' | 'medium' | 'low' | 'informational'; + createdAt: string; + updatedAt: string; + status?: { + monitorId: string; + status: MonitorStatus; + consecutiveFailures: number; + consecutiveSuccesses: number; + lastCheckedAt: string | null; + lastStatusChangeAt: string | null; + responseTimeMs: number | null; + lastErrorCode: string | null; + incidentId: string | null; + updatedAt: string; + }; +} + +export interface MonitorResult { + time: string; + id: string; + monitorId: string; + status: 'up' | 'down'; + responseTimeMs: number | null; + statusCode: number | null; + errorCode: string | null; + isHeartbeat: boolean; +} + +export interface UptimeBucket { + bucket: string; + monitorId: string; + totalChecks: number; + successfulChecks: number; + uptimePct: number; +} + +export interface CreateMonitorInput { + organizationId: string; + projectId: string; + name: string; + type: MonitorType; + target?: string | null; + intervalSeconds?: number; + timeoutSeconds?: number; + failureThreshold?: number; + autoResolve?: boolean; + enabled?: boolean; + httpConfig?: HttpConfig | null; + severity?: string; +} + +export interface UpdateMonitorInput { + name?: string; + target?: string | null; + intervalSeconds?: number; + timeoutSeconds?: number; + failureThreshold?: number; + autoResolve?: boolean; + enabled?: boolean; + httpConfig?: HttpConfig | null; + severity?: string; +} + +async function request(path: string, options: RequestInit = {}): Promise { + const token = getAuthToken(); + const response = await fetch(`${getApiUrl()}${path}`, { + ...options, + headers: { + ...(options.body ? { 'Content-Type': 'application/json' } : {}), + ...(token ? { Authorization: `Bearer ${token}` } : {}), + ...options.headers, + }, + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ error: 'Request failed' })); + throw new Error(error.error || `HTTP ${response.status}`); + } + + if (response.status === 204) return null as T; + return response.json(); +} + +export async function listMonitors( + organizationId: string, + projectId?: string +): Promise<{ monitors: Monitor[] }> { + const params = new URLSearchParams({ organizationId }); + if (projectId) params.append('projectId', projectId); + return request(`/api/v1/monitors?${params}`); +} + +export async function getMonitor( + id: string, + organizationId: string +): Promise<{ monitor: Monitor }> { + const params = new URLSearchParams({ organizationId }); + return request(`/api/v1/monitors/${id}?${params}`); +} + +export async function createMonitor(input: CreateMonitorInput): Promise<{ monitor: Monitor }> { + return request('/api/v1/monitors', { + method: 'POST', + body: JSON.stringify(input), + }); +} + +export async function updateMonitor( + id: string, + organizationId: string, + input: UpdateMonitorInput +): Promise<{ monitor: Monitor }> { + const params = new URLSearchParams({ organizationId }); + return request(`/api/v1/monitors/${id}?${params}`, { + method: 'PUT', + body: JSON.stringify(input), + }); +} + +export async function deleteMonitor(id: string, organizationId: string): Promise { + const params = new URLSearchParams({ organizationId }); + return request(`/api/v1/monitors/${id}?${params}`, { method: 'DELETE' }); +} + +export async function getMonitorResults( + id: string, + organizationId: string, + limit = 50 +): Promise<{ results: MonitorResult[] }> { + const params = new URLSearchParams({ organizationId, limit: String(limit) }); + return request(`/api/v1/monitors/${id}/results?${params}`); +} + +export async function getMonitorUptime( + id: string, + organizationId: string, + days = 90 +): Promise<{ history: UptimeBucket[] }> { + const params = new URLSearchParams({ organizationId, days: String(days) }); + return request(`/api/v1/monitors/${id}/uptime?${params}`); +} diff --git a/packages/frontend/src/lib/components/AppLayout.svelte b/packages/frontend/src/lib/components/AppLayout.svelte index 7b2ab79f..7584d91b 100644 --- a/packages/frontend/src/lib/components/AppLayout.svelte +++ b/packages/frontend/src/lib/components/AppLayout.svelte @@ -47,6 +47,7 @@ import Check from "@lucide/svelte/icons/check"; import SearchIcon from "@lucide/svelte/icons/search"; import BarChart3 from "@lucide/svelte/icons/bar-chart-3"; + import Activity from "@lucide/svelte/icons/activity"; import { formatTimeAgo } from "$lib/utils/datetime"; import Footer from "$lib/components/Footer.svelte"; import OnboardingChecklist from "$lib/components/OnboardingChecklist.svelte"; @@ -255,6 +256,7 @@ items: [ { label: "Alerts", href: "/dashboard/alerts", icon: AlertTriangle }, { label: "Security", href: "/dashboard/security", icon: Shield }, + { label: "Monitoring", href: "/dashboard/monitoring", icon: Activity }, ], }, { diff --git a/packages/frontend/src/lib/components/pipelines/PipelinePreview.svelte b/packages/frontend/src/lib/components/pipelines/PipelinePreview.svelte new file mode 100644 index 00000000..a30160c7 --- /dev/null +++ b/packages/frontend/src/lib/components/pipelines/PipelinePreview.svelte @@ -0,0 +1,122 @@ + + + + + + + Preview + + Test your pipeline steps against a sample log message + + +
+ +