From a6d1ff614baf3bb501218af3302596cadbf1f9a5 Mon Sep 17 00:00:00 2001 From: ryan echternacht Date: Tue, 5 May 2026 16:31:39 -0400 Subject: [PATCH 1/2] fix discrepancies to docs --- README.md | 179 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 175 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index cce6876..0ff84a4 100644 --- a/README.md +++ b/README.md @@ -697,12 +697,183 @@ client = Schematic( ) ``` -## Beta Status +## DataStream + +DataStream enables local flag evaluation by maintaining a WebSocket connection to Schematic and caching flag rules, company, and user data locally (or in a shared cache such as Redis). Flag checks are evaluated locally via a WASM rules engine, eliminating per-check network requests. + +> **Async-only:** DataStream and Replicator Mode are only available on the `AsyncSchematic` client. The synchronous `Schematic` client does not support either feature — use `AsyncSchematic` (shown in all examples below) if you need them. + +### Installation + +DataStream requires additional dependencies for WebSocket connections and local flag evaluation. Install them with the `datastream` extra: + +```bash +pip install 'schematichq[datastream]' +# or +poetry add schematichq -E datastream +``` + +To use the Redis-backed shared cache (see below), also install `redis`: + +```bash +pip install 'schematichq[datastream]' redis +``` + +### Key Features + +- **Real-Time Updates**: Automatically updates cached data when changes occur on the backend. +- **Local Flag Evaluation**: Flag checks are evaluated locally via WASM, eliminating per-check network requests. +- **Configurable Caching**: Supports in-memory caching (default) and custom `AsyncCacheProvider` implementations including a built-in Redis provider. + +### How to Enable DataStream + +Set `use_datastream=True` on `AsyncSchematicConfig`: + +```python +import asyncio +from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig + +async def main(): + config = AsyncSchematicConfig( + use_datastream=True, + datastream=DataStreamConfig( + cache_ttl=300_000, # 5 minutes, in ms + ), + ) + + async with AsyncSchematic("YOUR_API_KEY", config) as client: + is_enabled = await client.check_flag( + "some-flag-key", + company={"id": "your-company-id"}, + user={"id": "your-user-id"}, + ) + +asyncio.run(main()) +``` + +### Configuration Options + +All fields live on `DataStreamConfig`. + +| Option | Type | Default | Description | +|---|---|---|---| +| `cache_ttl` | `Optional[int]` | 24 hours | Cache TTL in milliseconds. `None` means no expiration. | +| `company_cache` | `AsyncCacheProvider` | in-memory | Cache for full company records. | +| `company_lookup_cache` | `AsyncCacheProvider` | in-memory | Cache mapping company keys → company IDs. | +| `user_cache` | `AsyncCacheProvider` | in-memory | Cache for full user records. | +| `user_lookup_cache` | `AsyncCacheProvider` | in-memory | Cache mapping user keys → user IDs. | +| `flag_cache` | `AsyncCacheProvider` | in-memory | Cache for flag rules. | +| `replicator_mode` | `bool` | `False` | Enable Replicator Mode (see below). | +| `replicator_health_url` | `Optional[str]` | `http://localhost:8090/ready` | Replicator health check URL. | +| `replicator_health_check` | `Optional[int]` | 30000 | Health check interval in milliseconds. | + +### Using Redis as a Shared Cache + +The SDK ships with a `RedisCache` provider built on `redis.asyncio`. Pass a Redis client into the cache slots on `DataStreamConfig` to share state across multiple processes: + +```python +import asyncio +import redis.asyncio as aioredis +from schematic.cache import RedisCache +from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig -This SDK is in **Preview**, and there may be breaking changes between versions without a major -version update. +async def main(): + redis_client = aioredis.from_url("redis://localhost:6379") + cache_ttl_ms = 60 * 60 * 1000 # 1 hour -To ensure a reproducible environment (and minimize risk of breaking changes), we recommend pinning a specific package version. + config = AsyncSchematicConfig( + use_datastream=True, + datastream=DataStreamConfig( + cache_ttl=cache_ttl_ms, + company_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms), + company_lookup_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms), + user_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms), + user_lookup_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms), + flag_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms), + ), + ) + + async with AsyncSchematic("YOUR_API_KEY", config) as client: + await client.check_flag("some-flag-key", company={"id": "your-company-id"}) + +asyncio.run(main()) +``` + +`RedisCache` accepts a `prefix` argument (default `"schematic"`) if you need to namespace keys — this must match the prefix used by any other SDKs or the replicator writing to the same Redis instance. + +### Replicator Mode + +When running the [`schematic-datastream-replicator`](https://github.com/schematichq/schematic-datastream-replicator) service, configure the client to operate in **Replicator Mode**. The replicator holds the single WebSocket connection to Schematic and populates a shared cache; SDK instances read from that cache and evaluate flags locally without opening their own WebSocket connections. + +**Replicator Mode requires a shared cache** (e.g. Redis) so the SDK can read data written by the external replicator process. Configure the cache slots on `DataStreamConfig` exactly as in the Redis example above. + +#### How to Enable Replicator Mode + +```python +import asyncio +import redis.asyncio as aioredis +from schematic.cache import RedisCache +from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig + +async def main(): + redis_client = aioredis.from_url("redis://localhost:6379") + + config = AsyncSchematicConfig( + use_datastream=True, + datastream=DataStreamConfig( + replicator_mode=True, + cache_ttl=None, # Match the replicator's unlimited default + company_cache=RedisCache(redis_client), + company_lookup_cache=RedisCache(redis_client), + user_cache=RedisCache(redis_client), + user_lookup_cache=RedisCache(redis_client), + flag_cache=RedisCache(redis_client), + ), + ) + + async with AsyncSchematic("YOUR_API_KEY", config) as client: + is_enabled = await client.check_flag( + "some-flag-key", + company={"id": "your-company-id"}, + ) + +asyncio.run(main()) +``` + +#### Cache TTL Configuration + +Set the SDK's `cache_ttl` to match the replicator's cache TTL. The replicator defaults to an unlimited cache TTL. If the SDK uses a shorter TTL (the default is 24 hours), locally updated cache entries (e.g. after track events) will be written back with the shorter TTL and eventually evicted from the shared cache, even though the replicator originally set them with no expiration. + +If you have configured a custom cache TTL on the replicator, use the same value here. + +#### Advanced Configuration + +The client automatically configures sensible defaults for Replicator Mode, but you can customize the health check endpoint and interval: + +```python +config = AsyncSchematicConfig( + use_datastream=True, + datastream=DataStreamConfig( + replicator_mode=True, + cache_ttl=None, + replicator_health_url="http://my-replicator:8090/ready", + replicator_health_check=60_000, # 60 seconds, in ms + # ... shared cache providers + ), +) +``` + +#### Default Configuration + +- **Replicator Health URL**: `http://localhost:8090/ready` +- **Health Check Interval**: 30 seconds +- **Cache TTL**: 24 hours (SDK default; should be set to match the replicator's TTL, which defaults to unlimited) + +When running in Replicator Mode, the client will: +- Skip establishing WebSocket connections +- Periodically check if the replicator service is ready +- Use cached data populated by the external replicator service +- Fall back to direct API calls if the replicator is not available ## Contributing From b63b498d569f5b35c073d13b2548bf493037cda9 Mon Sep 17 00:00:00 2001 From: ryan echternacht Date: Fri, 8 May 2026 15:00:23 -0400 Subject: [PATCH 2/2] Remove duplicate section --- README.md | 178 ------------------------------------------------------ 1 file changed, 178 deletions(-) diff --git a/README.md b/README.md index 0ff84a4..c01408e 100644 --- a/README.md +++ b/README.md @@ -285,184 +285,6 @@ client.check_flag( ) ``` -## DataStream - -DataStream enables local flag evaluation by maintaining a WebSocket connection to Schematic and caching flag rules, company, and user data locally (or in a shared cache such as Redis). Flag checks are evaluated locally via a WASM rules engine, eliminating per-check network requests. - -> **Async-only:** DataStream and Replicator Mode are only available on the `AsyncSchematic` client. The synchronous `Schematic` client does not support either feature — use `AsyncSchematic` (shown in all examples below) if you need them. - -### Installation - -DataStream requires additional dependencies for WebSocket connections and local flag evaluation. Install them with the `datastream` extra: - -```bash -pip install 'schematichq[datastream]' -# or -poetry add schematichq -E datastream -``` - -To use the Redis-backed shared cache (see below), also install `redis`: - -```bash -pip install 'schematichq[datastream]' redis -``` - -### Key Features - -- **Real-Time Updates**: Automatically updates cached data when changes occur on the backend. -- **Local Flag Evaluation**: Flag checks are evaluated locally via WASM, eliminating per-check network requests. -- **Configurable Caching**: Supports in-memory caching (default) and custom `AsyncCacheProvider` implementations including a built-in Redis provider. - -### How to Enable DataStream - -Set `use_datastream=True` on `AsyncSchematicConfig`: - -```python -import asyncio -from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig - -async def main(): - config = AsyncSchematicConfig( - use_datastream=True, - datastream=DataStreamConfig( - cache_ttl=300_000, # 5 minutes, in ms - ), - ) - - async with AsyncSchematic("YOUR_API_KEY", config) as client: - is_enabled = await client.check_flag( - "some-flag-key", - company={"id": "your-company-id"}, - user={"id": "your-user-id"}, - ) - -asyncio.run(main()) -``` - -### Configuration Options - -All fields live on `DataStreamConfig`. - -| Option | Type | Default | Description | -|---|---|---|---| -| `cache_ttl` | `Optional[int]` | 24 hours | Cache TTL in milliseconds. `None` means no expiration. | -| `company_cache` | `AsyncCacheProvider` | in-memory | Cache for full company records. | -| `company_lookup_cache` | `AsyncCacheProvider` | in-memory | Cache mapping company keys → company IDs. | -| `user_cache` | `AsyncCacheProvider` | in-memory | Cache for full user records. | -| `user_lookup_cache` | `AsyncCacheProvider` | in-memory | Cache mapping user keys → user IDs. | -| `flag_cache` | `AsyncCacheProvider` | in-memory | Cache for flag rules. | -| `replicator_mode` | `bool` | `False` | Enable Replicator Mode (see below). | -| `replicator_health_url` | `Optional[str]` | `http://localhost:8090/ready` | Replicator health check URL. | -| `replicator_health_check` | `Optional[int]` | 30000 | Health check interval in milliseconds. | - -### Using Redis as a Shared Cache - -The SDK ships with a `RedisCache` provider built on `redis.asyncio`. Pass a Redis client into the cache slots on `DataStreamConfig` to share state across multiple processes: - -```python -import asyncio -import redis.asyncio as aioredis -from schematic.cache import RedisCache -from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig - -async def main(): - redis_client = aioredis.from_url("redis://localhost:6379") - cache_ttl_ms = 60 * 60 * 1000 # 1 hour - - config = AsyncSchematicConfig( - use_datastream=True, - datastream=DataStreamConfig( - cache_ttl=cache_ttl_ms, - company_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms), - company_lookup_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms), - user_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms), - user_lookup_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms), - flag_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms), - ), - ) - - async with AsyncSchematic("YOUR_API_KEY", config) as client: - await client.check_flag("some-flag-key", company={"id": "your-company-id"}) - -asyncio.run(main()) -``` - -`RedisCache` accepts a `prefix` argument (default `"schematic"`) if you need to namespace keys — this must match the prefix used by any other SDKs or the replicator writing to the same Redis instance. - -### Replicator Mode - -When running the [`schematic-datastream-replicator`](https://github.com/schematichq/schematic-datastream-replicator) service, configure the client to operate in **Replicator Mode**. The replicator holds the single WebSocket connection to Schematic and populates a shared cache; SDK instances read from that cache and evaluate flags locally without opening their own WebSocket connections. - -**Replicator Mode requires a shared cache** (e.g. Redis) so the SDK can read data written by the external replicator process. Configure the cache slots on `DataStreamConfig` exactly as in the Redis example above. - -#### How to Enable Replicator Mode - -```python -import asyncio -import redis.asyncio as aioredis -from schematic.cache import RedisCache -from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig - -async def main(): - redis_client = aioredis.from_url("redis://localhost:6379") - - config = AsyncSchematicConfig( - use_datastream=True, - datastream=DataStreamConfig( - replicator_mode=True, - cache_ttl=None, # Match the replicator's unlimited default - company_cache=RedisCache(redis_client), - company_lookup_cache=RedisCache(redis_client), - user_cache=RedisCache(redis_client), - user_lookup_cache=RedisCache(redis_client), - flag_cache=RedisCache(redis_client), - ), - ) - - async with AsyncSchematic("YOUR_API_KEY", config) as client: - is_enabled = await client.check_flag( - "some-flag-key", - company={"id": "your-company-id"}, - ) - -asyncio.run(main()) -``` - -#### Cache TTL Configuration - -Set the SDK's `cache_ttl` to match the replicator's cache TTL. The replicator defaults to an unlimited cache TTL. If the SDK uses a shorter TTL (the default is 24 hours), locally updated cache entries (e.g. after track events) will be written back with the shorter TTL and eventually evicted from the shared cache, even though the replicator originally set them with no expiration. - -If you have configured a custom cache TTL on the replicator, use the same value here. - -#### Advanced Configuration - -The client automatically configures sensible defaults for Replicator Mode, but you can customize the health check endpoint and interval: - -```python -config = AsyncSchematicConfig( - use_datastream=True, - datastream=DataStreamConfig( - replicator_mode=True, - cache_ttl=None, - replicator_health_url="http://my-replicator:8090/ready", - replicator_health_check=60_000, # 60 seconds, in ms - # ... shared cache providers - ), -) -``` - -#### Default Configuration - -- **Replicator Health URL**: `http://localhost:8090/ready` -- **Health Check Interval**: 30 seconds -- **Cache TTL**: 24 hours (SDK default; should be set to match the replicator's TTL, which defaults to unlimited) - -When running in Replicator Mode, the client will: -- Skip establishing WebSocket connections -- Periodically check if the replicator service is ready -- Use cached data populated by the external replicator service -- Fall back to direct API calls if the replicator is not available - ## Webhook Verification Schematic can send webhooks to notify your application of events. To ensure the security of these webhooks, Schematic signs each request using HMAC-SHA256. The Python SDK provides utility functions to verify these signatures.