forked from SREGym/SREGym
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
359 lines (299 loc) · 13.8 KB
/
main.py
File metadata and controls
359 lines (299 loc) · 13.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
import argparse
import asyncio
import csv
import logging
import os
import sys
import threading
from datetime import datetime
from pathlib import Path
from rich.console import Console
from logger import init_logger
from sregym.agent_launcher import AgentLauncher
from sregym.agent_registry import get_agent, list_agents
from sregym.conductor.conductor import Conductor, ConductorConfig
from sregym.conductor.conductor_api import request_shutdown, run_api
from sregym.conductor.constants import StartProblemResult
LAUNCHER = AgentLauncher()
logger = logging.getLogger(__name__)
_driver_results: list[dict] = []
def get_current_datetime_formatted():
now = datetime.now()
formatted_datetime = now.strftime("%m%d_%H%M")
return formatted_datetime
def driver_loop(
conductor: Conductor,
problem_filter: str | None = None,
agent_to_run: str | None = None,
use_external_harness: bool = False,
n_attempts: int = 1,
):
"""
Deploy each problem and wait for HTTP grading via POST /submit.
Returns a list of flattened dicts with results per problem.
Args:
conductor: The Conductor instance
problem_filter: Optional problem ID to run. If specified, only this problem will be run.
agent_to_run: Agent name to run (required unless use_external_harness is True).
use_external_harness: If True, inject fault and exit without running evaluation logic.
n_attempts: Number of end-to-end attempts to run each problem.
"""
async def driver():
console = Console()
# give the API a moment to bind
await asyncio.sleep(1)
# Verify agent exists in registry (skip if using external harness)
if not use_external_harness:
available_agents = list_agents(path=Path(os.path.dirname(os.path.abspath(__file__))) / "agents.yaml").keys()
if agent_to_run not in available_agents:
console.log(f"⚠️ Agent '{agent_to_run}' not found in registry. Available agents: {available_agents}")
sys.exit(1)
console.log(f"Starting agent now: {agent_to_run}")
conductor.register_agent(agent_to_run)
# Start K8s API proxy to hide chaos engineering namespaces from the agent
console.log("🔒 Starting Kubernetes API proxy to hide chaos namespaces...")
conductor.start_k8s_proxy()
LAUNCHER.set_agent_kubeconfig(conductor.get_agent_kubeconfig_path())
all_results_for_agent = []
# Get all problem IDs and filter if needed
problem_ids = conductor.problems.get_problem_ids()
all_problem_ids = conductor.problems.get_problem_ids(all=True)
if problem_filter:
if problem_filter not in all_problem_ids:
console.log(f"⚠️ Problem '{problem_filter}' not found in registry. Available problems: {problem_ids}")
sys.exit(1)
problem_ids = [problem_filter]
console.log(f"🎯 Running single problem: {problem_filter}")
# sanity check: are there any specified problem ids that do not exist in the registry?
unknown_problem_ids = set(problem_ids) - set(all_problem_ids)
if unknown_problem_ids:
console.log(
f"⚠️ These problem ids do not exist in the registry and they will be skipped: {unknown_problem_ids}"
)
for unknown_problem_id in unknown_problem_ids:
problem_ids.remove(unknown_problem_id)
for pid in problem_ids:
conductor.problem_id = pid
# Keep a record of results for this problem in a temp file in case an attempt fails
tmp_path = f"_running_{pid}_{agent_to_run}_results.csv"
for attempt in range(1, n_attempts + 1):
console.log(f"\n🔍 Starting problem: {pid} (Attempt {attempt} of {n_attempts})")
result = await conductor.start_problem()
if result == StartProblemResult.SKIPPED_KHAOS_REQUIRED:
console.log(f"⏭️ Skipping problem '{pid}': requires Khaos but running on emulated cluster")
break # Skip to next problem
# If using external harness, fault is injected - exit now
if use_external_harness:
console.log(f"✅ Fault injected for problem '{pid}'. Exiting for external harness.")
return []
assert agent_to_run is not None
reg = get_agent(agent_to_run, path=Path(os.path.dirname(os.path.abspath(__file__))) / "agents.yaml")
if reg:
await LAUNCHER.ensure_started(reg)
# Poll until grading completes or agent exits
while conductor.submission_stage != "done":
# Check if agent process has exited
agent_proc = LAUNCHER._procs.get(agent_to_run)
if agent_proc:
agent_proc.proc.poll()
if agent_proc.proc.returncode is not None:
console.log(f"⚠️ Agent process exited with return code {agent_proc.proc.returncode}")
break
await asyncio.sleep(1)
console.log(f"✅ Completed {pid}: results={conductor.results}")
# Wait for agent process to complete naturally before cleanup
# This allows the agent to finish saving trajectories and other cleanup tasks
if not use_external_harness:
agent_proc = LAUNCHER._procs.get(agent_to_run)
if agent_proc:
console.log("⏳ Waiting for agent process to complete...")
timeout = 30 # seconds
elapsed = 0
while elapsed < timeout:
agent_proc.proc.poll()
if agent_proc.proc.returncode is not None:
console.log(f"✅ Agent process completed with return code {agent_proc.proc.returncode}")
break
await asyncio.sleep(1)
elapsed += 1
else:
console.log(f"⚠️ Agent process did not complete within {timeout}s, will force cleanup")
snapshot = {
"problem_id": pid,
"attempt": attempt,
}
for stage, outcome in conductor.results.items():
if isinstance(outcome, dict):
for k, v in outcome.items():
snapshot[f"{stage}.{k}"] = v
else:
snapshot[stage] = outcome
all_results_for_agent.append(snapshot)
fieldnames = sorted({key for row in all_results_for_agent for key in row})
with open(tmp_path, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(all_results_for_agent)
logger.info(
f"⏳ Attempt {attempt} of {n_attempts} for problem {pid} complete - Intermediate results written to {tmp_path}"
)
if attempt == n_attempts:
current_date_time = get_current_datetime_formatted()
csv_path = f"{current_date_time}_{pid}_{agent_to_run}_results.csv"
os.replace(tmp_path, csv_path)
logger.info(f"✅ Problem {pid} for agent {agent_to_run} complete! Results written to {csv_path}")
# Cleanup agent process so a fresh one can be started for the next problem
if not use_external_harness:
LAUNCHER.cleanup_agent(agent_to_run)
console.log(f"🧹 Cleaned up agent process for {agent_to_run}")
# Stop K8s API proxy when all problems are done
if not use_external_harness:
console.log("🔓 Stopping Kubernetes API proxy...")
conductor.stop_k8s_proxy()
return [{agent_to_run: all_results_for_agent}]
return asyncio.run(driver())
def _run_driver_and_shutdown(
conductor: Conductor,
problem_filter: str | None = None,
agent_to_run: str | None = None,
use_external_harness: bool = False,
n_attempts: int = 1,
):
"""Run the benchmark driver, stash results, then tell the API to exit."""
try:
results = driver_loop(
conductor,
problem_filter=problem_filter,
agent_to_run=agent_to_run,
use_external_harness=use_external_harness,
n_attempts=n_attempts,
)
global _driver_results
_driver_results = results
except Exception:
logger.exception("Driver thread crashed")
finally:
LAUNCHER.cleanup_all()
request_shutdown()
def main(args):
# set up the logger
init_logger()
# Initialize Noise Manager if config is provided or default config exists
nm = None
noise_config_path = args.noise_config
default_noise_config = "sregym/generators/noise/noise_config.yaml"
# Use default path if no argument provided but default file exists
if not noise_config_path and os.path.exists(default_noise_config):
noise_config_path = default_noise_config
if noise_config_path:
try:
from sregym.generators.noise.manager import get_noise_manager
nm = get_noise_manager()
nm.load_config(noise_config_path)
logger.info(f"✅ Noise manager initialized with config: {noise_config_path}")
except Exception as e:
logger.warning(f"⚠️ Failed to initialize noise manager: {e}")
os.environ["MODEL_ID"] = args.model
config = ConductorConfig(deploy_loki=not args.use_external_harness)
conductor = Conductor(config=config)
LAUNCHER.enable_container_isolation(force_build=args.force_build)
# Start the driver in the background; it will call request_shutdown() when finished
driver_thread = threading.Thread(
target=_run_driver_and_shutdown,
args=(conductor, args.problem, args.agent, args.use_external_harness, args.n_attempts),
name="driver",
daemon=True,
)
driver_thread.start()
# Start the Conductor HTTP API in the MAIN thread (blocking)
try:
run_api(conductor)
except KeyboardInterrupt:
# If interrupted, still try to shut down cleanly
LAUNCHER.cleanup_all()
request_shutdown()
finally:
# Stop any remaining agent containers/processes
LAUNCHER.cleanup_all()
# Stop noise manager if it was initialized
if nm:
try:
logger.info("Stopping noise manager...")
nm.stop()
except Exception as e:
logger.error(f"⚠️ Error stopping noise manager: {e}")
# Give driver a moment to finish setting results
driver_thread.join(timeout=5)
# When API shuts down, collect results from driver
results = _driver_results
if results:
aggregated = {}
for entry in results:
for agent_name, agent_rows in entry.items():
aggregated.setdefault(agent_name, []).extend(agent_rows)
for agent_name, agent_results in aggregated.items():
fieldnames = sorted({key for row in agent_results for key in row})
current_date_time = get_current_datetime_formatted()
csv_path = f"{current_date_time}_{agent_name}_ALL_results.csv"
with open(csv_path, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(agent_results)
logger.info(f"✅ Benchmark complete! Results for {agent_name} written to {csv_path}")
else:
logger.warning("⚠️ No results to write.")
if __name__ == "__main__":
# separate run, use exit
sys.exit(0)
else:
# function call run, return results
return results
if __name__ == "__main__":
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Run SREGym benchmark suite")
parser.add_argument(
"--problem",
type=str,
default=None,
help="Run only a specific problem by its ID (e.g., 'target_port')",
)
parser.add_argument(
"--agent",
type=str,
default=None,
help="Agent to run by its name (e.g., 'stratus')",
)
parser.add_argument(
"--model",
type=str,
default="gpt-5-nano",
help="Run only a specific model backend (e.g., 'gpt-5', 'gemini-2.5-pro', 'claude-sonnet-4', 'moonshot')",
)
parser.add_argument(
"--use-external-harness", action="store_true", help="For use in external harnesses, deploy the fault and exit."
)
parser.add_argument(
"--noise-config",
type=str,
default=None,
help="Path to noise configuration YAML file",
)
parser.add_argument(
"--n-attempts",
type=int,
default=1,
help="Number of attempts to run each problem (default: 1)",
)
parser.add_argument(
"--force-build",
action="store_true",
help="Force rebuild the agent Docker image even if it already exists (use after updating dependencies or build scripts)",
)
args = parser.parse_args()
# Validate that --agent is provided when not using external harness
if not args.use_external_harness and args.agent is None:
parser.error("--agent is required when --use-external-harness is not set")
# Validate that n_attempts is positive
if args.n_attempts < 1:
parser.error("--n-attempts must be a positive integer")
main(args)