Python implementation of the Mesh communication library.
Install via pip:
pip install marketrix-ai-meshDefine your Redis connection and service details.
from mesh import Mesh, MeshConfig, ServiceDiscoveryConfig
config = MeshConfig(
redis={"host": "localhost", "port": 6379}, # Shared Redis Config
service_discovery=ServiceDiscoveryConfig(
heartbeat_interval=2,
heartbeat_threshold=3
),
# host="127.0.0.1", # Optional: Auto-detected if omitted
port=0
)Initialize and start the mesh node.
import asyncio
async def main():
mesh = Mesh(config)
await mesh.start()
# ... your application logic ...
await mesh.close()
if __name__ == "__main__":
asyncio.run(main())Register a service name to handle incoming connections.
async def stream_handler(channel, metadata):
print(f"Accepted connection from session: {metadata.get('x-session-id')}")
try:
async def handler(msg):
print(f"Received: {msg['payload']}")
response = {"function_name": "reply", "payload": "Got it!"}
await channel.send(response)
await channel.on_data(handler)
except Exception:
pass
async def unary_handler(msg, metadata):
print(f"Received Unary: {msg['payload']}")
print(f"Metadata: {metadata}")
return {"function_name": "reply", "payload": "Got Unary!"}
mesh.register_service("my-service-name") \
.on_request_channel(stream_handler) \
.on_request_reply(unary_handler)Discover and connect to another service.
channel = await mesh.service("target-service-name").request_channel("session-id-123")
out_msg = {"function_name": "greet", "payload": "Hello World"}
await channel.send(out_msg)
# Reading messages (Callback handles backpressure)
async def reply_handler(msg):
print(f"Reply: {msg['payload']}")
async def reply_handler(msg):
print(f"Reply: {msg['payload']}")
await channel.on_data(reply_handler)
await channel.close()Mesh supports on_data(handler) for consuming messages with backpressure. The method returns an Awaitable that resolves when the stream is closed, allowing you to await the entire processing session.
async def handler(msg):
await process_message(msg)
# The next message is consumed only after this function returns
await channel.on_data(handler)