-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmulti_agent_communication_demo.py
More file actions
372 lines (294 loc) · 11.2 KB
/
multi_agent_communication_demo.py
File metadata and controls
372 lines (294 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
"""
Demo: Multi-Agent Communication System
Demonstrates message passing, protocols, and channels for agent communication.
Run: python -m react_agent_framework.examples.multi_agent_communication_demo
"""
import time
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from react_agent_framework.multi_agent.communication import (
Message,
MessageType,
MessagePriority,
MessageBus,
ACLProtocol,
DirectChannel,
BroadcastChannel,
MulticastChannel,
)
console = Console()
def print_section(title: str):
"""Print section header."""
console.print(f"\n[bold cyan]{'=' * 70}[/bold cyan]")
console.print(f"[bold yellow]{title}[/bold yellow]")
console.print(f"[bold cyan]{'=' * 70}[/bold cyan]\n")
def demo_1_direct_messaging():
"""Demo 1: Direct Point-to-Point Messaging"""
print_section("Demo 1: Direct Point-to-Point Messaging")
# Create message bus and register agents
bus = MessageBus()
bus.register_agent("agent-1")
bus.register_agent("agent-2")
console.print("[bold]Registered agents:[/bold] agent-1, agent-2\n")
# Send message from agent-1 to agent-2
console.print("[cyan]agent-1 → agent-2:[/cyan] 'Hello, Agent 2!'")
msg = Message(
sender="agent-1",
receiver="agent-2",
message_type=MessageType.INFORM,
content="Hello, Agent 2!"
)
bus.send(msg)
# Receive message
messages = bus.receive("agent-2")
console.print(f"[green]✓ agent-2 received:[/green] '{messages[0].content}'\n")
# Send reply
console.print("[cyan]agent-2 → agent-1:[/cyan] 'Hello back, Agent 1!'")
reply = messages[0].create_reply(
sender="agent-2",
content="Hello back, Agent 1!"
)
bus.send(reply)
# Receive reply
messages = bus.receive("agent-1")
console.print(f"[green]✓ agent-1 received reply:[/green] '{messages[0].content}'\n")
# Show stats
stats = bus.get_stats()
console.print(f"[bold]Messages sent:[/bold] {stats['total_sent']}")
console.print(f"[bold]Messages received:[/bold] {stats['total_received']}")
def demo_2_broadcast():
"""Demo 2: Broadcast Communication"""
print_section("Demo 2: Broadcast Communication")
# Create bus and broadcast channel
bus = MessageBus()
channel = BroadcastChannel(bus)
# Register multiple agents
agents = ["controller", "worker-1", "worker-2", "worker-3"]
for agent_id in agents:
bus.register_agent(agent_id)
console.print(f"[bold]Registered agents:[/bold] {', '.join(agents)}\n")
# Broadcast from controller
console.print("[cyan]controller → * (broadcast):[/cyan] 'Start processing task-123'")
channel.broadcast(
sender="controller",
content={"command": "start", "task_id": "task-123"},
priority=MessagePriority.HIGH
)
# Workers receive
console.print("\n[bold]Workers receiving broadcast:[/bold]")
for worker in ["worker-1", "worker-2", "worker-3"]:
messages = channel.receive(worker, max_messages=1)
if messages:
content = messages[0].content
console.print(f" [green]✓ {worker}:[/green] Received {content}")
# Show queue sizes
console.print("\n[bold]Queue sizes after broadcast:[/bold]")
for agent_id in agents:
size = bus.get_queue_size(agent_id)
console.print(f" {agent_id}: {size} messages")
def demo_3_pub_sub_topics():
"""Demo 3: Pub/Sub with Topics (Multicast)"""
print_section("Demo 3: Pub/Sub with Topics (Multicast)")
# Create bus and multicast channel
bus = MessageBus()
channel = MulticastChannel(bus)
# Register agents
agents = ["publisher", "sub-1", "sub-2", "sub-3"]
for agent_id in agents:
bus.register_agent(agent_id)
console.print(f"[bold]Registered agents:[/bold] {', '.join(agents)}\n")
# Subscribe to topics
console.print("[bold]Subscriptions:[/bold]")
channel.subscribe("sub-1", "updates")
channel.subscribe("sub-2", "updates")
channel.subscribe("sub-3", "alerts")
console.print(" sub-1 → 'updates'")
console.print(" sub-2 → 'updates'")
console.print(" sub-3 → 'alerts'\n")
# Publish to 'updates' topic
console.print("[cyan]publisher → topic 'updates':[/cyan] 'New version released'")
channel.publish(
sender="publisher",
topic="updates",
content="New version released"
)
# Check who received
console.print("\n[bold]Messages received:[/bold]")
for sub in ["sub-1", "sub-2", "sub-3"]:
messages = channel.receive(sub)
if messages:
console.print(f" [green]✓ {sub}:[/green] '{messages[0].content}'")
else:
console.print(f" [dim]{sub}: (no messages)[/dim]")
# Publish to 'alerts' topic
console.print("\n[cyan]publisher → topic 'alerts':[/cyan] 'System maintenance at 2am'")
channel.publish(
sender="publisher",
topic="alerts",
content="System maintenance at 2am"
)
# Check who received
console.print("\n[bold]Messages received:[/bold]")
for sub in ["sub-1", "sub-2", "sub-3"]:
messages = channel.receive(sub)
if messages:
console.print(f" [green]✓ {sub}:[/green] '{messages[0].content}'")
else:
console.print(f" [dim]{sub}: (no messages)[/dim]")
def demo_4_acl_protocol():
"""Demo 4: ACL Protocol Communication"""
print_section("Demo 4: ACL Protocol Communication")
# Create bus and protocol
bus = MessageBus()
protocol = ACLProtocol()
# Register agents
bus.register_agent("requester")
bus.register_agent("provider")
console.print("[bold]Registered agents:[/bold] requester, provider\n")
# 1. REQUEST
console.print("[cyan]1. requester → provider:[/cyan] REQUEST 'search'")
request_msg = protocol.request(
sender="requester",
receiver="provider",
action="search",
params={"query": "multi-agent systems"}
)
bus.send(request_msg)
# Provider receives and agrees
messages = bus.receive("provider")
console.print(f"[green]✓ provider received:[/green] {messages[0].metadata['performative']}\n")
# 2. AGREE
console.print("[cyan]2. provider → requester:[/cyan] AGREE to perform search")
agree_msg = protocol.agree(
sender="provider",
receiver="requester",
action="search",
reply_to=request_msg.message_id
)
bus.send(agree_msg)
messages = bus.receive("requester")
console.print(f"[green]✓ requester received:[/green] {messages[0].metadata['performative']}\n")
# 3. INFORM (result)
console.print("[cyan]3. provider → requester:[/cyan] INFORM search results")
inform_msg = protocol.inform(
sender="provider",
receiver="requester",
proposition={
"results": ["Paper 1", "Paper 2", "Paper 3"],
"count": 3
},
reply_to=request_msg.message_id
)
bus.send(inform_msg)
messages = bus.receive("requester")
console.print(f"[green]✓ requester received:[/green] {messages[0].content['proposition']}")
def demo_5_message_priorities():
"""Demo 5: Message Priorities"""
print_section("Demo 5: Message Priorities")
# Create bus
bus = MessageBus()
bus.register_agent("receiver")
console.print("[bold]Sending messages with different priorities:[/bold]\n")
# Send low priority
console.print("[dim]Sending LOW priority:[/dim] 'Regular update'")
bus.send(Message(
sender="sender",
receiver="receiver",
message_type=MessageType.INFORM,
content="Regular update",
priority=MessagePriority.LOW
))
# Send normal priority
console.print("[white]Sending NORMAL priority:[/white] 'Task completed'")
bus.send(Message(
sender="sender",
receiver="receiver",
message_type=MessageType.INFORM,
content="Task completed",
priority=MessagePriority.NORMAL
))
# Send critical priority
console.print("[red]Sending CRITICAL priority:[/red] 'System error!'")
bus.send(Message(
sender="sender",
receiver="receiver",
message_type=MessageType.ERROR,
content="System error!",
priority=MessagePriority.CRITICAL
))
# Receive (critical should come first)
console.print("\n[bold]Receiving messages (ordered by priority):[/bold]")
messages = bus.receive("receiver", max_messages=10)
for i, msg in enumerate(messages, 1):
priority_name = MessagePriority(msg.priority).name
color = {
MessagePriority.LOW: "dim",
MessagePriority.NORMAL: "white",
MessagePriority.HIGH: "yellow",
MessagePriority.URGENT: "orange1",
MessagePriority.CRITICAL: "red"
}.get(msg.priority, "white")
console.print(f" {i}. [{color}][{priority_name}][/{color}] {msg.content}")
def demo_6_message_bus_stats():
"""Demo 6: Message Bus Statistics"""
print_section("Demo 6: Message Bus Statistics")
# Create bus with multiple agents
bus = MessageBus()
agents = ["agent-1", "agent-2", "agent-3"]
for agent_id in agents:
bus.register_agent(agent_id)
# Send various messages
bus.send(Message("agent-1", "agent-2", MessageType.INFORM, "Hello"))
bus.send(Message("agent-2", "agent-3", MessageType.REQUEST, "Data"))
bus.send(Message("agent-1", "*", MessageType.BROADCAST, "Announcement"))
bus.send(Message("agent-3", "agent-1", MessageType.RESPONSE, "Result"))
# Receive some messages
bus.receive("agent-2")
bus.receive("agent-3")
# Get stats
stats = bus.get_stats()
# Display as table
table = Table(show_header=True, title="Message Bus Statistics")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="yellow", justify="right")
table.add_row("Total Sent", str(stats["total_sent"]))
table.add_row("Total Received", str(stats["total_received"]))
table.add_row("Registered Agents", str(stats["registered_agents"]))
table.add_row("Total Queued", str(stats["total_queued"]))
table.add_row("Dead Letters", str(stats["dead_letters"]))
console.print(table)
# Messages by type
console.print("\n[bold]Messages by Type:[/bold]")
for msg_type, count in stats["messages_by_type"].items():
console.print(f" {msg_type}: {count}")
def main():
"""Run all demos."""
console.print(Panel.fit(
"[bold yellow]Multi-Agent Communication System Demo[/bold yellow]\n"
"Message passing, protocols, and channels for agent collaboration",
border_style="cyan"
))
try:
demo_1_direct_messaging()
time.sleep(1)
demo_2_broadcast()
time.sleep(1)
demo_3_pub_sub_topics()
time.sleep(1)
demo_4_acl_protocol()
time.sleep(1)
demo_5_message_priorities()
time.sleep(1)
demo_6_message_bus_stats()
console.print(Panel.fit(
"[bold green]✓ All communication demos completed![/bold green]",
border_style="green"
))
except KeyboardInterrupt:
console.print("\n[yellow]Demo interrupted by user[/yellow]")
except Exception as e:
console.print(f"\n[red]Error: {e}[/red]")
raise
if __name__ == "__main__":
main()