forked from i-am-bee/beeai-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathremote.py
More file actions
63 lines (51 loc) · 1.96 KB
/
remote.py
File metadata and controls
63 lines (51 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import asyncio
import sys
import traceback
from pydantic import BaseModel
from beeai_framework.agents.experimental.remote import RemoteAgent
from beeai_framework.errors import FrameworkError
from beeai_framework.memory.unconstrained_memory import UnconstrainedMemory
from beeai_framework.workflows import Workflow
from examples.helpers.io import ConsoleReader
async def main() -> None:
reader = ConsoleReader()
class State(BaseModel):
topic: str
research: str | None = None
output: str | None = None
async def research(state: State) -> None:
agent = RemoteAgent(
agent_name="gpt-researcher", url="http://127.0.0.1:8333/api/v1/acp", memory=UnconstrainedMemory()
)
# Run the agent and observe events
response = await agent.run(state.topic).on(
"update",
lambda data, _: (reader.write("Agent 🤖 (debug) : ", data)),
)
state.research = response.result.text
async def podcast(state: State) -> None:
agent = RemoteAgent(
agent_name="podcast-creator", url="http://127.0.0.1:8333/api/v1/acp", memory=UnconstrainedMemory()
)
# Run the agent and observe events
response = await agent.run(state.research or "").on(
"update",
lambda data, _: (reader.write("Agent 🤖 (debug) : ", data)),
)
state.output = response.result.text
# Define the structure of the workflow graph
workflow = Workflow(State)
workflow.add_step("research", research)
workflow.add_step("podcast", podcast)
# Execute the workflow
result = await workflow.run(State(topic="Connemara"))
print("\n*********************")
print("Topic: ", result.state.topic)
print("Research: ", result.state.research)
print("Output: ", result.state.output)
if __name__ == "__main__":
try:
asyncio.run(main())
except FrameworkError as e:
traceback.print_exc()
sys.exit(e.explain())