Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
357 changes: 175 additions & 182 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,184 +285,6 @@ client.check_flag(
)
```

## DataStream

DataStream enables local flag evaluation by maintaining a WebSocket connection to Schematic and caching flag rules, company, and user data locally (or in a shared cache such as Redis). Flag checks are evaluated locally via a WASM rules engine, eliminating per-check network requests.

> **Async-only:** DataStream and Replicator Mode are only available on the `AsyncSchematic` client. The synchronous `Schematic` client does not support either feature — use `AsyncSchematic` (shown in all examples below) if you need them.

### Installation

DataStream requires additional dependencies for WebSocket connections and local flag evaluation. Install them with the `datastream` extra:

```bash
pip install 'schematichq[datastream]'
# or
poetry add schematichq -E datastream
```

To use the Redis-backed shared cache (see below), also install `redis`:

```bash
pip install 'schematichq[datastream]' redis
```

### Key Features

- **Real-Time Updates**: Automatically updates cached data when changes occur on the backend.
- **Local Flag Evaluation**: Flag checks are evaluated locally via WASM, eliminating per-check network requests.
- **Configurable Caching**: Supports in-memory caching (default) and custom `AsyncCacheProvider` implementations including a built-in Redis provider.

### How to Enable DataStream

Set `use_datastream=True` on `AsyncSchematicConfig`:

```python
import asyncio
from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig

async def main():
config = AsyncSchematicConfig(
use_datastream=True,
datastream=DataStreamConfig(
cache_ttl=300_000, # 5 minutes, in ms
),
)

async with AsyncSchematic("YOUR_API_KEY", config) as client:
is_enabled = await client.check_flag(
"some-flag-key",
company={"id": "your-company-id"},
user={"id": "your-user-id"},
)

asyncio.run(main())
```

### Configuration Options

All fields live on `DataStreamConfig`.

| Option | Type | Default | Description |
|---|---|---|---|
| `cache_ttl` | `Optional[int]` | 24 hours | Cache TTL in milliseconds. `None` means no expiration. |
| `company_cache` | `AsyncCacheProvider` | in-memory | Cache for full company records. |
| `company_lookup_cache` | `AsyncCacheProvider` | in-memory | Cache mapping company keys → company IDs. |
| `user_cache` | `AsyncCacheProvider` | in-memory | Cache for full user records. |
| `user_lookup_cache` | `AsyncCacheProvider` | in-memory | Cache mapping user keys → user IDs. |
| `flag_cache` | `AsyncCacheProvider` | in-memory | Cache for flag rules. |
| `replicator_mode` | `bool` | `False` | Enable Replicator Mode (see below). |
| `replicator_health_url` | `Optional[str]` | `http://localhost:8090/ready` | Replicator health check URL. |
| `replicator_health_check` | `Optional[int]` | 30000 | Health check interval in milliseconds. |

### Using Redis as a Shared Cache

The SDK ships with a `RedisCache` provider built on `redis.asyncio`. Pass a Redis client into the cache slots on `DataStreamConfig` to share state across multiple processes:

```python
import asyncio
import redis.asyncio as aioredis
from schematic.cache import RedisCache
from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig

async def main():
redis_client = aioredis.from_url("redis://localhost:6379")
cache_ttl_ms = 60 * 60 * 1000 # 1 hour

config = AsyncSchematicConfig(
use_datastream=True,
datastream=DataStreamConfig(
cache_ttl=cache_ttl_ms,
company_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
company_lookup_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
user_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
user_lookup_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
flag_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
),
)

async with AsyncSchematic("YOUR_API_KEY", config) as client:
await client.check_flag("some-flag-key", company={"id": "your-company-id"})

asyncio.run(main())
```

`RedisCache` accepts a `prefix` argument (default `"schematic"`) if you need to namespace keys — this must match the prefix used by any other SDKs or the replicator writing to the same Redis instance.

### Replicator Mode

When running the [`schematic-datastream-replicator`](https://github.com/schematichq/schematic-datastream-replicator) service, configure the client to operate in **Replicator Mode**. The replicator holds the single WebSocket connection to Schematic and populates a shared cache; SDK instances read from that cache and evaluate flags locally without opening their own WebSocket connections.

**Replicator Mode requires a shared cache** (e.g. Redis) so the SDK can read data written by the external replicator process. Configure the cache slots on `DataStreamConfig` exactly as in the Redis example above.

#### How to Enable Replicator Mode

```python
import asyncio
import redis.asyncio as aioredis
from schematic.cache import RedisCache
from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig

async def main():
redis_client = aioredis.from_url("redis://localhost:6379")

config = AsyncSchematicConfig(
use_datastream=True,
datastream=DataStreamConfig(
replicator_mode=True,
cache_ttl=None, # Match the replicator's unlimited default
company_cache=RedisCache(redis_client),
company_lookup_cache=RedisCache(redis_client),
user_cache=RedisCache(redis_client),
user_lookup_cache=RedisCache(redis_client),
flag_cache=RedisCache(redis_client),
),
)

async with AsyncSchematic("YOUR_API_KEY", config) as client:
is_enabled = await client.check_flag(
"some-flag-key",
company={"id": "your-company-id"},
)

asyncio.run(main())
```

#### Cache TTL Configuration

Set the SDK's `cache_ttl` to match the replicator's cache TTL. The replicator defaults to an unlimited cache TTL. If the SDK uses a shorter TTL (the default is 24 hours), locally updated cache entries (e.g. after track events) will be written back with the shorter TTL and eventually evicted from the shared cache, even though the replicator originally set them with no expiration.

If you have configured a custom cache TTL on the replicator, use the same value here.

#### Advanced Configuration

The client automatically configures sensible defaults for Replicator Mode, but you can customize the health check endpoint and interval:

```python
config = AsyncSchematicConfig(
use_datastream=True,
datastream=DataStreamConfig(
replicator_mode=True,
cache_ttl=None,
replicator_health_url="http://my-replicator:8090/ready",
replicator_health_check=60_000, # 60 seconds, in ms
# ... shared cache providers
),
)
```

#### Default Configuration

- **Replicator Health URL**: `http://localhost:8090/ready`
- **Health Check Interval**: 30 seconds
- **Cache TTL**: 24 hours (SDK default; should be set to match the replicator's TTL, which defaults to unlimited)

When running in Replicator Mode, the client will:
- Skip establishing WebSocket connections
- Periodically check if the replicator service is ready
- Use cached data populated by the external replicator service
- Fall back to direct API calls if the replicator is not available

## Webhook Verification

Schematic can send webhooks to notify your application of events. To ensure the security of these webhooks, Schematic signs each request using HMAC-SHA256. The Python SDK provides utility functions to verify these signatures.
Expand Down Expand Up @@ -697,12 +519,183 @@ client = Schematic(
)
```

## Beta Status
## DataStream

DataStream enables local flag evaluation by maintaining a WebSocket connection to Schematic and caching flag rules, company, and user data locally (or in a shared cache such as Redis). Flag checks are evaluated locally via a WASM rules engine, eliminating per-check network requests.

> **Async-only:** DataStream and Replicator Mode are only available on the `AsyncSchematic` client. The synchronous `Schematic` client does not support either feature — use `AsyncSchematic` (shown in all examples below) if you need them.

### Installation

DataStream requires additional dependencies for WebSocket connections and local flag evaluation. Install them with the `datastream` extra:

```bash
pip install 'schematichq[datastream]'
# or
poetry add schematichq -E datastream
```

To use the Redis-backed shared cache (see below), also install `redis`:

```bash
pip install 'schematichq[datastream]' redis
```

### Key Features

- **Real-Time Updates**: Automatically updates cached data when changes occur on the backend.
- **Local Flag Evaluation**: Flag checks are evaluated locally via WASM, eliminating per-check network requests.
- **Configurable Caching**: Supports in-memory caching (default) and custom `AsyncCacheProvider` implementations including a built-in Redis provider.

### How to Enable DataStream

Set `use_datastream=True` on `AsyncSchematicConfig`:

```python
import asyncio
from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig

async def main():
config = AsyncSchematicConfig(
use_datastream=True,
datastream=DataStreamConfig(
cache_ttl=300_000, # 5 minutes, in ms
),
)

async with AsyncSchematic("YOUR_API_KEY", config) as client:
is_enabled = await client.check_flag(
"some-flag-key",
company={"id": "your-company-id"},
user={"id": "your-user-id"},
)

asyncio.run(main())
```

### Configuration Options

All fields live on `DataStreamConfig`.

| Option | Type | Default | Description |
|---|---|---|---|
| `cache_ttl` | `Optional[int]` | 24 hours | Cache TTL in milliseconds. `None` means no expiration. |
| `company_cache` | `AsyncCacheProvider` | in-memory | Cache for full company records. |
| `company_lookup_cache` | `AsyncCacheProvider` | in-memory | Cache mapping company keys → company IDs. |
| `user_cache` | `AsyncCacheProvider` | in-memory | Cache for full user records. |
| `user_lookup_cache` | `AsyncCacheProvider` | in-memory | Cache mapping user keys → user IDs. |
| `flag_cache` | `AsyncCacheProvider` | in-memory | Cache for flag rules. |
| `replicator_mode` | `bool` | `False` | Enable Replicator Mode (see below). |
| `replicator_health_url` | `Optional[str]` | `http://localhost:8090/ready` | Replicator health check URL. |
| `replicator_health_check` | `Optional[int]` | 30000 | Health check interval in milliseconds. |

### Using Redis as a Shared Cache

The SDK ships with a `RedisCache` provider built on `redis.asyncio`. Pass a Redis client into the cache slots on `DataStreamConfig` to share state across multiple processes:

```python
import asyncio
import redis.asyncio as aioredis
from schematic.cache import RedisCache
from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig

async def main():
redis_client = aioredis.from_url("redis://localhost:6379")
cache_ttl_ms = 60 * 60 * 1000 # 1 hour

config = AsyncSchematicConfig(
use_datastream=True,
datastream=DataStreamConfig(
cache_ttl=cache_ttl_ms,
company_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
company_lookup_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
user_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
user_lookup_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
flag_cache=RedisCache(redis_client, default_ttl_ms=cache_ttl_ms),
),
)

async with AsyncSchematic("YOUR_API_KEY", config) as client:
await client.check_flag("some-flag-key", company={"id": "your-company-id"})

asyncio.run(main())
```

`RedisCache` accepts a `prefix` argument (default `"schematic"`) if you need to namespace keys — this must match the prefix used by any other SDKs or the replicator writing to the same Redis instance.

### Replicator Mode

When running the [`schematic-datastream-replicator`](https://github.com/schematichq/schematic-datastream-replicator) service, configure the client to operate in **Replicator Mode**. The replicator holds the single WebSocket connection to Schematic and populates a shared cache; SDK instances read from that cache and evaluate flags locally without opening their own WebSocket connections.

**Replicator Mode requires a shared cache** (e.g. Redis) so the SDK can read data written by the external replicator process. Configure the cache slots on `DataStreamConfig` exactly as in the Redis example above.

#### How to Enable Replicator Mode

```python
import asyncio
import redis.asyncio as aioredis
from schematic.cache import RedisCache
from schematic.client import AsyncSchematic, AsyncSchematicConfig, DataStreamConfig

async def main():
redis_client = aioredis.from_url("redis://localhost:6379")

config = AsyncSchematicConfig(
use_datastream=True,
datastream=DataStreamConfig(
replicator_mode=True,
cache_ttl=None, # Match the replicator's unlimited default
company_cache=RedisCache(redis_client),
company_lookup_cache=RedisCache(redis_client),
user_cache=RedisCache(redis_client),
user_lookup_cache=RedisCache(redis_client),
flag_cache=RedisCache(redis_client),
),
)

async with AsyncSchematic("YOUR_API_KEY", config) as client:
is_enabled = await client.check_flag(
"some-flag-key",
company={"id": "your-company-id"},
)

asyncio.run(main())
```

#### Cache TTL Configuration

Set the SDK's `cache_ttl` to match the replicator's cache TTL. The replicator defaults to an unlimited cache TTL. If the SDK uses a shorter TTL (the default is 24 hours), locally updated cache entries (e.g. after track events) will be written back with the shorter TTL and eventually evicted from the shared cache, even though the replicator originally set them with no expiration.

If you have configured a custom cache TTL on the replicator, use the same value here.

#### Advanced Configuration

The client automatically configures sensible defaults for Replicator Mode, but you can customize the health check endpoint and interval:

```python
config = AsyncSchematicConfig(
use_datastream=True,
datastream=DataStreamConfig(
replicator_mode=True,
cache_ttl=None,
replicator_health_url="http://my-replicator:8090/ready",
replicator_health_check=60_000, # 60 seconds, in ms
# ... shared cache providers
),
)
```

#### Default Configuration

This SDK is in **Preview**, and there may be breaking changes between versions without a major
version update.
- **Replicator Health URL**: `http://localhost:8090/ready`
- **Health Check Interval**: 30 seconds
- **Cache TTL**: 24 hours (SDK default; should be set to match the replicator's TTL, which defaults to unlimited)

To ensure a reproducible environment (and minimize risk of breaking changes), we recommend pinning a specific package version.
When running in Replicator Mode, the client will:
- Skip establishing WebSocket connections
- Periodically check if the replicator service is ready
- Use cached data populated by the external replicator service
- Fall back to direct API calls if the replicator is not available

## Contributing

Expand Down
Loading