This document describes the testing strategy, conventions, and tools for eventkit.
Test-Driven Development (TDD): Write tests before implementation. See tasks.md for the TDD ordering.
Test Pyramid:
/\
/ \ E2E Tests (few, slow, high confidence)
/____\
/ \ Integration Tests (some, medium speed)
/________\
/ \ Unit Tests (many, fast, low-level)
/__________\
Coverage Target: >80% line coverage, 100% for critical paths (adapters, processor).
tests/
├── unit/ # Fast, isolated tests
│ ├── schema/
│ ├── adapters/
│ ├── processing/
│ ├── stores/
│ └── api/
├── integration/ # Multi-component tests
│ ├── test_full_pipeline.py
│ ├── test_firestore_integration.py
│ └── test_api_integration.py
├── performance/ # Throughput and latency tests
│ ├── test_throughput.py
│ └── test_latency.py
└── conftest.py # Shared fixtures
# Run all tests
pytest
# Run with coverage
pytest --cov=src/eventkit --cov-report=html
# Run specific test file
pytest tests/unit/schema/test_raw.py
# Run tests matching pattern
pytest -k "test_raw_event"
# Run with verbose output
pytest -v# Unit tests only (fast)
pytest tests/unit/
# Integration tests (requires Firestore emulator)
pytest tests/integration/
# Performance tests
pytest tests/performance/ --benchmark-only# Re-run tests on file changes
pytest-watch
# Or use pytest-xdist for parallel execution
pytest -n auto --looponfailPrinciple: Test one component in isolation. Mock all dependencies.
Example: Testing RawEvent
# tests/unit/schema/test_raw.py
from datetime import datetime, timezone
from eventkit.schema.raw import RawEvent
def test_raw_event_accepts_arbitrary_fields():
"""Story 1: Accept any JSON"""
event = RawEvent(
payload={
"type": "identify",
"custom_field": "value",
"nested": {"field": 123}
},
stream="users"
)
assert event.get("custom_field") == "value"
assert event.get("nested")["field"] == 123
assert event.stream == "users"
assert isinstance(event.received_at, datetime)
def test_raw_event_get_with_default():
"""Test get() helper with missing key"""
event = RawEvent(payload={"type": "track"}, stream="events")
assert event.get("missing_key") is None
assert event.get("missing_key", "default") == "default"Example: Testing Adapter with Mocks
# tests/unit/adapters/test_segment.py
import pytest
from eventkit.schema.raw import RawEvent
from eventkit.adapters.segment import SegmentAdapter
from eventkit.adapters.base import AdapterResult
@pytest.fixture
def adapter():
return SegmentAdapter()
def test_adapt_identify_success(adapter):
"""Story 3: Validate and normalize identify events"""
raw = RawEvent(
payload={
"type": "identify",
"userId": "user_123",
"traits": {"email": "user@example.com"}
},
stream="users"
)
result = adapter.adapt(raw)
assert result.success is True
assert result.event.type == "identify"
assert result.event.userId == "user_123"
assert result.event.traits["email"] == "user@example.com"
assert result.error is None
def test_adapt_missing_user_id_returns_error(adapter):
"""Story 6: Invalid events return error (not exception)"""
raw = RawEvent(
payload={"type": "identify", "traits": {}},
stream="users"
)
result = adapter.adapt(raw)
assert result.success is False
assert "Missing required field" in result.error
assert result.event is NonePrinciple: Test multiple components working together. Use real implementations where feasible, but run against emulators/test backends.
Setup: Firestore Emulator
# Start Firestore emulator
gcloud emulators firestore start --host-port=localhost:8080
# In another terminal, set environment variable
export FIRESTORE_EMULATOR_HOST=localhost:8080Example: End-to-End Pipeline Test
# tests/integration/test_full_pipeline.py
import pytest
from eventkit.schema.raw import RawEvent
from eventkit.adapters.segment import SegmentAdapter
from eventkit.processing.sequencer import Sequencer
from eventkit.processing.buffer import Buffer
from eventkit.processing.processor import Processor
from eventkit.stores.firestore import FirestoreEventStore, FirestoreErrorStore
@pytest.fixture
async def processor():
"""Create processor with real Firestore stores (emulator)"""
event_store = FirestoreEventStore(
project_id="test-project",
collection="events"
)
error_store = FirestoreErrorStore(
project_id="test-project",
collection="errors"
)
adapter = SegmentAdapter()
sequencer = Sequencer(num_partitions=4) # Small for tests
event_loader = EventLoader(event_store, batch_size=10, flush_interval=1.0) # Small/fast for tests
processor = Processor(adapter, sequencer, buffer, error_store)
yield processor
# Cleanup: flush buffers
for partition_id in buffer.buffers.keys():
await buffer._flush_partition(partition_id)
@pytest.mark.asyncio
async def test_valid_event_reaches_event_store(processor):
"""Story 1 + Story 2 + Story 3: Full pipeline"""
# Arrange
raw_event = RawEvent(
payload={
"type": "identify",
"userId": "user_123",
"traits": {"email": "user@example.com"}
},
stream="users"
)
# Act
await processor.enqueue(raw_event)
await processor.buffer._flush_partition(0) # Force flush for test
# Assert
events = await processor.buffer.event_store.read(stream="users", limit=1)
assert len(events) == 1
assert events[0].userId == "user_123"
assert events[0].traits["email"] == "user@example.com"
@pytest.mark.asyncio
async def test_invalid_event_goes_to_error_store(processor):
"""Story 6: Invalid events → error store (not dropped)"""
# Arrange
raw_event = RawEvent(
payload={"type": "identify"}, # Missing userId and anonymousId
stream="users"
)
# Act
await processor.enqueue(raw_event)
# Assert
errors = await processor.error_store.query_errors(limit=1)
assert len(errors) == 1
assert "Missing required field" in errors[0]["error"]
assert errors[0]["raw_payload"]["type"] == "identify"Example: API Integration Test
# tests/integration/test_api_integration.py
import pytest
from fastapi.testclient import TestClient
from eventkit.main import app
@pytest.fixture
def client():
return TestClient(app)
def test_collect_endpoint_accepts_event(client):
"""Story 1: POST /collect accepts any JSON"""
response = client.post(
"/collect/users",
json={"type": "identify", "userId": "user_123", "traits": {}}
)
assert response.status_code == 202
assert response.json()["message"] == "Event received"
assert response.json()["data"]["received"] is True
def test_batch_events(client):
"""Story 1: Batch events (array) supported"""
response = client.post(
"/collect/events",
json=[
{"type": "track", "event": "Button Clicked", "userId": "user_1"},
{"type": "track", "event": "Page Viewed", "userId": "user_2"}
]
)
assert response.status_code == 202
assert response.json()["data"]["received"] == 2Principle: Validate throughput and latency targets.
Tools: pytest-benchmark, locust
Example: Throughput Test
# tests/performance/test_throughput.py
import pytest
import asyncio
import time
from eventkit.schema.raw import RawEvent
from eventkit.processing.processor import Processor
@pytest.mark.asyncio
async def test_throughput_10k_events_per_second(processor):
"""Story 1: Throughput target (10k events/sec)"""
num_events = 10_000
events = [
RawEvent(
payload={
"type": "track",
"event": "test_event",
"userId": f"user_{i % 1000}" # Simulate 1k unique users
},
stream="events"
)
for i in range(num_events)
]
start = time.time()
# Process events
tasks = [processor.enqueue(event) for event in events]
await asyncio.gather(*tasks)
elapsed = time.time() - start
throughput = num_events / elapsed
print(f"Throughput: {throughput:.0f} events/sec")
assert throughput >= 10_000, f"Only {throughput:.0f} events/sec"Example: Latency Test
# tests/performance/test_latency.py
import pytest
import time
from fastapi.testclient import TestClient
from eventkit.main import app
@pytest.fixture
def client():
return TestClient(app)
def test_collect_endpoint_latency(client):
"""Story 1: p95 latency <100ms"""
latencies = []
num_requests = 1000
for i in range(num_requests):
start = time.time()
response = client.post(
"/collect/events",
json={"type": "track", "event": "test", "userId": f"user_{i}"}
)
latency = (time.time() - start) * 1000 # ms
latencies.append(latency)
assert response.status_code == 202
latencies.sort()
p50 = latencies[int(num_requests * 0.50)]
p95 = latencies[int(num_requests * 0.95)]
p99 = latencies[int(num_requests * 0.99)]
print(f"Latency p50: {p50:.1f}ms, p95: {p95:.1f}ms, p99: {p99:.1f}ms")
assert p50 < 50, f"p50 latency too high: {p50:.1f}ms"
assert p95 < 100, f"p95 latency too high: {p95:.1f}ms"
assert p99 < 200, f"p99 latency too high: {p99:.1f}ms"# tests/conftest.py
import pytest
import asyncio
from eventkit.schema.raw import RawEvent
from eventkit.adapters.segment import SegmentAdapter
from eventkit.processing.sequencer import Sequencer
@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture
def sample_raw_event():
"""Reusable raw event for tests"""
return RawEvent(
payload={
"type": "identify",
"userId": "test_user",
"traits": {"email": "test@example.com"}
},
stream="users"
)
@pytest.fixture
def adapter():
"""Reusable adapter instance"""
return SegmentAdapter()
@pytest.fixture
def sequencer():
"""Reusable sequencer instance"""
return Sequencer(num_partitions=16)# tests/unit/stores/test_firestore.py
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from eventkit.stores.firestore import FirestoreEventStore
from eventkit.schema.events import IdentifyEvent
@pytest.fixture
def mock_firestore_client():
"""Mock Firestore client"""
with patch("google.cloud.firestore.AsyncClient") as mock:
client = MagicMock()
mock.return_value = client
yield client
@pytest.mark.asyncio
async def test_write_batch(mock_firestore_client):
"""Test batch write to Firestore"""
store = FirestoreEventStore(project_id="test", collection="events")
events = [
IdentifyEvent(
userId=f"user_{i}",
traits={"index": i},
timestamp=datetime.now(timezone.utc)
)
for i in range(100)
]
await store.write(events)
# Verify batch was called
assert mock_firestore_client.batch.called# .github/workflows/test.yml
name: Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11", "3.12"]
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
pip install -e ".[dev]"
- name: Start Firestore Emulator
run: |
gcloud emulators firestore start --host-port=localhost:8080 &
sleep 5
env:
FIRESTORE_EMULATOR_HOST: localhost:8080
- name: Run unit tests
run: pytest tests/unit/ -v --cov=src/eventkit --cov-report=xml
- name: Run integration tests
run: pytest tests/integration/ -v
env:
FIRESTORE_EMULATOR_HOST: localhost:8080
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
files: ./coverage.xml
- name: Type check
run: mypy src/eventkit
- name: Lint
run: ruff check src/# Pattern: test_{method}_{scenario}_{expected_result}
def test_adapt_identify_with_valid_payload_returns_success()
def test_adapt_identify_with_missing_user_id_returns_error()
def test_get_partition_id_with_same_user_id_returns_same_partition()# Pattern: test_{feature}_{scenario}
async def test_full_pipeline_valid_event_reaches_event_store()
async def test_full_pipeline_invalid_event_goes_to_error_store()
def test_api_collect_endpoint_accepts_batch_events()# Pattern: test_{metric}_{target}
async def test_throughput_10k_events_per_second()
def test_latency_p95_under_100ms()pytest tests/unit/schema/test_raw.py::test_raw_event_accepts_arbitrary_fields -v -spytest --pdbpytest -spytest -lpytest --cov=src/eventkit --cov-report=html
open htmlcov/index.htmlpytest --cov=src/eventkit --cov-report=term-missingpytest --cov=src/eventkit --cov-fail-under=80def test_sequencer_performance(benchmark, sequencer):
"""Benchmark sequencer routing"""
event = IdentifyEvent(
userId="user_123",
traits={},
timestamp=datetime.now(timezone.utc)
)
result = benchmark(sequencer.get_partition_id, event)
assert 0 <= result < sequencer.num_partitionsRun benchmarks:
pytest tests/performance/ --benchmark-only# locustfile.py
from locust import HttpUser, task, between
class EventKitUser(HttpUser):
wait_time = between(0.1, 0.5)
@task
def send_identify_event(self):
self.client.post(
"/collect/users",
json={
"type": "identify",
"userId": "user_123",
"traits": {"email": "user@example.com"}
}
)Run load test:
locust -f locustfile.py --host http://localhost:8000# tests/factories.py
import factory
from datetime import datetime, timezone
from eventkit.schema.events import IdentifyEvent
class IdentifyEventFactory(factory.Factory):
class Meta:
model = IdentifyEvent
type = "identify"
userId = factory.Sequence(lambda n: f"user_{n}")
traits = factory.Dict({"email": factory.Faker("email")})
timestamp = factory.LazyFunction(lambda: datetime.now(timezone.utc))
# Usage in tests
def test_something():
event = IdentifyEventFactory()
assert event.userId.startswith("user_")- All tests pass (
pytest) - Coverage >80% (
pytest --cov) - Type checking passes (
mypy src/eventkit) - Linting passes (
ruff check src/) - Integration tests with Firestore emulator pass
- Performance tests meet targets (if applicable)
- New tests added for new functionality
- Docstrings updated