forked from vitrixLab/AIOpsLab
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathreact.py
More file actions
130 lines (98 loc) · 4.49 KB
/
react.py
File metadata and controls
130 lines (98 loc) · 4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""Naive ReAct client for AIOpsLab.
Yao, S., Zhao, J., Yu, D., Du, N., Shafran, I., Narasimhan, K., & Cao, Y. (2022).
React: Synergizing reasoning and acting in language models. arXiv preprint arXiv:2210.03629.
Code: https://github.com/ysymyth/ReAct
Paper: https://arxiv.org/abs/2210.03629
"""
import asyncio
import json
import tiktoken
from aiopslab.orchestrator import Orchestrator
from aiopslab.orchestrator.problems.registry import ProblemRegistry
from clients.utils.llm import GPTClient
from clients.utils.templates import DOCS
RESP_INSTR = """DO NOT REPEAT ACTIONS! Respond with:
Thought: <your thought on the previous output>
Action: <your action towards mitigating>
"""
def count_message_tokens(message, enc):
# Each message format adds ~4 tokens of overhead
tokens = 4 # <|start|>role/name + content + <|end|>
tokens += len(enc.encode(message.get("content", "")))
return tokens
def trim_history_to_token_limit(history, max_tokens=120000, model="gpt-4"):
enc = tiktoken.encoding_for_model(model)
trimmed = []
total_tokens = 0
# Always include the last message
last_msg = history[-1]
last_msg_tokens = count_message_tokens(last_msg, enc)
if last_msg_tokens > max_tokens:
# If even the last message is too big, truncate its content
truncated_content = enc.decode(enc.encode(last_msg["content"])[:max_tokens - 4])
return [{"role": last_msg["role"], "content": truncated_content}]
trimmed.insert(0, last_msg)
total_tokens += last_msg_tokens
# Add earlier messages in reverse until limit is reached
for message in reversed(history[:-1]):
message_tokens = count_message_tokens(message, enc)
if total_tokens + message_tokens > max_tokens:
break
trimmed.insert(0, message)
total_tokens += message_tokens
return trimmed
class Agent:
def __init__(self):
self.history = []
self.llm = GPTClient()
def init_context(self, problem_desc: str, instructions: str, apis: str):
"""Initialize the context for the agent."""
self.shell_api = self._filter_dict(apis, lambda k, _: "exec_shell" in k)
self.submit_api = self._filter_dict(apis, lambda k, _: "submit" in k)
self.telemetry_apis = self._filter_dict(
apis, lambda k, _: "exec_shell" not in k and "submit" not in k
)
stringify_apis = lambda apis: "\n\n".join(
[f"{k}\n{v}" for k, v in apis.items()]
)
self.system_message = DOCS.format(
prob_desc=problem_desc,
telemetry_apis=stringify_apis(self.telemetry_apis),
shell_api=stringify_apis(self.shell_api),
submit_api=stringify_apis(self.submit_api),
)
self.task_message = instructions
self.history.append({"role": "system", "content": self.system_message})
self.history.append({"role": "user", "content": self.task_message})
async def get_action(self, input) -> str:
"""Wrapper to interface the agent with OpsBench.
Args:
input (str): The input from the orchestrator/environment.
Returns:
str: The response from the agent.
"""
self.history.append({"role": "user", "content": self._add_instr(input)})
trimmed_history = trim_history_to_token_limit(self.history)
response = self.llm.run(trimmed_history)
self.history.append({"role": "assistant", "content": response[0]})
return response[0]
def _filter_dict(self, dictionary, filter_func):
return {k: v for k, v in dictionary.items() if filter_func(k, v)}
def _add_instr(self, input):
return input + "\n\n" + RESP_INSTR
if __name__ == "__main__":
problems = ProblemRegistry().PROBLEM_REGISTRY
for pid in problems:
agent = Agent()
orchestrator = Orchestrator()
orchestrator.register_agent(agent, name="react")
try:
problem_desc, instructs, apis = orchestrator.init_problem(pid)
agent.init_context(problem_desc, instructs, apis)
full_output = asyncio.run(orchestrator.start_problem(max_steps=30))
results = full_output.get("results", {})
filename = f"react_{pid}.json"
with open(filename, "w") as f:
json.dump(results, f, indent=2)
except Exception as e:
print(f"Error while running problem {pid}: {e}")