diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..85f9a92 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,32 @@ +name: CI + +on: + push: + branches: ["main", "develop"] + pull_request: + branches: ["main", "develop"] + +jobs: + test: + name: Test (Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: pip + + - name: Install dependencies + run: pip install -e ".[dev,async]" + + - name: Run tests + run: pytest tests/ -v --tb=short diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..1946675 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,91 @@ +name: Release + +on: + push: + tags: + - "v*" + +permissions: + contents: read + +jobs: + # ── 1. Run tests before publishing ──────────────────────────────────────── + test: + name: Test before release + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + + - name: Install dependencies + run: pip install -e ".[dev,async]" + + - name: Run tests + run: pytest tests/ -v --tb=short + + # ── 2. Build source distribution and wheel ──────────────────────────────── + build: + name: Build distribution + runs-on: ubuntu-latest + needs: test + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + + - name: Install build + run: pip install build + + - name: Build sdist and wheel + run: python -m build + + - name: Verify tag matches package version + run: | + TAG="${GITHUB_REF_NAME#v}" + VERSION=$(python -c " + import re, pathlib + content = pathlib.Path('pyproject.toml').read_text() + print(re.search(r'^version\s*=\s*\"(.+?)\"', content, re.MULTILINE).group(1)) + ") + echo "Tag: $TAG | Package version: $VERSION" + if [ "$TAG" != "$VERSION" ]; then + echo "ERROR: tag '$TAG' does not match package version '$VERSION'" + exit 1 + fi + + - name: Upload build artifacts + uses: actions/upload-artifact@v4 + with: + name: dist + path: dist/ + + # ── 3. Publish to PyPI via OIDC Trusted Publisher ───────────────────────── + publish: + name: Publish to PyPI + runs-on: ubuntu-latest + needs: build + environment: pypi + + permissions: + id-token: write # required for OIDC trusted publisher + + steps: + - name: Download build artifacts + uses: actions/download-artifact@v4 + with: + name: dist + path: dist/ + + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/CHANGELOG.md b/CHANGELOG.md index d363b41..6301fc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,57 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.8.4] - 2026-03-21 + +### Added + +- `AsyncLogTideClient`: full async client using `aiohttp` with the same API as the + sync client — supports `async with`, `await client.start()`, and all logging, + flush, query, and stream methods as coroutines (`pip install logtide-sdk[async]`) +- `LogTideHandler`: standard `logging.Handler` for drop-in integration with Python's + built-in logging module — forwards records to LogTide with structured exception + metadata when `exc_info=True` is used +- `PayloadLimitsOptions`: configurable safeguards against 413 errors — per-field size + cap, total entry size cap, named field exclusion, and automatic base64 removal +- `LogTideStarletteMiddleware`: standalone Starlette ASGI middleware independent of + FastAPI (`pip install logtide-sdk[starlette]`) +- `serialize_exception()` exported at top level for use in custom integrations +- `payload_limits` field on `ClientOptions` + +### Changed + +- **BREAKING** API paths updated to match v1 server contract: + - `POST /api/logs` → `POST /api/v1/ingest` + - `GET /api/logs` → `GET /api/v1/logs` + - `GET /api/logs/trace/{id}` → `GET /api/v1/logs/trace/{id}` + - `GET /api/logs/stats` → `GET /api/v1/logs/aggregated` + - `GET /api/logs/stream` → `GET /api/v1/logs/stream` +- **BREAKING** Auth header changed from `Authorization: Bearer ` to `X-API-Key: ` +- **BREAKING** Error metadata key changed from `"error"` to `"exception"`; value is now a + structured object with `type`, `message`, `language`, `stacktrace` (array of + `{file, function, line}` frames), and `raw` +- **BREAKING** `stream()` no longer blocks — it runs in a background daemon thread and + returns a `Callable[[], None]` stop function immediately +- **BREAKING** Buffer overflow no longer raises `BufferFullError`; logs are silently + dropped and `logs_dropped` is incremented (`BufferFullError` class is kept for + backwards-compatible catch blocks) +- `requests.Session` is now created once and reused across all HTTP calls for + connection reuse and reduced TCP overhead +- `datetime.utcnow()` replaced with `datetime.now(timezone.utc)` throughout; + `LogEntry.time` now includes `+00:00` timezone suffix (ISO 8601 compliant) +- Middleware `__init__.py` now uses per-framework `try/except` guards — importing + `logtide_sdk.middleware` no longer fails if only a subset of frameworks are installed + +### Fixed + +- Flask, Django, and FastAPI middleware `_log_error` methods were passing raw + `Exception` objects into the metadata dict instead of serializing them — exceptions + are now serialized via `serialize_exception()` +- `log()` triggered `flush()` while holding `_buffer_lock`, causing a potential + deadlock under concurrent access — flush is now triggered outside the lock +- `__version__` in `__init__.py` was incorrectly set to `"0.1.0"` despite the + package being at `0.1.2` + ## [0.1.0] - 2026-01-13 ### Added @@ -28,4 +79,5 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - FastAPI middleware for auto-logging HTTP requests - Full type hints support for Python 3.8+ -[0.1.0]: https://github.com/logtide-dev/logtide-sdk-python/releases/tag/v0.1.0 +[0.8.4]: https://github.com/logtide-dev/logtide-python/compare/v0.1.0...v0.8.4 +[0.1.0]: https://github.com/logtide-dev/logtide-python/releases/tag/v0.1.0 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 0fc2117..978d801 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -6,7 +6,7 @@ Thank you for your interest in contributing! 1. Clone the repository: ```bash -git clone https://github.com/logtide-dev/logtide-sdk-python.git +git clone https://github.com/logtide-dev/logtide-python.git cd logtide-sdk-python ``` diff --git a/README.md b/README.md index 1fcd357..63c08b0 100644 --- a/README.md +++ b/README.md @@ -8,34 +8,36 @@ PyPI License Python - Release + Release

- Official Python SDK for LogTide with automatic batching, retry logic, circuit breaker, query API, live streaming, and middleware support. + Official Python SDK for LogTide — self-hosted log management with async client, logging integration, batching, retry, circuit breaker, and middleware.

--- ## Features +- **Sync & async clients** — `LogTideClient` (requests) and `AsyncLogTideClient` (aiohttp) +- **stdlib `logging` integration** — drop-in `LogTideHandler` for existing logging setups - **Automatic batching** with configurable size and interval - **Retry logic** with exponential backoff - **Circuit breaker** pattern for fault tolerance -- **Max buffer size** with drop policy to prevent memory leaks +- **Payload limits** — field truncation, base64 removal, field exclusion, max entry size +- **Max buffer size** with silent drop policy to prevent memory leaks - **Query API** for searching and filtering logs - **Live tail** with Server-Sent Events (SSE) - **Trace ID context** for distributed tracing - **Global metadata** added to all logs -- **Structured error serialization** -- **Internal metrics** (logs sent, errors, latency, etc.) -- **Flask, Django & FastAPI middleware** for auto-logging HTTP requests +- **Structured exception serialization** with parsed stack frames +- **Internal metrics** (logs sent, errors, latency, circuit breaker trips) +- **Flask, Django, FastAPI & Starlette middleware** for auto-logging HTTP requests - **Full Python 3.8+ support** with type hints ## Requirements - Python 3.8 or higher -- pip or poetry ## Installation @@ -46,20 +48,23 @@ pip install logtide-sdk ### Optional Dependencies ```bash -# For async support +# Async client (AsyncLogTideClient) pip install logtide-sdk[async] -# For Flask middleware +# Flask middleware pip install logtide-sdk[flask] -# For Django middleware +# Django middleware pip install logtide-sdk[django] -# For FastAPI middleware +# FastAPI middleware pip install logtide-sdk[fastapi] +# Starlette middleware (standalone, without FastAPI) +pip install logtide-sdk[starlette] + # Install all extras -pip install logtide-sdk[async,flask,django,fastapi] +pip install logtide-sdk[async,flask,django,fastapi,starlette] ``` ## Quick Start @@ -74,11 +79,10 @@ client = LogTideClient( ) ) -# Send logs client.info('api-gateway', 'Server started', {'port': 3000}) client.error('database', 'Connection failed', Exception('Timeout')) -# Graceful shutdown (automatic via atexit, but can be called manually) +# Graceful shutdown (also registered automatically via atexit) client.close() ``` @@ -92,109 +96,160 @@ client.close() |--------|------|---------|-------------| | `api_url` | `str` | **required** | Base URL of your LogTide instance | | `api_key` | `str` | **required** | Project API key (starts with `lp_`) | -| `batch_size` | `int` | `100` | Number of logs to batch before sending | -| `flush_interval` | `int` | `5000` | Interval in ms to auto-flush logs | +| `batch_size` | `int` | `100` | Logs per batch before an immediate flush | +| `flush_interval` | `int` | `5000` | Auto-flush interval in ms | ### Advanced Options | Option | Type | Default | Description | |--------|------|---------|-------------| -| `max_buffer_size` | `int` | `10000` | Max logs in buffer (prevents memory leak) | -| `max_retries` | `int` | `3` | Max retry attempts on failure | -| `retry_delay_ms` | `int` | `1000` | Initial retry delay (exponential backoff) | -| `circuit_breaker_threshold` | `int` | `5` | Failures before opening circuit | -| `circuit_breaker_reset_ms` | `int` | `30000` | Time before retrying after circuit opens | -| `enable_metrics` | `bool` | `True` | Track internal metrics | -| `debug` | `bool` | `False` | Enable debug logging to console | -| `global_metadata` | `dict` | `{}` | Metadata added to all logs | -| `auto_trace_id` | `bool` | `False` | Auto-generate trace IDs for logs | - -### Example: Full Configuration +| `max_buffer_size` | `int` | `10000` | Max buffered logs; excess are silently dropped | +| `max_retries` | `int` | `3` | Max retry attempts on send failure | +| `retry_delay_ms` | `int` | `1000` | Initial retry delay (doubles each attempt) | +| `circuit_breaker_threshold` | `int` | `5` | Consecutive failures before opening circuit | +| `circuit_breaker_reset_ms` | `int` | `30000` | Time before testing a half-open circuit | +| `debug` | `bool` | `False` | Print debug output to console | +| `global_metadata` | `dict` | `{}` | Metadata merged into every log entry | +| `auto_trace_id` | `bool` | `False` | Auto-generate a UUID trace ID per log | +| `payload_limits` | `PayloadLimitsOptions` | see below | Safeguards against oversized payloads | + +### Payload Limits + +`PayloadLimitsOptions` prevents 413 errors from oversized entries. + +| Field | Default | Description | +|-------|---------|-------------| +| `max_field_size` | `10 * 1024` (10 KB) | Max length of any single string field | +| `max_log_size` | `100 * 1024` (100 KB) | Max total serialized entry size | +| `exclude_fields` | `[]` | Field names replaced with `"[EXCLUDED]"` | +| `truncation_marker` | `"...[TRUNCATED]"` | Appended to truncated strings | ```python -import os +from logtide_sdk import LogTideClient, ClientOptions, PayloadLimitsOptions client = LogTideClient( ClientOptions( api_url='http://localhost:8080', api_key='lp_your_api_key_here', - - # Batching - batch_size=100, - flush_interval=5000, - - # Buffer management - max_buffer_size=10000, - - # Retry with exponential backoff (1s -> 2s -> 4s) - max_retries=3, - retry_delay_ms=1000, - - # Circuit breaker - circuit_breaker_threshold=5, - circuit_breaker_reset_ms=30000, - - # Metrics & debugging - enable_metrics=True, - debug=True, - - # Global context - global_metadata={ - 'env': os.getenv('APP_ENV'), - 'version': '1.0.0', - 'hostname': os.uname().nodename, - }, - - # Auto trace IDs - auto_trace_id=False, + payload_limits=PayloadLimitsOptions( + max_field_size=5 * 1024, + exclude_fields=['password', 'token'], + ), ) ) ``` +Base64-encoded strings (data URIs or long base64 blobs) are automatically replaced with `"[BASE64 DATA REMOVED]"`. + --- -## Logging Methods +## Sync Client -### Basic Logging +### Logging Methods ```python -from logtide_sdk import LogLevel - -client.debug('service-name', 'Debug message') -client.info('service-name', 'Info message', {'userId': 123}) -client.warn('service-name', 'Warning message') -client.error('service-name', 'Error message', {'custom': 'data'}) -client.critical('service-name', 'Critical message') +client.debug('service', 'Debug message') +client.info('service', 'Info message', {'userId': 123}) +client.warn('service', 'Warning message') +client.error('service', 'Error message', {'custom': 'data'}) +client.critical('service', 'Critical message') ``` -### Error Logging with Auto-Serialization +### Exception Auto-Serialization -The SDK automatically serializes `Exception` objects: +Pass an `Exception` directly to `error()` or `critical()` — it is serialized automatically: ```python try: raise RuntimeError('Database timeout') except Exception as e: - # Automatically serializes error with stack trace client.error('database', 'Query failed', e) ``` -Generated log metadata: +Generated metadata: ```json { - "error": { - "name": "RuntimeError", + "exception": { + "type": "RuntimeError", "message": "Database timeout", - "stack": "Traceback (most recent call last):\n ..." + "language": "python", + "stacktrace": [ + {"file": "app.py", "function": "run_query", "line": 42} + ], + "raw": "Traceback (most recent call last):\n ..." } } ``` --- -## Trace ID Context +## Async Client + +`AsyncLogTideClient` is the async equivalent, using `aiohttp`. Best used as an async context manager. + +```bash +pip install logtide-sdk[async] +``` + +```python +import asyncio +from logtide_sdk import AsyncLogTideClient, ClientOptions + +async def main(): + async with AsyncLogTideClient(ClientOptions( + api_url='http://localhost:8080', + api_key='lp_your_api_key_here', + )) as client: + await client.info('my-service', 'Hello from async!') + await client.error('my-service', 'Something failed', Exception('oops')) + +asyncio.run(main()) +``` -Track requests across services with trace IDs. +Manual lifecycle (without context manager): + +```python +client = AsyncLogTideClient(options) +await client.start() # starts background flush loop +try: + await client.info('svc', 'message') +finally: + await client.close() +``` + +All sync logging, query, stream, and metrics methods have async equivalents. + +--- + +## stdlib `logging` Integration + +`LogTideHandler` is a standard `logging.Handler` — drop it into any existing logging setup. + +```python +import logging +from logtide_sdk import LogTideClient, ClientOptions, LogTideHandler + +client = LogTideClient(ClientOptions( + api_url='http://localhost:8080', + api_key='lp_your_api_key_here', +)) + +handler = LogTideHandler(client=client, service='my-service') +handler.setLevel(logging.WARNING) + +logger = logging.getLogger(__name__) +logger.addHandler(handler) + +# These are forwarded to LogTide automatically +logger.warning('Low disk space') +logger.error('Unhandled exception', exc_info=True) +``` + +Exception info is serialized with full structured stack frames when `exc_info=True` is used. + +--- + +## Trace ID Context ### Manual Trace ID @@ -202,10 +257,10 @@ Track requests across services with trace IDs. client.set_trace_id('request-123') client.info('api', 'Request received') -client.info('database', 'Querying users') +client.info('db', 'Querying users') client.info('api', 'Response sent') -client.set_trace_id(None) # Clear context +client.set_trace_id(None) # clear ``` ### Scoped Trace ID (Context Manager) @@ -214,7 +269,7 @@ client.set_trace_id(None) # Clear context with client.with_trace_id('request-456'): client.info('api', 'Processing in context') client.warn('cache', 'Cache miss') -# Trace ID automatically restored after context +# Trace ID automatically restored after block ``` ### Auto-Generated Trace ID @@ -229,8 +284,6 @@ with client.with_new_trace_id(): ## Query API -Search and retrieve logs programmatically. - ### Basic Query ```python @@ -263,13 +316,11 @@ result = client.query(QueryOptions(q='timeout', limit=50)) ```python logs = client.get_by_trace_id('trace-123') -print(f"Trace has {len(logs)} logs") ``` ### Aggregated Statistics ```python -from datetime import datetime, timedelta from logtide_sdk import AggregatedStatsOptions stats = client.get_aggregated_stats( @@ -288,47 +339,46 @@ for service in stats.top_services: ## Live Streaming (SSE) -Stream logs in real-time using Server-Sent Events. +`stream()` runs in a background daemon thread and returns immediately with a stop function. ```python def handle_log(log): print(f"[{log['time']}] {log['level']}: {log['message']}") -def handle_error(error): - print(f"Stream error: {error}") - -client.stream( +stop = client.stream( on_log=handle_log, - on_error=handle_error, - filters={ - 'service': 'api-gateway', - 'level': 'error', - } + on_error=lambda e: print(f"Stream error: {e}"), + filters={'service': 'api-gateway', 'level': 'error'}, ) -# Note: This blocks. Run in separate thread for production. +# ... later, to stop: +stop() +``` + +Async streaming runs as a cancellable coroutine: + +```python +task = asyncio.create_task(client.stream(on_log=handle_log)) +# ... later: +task.cancel() ``` --- ## Metrics -Track SDK performance and health. - ```python metrics = client.get_metrics() -print(f"Logs sent: {metrics.logs_sent}") -print(f"Logs dropped: {metrics.logs_dropped}") -print(f"Errors: {metrics.errors}") -print(f"Retries: {metrics.retries}") -print(f"Avg latency: {metrics.avg_latency_ms}ms") -print(f"Circuit breaker trips: {metrics.circuit_breaker_trips}") +print(f"Logs sent: {metrics.logs_sent}") +print(f"Logs dropped: {metrics.logs_dropped}") +print(f"Errors: {metrics.errors}") +print(f"Retries: {metrics.retries}") +print(f"Avg latency: {metrics.avg_latency_ms:.1f}ms") +print(f"Circuit breaker trips: {metrics.circuit_breaker_trips}") -# Get circuit breaker state -print(client.get_circuit_breaker_state()) # CLOSED|OPEN|HALF_OPEN +print(client.get_circuit_breaker_state()) # CLOSED | OPEN | HALF_OPEN -# Reset metrics client.reset_metrics() ``` @@ -336,11 +386,7 @@ client.reset_metrics() ## Middleware Integration -LogTide provides ready-to-use middleware for popular frameworks. - -### Flask Middleware - -Auto-log all HTTP requests and responses. +### Flask ```python from flask import Flask @@ -348,13 +394,10 @@ from logtide_sdk import LogTideClient, ClientOptions from logtide_sdk.middleware import LogTideFlaskMiddleware app = Flask(__name__) - -client = LogTideClient( - ClientOptions( - api_url='http://localhost:8080', - api_key='lp_your_api_key_here', - ) -) +client = LogTideClient(ClientOptions( + api_url='http://localhost:8080', + api_key='lp_your_api_key_here', +)) LogTideFlaskMiddleware( app, @@ -366,31 +409,25 @@ LogTideFlaskMiddleware( ) ``` -**Logged automatically:** -- Request: `GET /api/users` -- Response: `GET /api/users 200 (45ms)` -- Errors: `Request error: Internal Server Error` - -### Django Middleware +### Django ```python # settings.py -MIDDLEWARE = [ - 'logtide_sdk.middleware.LogTideDjangoMiddleware', -] - from logtide_sdk import LogTideClient, ClientOptions -LOGTIDE_CLIENT = LogTideClient( - ClientOptions( - api_url='http://localhost:8080', - api_key='lp_your_api_key_here', - ) -) +LOGTIDE_CLIENT = LogTideClient(ClientOptions( + api_url='http://localhost:8080', + api_key='lp_your_api_key_here', +)) LOGTIDE_SERVICE_NAME = 'django-api' + +MIDDLEWARE = [ + 'logtide_sdk.middleware.LogTideDjangoMiddleware', + # ... +] ``` -### FastAPI Middleware +### FastAPI ```python from fastapi import FastAPI @@ -398,21 +435,36 @@ from logtide_sdk import LogTideClient, ClientOptions from logtide_sdk.middleware import LogTideFastAPIMiddleware app = FastAPI() +client = LogTideClient(ClientOptions( + api_url='http://localhost:8080', + api_key='lp_your_api_key_here', +)) -client = LogTideClient( - ClientOptions( - api_url='http://localhost:8080', - api_key='lp_your_api_key_here', - ) -) +app.add_middleware(LogTideFastAPIMiddleware, client=client, service_name='fastapi-api') +``` -app.add_middleware( - LogTideFastAPIMiddleware, - client=client, - service_name='fastapi-api', -) +### Starlette (standalone) + +```bash +pip install logtide-sdk[starlette] +``` + +```python +from starlette.applications import Starlette +from logtide_sdk import LogTideClient, ClientOptions +from logtide_sdk.middleware import LogTideStarletteMiddleware + +app = Starlette() +client = LogTideClient(ClientOptions( + api_url='http://localhost:8080', + api_key='lp_your_api_key_here', +)) + +app.add_middleware(LogTideStarletteMiddleware, client=client, service_name='starlette-api') ``` +All middleware auto-logs requests, responses (with duration and status code), and errors (with serialized exception metadata). Health check paths (`/health`, `/healthz`) are skipped by default. + --- ## Examples @@ -421,72 +473,40 @@ See the [examples/](./examples) directory for complete working examples: - **[basic.py](./examples/basic.py)** - Simple usage - **[advanced.py](./examples/advanced.py)** - All advanced features -- **[flask_example.py](./examples/flask_example.py)** - Flask integration -- **[fastapi_example.py](./examples/fastapi_example.py)** - FastAPI integration --- ## Best Practices -### 1. Always Close on Shutdown +### Use Global Metadata ```python -import atexit - -# Automatic cleanup (already registered by client) -# Or manually: -atexit.register(client.close) +client = LogTideClient(ClientOptions( + api_url='http://localhost:8080', + api_key='lp_your_api_key_here', + global_metadata={ + 'env': os.getenv('APP_ENV', 'production'), + 'version': '2.0.0', + 'region': 'eu-west-1', + }, +)) ``` -### 2. Use Global Metadata +### Monitor Metrics in Production ```python -client = LogTideClient( - ClientOptions( - api_url='http://localhost:8080', - api_key='lp_your_api_key_here', - global_metadata={ - 'env': os.getenv('ENV'), - 'version': '1.0.0', - 'region': 'us-east-1', - }, - ) -) -``` - -### 3. Enable Debug Mode in Development - -```python -client = LogTideClient( - ClientOptions( - api_url='http://localhost:8080', - api_key='lp_your_api_key_here', - debug=os.getenv('ENV') == 'development', - ) -) -``` - -### 4. Monitor Metrics in Production - -```python -import time import threading -def monitor_metrics(): +def _monitor(): while True: - metrics = client.get_metrics() - - if metrics.logs_dropped > 0: - print(f"Warning: Logs dropped: {metrics.logs_dropped}") - - if metrics.circuit_breaker_trips > 0: - print("Error: Circuit breaker is OPEN!") - + m = client.get_metrics() + if m.logs_dropped > 0: + print(f"WARNING: {m.logs_dropped} logs dropped") + if m.circuit_breaker_trips > 0: + print("ERROR: Circuit breaker tripped") time.sleep(60) -# Run in background thread -monitor_thread = threading.Thread(target=monitor_metrics, daemon=True) -monitor_thread.start() +threading.Thread(target=_monitor, daemon=True).start() ``` --- @@ -497,10 +517,10 @@ Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for gui ## License -MIT License - see [LICENSE](LICENSE) for details. +MIT License — see [LICENSE](LICENSE) for details. ## Links - [LogTide Website](https://logtide.dev) - [Documentation](https://logtide.dev/docs/sdks/python/) -- [GitHub Issues](https://github.com/logtide-dev/logtide-sdk-python/issues) +- [GitHub Issues](https://github.com/logtide-dev/logtide-python/issues) diff --git a/examples/advanced.py b/examples/advanced.py index 99261a9..7b2f955 100644 --- a/examples/advanced.py +++ b/examples/advanced.py @@ -92,7 +92,7 @@ print("Top services:", stats.top_services) print("Top errors:", stats.top_errors) -# Live streaming +# Live streaming — non-blocking, returns a stop function def handle_log(log): print(f"[{log['time']}] {log['level']}: {log['message']}") @@ -101,8 +101,10 @@ def handle_error(error): print(f"Stream error: {error}") -# Note: This blocks. Run in separate thread for production -# client.stream(on_log=handle_log, on_error=handle_error, filters={'level': 'error'}) +# stream() starts a background daemon thread and returns immediately +stop = client.stream(on_log=handle_log, on_error=handle_error, filters={"level": "error"}) +# When done streaming: +# stop() # Metrics metrics = client.get_metrics() diff --git a/logtide_sdk/__init__.py b/logtide_sdk/__init__.py index e495a39..34a2fb9 100644 --- a/logtide_sdk/__init__.py +++ b/logtide_sdk/__init__.py @@ -1,8 +1,17 @@ """LogTide SDK - Official Python SDK for LogTide.""" -from .client import LogTideClient +from .client import LogTideClient, serialize_exception + +_has_async = False +try: + from .async_client import AsyncLogTideClient + + _has_async = True +except ImportError: + pass # type: ignore[assignment] from .enums import CircuitState, LogLevel from .exceptions import BufferFullError, CircuitBreakerOpenError, LogTideError +from .handler import LogTideHandler from .models import ( AggregatedStatsOptions, AggregatedStatsResponse, @@ -10,14 +19,19 @@ ClientOptions, LogEntry, LogsResponse, + PayloadLimitsOptions, QueryOptions, ) -__version__ = "0.1.0" +__version__ = "0.8.4" __all__ = [ - # Client + # Clients "LogTideClient", + # Logging integration + "LogTideHandler", + # Error serialization utility + "serialize_exception", # Models "LogEntry", "ClientOptions", @@ -26,6 +40,7 @@ "ClientMetrics", "LogsResponse", "AggregatedStatsResponse", + "PayloadLimitsOptions", # Enums "LogLevel", "CircuitState", @@ -34,3 +49,6 @@ "CircuitBreakerOpenError", "BufferFullError", ] + +if _has_async: + __all__.append("AsyncLogTideClient") diff --git a/logtide_sdk/async_client.py b/logtide_sdk/async_client.py new file mode 100644 index 0000000..8488485 --- /dev/null +++ b/logtide_sdk/async_client.py @@ -0,0 +1,523 @@ +"""Async LogTide SDK client using aiohttp.""" + +import asyncio +import dataclasses +import json +import time +import uuid +from threading import Lock as ThreadingLock +from typing import Any, Callable, Dict, List, Optional, Union + +try: + import aiohttp +except ImportError: + raise ImportError( + "aiohttp is required for AsyncLogTideClient. " + "Install it with: pip install logtide-sdk[async]" + ) + +from .circuit_breaker import CircuitBreaker +from .client import _process_value, serialize_exception +from .enums import CircuitState, LogLevel +from .exceptions import CircuitBreakerOpenError +from .models import ( + AggregatedStatsOptions, + AggregatedStatsResponse, + ClientMetrics, + ClientOptions, + LogEntry, + LogsResponse, + PayloadLimitsOptions, + QueryOptions, +) + + +class AsyncLogTideClient: + """ + Async LogTide SDK Client. + + Async equivalent of LogTideClient using aiohttp. Designed for use in + asyncio-based applications. Best used as an async context manager. + + Example: + async with AsyncLogTideClient(ClientOptions(...)) as client: + await client.info('my-service', 'Hello from async!') + + Or with manual lifecycle management: + client = AsyncLogTideClient(options) + await client.start() # begin background flush loop + try: + await client.info('my-service', 'message') + finally: + await client.close() + """ + + def __init__(self, options: ClientOptions) -> None: + """ + Initialize async LogTide client. + + Args: + options: Client configuration options (same as LogTideClient) + """ + self.options = options + self._buffer: List[LogEntry] = [] + self._trace_id: Optional[str] = None + self._buffer_lock: Optional[asyncio.Lock] = None # created lazily in first async call + self._metrics_lock = ThreadingLock() + self._metrics = ClientMetrics() + self._circuit_breaker = CircuitBreaker( + threshold=options.circuit_breaker_threshold, + reset_timeout_ms=options.circuit_breaker_reset_ms, + ) + self._latency_window: List[float] = [] + self._payload_limits = options.payload_limits or PayloadLimitsOptions() + self._session: Optional[aiohttp.ClientSession] = None + self._flush_task: Optional[Any] = None # asyncio.Task[None] + self._closed = False + + if self.options.debug: + print(f"[LogTide] Async client initialized: {options.api_url}") + + # ----------------------------------------------------------------------- + # Lifecycle + # ----------------------------------------------------------------------- + + async def start(self) -> None: + """ + Start the background flush loop. Called automatically by __aenter__. + Only needed when not using the async context manager. + """ + # Eagerly create the session so concurrent callers don't race on first use. + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession() + if self.options.flush_interval > 0 and self._flush_task is None: + self._flush_task = asyncio.create_task(self._flush_loop()) + + async def __aenter__(self) -> "AsyncLogTideClient": + await self.start() + return self + + async def __aexit__(self, *args: Any) -> None: + await self.close() + + async def close(self) -> None: + """Cancel the flush loop, flush remaining logs, and close the HTTP session.""" + if self._closed: + return + + # Set _closed immediately so new log() calls are rejected from this point. + # We then drain the buffer directly, bypassing the _closed guard in flush(). + self._closed = True + + if self._flush_task is not None: + self._flush_task.cancel() + try: + await self._flush_task + except asyncio.CancelledError: + pass + + await self._drain() + + if self._session is not None and not self._session.closed: + await self._session.close() + + if self.options.debug: + print("[LogTide] Async client closed") + + # ----------------------------------------------------------------------- + # Trace ID helpers + # ----------------------------------------------------------------------- + + def set_trace_id(self, trace_id: Optional[str]) -> None: + """Set trace ID for subsequent logs.""" + self._trace_id = trace_id + + def get_trace_id(self) -> Optional[str]: + """Return the current trace ID.""" + return self._trace_id + + # ----------------------------------------------------------------------- + # Logging methods + # ----------------------------------------------------------------------- + + async def log(self, entry: LogEntry) -> None: + """ + Buffer a log entry. Silently drops when buffer is full. + + Args: + entry: Pre-built log entry + """ + if self._closed: + return + + if entry.trace_id is None: + if self.options.auto_trace_id: + entry.trace_id = str(uuid.uuid4()) + elif self._trace_id is not None: + entry.trace_id = self._trace_id + + if entry.metadata is None: + entry.metadata = {} + + if self.options.global_metadata: + entry.metadata = {**self.options.global_metadata, **entry.metadata} + + self._apply_payload_limits(entry) + + should_flush = False + if self._buffer_lock is None: + self._buffer_lock = asyncio.Lock() + async with self._buffer_lock: + if len(self._buffer) >= self.options.max_buffer_size: + if self.options.debug: + print(f"[LogTide] Buffer full, dropping log: {entry.message}") + with self._metrics_lock: + self._metrics.logs_dropped += 1 + return + self._buffer.append(entry) + if len(self._buffer) >= self.options.batch_size: + should_flush = True + + if should_flush: + await self.flush() + + async def debug( + self, service: str, message: str, metadata: Optional[Dict[str, Any]] = None + ) -> None: + """Log a DEBUG-level message.""" + await self.log( + LogEntry( + service=service, + level=LogLevel.DEBUG, + message=message, + metadata=metadata or {}, + ) + ) + + async def info( + self, service: str, message: str, metadata: Optional[Dict[str, Any]] = None + ) -> None: + """Log an INFO-level message.""" + await self.log( + LogEntry( + service=service, + level=LogLevel.INFO, + message=message, + metadata=metadata or {}, + ) + ) + + async def warn( + self, service: str, message: str, metadata: Optional[Dict[str, Any]] = None + ) -> None: + """Log a WARN-level message.""" + await self.log( + LogEntry( + service=service, + level=LogLevel.WARN, + message=message, + metadata=metadata or {}, + ) + ) + + async def error( + self, + service: str, + message: str, + metadata_or_error: Union[Dict[str, Any], Exception, None] = None, + ) -> None: + """Log an ERROR-level message. Accepts an Exception for automatic serialization.""" + metadata = self._process_metadata_or_error(metadata_or_error) + await self.log( + LogEntry( + service=service, + level=LogLevel.ERROR, + message=message, + metadata=metadata, + ) + ) + + async def critical( + self, + service: str, + message: str, + metadata_or_error: Union[Dict[str, Any], Exception, None] = None, + ) -> None: + """Log a CRITICAL-level message. Accepts an Exception for automatic serialization.""" + metadata = self._process_metadata_or_error(metadata_or_error) + await self.log( + LogEntry( + service=service, + level=LogLevel.CRITICAL, + message=message, + metadata=metadata, + ) + ) + + # ----------------------------------------------------------------------- + # Flush & send + # ----------------------------------------------------------------------- + + async def flush(self) -> None: + """Flush all buffered logs to the LogTide API. No-op after close().""" + if self._closed: + return + await self._drain() + + async def _drain(self) -> None: + """Drain the buffer unconditionally (used internally, including during close).""" + if self._buffer_lock is None: + self._buffer_lock = asyncio.Lock() + async with self._buffer_lock: + if not self._buffer: + return + logs_to_send = self._buffer[:] + self._buffer.clear() + + await self._send_logs_with_retry(logs_to_send) + + # ----------------------------------------------------------------------- + # Query / read API + # ----------------------------------------------------------------------- + + async def query(self, options: QueryOptions) -> LogsResponse: + """Query logs with optional filters.""" + params: Dict[str, Any] = {"limit": options.limit, "offset": options.offset} + if options.service: + params["service"] = options.service + if options.level: + params["level"] = options.level.value + if options.q: + params["q"] = options.q + if options.from_time: + params["from"] = options.from_time.isoformat() + if options.to_time: + params["to"] = options.to_time.isoformat() + + async with self._get_session().get( + f"{self.options.api_url}/api/v1/logs", + headers=self._get_headers(), + params=params, + ) as response: + response.raise_for_status() + data = await response.json() + return LogsResponse(logs=data.get("logs", []), total=data.get("total", 0)) + + async def get_by_trace_id(self, trace_id: str) -> List[Dict[str, Any]]: + """Return all log entries for a given trace ID.""" + async with self._get_session().get( + f"{self.options.api_url}/api/v1/logs/trace/{trace_id}", + headers=self._get_headers(), + ) as response: + response.raise_for_status() + return await response.json() + + async def get_aggregated_stats( + self, options: AggregatedStatsOptions + ) -> AggregatedStatsResponse: + """Return aggregated statistics over a time range.""" + params: Dict[str, Any] = { + "from": options.from_time.isoformat(), + "to": options.to_time.isoformat(), + "interval": options.interval, + } + if options.service: + params["service"] = options.service + + async with self._get_session().get( + f"{self.options.api_url}/api/v1/logs/aggregated", + headers=self._get_headers(), + params=params, + ) as response: + response.raise_for_status() + data = await response.json() + return AggregatedStatsResponse( + timeseries=data.get("timeseries", []), + top_services=data.get("top_services", []), + top_errors=data.get("top_errors", []), + ) + + async def stream( + self, + on_log: Callable[[Dict[str, Any]], None], + on_error: Optional[Callable[[Exception], None]] = None, + filters: Optional[Dict[str, str]] = None, + ) -> None: + """ + Stream logs in real-time via SSE. This coroutine runs until cancelled. + + Wrap with asyncio.create_task() to run concurrently. + + Example: + task = asyncio.create_task(client.stream(on_log=handle_log)) + # ... later: + task.cancel() + """ + params: Dict[str, str] = dict(filters or {}) + params["token"] = self.options.api_key + + async with self._get_session().get( + f"{self.options.api_url}/api/v1/logs/stream", + headers=self._get_headers(), + params=params, + ) as response: + response.raise_for_status() + async for line_bytes in response.content: + line = line_bytes.decode("utf-8").strip() + if line.startswith("data: "): + try: + on_log(json.loads(line[6:])) + except Exception as e: + if on_error: + on_error(e) + + # ----------------------------------------------------------------------- + # Metrics + # ----------------------------------------------------------------------- + + def get_metrics(self) -> ClientMetrics: + """Return a snapshot of current SDK metrics.""" + with self._metrics_lock: + return dataclasses.replace(self._metrics) + + def reset_metrics(self) -> None: + """Reset all metrics to zero.""" + with self._metrics_lock: + self._metrics = ClientMetrics() + self._latency_window.clear() + + def get_circuit_breaker_state(self) -> CircuitState: + """Return the current circuit breaker state.""" + return self._circuit_breaker.state + + # ----------------------------------------------------------------------- + # Private helpers + # ----------------------------------------------------------------------- + + def _get_session(self) -> aiohttp.ClientSession: + """Return (or lazily create) the aiohttp session.""" + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession() + return self._session + + def _get_headers(self) -> Dict[str, str]: + return { + "X-API-Key": self.options.api_key, + "Content-Type": "application/json", + } + + async def _flush_loop(self) -> None: + """Background coroutine: flush on a fixed interval until closed.""" + interval = self.options.flush_interval / 1000.0 + while not self._closed: + await asyncio.sleep(interval) + if not self._closed: + await self.flush() + + async def _send_logs_with_retry(self, logs: List[LogEntry]) -> None: + """Send a batch with exponential backoff and circuit breaker.""" + attempt = 0 + delay = self.options.retry_delay_ms / 1000.0 + state_before = self._circuit_breaker.state + + while attempt <= self.options.max_retries: + try: + if self._circuit_breaker.state == CircuitState.OPEN: + if self.options.debug: + print("[LogTide] Circuit breaker open, skipping send") + with self._metrics_lock: + self._metrics.logs_dropped += len(logs) + raise CircuitBreakerOpenError("Circuit breaker is open") + + start_time = time.time() + await self._send_logs(logs) + latency = (time.time() - start_time) * 1000 + + self._circuit_breaker.record_success() + self._update_latency(latency) + + with self._metrics_lock: + self._metrics.logs_sent += len(logs) + + if self.options.debug: + print(f"[LogTide] Sent {len(logs)} logs ({latency:.2f}ms)") + + return + + except CircuitBreakerOpenError: + break + + except Exception as e: + attempt += 1 + self._circuit_breaker.record_failure() + + with self._metrics_lock: + self._metrics.errors += 1 + if attempt <= self.options.max_retries: + self._metrics.retries += 1 + + if attempt > self.options.max_retries: + if self.options.debug: + print( + f"[LogTide] Failed to send logs after {attempt} attempts: {e}" + ) + with self._metrics_lock: + self._metrics.logs_dropped += len(logs) + break + + if self.options.debug: + print( + f"[LogTide] Retry {attempt}/{self.options.max_retries} in {delay}s" + ) + + await asyncio.sleep(delay) + delay *= 2 + + if (self._circuit_breaker.state == CircuitState.OPEN + and state_before != CircuitState.OPEN): + with self._metrics_lock: + self._metrics.circuit_breaker_trips += 1 + + async def _send_logs(self, logs: List[LogEntry]) -> None: + """POST a serialized batch to /api/v1/ingest.""" + payload = {"logs": [log.to_dict() for log in logs]} + async with self._get_session().post( + f"{self.options.api_url}/api/v1/ingest", + headers=self._get_headers(), + json=payload, + ) as response: + response.raise_for_status() + + def _process_metadata_or_error( + self, metadata_or_error: Union[Dict[str, Any], Exception, None] + ) -> Dict[str, Any]: + if metadata_or_error is None: + return {} + if isinstance(metadata_or_error, dict): + return metadata_or_error + return {"exception": serialize_exception(metadata_or_error)} + + def _apply_payload_limits(self, entry: LogEntry) -> None: + """Enforce payload limits on entry.metadata in-place.""" + if not entry.metadata: + return + lim = self._payload_limits + entry.metadata = _process_value(entry.metadata, "root", lim) + + raw = json.dumps(entry.to_dict()) + if len(raw.encode()) > lim.max_log_size: + if self.options.debug: + print( + f"[LogTide] Log entry too large ({len(raw)} bytes), truncating metadata" + ) + entry.metadata = {"_truncated": True, "_original_size": len(raw.encode())} + + def _update_latency(self, latency: float) -> None: + with self._metrics_lock: + self._latency_window.append(latency) + if len(self._latency_window) > 100: + self._latency_window.pop(0) + if self._latency_window: + self._metrics.avg_latency_ms = sum(self._latency_window) / len( + self._latency_window + ) diff --git a/logtide_sdk/circuit_breaker.py b/logtide_sdk/circuit_breaker.py index a90c3cb..d33552b 100644 --- a/logtide_sdk/circuit_breaker.py +++ b/logtide_sdk/circuit_breaker.py @@ -53,7 +53,9 @@ def record_failure(self) -> None: self._failure_count += 1 self._last_failure_time = time.time() - if self._failure_count >= self.threshold: + # A single failure in HALF_OPEN immediately re-opens the circuit. + # In CLOSED state, only open once the threshold is reached. + if self._state == CircuitState.HALF_OPEN or self._failure_count >= self.threshold: self._state = CircuitState.OPEN def call(self, func: Callable[[], T]) -> T: diff --git a/logtide_sdk/client.py b/logtide_sdk/client.py index 344a9ca..f090aac 100644 --- a/logtide_sdk/client.py +++ b/logtide_sdk/client.py @@ -1,18 +1,21 @@ """Main LogTide SDK client implementation.""" import atexit +import dataclasses +import json +import re import time import traceback import uuid from contextlib import contextmanager -from threading import Lock, Timer +from threading import Event, Lock, Thread, Timer from typing import Any, Callable, Dict, Iterator, List, Optional, Union import requests from .circuit_breaker import CircuitBreaker from .enums import CircuitState, LogLevel -from .exceptions import BufferFullError, CircuitBreakerOpenError +from .exceptions import CircuitBreakerOpenError from .models import ( AggregatedStatsOptions, AggregatedStatsResponse, @@ -20,16 +23,95 @@ ClientOptions, LogEntry, LogsResponse, + PayloadLimitsOptions, QueryOptions, ) +# --------------------------------------------------------------------------- +# Module-level helpers (importable by async_client and middleware) +# --------------------------------------------------------------------------- + +_BASE64_RE = re.compile(r"^[A-Za-z0-9+/=]{100,}$") + + +def _looks_like_base64(s: str) -> bool: + """Return True if the string looks like base64-encoded or data-URI data.""" + if s.startswith("data:"): + return True + return bool(_BASE64_RE.match(s.replace("\n", "").replace("\r", ""))) + + +def serialize_exception(exc: BaseException) -> Dict[str, Any]: + """ + Serialize an exception into a structured format. + + Returns a dict with keys: type, message, language, stacktrace, raw. + stacktrace is a list of frame dicts: {file, function, line}. + Chained exceptions (exc.__cause__) are serialized recursively as 'cause'. + """ + frames: List[Dict[str, Any]] = [] + tb = exc.__traceback__ + while tb is not None: + frame = tb.tb_frame + frames.append( + { + "file": frame.f_code.co_filename, + "function": frame.f_code.co_name, + "line": tb.tb_lineno, + } + ) + tb = tb.tb_next + + result: Dict[str, Any] = { + "type": type(exc).__name__, + "message": str(exc), + "language": "python", + "stacktrace": frames, + "raw": "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)), + } + + if exc.__cause__ is not None: + result["cause"] = serialize_exception(exc.__cause__) + + return result + + +def _process_value(value: Any, path: str, lim: PayloadLimitsOptions) -> Any: + """Recursively apply payload limits to a metadata value.""" + field_name = path.split(".")[-1] + if field_name in lim.exclude_fields: + return "[EXCLUDED]" + + if value is None: + return value + + if isinstance(value, str): + if len(value) >= 100 and _looks_like_base64(value): + return "[BASE64 DATA REMOVED]" + if len(value) > lim.max_field_size: + return value[: lim.max_field_size] + lim.truncation_marker + return value + + if isinstance(value, dict): + return {k: _process_value(v, f"{path}.{k}", lim) for k, v in value.items()} + + if isinstance(value, list): + return [_process_value(v, f"{path}[{i}]", lim) for i, v in enumerate(value)] + + return value + + +# --------------------------------------------------------------------------- +# Main client +# --------------------------------------------------------------------------- + class LogTideClient: """ LogTide SDK Client. - Main client for sending logs to LogTide with automatic batching, - retry logic, circuit breaker, and query capabilities. + Main client for sending structured logs to LogTide with automatic batching, + retry logic, circuit breaker, connection reuse, and query capabilities. """ def __init__(self, options: ClientOptions) -> None: @@ -52,28 +134,33 @@ def __init__(self, options: ClientOptions) -> None: self._latency_window: List[float] = [] self._flush_timer: Optional[Timer] = None self._closed = False + self._payload_limits = options.payload_limits or PayloadLimitsOptions() + + # Persistent HTTP session for connection reuse across requests + self._session = requests.Session() - # Register cleanup on exit + # Register cleanup on interpreter exit atexit.register(self.close) - # Start flush timer if interval is set + # Start timer-based auto-flush if options.flush_interval > 0: self._schedule_flush() if self.options.debug: print(f"[LogTide] Client initialized: {options.api_url}") + # ----------------------------------------------------------------------- + # Trace ID helpers + # ----------------------------------------------------------------------- + def set_trace_id(self, trace_id: Optional[str]) -> None: """ Set trace ID for subsequent logs. Args: - trace_id: Trace ID or None to clear + trace_id: Trace ID string, or None to clear """ - if trace_id is not None: - self._trace_id = self._normalize_trace_id(trace_id) - else: - self._trace_id = None + self._trace_id = trace_id def get_trace_id(self) -> Optional[str]: """ @@ -87,7 +174,8 @@ def get_trace_id(self) -> Optional[str]: @contextmanager def with_trace_id(self, trace_id: str) -> Iterator[None]: """ - Context manager for scoped trace ID. + Context manager that sets a trace ID for the duration of the block, + then restores the previous value. Args: trace_id: Trace ID to use within context @@ -97,7 +185,7 @@ def with_trace_id(self, trace_id: str) -> Iterator[None]: client.info('api', 'Processing request') """ old_trace_id = self._trace_id - self.set_trace_id(trace_id) + self._trace_id = trace_id try: yield finally: @@ -106,19 +194,23 @@ def with_trace_id(self, trace_id: str) -> Iterator[None]: @contextmanager def with_new_trace_id(self) -> Iterator[None]: """ - Context manager with auto-generated trace ID. + Context manager with an auto-generated UUID trace ID. Example: with client.with_new_trace_id(): client.info('worker', 'Background job') """ - new_trace_id = str(uuid.uuid4()) - with self.with_trace_id(new_trace_id): + with self.with_trace_id(str(uuid.uuid4())): yield + # ----------------------------------------------------------------------- + # Logging methods + # ----------------------------------------------------------------------- + def log(self, entry: LogEntry) -> None: """ - Log a custom entry. + Log a pre-built entry. Applies trace ID, global metadata, and + payload limits before buffering. Silently drops when buffer is full. Args: entry: Log entry to send @@ -126,44 +218,45 @@ def log(self, entry: LogEntry) -> None: if self._closed: return - # Add trace ID if set + # Inject trace ID if entry.trace_id is None: if self.options.auto_trace_id: entry.trace_id = str(uuid.uuid4()) elif self._trace_id is not None: entry.trace_id = self._trace_id - # Merge global metadata + # Coerce None to {} so unpacking never raises TypeError + if entry.metadata is None: + entry.metadata = {} + + # Merge global metadata (entry metadata wins on collision) if self.options.global_metadata: entry.metadata = {**self.options.global_metadata, **entry.metadata} + # Apply payload limits before buffering + self._apply_payload_limits(entry) + + should_flush = False with self._buffer_lock: - # Check buffer size if len(self._buffer) >= self.options.max_buffer_size: if self.options.debug: print(f"[LogTide] Buffer full, dropping log: {entry.message}") - with self._metrics_lock: self._metrics.logs_dropped += 1 - raise BufferFullError("Log buffer is full") + return self._buffer.append(entry) - - # Auto-flush if batch size reached if len(self._buffer) >= self.options.batch_size: - self.flush() + should_flush = True + + # Flush outside the lock to avoid a deadlock on re-entry + if should_flush: + self.flush() def debug( self, service: str, message: str, metadata: Optional[Dict[str, Any]] = None ) -> None: - """ - Log debug message. - - Args: - service: Service name - message: Log message - metadata: Optional metadata dictionary - """ + """Log a DEBUG-level message.""" self.log( LogEntry( service=service, @@ -176,14 +269,7 @@ def debug( def info( self, service: str, message: str, metadata: Optional[Dict[str, Any]] = None ) -> None: - """ - Log info message. - - Args: - service: Service name - message: Log message - metadata: Optional metadata dictionary - """ + """Log an INFO-level message.""" self.log( LogEntry( service=service, @@ -196,14 +282,7 @@ def info( def warn( self, service: str, message: str, metadata: Optional[Dict[str, Any]] = None ) -> None: - """ - Log warning message. - - Args: - service: Service name - message: Log message - metadata: Optional metadata dictionary - """ + """Log a WARN-level message.""" self.log( LogEntry( service=service, @@ -220,12 +299,12 @@ def error( metadata_or_error: Union[Dict[str, Any], Exception, None] = None, ) -> None: """ - Log error message. + Log an ERROR-level message. Args: service: Service name message: Log message - metadata_or_error: Metadata dict or Exception object + metadata_or_error: Metadata dict or Exception (serialized automatically) """ metadata = self._process_metadata_or_error(metadata_or_error) self.log( @@ -244,12 +323,12 @@ def critical( metadata_or_error: Union[Dict[str, Any], Exception, None] = None, ) -> None: """ - Log critical message. + Log a CRITICAL-level message. Args: service: Service name message: Log message - metadata_or_error: Metadata dict or Exception object + metadata_or_error: Metadata dict or Exception (serialized automatically) """ metadata = self._process_metadata_or_error(metadata_or_error) self.log( @@ -261,30 +340,33 @@ def critical( ) ) - def flush(self) -> None: - """Flush buffered logs to LogTide API.""" - if self._closed: - return + # ----------------------------------------------------------------------- + # Flush & send + # ----------------------------------------------------------------------- + def flush(self) -> None: + """Flush all buffered logs to the LogTide API immediately.""" with self._buffer_lock: if not self._buffer: return - logs_to_send = self._buffer[:] self._buffer.clear() - # Send logs with retry logic self._send_logs_with_retry(logs_to_send) + # ----------------------------------------------------------------------- + # Query / read API + # ----------------------------------------------------------------------- + def query(self, options: QueryOptions) -> LogsResponse: """ Query logs with filters. Args: - options: Query options + options: Query options (service, level, time range, full-text search) Returns: - Logs response with results + LogsResponse with matched logs and total count Raises: requests.RequestException: On API error @@ -293,7 +375,6 @@ def query(self, options: QueryOptions) -> LogsResponse: "limit": options.limit, "offset": options.offset, } - if options.service: params["service"] = options.service if options.level: @@ -305,29 +386,28 @@ def query(self, options: QueryOptions) -> LogsResponse: if options.to_time: params["to"] = options.to_time.isoformat() - response = requests.get( - f"{self.options.api_url}/api/logs", + response = self._session.get( + f"{self.options.api_url}/api/v1/logs", headers=self._get_headers(), params=params, timeout=30, ) response.raise_for_status() - data = response.json() return LogsResponse(logs=data.get("logs", []), total=data.get("total", 0)) def get_by_trace_id(self, trace_id: str) -> List[Dict[str, Any]]: """ - Get logs by trace ID. + Get all logs belonging to a trace ID. Args: - trace_id: Trace ID to search for + trace_id: Trace ID to look up Returns: - List of log entries + List of log entry dicts """ - response = requests.get( - f"{self.options.api_url}/api/logs/trace/{trace_id}", + response = self._session.get( + f"{self.options.api_url}/api/v1/logs/trace/{trace_id}", headers=self._get_headers(), timeout=30, ) @@ -338,31 +418,29 @@ def get_aggregated_stats( self, options: AggregatedStatsOptions ) -> AggregatedStatsResponse: """ - Get aggregated statistics. + Get aggregated log statistics over a time range. Args: - options: Aggregation options + options: Time range, interval, and optional service filter Returns: - Aggregated stats response + AggregatedStatsResponse with timeseries, top services, and top errors """ params: Dict[str, Any] = { "from": options.from_time.isoformat(), "to": options.to_time.isoformat(), "interval": options.interval, } - if options.service: params["service"] = options.service - response = requests.get( - f"{self.options.api_url}/api/logs/stats", + response = self._session.get( + f"{self.options.api_url}/api/v1/logs/aggregated", headers=self._get_headers(), params=params, timeout=30, ) response.raise_for_status() - data = response.json() return AggregatedStatsResponse( timeseries=data.get("timeseries", []), @@ -375,132 +453,151 @@ def stream( on_log: Callable[[Dict[str, Any]], None], on_error: Optional[Callable[[Exception], None]] = None, filters: Optional[Dict[str, str]] = None, - ) -> None: + ) -> Callable[[], None]: """ Stream logs in real-time via Server-Sent Events. + Runs in a background daemon thread and returns immediately. + Args: - on_log: Callback for each log entry - on_error: Optional error callback - filters: Optional filters (service, level) + on_log: Callback invoked for each incoming log entry dict + on_error: Optional callback for connection or parse errors + filters: Optional SSE filters, e.g. {'service': 'api', 'level': 'error'} - Example: - def handle_log(log): - print(f"{log['level']}: {log['message']}") + Returns: + A stop callable — call it to terminate the stream. - client.stream(on_log=handle_log, filters={'level': 'error'}) + Example: + stop = client.stream(on_log=handle_log, filters={'level': 'error'}) + # ... later: + stop() """ - params = filters or {} - url = f"{self.options.api_url}/api/logs/stream" + params: Dict[str, str] = dict(filters or {}) + params["token"] = self.options.api_key + url = f"{self.options.api_url}/api/v1/logs/stream" + stop_event = Event() - try: - with requests.get( - url, - headers=self._get_headers(), - params=params, - stream=True, - timeout=None, - ) as response: - response.raise_for_status() - - for line in response.iter_lines(): - if not line: - continue - - line_str = line.decode("utf-8") - if line_str.startswith("data: "): - try: - import json - - log_data = json.loads(line_str[6:]) - on_log(log_data) - except Exception as e: - if on_error: - on_error(e) - - except Exception as e: - if on_error: - on_error(e) - else: - raise + def _run() -> None: + try: + with self._session.get( + url, + params=params, + stream=True, + timeout=None, + headers=self._get_headers(), + ) as response: + response.raise_for_status() + for line in response.iter_lines(): + if stop_event.is_set(): + break + if not line: + continue + line_str = line.decode("utf-8") if isinstance(line, bytes) else line + if line_str.startswith("data: "): + try: + log_data = json.loads(line_str[6:]) + on_log(log_data) + except Exception as e: + if on_error: + on_error(e) + except Exception as e: + if not stop_event.is_set() and on_error: + on_error(e) + + t = Thread(target=_run, daemon=True) + t.start() + + def stop() -> None: + stop_event.set() + + return stop + + # ----------------------------------------------------------------------- + # Metrics + # ----------------------------------------------------------------------- def get_metrics(self) -> ClientMetrics: """ - Get SDK metrics. + Return a snapshot of the current SDK metrics. Returns: - Current metrics + ClientMetrics dataclass with counters and average latency """ with self._metrics_lock: - return ClientMetrics( - logs_sent=self._metrics.logs_sent, - logs_dropped=self._metrics.logs_dropped, - errors=self._metrics.errors, - retries=self._metrics.retries, - avg_latency_ms=self._metrics.avg_latency_ms, - circuit_breaker_trips=self._metrics.circuit_breaker_trips, - ) + return dataclasses.replace(self._metrics) def reset_metrics(self) -> None: - """Reset SDK metrics.""" + """Reset all SDK metrics to zero.""" with self._metrics_lock: self._metrics = ClientMetrics() self._latency_window.clear() def get_circuit_breaker_state(self) -> CircuitState: """ - Get circuit breaker state. + Return the current circuit breaker state. Returns: - Current circuit state + CircuitState enum value (CLOSED, OPEN, or HALF_OPEN) """ return self._circuit_breaker.state + # ----------------------------------------------------------------------- + # Lifecycle + # ----------------------------------------------------------------------- + def close(self) -> None: - """Close client and flush remaining logs.""" + """Flush remaining logs, cancel the timer, and close the HTTP session.""" if self._closed: return self._closed = True - # Cancel flush timer if self._flush_timer: self._flush_timer.cancel() - # Flush remaining logs self.flush() + self._session.close() if self.options.debug: print("[LogTide] Client closed") def __del__(self) -> None: - """Destructor to ensure cleanup.""" - self.close() + """Destructor — ensures cleanup if close() was not called explicitly.""" + try: + self.close() + except Exception: + pass - # Private methods + # ----------------------------------------------------------------------- + # Private helpers + # ----------------------------------------------------------------------- + + def _get_headers(self) -> Dict[str, str]: + """Return HTTP headers for all API requests.""" + return { + "X-API-Key": self.options.api_key, + "Content-Type": "application/json", + } def _send_logs_with_retry(self, logs: List[LogEntry]) -> None: - """Send logs with retry logic and exponential backoff.""" + """Send a batch of logs with exponential backoff and circuit breaker.""" attempt = 0 delay = self.options.retry_delay_ms / 1000.0 + state_before = self._circuit_breaker.state while attempt <= self.options.max_retries: try: - # Check circuit breaker if self._circuit_breaker.state == CircuitState.OPEN: if self.options.debug: print("[LogTide] Circuit breaker open, skipping send") - with self._metrics_lock: self._metrics.logs_dropped += len(logs) raise CircuitBreakerOpenError("Circuit breaker is open") - # Send logs start_time = time.time() self._send_logs(logs) latency = (time.time() - start_time) * 1000 - # Record success self._circuit_breaker.record_success() self._update_latency(latency) @@ -513,7 +610,6 @@ def _send_logs_with_retry(self, logs: List[LogEntry]) -> None: return except CircuitBreakerOpenError: - # Don't retry if circuit is open break except Exception as e: @@ -527,123 +623,100 @@ def _send_logs_with_retry(self, logs: List[LogEntry]) -> None: if attempt > self.options.max_retries: if self.options.debug: - print(f"[LogTide] Failed to send logs after {attempt} attempts: {e}") - + print( + f"[LogTide] Failed to send logs after {attempt} attempts: {e}" + ) with self._metrics_lock: self._metrics.logs_dropped += len(logs) break - # Exponential backoff if self.options.debug: - print(f"[LogTide] Retry {attempt}/{self.options.max_retries} in {delay}s") + print( + f"[LogTide] Retry {attempt}/{self.options.max_retries} in {delay}s" + ) + + # Abort retries if the client was closed while we were in-flight. + # The session is gone — all remaining attempts would fail anyway. + if self._closed: + with self._metrics_lock: + self._metrics.logs_dropped += len(logs) + break time.sleep(delay) delay *= 2 - # Track circuit breaker trips - if self._circuit_breaker.state == CircuitState.OPEN: + # Only count a trip when the circuit *transitions* to OPEN during this call, + # not on every subsequent call while it's already open. + if (self._circuit_breaker.state == CircuitState.OPEN + and state_before != CircuitState.OPEN): with self._metrics_lock: self._metrics.circuit_breaker_trips += 1 def _send_logs(self, logs: List[LogEntry]) -> None: - """ - Send logs to LogTide API. - - Args: - logs: Logs to send - - Raises: - requests.RequestException: On API error - """ + """POST a batch of serialized log entries to /api/v1/ingest.""" payload = {"logs": [log.to_dict() for log in logs]} - - response = requests.post( - f"{self.options.api_url}/api/logs", + response = self._session.post( + f"{self.options.api_url}/api/v1/ingest", headers=self._get_headers(), json=payload, timeout=30, ) response.raise_for_status() - def _get_headers(self) -> Dict[str, str]: - """Get HTTP headers for API requests.""" - return { - "Authorization": f"Bearer {self.options.api_key}", - "Content-Type": "application/json", - } - def _schedule_flush(self) -> None: - """Schedule automatic flush.""" + """Schedule the next timer-based auto-flush.""" if self._closed: return - interval = self.options.flush_interval / 1000.0 self._flush_timer = Timer(interval, self._auto_flush) self._flush_timer.daemon = True self._flush_timer.start() def _auto_flush(self) -> None: - """Auto-flush callback.""" + """Timer callback: flush then reschedule.""" if not self._closed: self.flush() self._schedule_flush() - def _normalize_trace_id(self, trace_id: str) -> str: - """ - Normalize trace ID. - - Args: - trace_id: Input trace ID - - Returns: - Normalized trace ID string - """ - # Simply return the trace ID as-is - # Accept any string as a valid trace ID - return trace_id - def _process_metadata_or_error( self, metadata_or_error: Union[Dict[str, Any], Exception, None] ) -> Dict[str, Any]: """ - Process metadata or error parameter. - - Args: - metadata_or_error: Metadata dict or Exception - - Returns: - Metadata dictionary with error serialized if applicable + Normalise the metadata_or_error parameter used by error() and critical(). + Exceptions are serialized to a structured 'exception' key. """ if metadata_or_error is None: return {} - if isinstance(metadata_or_error, dict): return metadata_or_error + return {"exception": serialize_exception(metadata_or_error)} - # Serialize exception - return { - "error": { - "name": type(metadata_or_error).__name__, - "message": str(metadata_or_error), - "stack": traceback.format_exc(), + def _apply_payload_limits(self, entry: LogEntry) -> None: + """Enforce payload limits on entry.metadata in-place.""" + if not entry.metadata: + return + + lim = self._payload_limits + entry.metadata = _process_value(entry.metadata, "root", lim) + + # Enforce total entry size + raw = json.dumps(entry.to_dict()) + if len(raw.encode()) > lim.max_log_size: + if self.options.debug: + print( + f"[LogTide] Log entry too large ({len(raw)} bytes), truncating metadata" + ) + entry.metadata = { + "_truncated": True, + "_original_size": len(raw.encode()), } - } def _update_latency(self, latency: float) -> None: - """ - Update latency metrics with rolling window. - - Args: - latency: Latency in milliseconds - """ + """Update the rolling average latency (100-sample window).""" with self._metrics_lock: self._latency_window.append(latency) - - # Keep window size at 100 if len(self._latency_window) > 100: self._latency_window.pop(0) - - # Calculate average if self._latency_window: self._metrics.avg_latency_ms = sum(self._latency_window) / len( self._latency_window diff --git a/logtide_sdk/handler.py b/logtide_sdk/handler.py new file mode 100644 index 0000000..0c57b94 --- /dev/null +++ b/logtide_sdk/handler.py @@ -0,0 +1,95 @@ +"""Python standard-library logging integration for LogTide SDK.""" + +import logging + +from .client import LogTideClient, serialize_exception +from .enums import LogLevel +from .models import LogEntry + + +class LogTideHandler(logging.Handler): + """ + A standard logging.Handler that forwards log records to LogTideClient. + + Drop-in integration for applications already using Python's logging module. + + Example: + import logging + from logtide_sdk import LogTideClient, ClientOptions, LogTideHandler + + client = LogTideClient(ClientOptions(api_url=..., api_key=...)) + handler = LogTideHandler(client=client, service='my-app') + handler.setLevel(logging.INFO) + + logging.getLogger().addHandler(handler) + + # Now standard logging calls are forwarded to LogTide: + logging.info('Server started') + logging.error('Unhandled exception', exc_info=True) + + Exception info from exc_info=True is automatically serialized into a + structured 'exception' metadata key. + """ + + def __init__( + self, + client: LogTideClient, + service: str, + level: int = logging.NOTSET, + ) -> None: + """ + Initialize the handler. + + Args: + client: An active LogTideClient instance + service: Service name attached to every forwarded log entry + level: Minimum logging level (default: NOTSET — accept all records) + """ + super().__init__(level) + self.client = client + self.service = service + + def emit(self, record: logging.LogRecord) -> None: + """ + Forward a LogRecord to LogTide. + + Called by the logging framework for each matching log record. Never + raises — falls back to logging.Handler.handleError on exceptions. + """ + try: + logtide_level = self._map_level(record.levelno) + + metadata = { + "logger": record.name, + "module": record.module, + "funcName": record.funcName, + "lineno": record.lineno, + "pathname": record.pathname, + } + + # Serialize attached exception info + if record.exc_info and record.exc_info[1] is not None: + metadata["exception"] = serialize_exception(record.exc_info[1]) + + self.client.log( + LogEntry( + service=self.service, + level=logtide_level, + message=self.format(record), + metadata=metadata, + ) + ) + except Exception: + self.handleError(record) # stdlib fallback — does not re-raise + + def _map_level(self, levelno: int) -> LogLevel: + """Map a stdlib logging level integer to a LogTide LogLevel.""" + if levelno >= logging.CRITICAL: + return LogLevel.CRITICAL + if levelno >= logging.ERROR: + return LogLevel.ERROR + if levelno >= logging.WARNING: + return LogLevel.WARN + if levelno >= logging.INFO: + return LogLevel.INFO + return LogLevel.DEBUG diff --git a/logtide_sdk/middleware/__init__.py b/logtide_sdk/middleware/__init__.py index 4916726..b3a6628 100644 --- a/logtide_sdk/middleware/__init__.py +++ b/logtide_sdk/middleware/__init__.py @@ -1,11 +1,36 @@ """Middleware for LogTide SDK.""" -from .django import LogTideDjangoMiddleware -from .fastapi import LogTideFastAPIMiddleware -from .flask import LogTideFlaskMiddleware - -__all__ = [ - "LogTideFlaskMiddleware", - "LogTideDjangoMiddleware", - "LogTideFastAPIMiddleware", -] +# Each middleware is guarded by a try/except so that importing this package +# does not fail when only a subset of framework dependencies are installed. +# __all__ is built dynamically so that `from logtide_sdk.middleware import *` +# never raises AttributeError for frameworks that are not installed. + +__all__ = [] + +try: + from .flask import LogTideFlaskMiddleware + + __all__.append("LogTideFlaskMiddleware") +except ImportError: + pass # type: ignore[assignment] + +try: + from .django import LogTideDjangoMiddleware + + __all__.append("LogTideDjangoMiddleware") +except ImportError: + pass # type: ignore[assignment] + +try: + from .fastapi import LogTideFastAPIMiddleware + + __all__.append("LogTideFastAPIMiddleware") +except ImportError: + pass # type: ignore[assignment] + +try: + from .starlette import LogTideStarletteMiddleware + + __all__.append("LogTideStarletteMiddleware") +except ImportError: + pass # type: ignore[assignment] diff --git a/logtide_sdk/middleware/django.py b/logtide_sdk/middleware/django.py index f911601..a1a4844 100644 --- a/logtide_sdk/middleware/django.py +++ b/logtide_sdk/middleware/django.py @@ -12,7 +12,7 @@ "Install it with: pip install logtide-sdk[django]" ) -from ..client import LogTideClient +from ..client import LogTideClient, serialize_exception class LogTideDjangoMiddleware: @@ -53,7 +53,7 @@ def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]) -> None: self.log_errors: bool = getattr(settings, "LOGTIDE_LOG_ERRORS", True) self.include_headers: bool = getattr(settings, "LOGTIDE_INCLUDE_HEADERS", False) self.skip_health_check: bool = getattr(settings, "LOGTIDE_SKIP_HEALTH_CHECK", True) - self.skip_paths: list = getattr(settings, "LOGTIDE_SKIP_PATHS", []) + self.skip_paths: list = list(getattr(settings, "LOGTIDE_SKIP_PATHS", [])) if self.skip_health_check: self.skip_paths.extend(["/health", "/health/", "/healthz", "/healthz/"]) @@ -64,15 +64,14 @@ def __call__(self, request: HttpRequest) -> HttpResponse: if self._should_skip(request.path): return self.get_response(request) - # Extract trace ID from headers - trace_id = request.headers.get("X-Trace-ID") - if trace_id: - self.client.set_trace_id(trace_id) + # Extract trace ID from headers (kept local — not set on the shared client + # to avoid race conditions across concurrent requests). + trace_id: Optional[str] = request.headers.get("X-Trace-ID") # Log request start_time = time.time() if self.log_requests: - self._log_request(request) + self._log_request(request, trace_id) # Process request try: @@ -81,13 +80,13 @@ def __call__(self, request: HttpRequest) -> HttpResponse: # Log error if self.log_errors: duration_ms = (time.time() - start_time) * 1000 - self._log_error(request, e, duration_ms) + self._log_error(request, e, duration_ms, trace_id) raise # Log response if self.log_responses: duration_ms = (time.time() - start_time) * 1000 - self._log_response(request, response, duration_ms) + self._log_response(request, response, duration_ms, trace_id) return response @@ -95,16 +94,17 @@ def _should_skip(self, path: str) -> bool: """Check if path should be skipped.""" return path in self.skip_paths - def _log_request(self, request: HttpRequest) -> None: + def _log_request(self, request: HttpRequest, trace_id: Optional[str] = None) -> None: """Log incoming request.""" metadata = { "method": request.method, "path": request.path, "ip": self._get_client_ip(request), } - if self.include_headers: metadata["headers"] = dict(request.headers) + if trace_id: + metadata["trace_id"] = trace_id self.client.info( self.service_name, @@ -113,7 +113,7 @@ def _log_request(self, request: HttpRequest) -> None: ) def _log_response( - self, request: HttpRequest, response: HttpResponse, duration_ms: float + self, request: HttpRequest, response: HttpResponse, duration_ms: float, trace_id: Optional[str] = None ) -> None: """Log response.""" metadata = { @@ -122,16 +122,16 @@ def _log_response( "status": response.status_code, "duration_ms": round(duration_ms, 2), } - if self.include_headers: metadata["response_headers"] = dict(response.items()) + if trace_id: + metadata["trace_id"] = trace_id message = ( f"{request.method} {request.path} " f"{response.status_code} ({duration_ms:.0f}ms)" ) - # Use appropriate log level based on status code if response.status_code >= 500: self.client.error(self.service_name, message, metadata) elif response.status_code >= 400: @@ -140,18 +140,21 @@ def _log_response( self.client.info(self.service_name, message, metadata) def _log_error( - self, request: HttpRequest, error: Exception, duration_ms: float + self, request: HttpRequest, error: Exception, duration_ms: float, trace_id: Optional[str] = None ) -> None: """Log error.""" + metadata = { + "method": request.method, + "path": request.path, + "duration_ms": round(duration_ms, 2), + "exception": serialize_exception(error), + } + if trace_id: + metadata["trace_id"] = trace_id self.client.error( self.service_name, f"Request error: {request.method} {request.path} - {str(error)}", - { - "method": request.method, - "path": request.path, - "duration_ms": round(duration_ms, 2), - "error": error, - }, + metadata, ) def _get_client_ip(self, request: HttpRequest) -> Optional[str]: diff --git a/logtide_sdk/middleware/fastapi.py b/logtide_sdk/middleware/fastapi.py index 43c3ff0..c8d973d 100644 --- a/logtide_sdk/middleware/fastapi.py +++ b/logtide_sdk/middleware/fastapi.py @@ -1,169 +1,17 @@ """FastAPI middleware for LogTide SDK.""" -import time -from typing import Callable, Optional - try: - from fastapi import FastAPI, Request, Response - from starlette.middleware.base import BaseHTTPMiddleware - from starlette.types import ASGIApp + import fastapi # noqa: F401 — validates FastAPI is installed except ImportError: raise ImportError( - "FastAPI and Starlette are required for LogTideFastAPIMiddleware. " - "Install them with: pip install logtide-sdk[fastapi]" + "FastAPI is required for LogTideFastAPIMiddleware. " + "Install it with: pip install logtide-sdk[fastapi]" ) -from ..client import LogTideClient - - -class LogTideFastAPIMiddleware(BaseHTTPMiddleware): - """ - FastAPI middleware for automatic request/response logging. - - Example: - app = FastAPI() - client = LogTideClient(ClientOptions(...)) - - app.add_middleware( - LogTideFastAPIMiddleware, - client=client, - service_name='fastapi-api' - ) - """ - - def __init__( - self, - app: ASGIApp, - client: LogTideClient, - service_name: str, - log_requests: bool = True, - log_responses: bool = True, - log_errors: bool = True, - include_headers: bool = False, - skip_health_check: bool = True, - skip_paths: Optional[list] = None, - ) -> None: - """ - Initialize FastAPI middleware. - - Args: - app: ASGI application - client: LogTide client - service_name: Service name for logs - log_requests: Log incoming requests - log_responses: Log responses - log_errors: Log errors - include_headers: Include headers in metadata - skip_health_check: Skip /health and /healthz - skip_paths: List of paths to skip - """ - super().__init__(app) - self.client = client - self.service_name = service_name - self.log_requests = log_requests - self.log_responses = log_responses - self.log_errors = log_errors - self.include_headers = include_headers - self.skip_paths = skip_paths or [] - - if skip_health_check: - self.skip_paths.extend(["/health", "/healthz", "/docs", "/redoc", "/openapi.json"]) - - async def dispatch(self, request: Request, call_next: Callable) -> Response: - """Process request and response.""" - # Check if should skip - if self._should_skip(request.url.path): - return await call_next(request) - - # Extract trace ID from headers - trace_id = request.headers.get("x-trace-id") - if trace_id: - self.client.set_trace_id(trace_id) - - # Log request - start_time = time.time() - if self.log_requests: - self._log_request(request) - - # Process request - try: - response = await call_next(request) - except Exception as e: - # Log error - if self.log_errors: - duration_ms = (time.time() - start_time) * 1000 - self._log_error(request, e, duration_ms) - raise - - # Log response - if self.log_responses: - duration_ms = (time.time() - start_time) * 1000 - self._log_response(request, response, duration_ms) - - return response - - def _should_skip(self, path: str) -> bool: - """Check if path should be skipped.""" - return path in self.skip_paths - - def _log_request(self, request: Request) -> None: - """Log incoming request.""" - metadata = { - "method": request.method, - "path": request.url.path, - "ip": self._get_client_ip(request), - } - - if self.include_headers: - metadata["headers"] = dict(request.headers) - - self.client.info( - self.service_name, - f"{request.method} {request.url.path}", - metadata, - ) - - def _log_response(self, request: Request, response: Response, duration_ms: float) -> None: - """Log response.""" - metadata = { - "method": request.method, - "path": request.url.path, - "status": response.status_code, - "duration_ms": round(duration_ms, 2), - } - - if self.include_headers: - metadata["response_headers"] = dict(response.headers) - - message = ( - f"{request.method} {request.url.path} " - f"{response.status_code} ({duration_ms:.0f}ms)" - ) - - # Use appropriate log level based on status code - if response.status_code >= 500: - self.client.error(self.service_name, message, metadata) - elif response.status_code >= 400: - self.client.warn(self.service_name, message, metadata) - else: - self.client.info(self.service_name, message, metadata) +from .starlette import LogTideStarletteMiddleware - def _log_error(self, request: Request, error: Exception, duration_ms: float) -> None: - """Log error.""" - self.client.error( - self.service_name, - f"Request error: {request.method} {request.url.path} - {str(error)}", - { - "method": request.method, - "path": request.url.path, - "duration_ms": round(duration_ms, 2), - "error": error, - }, - ) +# LogTideFastAPIMiddleware is a type alias for backwards compatibility. +# FastAPI is built on Starlette so the same middleware class works for both. +LogTideFastAPIMiddleware = LogTideStarletteMiddleware - def _get_client_ip(self, request: Request) -> Optional[str]: - """Get client IP address.""" - x_forwarded_for = request.headers.get("x-forwarded-for") - if x_forwarded_for: - return x_forwarded_for.split(",")[0].strip() - return request.client.host if request.client else None +__all__ = ["LogTideFastAPIMiddleware"] diff --git a/logtide_sdk/middleware/flask.py b/logtide_sdk/middleware/flask.py index 8011143..5f1b373 100644 --- a/logtide_sdk/middleware/flask.py +++ b/logtide_sdk/middleware/flask.py @@ -4,14 +4,15 @@ from typing import Optional try: - from flask import Flask, Request, Response, g, request + from flask import Flask, Request, Response, g, make_response, request + import werkzeug.exceptions as _werkzeug_exc except ImportError: raise ImportError( "Flask is required for LogTideFlaskMiddleware. " "Install it with: pip install logtide-sdk[flask]" ) -from ..client import LogTideClient +from ..client import LogTideClient, serialize_exception class LogTideFlaskMiddleware: @@ -63,7 +64,7 @@ def __init__( self.log_errors = log_errors self.include_headers = include_headers self.include_body = include_body - self.skip_paths = skip_paths or [] + self.skip_paths: list = list(skip_paths or []) if skip_health_check: self.skip_paths.extend(["/health", "/healthz"]) @@ -98,10 +99,11 @@ def _before_request(self) -> None: if self.include_body and request.is_json: metadata["body"] = request.get_json(silent=True) - # Extract trace ID from headers + # Extract trace ID from headers (kept local — not set on the shared client + # to avoid race conditions across concurrent requests). trace_id = request.headers.get("X-Trace-ID") if trace_id: - self.client.set_trace_id(trace_id) + metadata["trace_id"] = trace_id self.client.info( self.service_name, @@ -131,8 +133,12 @@ def _after_request(self, response: Response) -> Response: if self.include_headers: metadata["response_headers"] = dict(response.headers) - if self.include_body and response.is_json: - metadata["response_body"] = response.get_json(silent=True) + if self.include_body and response.content_type and "application/json" in response.content_type: + try: + import json as _json + metadata["response_body"] = _json.loads(response.get_data(as_text=True)) + except Exception: + pass # Use appropriate log level based on status code if response.status_code >= 500: @@ -156,10 +162,19 @@ def _after_request(self, response: Response) -> Response: return response - def _error_handler(self, error: Exception) -> None: - """Log error.""" + def _error_handler(self, error: Exception) -> Response: + """Log error and return a response. + + Returning a Response (instead of re-raising) ensures Flask still executes + after_request handlers, so _after_request can log the 500 response too. + """ + if isinstance(error, _werkzeug_exc.HTTPException): + resp: Response = error.get_response() + else: + resp = make_response("Internal Server Error", 500) + if not self.log_errors or self._should_skip(request.path): - raise error + return resp duration_ms = 0 if hasattr(g, "logtide_start_time"): @@ -172,8 +187,8 @@ def _error_handler(self, error: Exception) -> None: "method": request.method, "path": request.path, "duration_ms": round(duration_ms, 2), - "error": error, + "exception": serialize_exception(error), }, ) - raise error + return resp diff --git a/logtide_sdk/middleware/starlette.py b/logtide_sdk/middleware/starlette.py new file mode 100644 index 0000000..2372222 --- /dev/null +++ b/logtide_sdk/middleware/starlette.py @@ -0,0 +1,174 @@ +"""Standalone Starlette ASGI middleware for LogTide SDK.""" + +import time +from typing import Callable, List, Optional + +try: + from starlette.middleware.base import BaseHTTPMiddleware + from starlette.requests import Request + from starlette.responses import Response + from starlette.types import ASGIApp +except ImportError: + raise ImportError( + "Starlette is required for LogTideStarletteMiddleware. " + "Install it with: pip install logtide-sdk[starlette]" + ) + +from ..client import LogTideClient, serialize_exception + + +class LogTideStarletteMiddleware(BaseHTTPMiddleware): + """ + Standalone Starlette ASGI middleware for automatic request/response logging. + + Works with any Starlette-based application (including FastAPI). + Use LogTideFastAPIMiddleware if you prefer the FastAPI-flavoured import path. + + Example: + from starlette.applications import Starlette + from logtide_sdk.middleware import LogTideStarletteMiddleware + + app = Starlette() + app.add_middleware( + LogTideStarletteMiddleware, + client=client, + service_name='starlette-api', + ) + """ + + def __init__( + self, + app: ASGIApp, + client: LogTideClient, + service_name: str, + log_requests: bool = True, + log_responses: bool = True, + log_errors: bool = True, + include_headers: bool = False, + skip_health_check: bool = True, + skip_paths: Optional[List[str]] = None, + ) -> None: + """ + Initialize Starlette middleware. + + Args: + app: ASGI application + client: LogTide client instance + service_name: Service name attached to every log entry + log_requests: Log each incoming request + log_responses: Log each response (with status and duration) + log_errors: Log unhandled exceptions + include_headers: Include request/response headers in metadata + skip_health_check: Skip /health, /healthz, /docs, /redoc, /openapi.json + skip_paths: Additional exact paths to skip + """ + super().__init__(app) + self.client = client + self.service_name = service_name + self.log_requests = log_requests + self.log_responses = log_responses + self.log_errors = log_errors + self.include_headers = include_headers + self.skip_paths: List[str] = list(skip_paths or []) + + if skip_health_check: + self.skip_paths.extend( + ["/health", "/healthz", "/docs", "/redoc", "/openapi.json"] + ) + + async def dispatch(self, request: Request, call_next: Callable) -> Response: + """Process request and response, logging each phase.""" + if self._should_skip(request.url.path): + return await call_next(request) + + # Extract trace ID from request headers (kept local — not set on the shared client + # to avoid race conditions across concurrent requests). + trace_id: Optional[str] = request.headers.get("x-trace-id") + + start_time = time.time() + + if self.log_requests: + self._log_request(request, trace_id) + + try: + response = await call_next(request) + except Exception as e: + if self.log_errors: + duration_ms = (time.time() - start_time) * 1000 + self._log_error(request, e, duration_ms, trace_id) + raise + + if self.log_responses: + duration_ms = (time.time() - start_time) * 1000 + self._log_response(request, response, duration_ms, trace_id) + + return response + + def _should_skip(self, path: str) -> bool: + return path in self.skip_paths + + def _log_request(self, request: Request, trace_id: Optional[str] = None) -> None: + metadata = { + "method": request.method, + "path": request.url.path, + "ip": self._get_client_ip(request), + } + if self.include_headers: + metadata["headers"] = dict(request.headers) + if trace_id: + metadata["trace_id"] = trace_id + + self.client.info( + self.service_name, + f"{request.method} {request.url.path}", + metadata, + ) + + def _log_response( + self, request: Request, response: Response, duration_ms: float, trace_id: Optional[str] = None + ) -> None: + metadata = { + "method": request.method, + "path": request.url.path, + "status": response.status_code, + "duration_ms": round(duration_ms, 2), + } + if self.include_headers: + metadata["response_headers"] = dict(response.headers) + if trace_id: + metadata["trace_id"] = trace_id + + message = ( + f"{request.method} {request.url.path} " + f"{response.status_code} ({duration_ms:.0f}ms)" + ) + + if response.status_code >= 500: + self.client.error(self.service_name, message, metadata) + elif response.status_code >= 400: + self.client.warn(self.service_name, message, metadata) + else: + self.client.info(self.service_name, message, metadata) + + def _log_error( + self, request: Request, error: Exception, duration_ms: float, trace_id: Optional[str] = None + ) -> None: + metadata = { + "method": request.method, + "path": request.url.path, + "duration_ms": round(duration_ms, 2), + "exception": serialize_exception(error), + } + if trace_id: + metadata["trace_id"] = trace_id + self.client.error( + self.service_name, + f"Request error: {request.method} {request.url.path} - {str(error)}", + metadata, + ) + + def _get_client_ip(self, request: Request) -> Optional[str]: + x_forwarded_for = request.headers.get("x-forwarded-for") + if x_forwarded_for: + return x_forwarded_for.split(",")[0].strip() + return request.client.host if request.client else None diff --git a/logtide_sdk/models.py b/logtide_sdk/models.py index 88378c8..3619809 100644 --- a/logtide_sdk/models.py +++ b/logtide_sdk/models.py @@ -1,12 +1,22 @@ """Data models for LogTide SDK.""" from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Dict, List, Optional from .enums import LogLevel +@dataclass +class PayloadLimitsOptions: + """Options for controlling log payload sizes to prevent 413 errors.""" + + max_field_size: int = 10 * 1024 # 10 KB — truncate individual string fields above this + max_log_size: int = 100 * 1024 # 100 KB — drop metadata entirely if entry exceeds this + exclude_fields: List[str] = field(default_factory=list) # field names to redact + truncation_marker: str = "...[TRUNCATED]" # appended to truncated strings + + @dataclass class LogEntry: """Single log entry.""" @@ -21,7 +31,7 @@ class LogEntry: def __post_init__(self) -> None: """Initialize default values.""" if self.time is None: - self.time = datetime.utcnow().isoformat() + "Z" + self.time = datetime.now(timezone.utc).isoformat() def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization.""" @@ -52,6 +62,7 @@ class ClientOptions: debug: bool = False global_metadata: Dict[str, Any] = field(default_factory=dict) auto_trace_id: bool = False + payload_limits: Optional[PayloadLimitsOptions] = None @dataclass diff --git a/pyproject.toml b/pyproject.toml index 48e8b88..adf6346 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta" [project] name = "logtide-sdk" -version = "0.1.2" -description = "Official Python SDK for LogTide - Self-hosted log management with batching, retry logic, circuit breaker, query API, and middleware support" +version = "0.8.4" +description = "Official Python SDK for LogTide - Self-hosted log management with async client, logging integration, batching, retry, circuit breaker, and middleware" readme = "README.md" license = {text = "MIT"} authors = [{name = "Polliog", email = "giuseppe@solture.it"}] @@ -20,9 +20,11 @@ keywords = [ "self-hosted", "tracing", "circuit-breaker", + "asyncio", "flask", "django", - "fastapi" + "fastapi", + "starlette" ] classifiers = [ "Development Status :: 4 - Beta", @@ -36,6 +38,7 @@ classifiers = [ "Programming Language :: Python :: 3.12", "Topic :: System :: Logging", "Topic :: Software Development :: Libraries :: Python Modules", + "Framework :: AsyncIO", ] dependencies = [ @@ -44,6 +47,7 @@ dependencies = [ [project.optional-dependencies] async = ["aiohttp>=3.9.0"] +starlette = ["starlette>=0.27.0"] dev = [ "pytest>=7.4.0", "pytest-asyncio>=0.21.0", @@ -57,10 +61,10 @@ django = ["django>=3.2.0"] fastapi = ["fastapi>=0.100.0", "starlette>=0.27.0"] [project.urls] -Homepage = "https://github.com/logtide-dev/logtide-sdk-python" -Repository = "https://github.com/logtide-dev/logtide-sdk-python" +Homepage = "https://github.com/logtide-dev/logtide-python" +Repository = "https://github.com/logtide-dev/logtide-python" Documentation = "https://logtide.dev/docs" -Issues = "https://github.com/logtide-dev/logtide-sdk-python/issues" +Issues = "https://github.com/logtide-dev/logtide-python/issues" [tool.setuptools.packages.find] where = ["."] @@ -93,3 +97,4 @@ testpaths = ["tests"] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] +asyncio_mode = "auto" diff --git a/tests/test_client.py b/tests/test_client.py index 003ce46..39193f1 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -94,7 +94,7 @@ def test_auto_trace_id(): def test_error_serialization(): - """Test error serialization.""" + """Test error serialization produces structured exception metadata.""" client = LogTideClient( ClientOptions( api_url="http://localhost:8080", @@ -107,15 +107,17 @@ def test_error_serialization(): except Exception as e: client.error("test", "Error occurred", e) - assert "error" in client._buffer[0].metadata - assert client._buffer[0].metadata["error"]["name"] == "ValueError" - assert client._buffer[0].metadata["error"]["message"] == "Test error" + exc = client._buffer[0].metadata["exception"] + assert exc["type"] == "ValueError" + assert exc["message"] == "Test error" + assert exc["language"] == "python" + assert isinstance(exc["stacktrace"], list) client.close() def test_buffer_management(): - """Test buffer size limits.""" + """Test buffer size limits: logs are silently dropped when full.""" client = LogTideClient( ClientOptions( api_url="http://localhost:8080", @@ -124,16 +126,14 @@ def test_buffer_management(): ) ) - # Fill buffer for i in range(5): client.info("test", f"Message {i}") - # Should raise BufferFullError - try: - client.info("test", "Message 6") - assert False, "Should have raised BufferFullError" - except Exception as e: - assert "buffer is full" in str(e).lower() + # Buffer is now at capacity — 6th log must be dropped, not raise + client.info("test", "Message 6") + + assert len(client._buffer) == 5 + assert client.get_metrics().logs_dropped == 1 client.close() diff --git a/tests/test_v084.py b/tests/test_v084.py new file mode 100644 index 0000000..ed55997 --- /dev/null +++ b/tests/test_v084.py @@ -0,0 +1,481 @@ +"""Tests covering v0.8.4 changes and new features.""" + +import logging +from datetime import datetime, timezone +from unittest.mock import MagicMock, patch + +import pytest + +from logtide_sdk import ( + ClientOptions, + LogEntry, + LogLevel, + LogTideClient, + LogTideHandler, + PayloadLimitsOptions, + serialize_exception, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def make_client(**kwargs) -> LogTideClient: + """Create a client with no timer and a large batch size (no auto-flush).""" + defaults = { + "api_url": "http://localhost:8080", + "api_key": "test_key", + "flush_interval": 0, + "batch_size": 1000, + } + defaults.update(kwargs) + return LogTideClient(ClientOptions(**defaults)) + + +def patched_session(client: LogTideClient, status: int = 200) -> MagicMock: + """Replace client._session with a MagicMock and return it.""" + mock_resp = MagicMock() + mock_resp.raise_for_status.return_value = None + mock_session = MagicMock() + mock_session.post.return_value = mock_resp + mock_session.get.return_value = mock_resp + client._session = mock_session + return mock_session + + +# --------------------------------------------------------------------------- +# API paths +# --------------------------------------------------------------------------- + + +def test_ingest_path(): + """POST logs must go to /api/v1/ingest.""" + client = make_client() + mock = patched_session(client) + + client.info("svc", "msg") + client.flush() + + mock.post.assert_called_once() + url = mock.post.call_args[0][0] + assert url == "http://localhost:8080/api/v1/ingest" + client.close() + + +def test_query_path(): + """GET logs must go to /api/v1/logs.""" + from logtide_sdk import QueryOptions + + client = make_client() + mock = patched_session(client) + mock.get.return_value.json.return_value = {"logs": [], "total": 0} + + client.query(QueryOptions()) + + url = mock.get.call_args[0][0] + assert url == "http://localhost:8080/api/v1/logs" + client.close() + + +def test_trace_id_path(): + """GET by trace ID must go to /api/v1/logs/trace/{id}.""" + client = make_client() + mock = patched_session(client) + mock.get.return_value.json.return_value = [] + + client.get_by_trace_id("abc123") + + url = mock.get.call_args[0][0] + assert url == "http://localhost:8080/api/v1/logs/trace/abc123" + client.close() + + +def test_aggregated_stats_path(): + """GET aggregated stats must go to /api/v1/logs/aggregated.""" + from logtide_sdk import AggregatedStatsOptions + + client = make_client() + mock = patched_session(client) + mock.get.return_value.json.return_value = { + "timeseries": [], + "top_services": [], + "top_errors": [], + } + + client.get_aggregated_stats( + AggregatedStatsOptions( + from_time=datetime(2026, 1, 1, tzinfo=timezone.utc), + to_time=datetime(2026, 1, 2, tzinfo=timezone.utc), + ) + ) + + url = mock.get.call_args[0][0] + assert url == "http://localhost:8080/api/v1/logs/aggregated" + client.close() + + +# --------------------------------------------------------------------------- +# Auth header +# --------------------------------------------------------------------------- + + +def test_auth_header_uses_x_api_key(): + """Requests must use X-API-Key header, not Authorization: Bearer.""" + client = make_client(api_key="lp_secret") + mock = patched_session(client) + + client.info("svc", "msg") + client.flush() + + headers = mock.post.call_args[1]["headers"] + assert headers.get("X-API-Key") == "lp_secret" + assert "Authorization" not in headers + client.close() + + +# --------------------------------------------------------------------------- +# Error serialization +# --------------------------------------------------------------------------- + + +def test_serialize_exception_structure(): + """serialize_exception must return the expected structured keys.""" + try: + raise RuntimeError("boom") + except RuntimeError as e: + result = serialize_exception(e) + + assert result["type"] == "RuntimeError" + assert result["message"] == "boom" + assert result["language"] == "python" + assert isinstance(result["stacktrace"], list) + assert "raw" in result + + +def test_serialize_exception_stackframes(): + """Each stack frame must contain file, function, and line.""" + try: + raise ValueError("frame test") + except ValueError as e: + result = serialize_exception(e) + + assert len(result["stacktrace"]) > 0 + frame = result["stacktrace"][-1] + assert "file" in frame + assert "function" in frame + assert "line" in frame + + +def test_serialize_exception_chained_cause(): + """Chained exceptions must appear under the 'cause' key.""" + try: + try: + raise ValueError("inner") + except ValueError as inner: + raise RuntimeError("outer") from inner + except RuntimeError as e: + result = serialize_exception(e) + + assert "cause" in result + assert result["cause"]["type"] == "ValueError" + + +def test_error_method_uses_exception_key(): + """client.error() with an Exception must produce metadata['exception'].""" + client = make_client() + + try: + raise TypeError("type error") + except TypeError as e: + client.error("svc", "something failed", e) + + meta = client._buffer[0].metadata + assert "exception" in meta + assert meta["exception"]["type"] == "TypeError" + client.close() + + +# --------------------------------------------------------------------------- +# datetime.utcnow replacement +# --------------------------------------------------------------------------- + + +def test_log_entry_time_is_timezone_aware(): + """LogEntry.time must include timezone offset (not naive UTC).""" + entry = LogEntry(service="svc", level=LogLevel.INFO, message="hello") + assert entry.time is not None + # timezone.utc isoformat produces '+00:00' suffix + assert "+00:00" in entry.time or "Z" in entry.time + + +# --------------------------------------------------------------------------- +# Buffer full — silent drop, no exception +# --------------------------------------------------------------------------- + + +def test_buffer_full_drops_silently(): + """Buffer overflow must drop logs silently without raising.""" + client = make_client(max_buffer_size=3) + + for i in range(3): + client.info("svc", f"msg {i}") + + # Must not raise + client.info("svc", "overflow") + + assert len(client._buffer) == 3 + assert client.get_metrics().logs_dropped == 1 + client.close() + + +def test_buffer_full_increments_dropped_counter(): + """Each dropped log must increment the logs_dropped counter.""" + client = make_client(max_buffer_size=2) + + client.info("svc", "a") + client.info("svc", "b") + client.info("svc", "c") # dropped + client.info("svc", "d") # dropped + + assert client.get_metrics().logs_dropped == 2 + client.close() + + +# --------------------------------------------------------------------------- +# requests.Session reuse +# --------------------------------------------------------------------------- + + +def test_session_is_reused_across_calls(): + """All HTTP calls must use the same requests.Session instance.""" + client = make_client() + mock = patched_session(client) + mock.get.return_value.json.return_value = {"logs": [], "total": 0} + + client.info("svc", "msg") + client.flush() + + from logtide_sdk import QueryOptions + + client.query(QueryOptions()) + + # post and get were both called on the same mock session + assert mock.post.called + assert mock.get.called + client.close() + + +# --------------------------------------------------------------------------- +# Payload limits +# --------------------------------------------------------------------------- + + +def test_payload_limits_field_truncation(): + """String fields longer than max_field_size must be truncated.""" + limits = PayloadLimitsOptions(max_field_size=10, truncation_marker="[T]") + client = make_client(payload_limits=limits) + + client.info("svc", "msg", {"data": "A" * 50}) + + value = client._buffer[0].metadata["data"] + assert value == "A" * 10 + "[T]" + client.close() + + +def test_payload_limits_exclude_fields(): + """Fields in exclude_fields must be replaced with [EXCLUDED].""" + limits = PayloadLimitsOptions(exclude_fields=["password"]) + client = make_client(payload_limits=limits) + + client.info("svc", "login", {"username": "alice", "password": "secret"}) + + assert client._buffer[0].metadata["password"] == "[EXCLUDED]" + assert client._buffer[0].metadata["username"] == "alice" + client.close() + + +def test_payload_limits_base64_removal(): + """Long base64-looking strings must be replaced.""" + limits = PayloadLimitsOptions() + client = make_client(payload_limits=limits) + + b64 = "A" * 150 # all base64 chars, long enough to trigger detection + client.info("svc", "msg", {"image": b64}) + + assert client._buffer[0].metadata["image"] == "[BASE64 DATA REMOVED]" + client.close() + + +def test_payload_limits_max_log_size(): + """Entries exceeding max_log_size must have metadata replaced with truncation marker.""" + limits = PayloadLimitsOptions(max_log_size=100) # very small limit + client = make_client(payload_limits=limits) + + client.info("svc", "msg", {"big": "X" * 200}) + + meta = client._buffer[0].metadata + assert meta.get("_truncated") is True + client.close() + + +# --------------------------------------------------------------------------- +# stream() returns a stop callable +# --------------------------------------------------------------------------- + + +def test_stream_returns_callable(): + """stream() must return a callable stop function immediately.""" + client = make_client() + + # Patch the session so the background thread hits a mock instead of the network + mock_session = MagicMock() + mock_response = MagicMock() + mock_response.__enter__ = lambda s: s + mock_response.__exit__ = MagicMock(return_value=False) + mock_response.raise_for_status.return_value = None + mock_response.iter_lines.return_value = iter([]) + mock_session.get.return_value = mock_response + client._session = mock_session + + stop = client.stream(on_log=lambda log: None) + assert callable(stop) + + stop() # must not raise + client.close() + + +# --------------------------------------------------------------------------- +# LogTideHandler +# --------------------------------------------------------------------------- + + +def test_logtide_handler_emit(): + """LogTideHandler must forward log records to LogTideClient.""" + client = make_client() + handler = LogTideHandler(client=client, service="test-svc") + + logger = logging.getLogger("test_logtide_handler_emit") + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + logger.info("hello from logging") + + assert len(client._buffer) == 1 + entry = client._buffer[0] + assert entry.service == "test-svc" + assert entry.level == LogLevel.INFO + assert "hello from logging" in entry.message + + logger.removeHandler(handler) + client.close() + + +def test_logtide_handler_level_mapping(): + """Handler must map stdlib levels to LogTide levels correctly.""" + client = make_client() + handler = LogTideHandler(client=client, service="svc") + + cases = [ + (logging.DEBUG, LogLevel.DEBUG), + (logging.INFO, LogLevel.INFO), + (logging.WARNING, LogLevel.WARN), + (logging.ERROR, LogLevel.ERROR), + (logging.CRITICAL, LogLevel.CRITICAL), + ] + for std_level, expected in cases: + assert handler._map_level(std_level) == expected + + client.close() + + +def test_logtide_handler_exception_metadata(): + """Records with exc_info must include a structured 'exception' in metadata.""" + client = make_client() + handler = LogTideHandler(client=client, service="svc") + + logger = logging.getLogger("test_logtide_handler_exc") + logger.addHandler(handler) + logger.setLevel(logging.ERROR) + + try: + raise ValueError("handler exc test") + except ValueError: + logger.error("something broke", exc_info=True) + + meta = client._buffer[0].metadata + assert "exception" in meta + assert meta["exception"]["type"] == "ValueError" + + logger.removeHandler(handler) + client.close() + + +# --------------------------------------------------------------------------- +# Async client (basic, no network) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_async_client_buffers_logs(): + """AsyncLogTideClient must buffer logs without sending.""" + from logtide_sdk import AsyncLogTideClient + + client = AsyncLogTideClient( + ClientOptions( + api_url="http://localhost:8080", + api_key="test_key", + flush_interval=0, + batch_size=1000, + ) + ) + + await client.info("svc", "async hello") + await client.error("svc", "async error", {"k": "v"}) + + assert len(client._buffer) == 2 + assert client._buffer[0].level == LogLevel.INFO + assert client._buffer[1].level == LogLevel.ERROR + + # Don't flush (no real server) — just verify buffering + client._closed = True # prevent flush in close() + if client._session: + await client._session.close() + + +@pytest.mark.asyncio +async def test_async_client_auth_header(): + """AsyncLogTideClient must use X-API-Key header.""" + from unittest.mock import AsyncMock, MagicMock + + from logtide_sdk import AsyncLogTideClient + + client = AsyncLogTideClient( + ClientOptions( + api_url="http://localhost:8080", + api_key="lp_async_key", + flush_interval=0, + batch_size=1, # triggers flush on first log + ) + ) + + mock_response = AsyncMock() + mock_response.raise_for_status = MagicMock(return_value=None) + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=False) + + mock_session = MagicMock() + mock_session.closed = False + mock_session.post.return_value = mock_response + mock_session.close = AsyncMock() + client._session = mock_session + + await client.info("svc", "trigger flush") + + mock_session.post.assert_called_once() + headers = mock_session.post.call_args[1]["headers"] + assert headers["X-API-Key"] == "lp_async_key" + assert "Authorization" not in headers + + await client._session.close()