Skip to content

Commit dc87f6b

Browse files
committed
docs: add BigQuery errors table schema and update README
Add BigQuery schema: - Create errors table with partitioning and clustering - Metadata table for tracking loaded files - Example queries for debugging Update README: - Error Store (Dead Letter Queue) section - Storage structure and usage examples - BigQuery setup and query examples - Updated roadmap (error store complete) Features documented: - Two error types (validation vs processing) - 30-day retention with GCS lifecycle - Full event context for debugging - BigQuery integration for analysis
1 parent 6f7df6d commit dc87f6b

2 files changed

Lines changed: 131 additions & 1 deletion

File tree

README.md

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,62 @@ python -m scripts.run_bigquery_loader
340340

341341
See `scripts/bigquery/README.md` and `specs/gcs-bigquery-storage/` for full details.
342342

343+
### Error Store (Dead Letter Queue)
344+
345+
All failed events are stored in a GCS-based dead letter queue for debugging and retry:
346+
347+
**Two Error Types:**
348+
- **Validation Errors**: Missing required fields, invalid schema
349+
- **Processing Errors**: Storage failures, unexpected exceptions
350+
351+
**Storage Structure:**
352+
```
353+
gs://bucket/errors/
354+
date=2026-01-15/
355+
error_type=validation/
356+
error-20260115-100000-abc123.parquet
357+
error_type=processing/
358+
error-20260115-100500-def456.parquet
359+
```
360+
361+
**Create BigQuery Errors Table:**
362+
```bash
363+
cd scripts/bigquery
364+
export PROJECT_ID=my-project DATASET=events
365+
cat create_errors_table.sql | sed "s/{PROJECT_ID}/$PROJECT_ID/g" | sed "s/{DATASET}/$DATASET/g" | bq query --use_legacy_sql=false
366+
```
367+
368+
**Query Errors:**
369+
```sql
370+
-- Find validation errors in last 24 hours
371+
SELECT
372+
error_message,
373+
stream,
374+
COUNT(*) as count
375+
FROM `project.dataset.errors`
376+
WHERE date >= CURRENT_DATE() - 1
377+
AND error_type = 'validation_error'
378+
GROUP BY error_message, stream
379+
ORDER BY count DESC;
380+
381+
-- Get processing errors with stack traces
382+
SELECT
383+
timestamp,
384+
error_message,
385+
JSON_EXTRACT_SCALAR(error_details, '$.exception_type') as exception,
386+
JSON_EXTRACT_SCALAR(error_details, '$.stack_trace') as stack_trace
387+
FROM `project.dataset.errors`
388+
WHERE error_type = 'processing_error'
389+
ORDER BY timestamp DESC
390+
LIMIT 10;
391+
```
392+
393+
**Key Features:**
394+
- Never loses events - all failures stored for debugging
395+
- Automatic 30-day retention (GCS lifecycle rules)
396+
- Full event context (payload, error, timestamp, stream)
397+
- Queryable via BigQuery for pattern analysis
398+
343399
### Custom Storage
344400

345401
Implement the `EventStore` protocol for any backend:
@@ -486,7 +542,7 @@ uv run ruff format src/
486542
- [x] Prometheus metrics
487543
- [x] EventSubscriptionCoordinator (dual-path architecture)
488544
- [x] Hash-based sequencer for consistent ordering
489-
- [ ] Error handling and dead letter queue (ErrorStore protocol exists, needs implementation)
545+
- [x] Error store with dead letter queue (GCS-based)
490546
- [ ] Performance benchmarks (10k+ events/sec)
491547

492548
### v1.0
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
-- Create errors table for dead letter queue
2+
--
3+
-- Usage:
4+
-- export PROJECT_ID=your-project
5+
-- export DATASET=events
6+
-- cat create_errors_table.sql | sed "s/{PROJECT_ID}/$PROJECT_ID/g" | sed "s/{DATASET}/$DATASET/g" | bq query --use_legacy_sql=false
7+
8+
CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET}.errors` (
9+
error_id STRING NOT NULL,
10+
timestamp TIMESTAMP NOT NULL,
11+
error_type STRING NOT NULL, -- validation_error | processing_error
12+
error_message STRING NOT NULL,
13+
error_details JSON,
14+
stream STRING NOT NULL,
15+
original_payload JSON NOT NULL,
16+
retry_count INT64 DEFAULT 0,
17+
retry_after TIMESTAMP,
18+
date DATE NOT NULL
19+
)
20+
PARTITION BY date
21+
CLUSTER BY error_type, stream
22+
OPTIONS(
23+
description="Dead letter queue for failed events. All events that fail validation or processing are stored here with full context for debugging.",
24+
partition_expiration_days=30
25+
);
26+
27+
-- Create metadata table for tracking loaded error files
28+
CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET}._loaded_error_files` (
29+
file_path STRING NOT NULL,
30+
loaded_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP(),
31+
row_count INT64,
32+
error_type STRING
33+
)
34+
PARTITION BY DATE(loaded_at)
35+
OPTIONS(
36+
description="Metadata tracking which error files have been loaded to prevent duplicates"
37+
);
38+
39+
-- Example queries for debugging
40+
41+
-- Find validation errors in last 24 hours
42+
-- SELECT
43+
-- error_type,
44+
-- error_message,
45+
-- stream,
46+
-- COUNT(*) as count
47+
-- FROM `{PROJECT_ID}.{DATASET}.errors`
48+
-- WHERE date >= CURRENT_DATE() - 1
49+
-- AND error_type = 'validation_error'
50+
-- GROUP BY error_type, error_message, stream
51+
-- ORDER BY count DESC;
52+
53+
-- Find processing errors with stack traces
54+
-- SELECT
55+
-- timestamp,
56+
-- error_message,
57+
-- JSON_EXTRACT_SCALAR(error_details, '$.exception_type') as exception,
58+
-- JSON_EXTRACT_SCALAR(error_details, '$.stack_trace') as stack_trace,
59+
-- original_payload
60+
-- FROM `{PROJECT_ID}.{DATASET}.errors`
61+
-- WHERE error_type = 'processing_error'
62+
-- ORDER BY timestamp DESC
63+
-- LIMIT 10;
64+
65+
-- Error rate by stream
66+
-- SELECT
67+
-- stream,
68+
-- error_type,
69+
-- COUNT(*) as error_count,
70+
-- ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (PARTITION BY stream), 2) as pct
71+
-- FROM `{PROJECT_ID}.{DATASET}.errors`
72+
-- WHERE date >= CURRENT_DATE() - 7
73+
-- GROUP BY stream, error_type
74+
-- ORDER BY stream, error_count DESC;

0 commit comments

Comments
 (0)