Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
69bde8b
Upgrades valkey to v9 relaxes pin to maj ver
chrisk314 Nov 30, 2025
e16803b
Relaxes rabbitmq pin to maj v4
chrisk314 Nov 30, 2025
89f902e
Adds redis deps
chrisk314 Nov 30, 2025
932d6cc
Adds redis connector impl with tests
chrisk314 Nov 30, 2025
6cfb75e
Redis connection in DI
chrisk314 Nov 30, 2025
1f28655
Import schema object from plugboard_schemas
chrisk314 Feb 25, 2026
3a3bc2d
Some simplifications for receive
chrisk314 Feb 25, 2026
01c3ba6
Simplifies RedisChannel impl by passing send/recv fn
chrisk314 Feb 25, 2026
b2dd347
Avoids unnecessary lock for pubsub mode
chrisk314 Feb 25, 2026
f3f4516
Fixup mypy errors
chrisk314 Feb 25, 2026
3a12906
Adds missing redis deps for tests in github actions
chrisk314 Feb 25, 2026
cd32c60
Swaps valkey for redis, updates docker service versions
chrisk314 Feb 25, 2026
5551244
Exports Redis classes in connector module
chrisk314 Feb 25, 2026
5c7f297
Updates connector docs
chrisk314 Feb 25, 2026
b660d2c
Doc strings
chrisk314 Feb 25, 2026
ed01a22
Redis URL env var mention
chrisk314 Feb 25, 2026
503f401
Fixup rabbitmq docker image tag
chrisk314 Feb 25, 2026
c59d682
Adds missing Redis settings
chrisk314 Feb 25, 2026
8047c48
Redis url and password setup for testing
chrisk314 Feb 25, 2026
f7aed89
Updates deps
chrisk314 Mar 1, 2026
7d6618f
Only create redis and rabbitmq conn if required
chrisk314 Mar 1, 2026
9ad3b3e
Corrections for Redis auth config
chrisk314 Mar 1, 2026
291b541
Fixup redis type annotation
chrisk314 Mar 1, 2026
0045cc4
Fixes issue due to subscribe messages
chrisk314 Mar 11, 2026
e4cf491
Simplifies cache usage in state backend to avoid bug
chrisk314 Mar 11, 2026
33c0f64
Removes unrequired async-lru dep and upgrades others
chrisk314 Mar 11, 2026
ce3815f
chore: uv lock upgrade
chrisk314 Mar 23, 2026
8216ea0
Merge branch 'main' into feat/redis-connector
chrisk314 Mar 23, 2026
f564e59
chore: uv lock upgrade
chrisk314 Mar 23, 2026
822f803
chore: Coverage ignores
chrisk314 Mar 23, 2026
ef86b91
test: Adds tests for broker env vars unset cases
chrisk314 Mar 23, 2026
f182157
test: Adds tests for no process found cases in state backends
chrisk314 Mar 23, 2026
07349e7
chore: Fixup lint issues
chrisk314 Mar 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .github/workflows/lint-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:
cache-dependency-glob: "uv.lock"

- name: Install project
run: uv sync --group test
run: uv sync --group test --extra redis

- name: Run unit tests
run: COVERAGE_FILE=.coverage.py${{ matrix.python_version }}.unit uv run coverage run -m pytest ./tests/unit/
Expand Down Expand Up @@ -133,17 +133,18 @@ jobs:
cache-dependency-glob: "uv.lock"

- name: Install project
run: uv sync --group test
run: uv sync --group test --extra redis

- name: Run backing services
run: |
docker compose up postgres rabbitmq -d
docker compose up -d
sleep 10 # Wait for services to start

- name: Run integration tests
run: COVERAGE_FILE=.coverage.py${{ matrix.python_version }}.integration uv run coverage run -m pytest ./tests/integration/ -m "not tuner"
env:
RABBITMQ_URL: amqp://user:password@localhost:5672/
REDIS_URL: redis://default:password@localhost:6379/
RAY_ENABLE_UV_RUN_RUNTIME_ENV: 0
PLUGBOARD_IO_READ_TIMEOUT: 5.0

Expand Down Expand Up @@ -177,12 +178,13 @@ jobs:
cache-dependency-glob: "uv.lock"

- name: Install project
run: uv sync --group test
run: uv sync --group test --extra redis

- name: Run tuner tests
run: COVERAGE_FILE=.coverage.py${{ matrix.python_version }}.integration.tuner uv run coverage run -m pytest ./tests/integration/ -m "tuner"
env:
RABBITMQ_URL: amqp://user:password@localhost:5672/
REDIS_URL: redis://default:password@localhost:6379/
RAY_ENABLE_UV_RUN_RUNTIME_ENV: 0
PLUGBOARD_IO_READ_TIMEOUT: 5.0

Expand Down
18 changes: 10 additions & 8 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: plugboard

services:
rabbitmq:
image: rabbitmq:4.0-rc-management-alpine
image: rabbitmq:4.2-management-alpine
container_name: rabbitmq
ports:
- 5672:5672
Expand All @@ -17,7 +17,7 @@ services:
restart: always

postgres:
image: postgres:18-alpine
image: postgres:18.2-alpine
container_name: postgres
environment:
- POSTGRES_USER=plugboard
Expand All @@ -31,14 +31,16 @@ services:
- main
restart: always

valkey:
image: valkey/valkey:8.0-alpine
container_name: valkey
command: valkey-server --dir /var/lib/valkey --bind 0.0.0.0 -::1 --protected-mode no
redis:
image: redis:8.6-alpine
container_name: redis
environment:
- REDIS_PASSWORD=password
command: redis-server --bind 0.0.0.0 --requirepass password
ports:
- 6379:6379
volumes:
- valkey-data:/var/lib/valkey
- redis-data:/data
networks:
- main
restart: always
Expand All @@ -50,4 +52,4 @@ networks:
volumes:
rabbitmq-data:
postgres-data:
valkey-data:
redis-data:
2 changes: 2 additions & 0 deletions docs/api/connector/connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@
- RabbitMQChannel
- RayConnector
- RayChannel
- RedisConnector
- RedisChannel
- ZMQConnector
- ZMQChannel
1 change: 1 addition & 0 deletions docs/usage/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Plugboard can make use of a message broker for data exchange between components
| Option Name | Description | Default Value |
|----------------------------|----------------------------------------------------------|---------------|
| `RABBITMQ_URL` | URL for RabbitMQ AMQP message broker (must include credentials if required) | |
| `REDIS_URL` | URL for Redis message broker (must include credentials if required) | |

## Job ID

Expand Down
3 changes: 3 additions & 0 deletions plugboard/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from plugboard.connector.connector_builder import ConnectorBuilder
from plugboard.connector.rabbitmq_channel import RabbitMQChannel, RabbitMQConnector
from plugboard.connector.ray_channel import RayChannel, RayConnector
from plugboard.connector.redis_channel import RedisChannel, RedisConnector
from plugboard.connector.serde_channel import SerdeChannel
from plugboard.connector.zmq_channel import ZMQChannel, ZMQConnector

Expand All @@ -20,6 +21,8 @@
"RabbitMQConnector",
"RayChannel",
"RayConnector",
"RedisChannel",
"RedisConnector",
"SerdeChannel",
"ZMQChannel",
"ZMQConnector",
Expand Down
16 changes: 12 additions & 4 deletions plugboard/connector/rabbitmq_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ def __init__(self, *args: _t.Any, **kwargs: _t.Any) -> None:
self._recv_channel: _t.Optional[RabbitMQChannel] = None
self._recv_channel_lock = asyncio.Lock()

def __getstate__(self) -> dict:
def __getstate__(self) -> dict: # pragma: no cover
state = self.__dict__.copy()
for attr in ("_send_channel", "_send_channel_lock", "_recv_channel", "_recv_channel_lock"):
if attr in state:
del state[attr]
return state

def __setstate__(self, state: dict) -> None:
def __setstate__(self, state: dict) -> None: # pragma: no cover
self.__dict__.update(state)
self._send_channel = None
self._send_channel_lock = asyncio.Lock()
Expand All @@ -134,9 +134,13 @@ def __setstate__(self, state: dict) -> None:

@inject
async def connect_send(
self, rabbitmq_conn: AbstractRobustConnection = Provide[DI.rabbitmq_conn]
self, rabbitmq_conn: AbstractRobustConnection | None = Provide[DI.rabbitmq_conn]
) -> RabbitMQChannel:
"""Returns a `RabbitMQ` channel for sending messages."""
if rabbitmq_conn is None:
raise RuntimeError(
"RabbitMQ connection not available. Ensure RabbitMQ URL is configured."
)
async with self._send_channel_lock:
if self._send_channel is not None:
return self._send_channel
Expand All @@ -150,9 +154,13 @@ async def connect_send(

@inject
async def connect_recv(
self, rabbitmq_conn: AbstractRobustConnection = Provide[DI.rabbitmq_conn]
self, rabbitmq_conn: AbstractRobustConnection | None = Provide[DI.rabbitmq_conn]
) -> RabbitMQChannel:
"""Returns a `RabbitMQ` channel for receiving messages."""
if rabbitmq_conn is None:
raise RuntimeError(
"RabbitMQ connection not available. Ensure RabbitMQ URL is configured."
)
cm = self._recv_channel_lock if self.spec.mode != ConnectorMode.PUBSUB else nullcontext()
async with cm:
if self._recv_channel is not None:
Expand Down
195 changes: 195 additions & 0 deletions plugboard/connector/redis_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
"""Provides RedisChannel and RedisConnector."""

from __future__ import annotations

import asyncio
import typing as _t

from plugboard_schemas.connector import ConnectorMode
from that_depends import Provide, inject

from plugboard.connector.connector import Connector
from plugboard.connector.serde_channel import SerdeChannel
from plugboard.exceptions import ChannelClosedError
from plugboard.utils import DI, depends_on_optional


try:
from redis.asyncio import Redis
from redis.asyncio.client import PubSub
except ImportError: # pragma: no cover
pass


class RedisChannel(SerdeChannel):
"""`RedisChannel` for sending and receiving messages via Redis."""

@depends_on_optional("redis")
def __init__(
self,
*args: _t.Any,
key: str,
send_fn: _t.Optional[_t.Callable[[bytes], _t.Awaitable[None]]] = None,
recv_fn: _t.Optional[_t.Callable[[], _t.Awaitable[bytes]]] = None,
pubsub: _t.Optional[PubSub] = None,
**kwargs: _t.Any,
) -> None:
"""Instantiates a `RedisChannel`.

Uses Redis to provide communication between components on different processes.
Requires a Redis server to be running with the URL set in the `REDIS_URL`
environment variable.

Args:
key: The Redis key for the channel.
send_fn: Optional; A callable for sending messages to the Redis channel.
recv_fn: Optional; A callable for receiving messages from the Redis channel.
pubsub: Optional; The Redis `PubSub` instance, used in pub-sub mode.
"""
super().__init__(*args, **kwargs)
self._key = key
self._send_fn = send_fn
self._recv_fn = recv_fn
self._pubsub = pubsub

# Set initial state based on intended usage
self._is_send_closed = send_fn is None
self._is_recv_closed = recv_fn is None

async def send(self, msg: bytes) -> None:
"""Send a message to the Redis channel."""
if self._is_send_closed or self._send_fn is None:
raise ChannelClosedError("Channel is closed for sending")
await self._send_fn(msg)

async def recv(self) -> bytes:
"""Receive a message from the Redis channel."""
if self._is_recv_closed or self._recv_fn is None:
raise ChannelClosedError("Channel is closed for receiving")
return await self._recv_fn()

async def close(self) -> None:
"""Closes the `RedisChannel`."""
# If we are a sender, send the close message (via super().close())
if not self._is_send_closed:
await super().close()
self._is_send_closed = True

if self._pubsub is not None:
await self._pubsub.unsubscribe()
await self._pubsub.close()
self._pubsub = None

self._is_recv_closed = True


class RedisConnector(Connector):
"""`RedisConnector` connects components via Redis."""

@depends_on_optional("redis")
def __init__(self, *args: _t.Any, **kwargs: _t.Any) -> None:
"""Instantiates a `RedisConnector`.

Uses Redis to connect components via either pipeline (list-based) or pub-sub
(channel-based) mode. Requires a Redis server to be running with the URL set
in the `REDIS_URL` environment variable.
"""
super().__init__(*args, **kwargs)
self._topic: str = (
str(self.spec.source) if self.spec.mode == ConnectorMode.PUBSUB else self.spec.id
)
self._send_channel: _t.Optional[RedisChannel] = None
self._send_channel_lock = asyncio.Lock()
self._recv_channel: _t.Optional[RedisChannel] = None
self._recv_channel_lock = asyncio.Lock()

def __getstate__(self) -> dict: # pragma: no cover
state = self.__dict__.copy()
for attr in ("_send_channel", "_recv_channel", "_send_channel_lock", "_recv_channel_lock"):
if attr in state:
del state[attr]
return state

def __setstate__(self, state: dict) -> None: # pragma: no cover
self.__dict__.update(state)
self._send_channel = None
self._send_channel_lock = asyncio.Lock()
self._recv_channel = None
self._recv_channel_lock = asyncio.Lock()

@inject
async def _get_key(self, job_id: str = Provide[DI.job_id]) -> str:
return f"{job_id}.{self._topic}"

@inject
async def connect_send(
self, redis_client: Redis | None = Provide[DI.redis_client]
) -> RedisChannel:
"""Returns a `RedisChannel` for sending messages."""
if redis_client is None:
raise RuntimeError("Redis client not available. Ensure Redis URL is configured.")
async with self._send_channel_lock:
if self._send_channel is not None:
return self._send_channel

key = await self._get_key()
send_fn = self._build_send_fn(redis_client, key)
self._send_channel = RedisChannel(key=key, send_fn=send_fn)
return self._send_channel

def _build_send_fn(
self, redis_client: Redis, key: str
) -> _t.Callable[[bytes], _t.Awaitable[None]]:
if self.spec.mode == ConnectorMode.PIPELINE:

async def send_fn(msg: bytes) -> None:
await redis_client.lpush(key, msg) # type: ignore[misc]
else:

async def send_fn(msg: bytes) -> None:
await redis_client.publish(key, msg)

return send_fn

@inject
async def connect_recv(
self, redis_client: Redis | None = Provide[DI.redis_client]
) -> RedisChannel:
"""Returns a `RedisChannel` for receiving messages."""
if redis_client is None:
raise RuntimeError("Redis client not available. Ensure Redis URL is configured.")
key = await self._get_key()
if self.spec.mode == ConnectorMode.PIPELINE:
async with self._recv_channel_lock:
if self._recv_channel is not None:
return self._recv_channel
recv_fn = self._build_recv_fn(redis_client, key)
channel = RedisChannel(key=key, recv_fn=recv_fn)
self._recv_channel = channel
else: # ConnectorMode.PUBSUB
pubsub = redis_client.pubsub(ignore_subscribe_messages=True)
await pubsub.subscribe(key)
recv_fn = self._build_recv_fn(redis_client, key, pubsub=pubsub)
channel = RedisChannel(key=key, recv_fn=recv_fn, pubsub=pubsub)
return channel

def _build_recv_fn(
self, redis_client: Redis, key: str, pubsub: _t.Optional[PubSub] = None
) -> _t.Callable[[], _t.Awaitable[bytes]]:
if self.spec.mode == ConnectorMode.PIPELINE:

async def recv_fn() -> bytes:
result = await redis_client.brpop([key], timeout=None) # type: ignore[misc]
return result[1]
else:
if pubsub is None:
raise ValueError("PubSub instance required for PUBSUB mode")

async def recv_fn() -> bytes:
# NOTE : We use `listen()` here due to non-sensical `get_message()` behaviour with
# : `ignore_subscribe_messages=True`.
# : See: https://github.com/redis/redis-py/issues/733#issuecomment-1956647495
message = await asyncio.wait_for(anext(pubsub.listen()), timeout=None)
return message["data"]

return recv_fn
Loading
Loading