Skip to content

mn2rb/Streamix

Repository files navigation

Streamix Queue

PyPI Downloads Python 3.10+ License: MIT

Lightweight Redis Streams event messenger for service-to-service communication.

Features

  • Simple API: Just publish() and consume() functions
  • Redis Streams: Built on battle-tested Redis infrastructure
  • Consumer Groups: Multiple services can consume the same events with group-based tracking
  • Automatic Retries: Configurable retry limit with exponential backoff support
  • Dead-Letter Queue: Failed messages sent to <stream>:failed for inspection
  • Stale Message Recovery: Automatic reclaim of messages from crashed consumers
  • Structured Schema: Messages include id, event, data, retries, and timestamps
  • Type Hints: Full Python type annotations for IDE support
  • Minimal Dependencies: Only Redis client required

Installation

pip install streamix-queue

Quick Start

Consumer Service A

from streamix_queue import consume

def on_user_created(data):
    print(f"User created: {data['user_id']}")
    # If handler raises an exception, message is automatically retried

consume(
    "user.created",
    on_user_created,
    redis_url="redis://localhost:6379/0",
    stream="app.events",
    group="service-a",
)

Publisher Service B

from streamix_queue import publish

publish(
    "user.created",
    {"user_id": "123", "email": "alice@example.com"},
    redis_url="redis://localhost:6379/0",
    stream="app.events",
    group="service-a",
)

How It Works

  1. Publish: Event sent to Redis Stream with structured schema
  2. Consumer Group: Maintains message delivery state and ownership
  3. Processing: Consumer reads and processes events
  4. Success: Message acknowledged (removed from pending list)
  5. Failure: On exception, message retried; after limit exceeded, moved to DLQ
  6. Dead-Letter: Permanently failed messages stored in <stream>:failed for debugging

API Reference

publish(event, data, **kwargs)

Publish an event to the stream.

Parameters:

  • event (str): Event name (e.g., "user.created")
  • data (dict): Event payload
  • redis_url (str, default="redis://localhost:6379/0"): Redis connection URL
  • stream (str, default="app.events"): Stream name
  • group (str, default="app.workers"): Consumer group name

Returns: StreamMessage object with id, event, data, retries, timestamps

consume(event, handler, **kwargs)

Start a consumer that listens for events.

Parameters:

  • event (str): Event name to listen for
  • handler (callable): Function called with message data (or message object if it accepts 2+ args)
  • redis_url (str, default="redis://localhost:6379/0"): Redis connection URL
  • stream (str, default="app.events"): Stream name
  • group (str, default="app.workers"): Consumer group name
  • consumer (str, optional): Consumer instance name (auto-generated if None)
  • retry_limit (int, default=3): Max retries before sending to DLQ
  • batch_size (int, default=10): Messages per batch
  • block_ms (int, default=5000): Blocking timeout for XREADGROUP
  • claim_idle_ms (int, default=60000): Idle time threshold for stale message reclaim

Handler signature:

# Simple - receives data only
def handler(data):
    pass

# Advanced - receives data and full message
def handler(data, message):
    print(message.id)       # Message UUID
    print(message.retries)  # Retry count
    print(message.event)    # Original event name

Configuration Examples

Multiple consumers for same event

# Service A
consume("order.placed", on_order_placed, group="service-a", consumer="worker-1")

# Service B - same event, different group
consume("order.placed", on_order_placed_b, group="service-b", consumer="worker-1")

Different streams per environment

# Dev
publish("user.updated", {...}, stream="dev.events", group="dev-workers")

# Prod
publish("user.updated", {...}, stream="prod.events", group="prod-workers")

Adjust retry behavior

consume(
    "payment.processed",
    handle_payment,
    retry_limit=5,  # More retries
    block_ms=10000,  # Longer blocking timeouts
    claim_idle_ms=120000,  # Reclaim after 2 minutes
)

Message Schema

Every message follows this structure:

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "event": "user.created",
  "data": {
    "user_id": "123",
    "email": "alice@example.com"
  },
  "retries": 0,
  "timestamps": {
    "created_at": "2026-04-24T17:45:00+00:00",
    "updated_at": "2026-04-24T17:45:00+00:00"
  }
}

Error Handling & Dead-Letter Queue

By default, messages are retried up to 3 times. After exceeding the retry limit, they're sent to the dead-letter stream:

DLQ Stream: <stream>:failed (e.g., app.events:failed)

DLQ Message Example:

{
  "id": "...",
  "event": "user.created",
  "data": {
    "original_id": "550e8400-...",
    "original_event": "user.created",
    "original_data": {"user_id": "123"},
    "source_stream_id": "1713982500001-0",
    "retries": 3,
    "error": "Traceback: Connection timeout..."
  },
  "retries": 3,
  "timestamps": {...}
}

Running in Production

Docker Example

FROM python:3.12-slim

WORKDIR /app
RUN pip install streamix-queue

COPY handlers.py .

CMD ["python", "handlers.py"]

Kubernetes Example

apiVersion: v1
kind: Pod
metadata:
  name: streamix-consumer
spec:
  containers:
  - name: consumer
    image: myapp:latest
    env:
    - name: REDIS_URL
      value: "redis://redis:6379/0"
    - name: STREAM
      value: "app.events"
    - name: GROUP
      value: "service-a"

Performance Tips

  1. Batch Size: Increase batch_size for high throughput (10-50)
  2. Block Timeout: Increase block_ms to reduce CPU usage (5000-30000)
  3. Consumer Instances: Run multiple consumers in the same group for parallel processing
  4. Redis Persistence: Enable AOF/RDB for durability

Troubleshooting

Messages stuck in pending

Check the consumer group pending entries:

from redis import Redis

r = Redis.from_url("redis://localhost:6379/0")
pending = r.xpending("app.events", "service-a")
print(pending)

Inspect dead-letter stream

from redis import Redis

r = Redis.from_url("redis://localhost:6379/0")
failed = r.xread({"app.events:failed": "0"}, count=10)
for stream, messages in failed:
    for msg_id, data in messages:
        print(msg_id, data)

License

MIT License - see LICENSE file for details

Contributing

Contributions welcome! Please feel free to submit a Pull Request.

Support

For issues, questions, or feature requests, please open an issue on GitHub.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages