diff --git a/.gitignore b/.gitignore index 4ca0ece3c..430b374ec 100644 --- a/.gitignore +++ b/.gitignore @@ -148,6 +148,10 @@ dmypy.json checkpoints/ experiments/ benchmark*/ +!pufferlib/ocean/benchmark/ +!pufferlib/ocean/benchmark/** +# But re-ignore caches inside it +pufferlib/ocean/benchmark/**/__pycache__/ wandb/ .neptune/ raylib*/ diff --git a/README.md b/README.md index c1e2e137e..2fc82a481 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ python setup.py build_ext --inplace --force ## Install (HPC cluster) -For clusters where the host glibc is too old or you need a CUDA toolchain that's not pinned by the OS, PufferDrive uses a **mixed Singularity + venv** layout: +For the NYU cluster, PufferDrive recommends a **mixed Singularity + venv** layout: - **Singularity image** (read-only, system-wide): supplies CUDA + cuDNN. - **ext3 overlay** (writable via `--fakeroot`, host the miniforge3 base interpreter at `/ext3/miniforge3` only). diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index bafc523b2..7a500f7e8 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -190,67 +190,163 @@ show_human_logs = True ; Options: List[str to path], str to path (e.g., "resources/drive/training/binaries/map_001.bin"), None render_map = none -[eval] -; Set to True to enable periodic multi-scenario evaluation during training -multi_scenario_eval = False -; Set to True to enable periodic multi-scenario render during training (one -; rollout per scenario, output mp4 per scenario via the EGL render pipeline -; or HTML replay via viz.generate_interactive_replay depending on -; multi_scenario_render_backend). Does not affect multi_scenario_eval. -multi_scenario_render = True -; Epoch interval between render runs. Independent of eval_interval so metric -; eval can run on a tighter schedule than the more expensive render. -multi_scenario_render_interval = 250 -; Render backend for multi_scenario_render: "html" (CPU, viz.generate_interactive_replay) -; or "egl" (C-side render.h → EGL → PBO → ffmpeg libx264, one mp4 per scenario). -multi_scenario_render_backend = egl -; Frequency of evaluation during training (in epochs) -eval_interval = 25 -; When True, inline eval zeroes road-segment dropout + perturbations and -; enforces red-light stops. Metrics then reflect performance under clean -; conditions. The live training policy is re-aligned to the clean env's -; obs shape via _swap_policy_obs_counts — safe because the GigaFlow -; encoder is count-invariant (shared MLP + max-pool over segments). -clean_eval = True -num_agents = 512 -; Batch size for eval_multi_scenarios (number of scenarios per batch) -; Path to dataset used for evaluation -map_dir = "pufferlib/resources/drive/binaries/carla_py123d" -; Simulation mode for evaluation: "gigaflow" or "replay" -; gigaflow — procedurally spawn agents on CARLA towns (needs map-only .bin -; files in pufferlib/resources/drive/binaries/carla_py123d) -; replay — play logged trajectories from WOMD/nuPlan scenarios (needs -; trajectory-bearing .bin files in pufferlib/resources/drive/binaries/womd) -multi_scenario_simulation_mode = "gigaflow" -; Total number of scenarios to evaluate -multi_scenario_num_scenarios = 250 -; Per-scenario step count for replay-mode eval (also used as resample_frequency). -; 91 = WOMD (9.1s @ 10Hz). 201 = nuPlan (20.1s @ 10Hz). Ignored for gigaflow -; mode, which always uses a hardcoded 3000-step procedural episode. -scenario_length = 201 -; Cap the render rollout at this many steps. -render_max_steps = 201 -backend = PufferEnv -; WOSAC (Waymo Open Sim Agents Challenge) evaluation settings -; If True, enables evaluation on realism metrics each time we save a checkpoint -wosac_realism_eval = False -wosac_num_rollouts = 32 ; Number of policy rollouts per scene -wosac_init_steps = 10 ; When to start the simulation -wosac_num_agents = 256 ; Total number of WOSAC agents to evaluate -wosac_control_mode = "control_wosac" ; Control the tracks to predict -wosac_init_mode = "create_all_valid" ; Initialize from the tracks to predict -wosac_goal_radius = 2.0 ; Can shrink goal radius for WOSAC evaluation -wosac_sanity_check = False -wosac_aggregate_results = True ; Only return aggregate results across all scenes -; If True, enable human replay evaluation (pair policy-controlled agent with human replays) -human_replay_eval = False -human_replay_control_mode = "control_sdc_only" ; Control only the self-driving car -human_replay_num_agents = 64 ; This equals the number of scenarios, since we control one agent in each -; Evaluating different driving behaviours learned by the policy -driving_behaviours_eval = True -driving_behaviours_eval_config = "pufferlib/config/ocean/driving_behaviours_eval.ini" -driving_behaviours_eval_interval = 250 -render_driving_behaviours = True +; =========================================================================== +; Evaluation suites +; +; Each [eval.] section is one Evaluator instance. EvalManager discovers +; them via auto-discovery (any section under [eval] with a `type` field). +; Sections without a `type` field are templates — referenced from other +; sections via `inherits = ""`. +; +; Field reference: +; type — registered evaluator class (multi_scenario, behavior_class, +; human_replay, wosac) +; enabled — true|false +; interval — epochs between runs (0 disables) +; mode — "inline" (block training) | "subprocess" (spawn process) +; inherits — pull defaults from another section, recursively +; clean — true → zero perturbations + dropout + enforce red lights +; render — true → capture mp4(s) during rollout +; render_views — list of camera views: sim_state, bev, topdown_sim, bev_all +; env. — any [env] override (dotted key) +; eval. — evaluator-specific knob (e.g. num_scenarios) +; vec. — any [vec] override +; =========================================================================== + +[eval.validation_gigaflow] +type = "multi_scenario" +enabled = true +interval = 250 +mode = "inline" +clean = true +render = true +render_views = ["sim_state", "bev"] +env.simulation_mode = "gigaflow" +env.map_dir = "pufferlib/resources/drive/binaries/carla_py123d" +env.num_maps = 8 +env.num_agents = 400 +env.min_agents_per_env = 50 +env.max_agents_per_env = 50 +env.scenario_length = 3000 +env.resample_frequency = 3000 +; One rollout per carla map. +eval.num_scenarios = 8 +eval.render_num_scenarios = 5 +eval.render_max_steps = 300 + +; --------------------------------------------------------------------------- +; Driving-behaviour evaluation: nuPlan scenes labeled by scene type. Each +; behavior is one [eval.behaviors_*] section. All inherit from the template +; below — change shared knobs in one place. +; --------------------------------------------------------------------------- + +[eval.behaviors_defaults] +enabled = false +interval = 250 +mode = "inline" +clean = true +render = true +render_views = ["sim_state", "bev"] +env.simulation_mode = "replay" +env.control_mode = "control_sdc_only" +env.init_mode = "create_all_valid" +env.scenario_length = 201 +env.max_partner_observations = 32 +eval.num_scenarios = 50 +eval.render_num_scenarios = 2 +eval.render_max_steps = 200 + +[eval.behaviors_full_dir] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/nuplan_mini_train_bins" + +[eval.behaviors_hard_stop] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/categories_v021/hard_stop" + +[eval.behaviors_highway_straight] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/categories_v021/highway_straight" + +[eval.behaviors_lane_change] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/categories_v021/lane_change" + +[eval.behaviors_merge] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/categories_v021/merge" + +[eval.behaviors_parked_cars] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/categories_v021/parked_cars" + +[eval.behaviors_roundabout] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/categories_v021/roundabout" + +[eval.behaviors_stopped_traffic] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/categories_v021/stopped_traffic" + +[eval.behaviors_traffic_light_green] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/categories_v021/traffic_light_green" + +[eval.behaviors_traffic_light_stop] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/categories_v021/traffic_light_stop" + +[eval.behaviors_unprotected_left] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/categories_v021/unprotected_left" + +[eval.behaviors_unprotected_right] +inherits = "behaviors_defaults" +type = "behavior_class" +enabled = true +env.map_dir = "/scratch/ev2237/data/nuplan/categories_v021/unprotected_right" + +; --------------------------------------------------------------------------- +; Optional: WOSAC realism eval. Off by default. +; --------------------------------------------------------------------------- + +[eval.wosac] +type = "wosac" +enabled = false +interval = 500 +mode = "subprocess" +clean = true +render = false +env.control_mode = "control_wosac" +env.init_mode = "create_all_valid" +env.init_steps = 10 +env.goal_radius = 2.0 +eval.wosac_num_rollouts = 32 +eval.wosac_num_agents = 256 +eval.wosac_sanity_check = false +eval.wosac_aggregate_results = true ; [sweep.train.learning_rate] ; distribution = log_normal diff --git a/pufferlib/config/ocean/driving_behaviours_eval.ini b/pufferlib/config/ocean/driving_behaviours_eval.ini deleted file mode 100644 index 02896db99..000000000 --- a/pufferlib/config/ocean/driving_behaviours_eval.ini +++ /dev/null @@ -1,64 +0,0 @@ -; Configuration for driving behaviour evaluation using nuPlan mini-train -; scenarios labeled by scene type. Built from py123d 0.2.1 reconvert of -; .bin files under /scratch/ev2237/data/nuplan/categories_v021/. -; -; Eval runs in REPLAY mode (simulation_mode=replay, control_mode=control_sdc_only) -; using the same reward weights as training (no reward conditioning). Scenario -; length is 201 (nuPlan with duration_s=20 at 10Hz → 20.1s). -; -; Categories with an empty folder are omitted — driving_behaviours_eval errors -; if map_dir has no .bin files. Add new categories by labeling more scenes -; (see scripts/render_scenario.py --view bev) and copying them into the -; corresponding /scratch/ev2237/data/nuplan/categories_v021// folder. - -[eval_full_dir] -map_dir = "/scratch/ev2237/data/nuplan/nuplan_mini_train_bins" -scenario_length = 201 -; Random-sample this many bins from map_dir each eval pass (fresh sample -; per pass). Cap keeps wall-clock manageable; 876-bin full sweep would -; take ~25 min, 50 bins takes ~1.5 min. -num_scenarios = 50 - -[eval_hard_stop] -map_dir = "/scratch/ev2237/data/nuplan/categories_v021/hard_stop" -scenario_length = 201 - -[eval_highway_straight] -map_dir = "/scratch/ev2237/data/nuplan/categories_v021/highway_straight" -scenario_length = 201 - -[eval_lane_change] -map_dir = "/scratch/ev2237/data/nuplan/categories_v021/lane_change" -scenario_length = 201 - -[eval_merge] -map_dir = "/scratch/ev2237/data/nuplan/categories_v021/merge" -scenario_length = 201 - -[eval_parked_cars] -map_dir = "/scratch/ev2237/data/nuplan/categories_v021/parked_cars" -scenario_length = 201 - -[eval_roundabout] -map_dir = "/scratch/ev2237/data/nuplan/categories_v021/roundabout" -scenario_length = 201 - -[eval_stopped_traffic] -map_dir = "/scratch/ev2237/data/nuplan/categories_v021/stopped_traffic" -scenario_length = 201 - -[eval_traffic_light_green] -map_dir = "/scratch/ev2237/data/nuplan/categories_v021/traffic_light_green" -scenario_length = 201 - -[eval_traffic_light_stop] -map_dir = "/scratch/ev2237/data/nuplan/categories_v021/traffic_light_stop" -scenario_length = 201 - -[eval_unprotected_left] -map_dir = "/scratch/ev2237/data/nuplan/categories_v021/unprotected_left" -scenario_length = 201 - -[eval_unprotected_right] -map_dir = "/scratch/ev2237/data/nuplan/categories_v021/unprotected_right" -scenario_length = 201 diff --git a/pufferlib/ocean/benchmark/evaluators/__init__.py b/pufferlib/ocean/benchmark/evaluators/__init__.py new file mode 100644 index 000000000..d7594bffc --- /dev/null +++ b/pufferlib/ocean/benchmark/evaluators/__init__.py @@ -0,0 +1,33 @@ +"""Unified evaluator framework for PufferDrive. + +Each Evaluator subclass owns one rollout pattern. The EvalManager (parent +package) discovers evaluators from `[eval.]` sections in drive.ini +and dispatches them inline (during training) or as subprocesses. + +See docs/eval_unification.md for the full design rationale. +""" + +from pufferlib.ocean.benchmark.evaluators.base import EvalResult, Evaluator +from pufferlib.ocean.benchmark.evaluators.behavior_class import BehaviorClassEvaluator +from pufferlib.ocean.benchmark.evaluators.human_replay import HumanReplayEvaluator +from pufferlib.ocean.benchmark.evaluators.multi_scenario import MultiScenarioEvaluator +from pufferlib.ocean.benchmark.evaluators.wosac import WOSACEvaluator + +# Type registry for [eval.].type → class lookup. Manager uses this +# to instantiate the right subclass per config section. +EVALUATOR_REGISTRY = { + "multi_scenario": MultiScenarioEvaluator, + "behavior_class": BehaviorClassEvaluator, + "human_replay": HumanReplayEvaluator, + "wosac": WOSACEvaluator, +} + +__all__ = [ + "EVALUATOR_REGISTRY", + "EvalResult", + "Evaluator", + "MultiScenarioEvaluator", + "BehaviorClassEvaluator", + "HumanReplayEvaluator", + "WOSACEvaluator", +] diff --git a/pufferlib/ocean/benchmark/evaluators/base.py b/pufferlib/ocean/benchmark/evaluators/base.py new file mode 100644 index 000000000..2ecd92827 --- /dev/null +++ b/pufferlib/ocean/benchmark/evaluators/base.py @@ -0,0 +1,304 @@ +"""Evaluator base class + default rollout loop + EvalResult dataclass.""" + +import time +from dataclasses import dataclass, field +from typing import ClassVar + + +@dataclass +class EvalResult: + metrics: dict + frames: list = field(default_factory=list) + + +class Evaluator: + """Base class for all evaluators. + + Subclasses typically override only `_should_stop` (the loop termination + condition) and `env_overrides`. The default `rollout` runs a step loop + suitable for "stream of episode infos until target count reached" evals. + + To diverge from the default loop entirely, override `rollout` directly. + """ + + type_name: ClassVar[str] = "" + + def __init__(self, name: str, config: dict, train_config: dict): + # `name` = the [eval.] section name. Used as the wandb prefix. + self.name = name + # `config` = merged per-evaluator config (after inheritance + clean + # macro expansion). Has nested `env`, `vec`, plus flat scalar knobs. + self.config = config + # `train_config` = the full training config from drive.ini, used as + # the base layer that `config` overrides on top of. + self.train_config = train_config + + # Common scalars pulled out for ergonomics. + self.enabled: bool = bool(config.get("enabled", True)) + self.interval: int = int(config.get("interval", 0)) + self.mode: str = config.get("mode", "inline") + self.render: bool = bool(config.get("render", False)) + self.render_views: list = list(config.get("render_views", ["sim_state"])) + self.clean: bool = bool(config.get("clean", True)) + + # -- Config hooks --------------------------------------------------- + + def env_overrides(self) -> dict: + """Per-evaluator [env] overrides. Defaults to whatever the section + wrote under `env.*`. Subclasses can override to add baseline knobs.""" + return dict(self.config.get("env", {})) + + def vec_overrides(self) -> dict: + """Per-evaluator [vec] overrides. Default: serial single-worker — + the safe default for replay-style evals where each worker is a + single bin replay. Subclasses that want parallel throughput + (gigaflow validation) override this.""" + base = {"backend": "PufferEnv", "num_envs": 1} + base.update(self.config.get("vec", {})) + return base + + # -- Rollout (default) ---------------------------------------------- + + def rollout(self, vecenv, policy, args) -> EvalResult: + """Default rollout: reset → step → collect infos → aggregate. + + Times the inner work and adds `eval_seconds` to metrics so wandb + panels show wall-clock cost per evaluator. Subclasses tune + behavior by overriding `_run_rollout_loop` (and optionally + `_render_pass`); only override this method if the loop shape + itself needs to differ. + """ + t0 = time.time() + metrics = self._run_rollout_loop(vecenv, policy, args) + frames = self._render_pass(vecenv, policy, args) if self.render else [] + metrics["eval_seconds"] = float(time.time() - t0) + return EvalResult(metrics=metrics, frames=frames) + + def _run_rollout_loop(self, vecenv, policy, args) -> dict: + import numpy as np + import torch + + import pufferlib + + device = args["train"]["device"] + num_agents = vecenv.observation_space.shape[0] + state = self._init_lstm_state(num_agents, policy, device, args) + + obs = self._initial_reset(vecenv, args) + + infos_collected: list = [] + steps = 0 + while not self._should_stop(args, infos_collected, steps): + with torch.no_grad(): + ob_t = torch.as_tensor(obs).to(device) + logits, _ = policy.forward_eval(ob_t, state) + action, _, _ = pufferlib.pytorch.sample_logits(logits, deterministic=True) + action = action.cpu().numpy().reshape(vecenv.action_space.shape) + if isinstance(logits, torch.distributions.Normal): + action = np.clip(action, vecenv.action_space.low, vecenv.action_space.high) + + obs, _, terminals, truncations, infos = vecenv.step(action) + infos_collected.extend(self._flatten_infos(infos)) + # Mask LSTM state per-agent for envs that just terminated or + # truncated — those agents' next obs is from a fresh scenario + # and the recurrent memory of the previous one would bias + # the policy. Either signal alone means "episode over, env + # reset," so OR them. + if state: + done = np.asarray(terminals).astype(bool) | np.asarray(truncations).astype(bool) + mask = torch.as_tensor(~done, device=device, dtype=state["lstm_h"].dtype).reshape(-1, 1) + state["lstm_h"] *= mask + state["lstm_c"] *= mask + steps += 1 + + return self._aggregate_infos(infos_collected) + + # -- Loop hooks (subclass-overridable) ------------------------------ + + def _initial_reset(self, vecenv, args): + """Return the initial observation. Default: synchronous reset.""" + obs, _ = vecenv.reset() + return obs + + def _init_lstm_state(self, num_agents, policy, device, args) -> dict: + if not args["train"].get("use_rnn"): + return {} + import torch + + return dict( + lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), + lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), + ) + + def _should_stop(self, args, infos_collected, steps) -> bool: + """Loop termination. Subclasses must override.""" + raise NotImplementedError + + def _flatten_infos(self, infos) -> list: + """Pufferlib backends return either a list-of-list (multi-worker) or + a single list (PufferEnv backend). Flatten to a list of dicts.""" + out = [] + if not infos: + return out + for sub in infos: + if not sub: + continue + if isinstance(sub, list): + out.extend(sub) + else: + out.append(sub) + return out + + def _aggregate_infos(self, infos: list) -> dict: + """Default: numeric mean per key, plus a num_scenarios_completed count.""" + if not infos: + return {"num_scenarios_completed": 0} + import numpy as np + + out = {"num_scenarios_completed": float(len(infos))} + keys = set().union(*(d.keys() for d in infos)) + for k in keys: + vals = [d[k] for d in infos if isinstance(d.get(k), (int, float))] + if vals: + out[k] = float(np.mean(vals)) + return out + + # -- Render (default EGL → ffmpeg mp4 pipeline) ---------------------- + + def _render_pass(self, vecenv, policy, args) -> list: + """Build a fresh PufferEnv with `render_mode=headless`, render one + clip per (scenario, view), return mp4 paths. Returns [] for non-egl + backends. Subclasses customize the render env via `_render_env_overrides`. + """ + backend = args.get("render_backend", "egl") + if backend != "egl": + return [] + + import importlib + from pathlib import Path + + import pufferlib + + # Per-evaluator subdir so each evaluator's mp4s don't get re-globbed + # by the next evaluator's _render_view (every evaluator runs at the + # same global_step, so a shared dir + step glob would collect every + # earlier evaluator's mp4s into this one's result.frames). + out_dir = Path(args.get("render_results_dir") or args.get("eval_results_dir") or ".") / "mp4" / self.name + out_dir.mkdir(parents=True, exist_ok=True) + + package = args.get("package", "ocean") + module_name = "pufferlib.ocean" if package == "ocean" else f"pufferlib.environments.{package}" + env_module = importlib.import_module(module_name) + make_env = env_module.env_creator(args["env_name"]) + + render_env_kwargs = self._render_env_overrides(args) + # Stamp epoch + training step into the filename so successive + # epochs produce distinct mp4s and wandb's render carousel shows + # policy evolution. Epoch is the human-readable index ("which + # checkpoint did this come from"); global_step is the precise + # env-step count. Both fall back to 0 for ad-hoc CLI runs. + epoch = int(args.get("epoch") or 0) + global_step = int(args.get("global_step") or 0) + step_suffix = f"_epoch{epoch}_step{global_step}" + + all_paths = [] + for view in self.render_views: + view_idx = _VIEW_NAME_TO_IDX.get(view, 0) + view_suffix = step_suffix + ("" if view == "sim_state" else f"_{view}") + + vec = pufferlib.vector.make( + make_env, + env_args=[], + env_kwargs=render_env_kwargs, + backend="PufferEnv", + num_envs=1, + ) + target = vec if not hasattr(vec, "envs") else vec.envs[0] + internal = getattr(target, "num_envs", 1) + for e in range(internal): + target.set_video_suffix(view_suffix, env_idx=e) + + paths = self._render_view(vec, target, policy, args, view_idx, out_dir, step_suffix) + vec.close() + all_paths.extend(paths) + return all_paths + + def _render_env_overrides(self, args) -> dict: + """Build env kwargs for the render env. Default: same as the + metric-pass env plus `render_mode=headless`. Subclasses override + to inject things like a random starting_map (gigaflow validation) + or a shrunken bin set (behavior class).""" + out = dict(args["env"]) + out["render_mode"] = "headless" + return out + + def _render_view(self, vecenv, target_env, policy, args, view_idx, out_dir, step_suffix) -> list: + """One rollout per render-env, writes one mp4 per active env per view. + Caps how many internal envs actually feed ffmpeg pipes via + `eval.render_num_scenarios` so render cost stays bounded.""" + import os + + import numpy as np + import torch + + import pufferlib + + device = args["train"]["device"] + num_agents = vecenv.observation_space.shape[0] + + eval_cfg = self.config.get("eval", {}) + for required in ("render_num_scenarios", "render_max_steps"): + if required not in eval_cfg: + raise KeyError( + f"[eval.{self.name}] has render=true but eval.{required} is not set. " + "Render is expensive — set it explicitly per evaluator." + ) + num_scenarios = int(eval_cfg["render_num_scenarios"]) + max_steps = int(eval_cfg["render_max_steps"]) + + saved_cwd = os.getcwd() + os.chdir(out_dir) + # Glob for files written this pass: every mp4 has the step suffix, + # so a step_suffix-prefixed glob filters out accumulated mp4s from + # prior epochs (the dir is shared across runs). + step_glob = f"*{step_suffix}*.mp4" + try: + state = self._init_lstm_state(num_agents, policy, device, args) + scenarios_processed = 0 + while scenarios_processed < num_scenarios: + ob, _ = vecenv.reset() + scenarios = vecenv.get_state() + num_in_batch = len(scenarios) + # Cap how many envs render this iteration: the C kernel + # steps the full batch regardless, but only the first + # `to_render` envs feed ffmpeg pipes. + to_render = min(num_in_batch, num_scenarios - scenarios_processed) + if state: + state["lstm_h"].zero_() + state["lstm_c"].zero_() + for _ in range(max_steps): + with torch.no_grad(): + ob_t = torch.as_tensor(ob).to(device) + logits, _ = policy.forward_eval(ob_t, state) + action, _, _ = pufferlib.pytorch.sample_logits(logits, deterministic=True) + action = action.cpu().numpy().reshape(vecenv.action_space.shape) + if isinstance(logits, torch.distributions.Normal): + action = np.clip(action, vecenv.action_space.low, vecenv.action_space.high) + ob, _, _, _, _ = vecenv.step(action) + for e in range(to_render): + target_env.render(env_idx=e, view_mode=view_idx) + for e in range(to_render): + target_env.close_client(env_idx=e) + scenarios_processed += to_render + finally: + os.chdir(saved_cwd) + + return sorted(out_dir.glob(step_glob)) + + +_VIEW_NAME_TO_IDX = { + "sim_state": 0, + "bev": 1, + "topdown_sim": 2, + "bev_all": 3, +} diff --git a/pufferlib/ocean/benchmark/evaluators/behavior_class.py b/pufferlib/ocean/benchmark/evaluators/behavior_class.py new file mode 100644 index 000000000..6dabaea9d --- /dev/null +++ b/pufferlib/ocean/benchmark/evaluators/behavior_class.py @@ -0,0 +1,47 @@ +"""BehaviorClassEvaluator — one nuPlan behavior category at a time. + +Inherits HumanReplayEvaluator's loop. Adds optional fresh random sampling +each pass when `num_scenarios` < total bins (via a tmp symlink dir).""" + +import os +import random +import shutil +import tempfile +from typing import ClassVar + +from pufferlib.ocean.benchmark.evaluators.human_replay import HumanReplayEvaluator + + +class BehaviorClassEvaluator(HumanReplayEvaluator): + type_name: ClassVar[str] = "behavior_class" + + def __init__(self, name, config, train_config): + super().__init__(name, config, train_config) + self._sampled_dir = None # tmp symlink dir created per pass + + def env_overrides(self) -> dict: + env = super().env_overrides() + map_dir = env.get("map_dir", "") + if not map_dir or not os.path.isdir(map_dir): + return env + + num_scenarios = int(self.config.get("eval", {}).get("num_scenarios", 0)) + all_bins = [f for f in os.listdir(map_dir) if f.endswith(".bin")] + if num_scenarios > 0 and num_scenarios < len(all_bins): + sampled = random.sample(all_bins, num_scenarios) + self._sampled_dir = tempfile.mkdtemp(prefix=f"{self.name}_") + for fname in sampled: + os.symlink(os.path.join(map_dir, fname), os.path.join(self._sampled_dir, fname)) + env["map_dir"] = self._sampled_dir + n = num_scenarios + else: + n = len(all_bins) + env["num_agents"] = n + env["num_maps"] = n + env["num_eval_scenarios"] = n + return env + + def cleanup(self): + if self._sampled_dir and os.path.isdir(self._sampled_dir): + shutil.rmtree(self._sampled_dir, ignore_errors=True) + self._sampled_dir = None diff --git a/pufferlib/ocean/benchmark/evaluators/human_replay.py b/pufferlib/ocean/benchmark/evaluators/human_replay.py new file mode 100644 index 000000000..d400a09f6 --- /dev/null +++ b/pufferlib/ocean/benchmark/evaluators/human_replay.py @@ -0,0 +1,48 @@ +"""HumanReplayEvaluator — replay mode + control_sdc_only, one rollout per +bin in the map_dir, mean of per-episode info dicts. + +Inherits the default rollout loop from `Evaluator`; only overrides +`_should_stop` to terminate once every bin has produced one info.""" + +import os +from typing import ClassVar + +from pufferlib.ocean.benchmark.evaluators.base import Evaluator + + +class HumanReplayEvaluator(Evaluator): + type_name: ClassVar[str] = "human_replay" + + def env_overrides(self) -> dict: + env = { + "simulation_mode": "replay", + "control_mode": "control_sdc_only", + "init_mode": "create_all_valid", + "eval_mode": 1, + "termination_mode": 0, + "reward_randomization": False, + } + env.update(self.config.get("env", {})) + # 1 SDC per bin → num_agents == num_maps == num_eval_scenarios. + # Override the train-config defaults (num_agents=1024 etc.) since + # they don't apply in replay+control_sdc_only mode. + # Without setting num_eval_scenarios, the C-side replay branch caps + # env_count at drive.py's default of 16 — only 16 of N bins would + # ever instantiate as envs. + map_dir = env.get("map_dir", "") + if map_dir and os.path.isdir(map_dir): + n_bins = len([f for f in os.listdir(map_dir) if f.endswith(".bin")]) + env["num_agents"] = n_bins + env["num_maps"] = n_bins + env["num_eval_scenarios"] = n_bins + return env + + def _should_stop(self, args, infos_collected, steps) -> bool: + # All bins run as parallel envs in one batched rollout (because + # num_eval_scenarios = num_maps). Every env truncates on the same + # tick at scenario_length, vec_log emits one aggregate dict at + # that boundary, and we have the full per-bin average. Stop right + # after that — waiting longer just deterministically re-runs the + # same bins (resample_frequency is effectively never in replay). + scenario_length = int(args["env"]["scenario_length"]) + return steps >= scenario_length + 1 diff --git a/pufferlib/ocean/benchmark/evaluators/multi_scenario.py b/pufferlib/ocean/benchmark/evaluators/multi_scenario.py new file mode 100644 index 000000000..80d6f95a0 --- /dev/null +++ b/pufferlib/ocean/benchmark/evaluators/multi_scenario.py @@ -0,0 +1,38 @@ +"""MultiScenarioEvaluator — gigaflow validation eval. C-side eval_mode +cycles maps sequentially in one batched rollout, so the base loop + +PufferEnv defaults handle parallelism without multi-process workers.""" + +from typing import ClassVar + +from pufferlib.ocean.benchmark.evaluators.base import Evaluator + + +class MultiScenarioEvaluator(Evaluator): + type_name: ClassVar[str] = "multi_scenario" + + def env_overrides(self) -> dict: + env = { + "eval_mode": 1, + "termination_mode": 0, + "reward_randomization": False, + } + env.update(self.config.get("env", {})) + return env + + def _should_stop(self, args, infos_collected, steps) -> bool: + target = int(self.config.get("eval", {}).get("num_scenarios", 1)) + return len(infos_collected) >= target + + def _render_env_overrides(self, args) -> dict: + # Random starting_map per render epoch — every epoch shows a + # different bin from the dir rather than the same alphabetical + # first-N. Pin by setting env.starting_map explicitly in the + # [eval.] section. + import random + + out = super()._render_env_overrides(args) + if "starting_map" not in self.config.get("env", {}): + num_maps = int(out.get("num_maps", 1)) + if num_maps > 1: + out["starting_map"] = random.randint(0, num_maps - 1) + return out diff --git a/pufferlib/ocean/benchmark/evaluators/wosac.py b/pufferlib/ocean/benchmark/evaluators/wosac.py new file mode 100644 index 000000000..b8a7d82af --- /dev/null +++ b/pufferlib/ocean/benchmark/evaluators/wosac.py @@ -0,0 +1,41 @@ +"""WOSACEvaluator — Waymo Open Sim Agents Challenge realism eval. + +Wraps the existing WOSACEvaluator class in benchmark/evaluator.py — that +file owns the realism math (per-feature likelihood under learned +estimators) and the per-scene multi-rollout structure. This adapter +fits it into the unified Evaluator interface. +""" + +from typing import ClassVar + +from pufferlib.ocean.benchmark.evaluators.base import Evaluator + + +class WOSACEvaluator(Evaluator): + type_name: ClassVar[str] = "wosac" + + def env_overrides(self) -> dict: + env = { + "control_mode": "control_wosac", + "init_mode": "create_all_valid", + "eval_mode": 1, + "termination_mode": 0, + "reward_randomization": False, + } + env.update(self.config.get("env", {})) + return env + + def _run_rollout_loop(self, vecenv, policy, args) -> dict: + # Inner class pulls pandas/matplotlib — keep the import inside the + # rollout so the wrapper class can be imported in environments + # that don't have those (e.g. unit-test smoke envs). + from pufferlib.ocean.benchmark.evaluator import WOSACEvaluator as _WOSACInner + + inner = _WOSACInner(args) + df = inner.evaluate(args, vecenv, policy) + # df has one row per scene; aggregate to a single dict. + results = df.mean(numeric_only=True).to_dict() + results["total_num_agents"] = float(df["num_agents_per_scene"].sum()) + results["total_unique_scenarios"] = float(df.index.unique().shape[0]) + results["realism_meta_score_std"] = float(df["realism_meta_score"].std()) + return {k: (float(v) if hasattr(v, "item") else v) for k, v in results.items()} diff --git a/pufferlib/ocean/benchmark/manager.py b/pufferlib/ocean/benchmark/manager.py new file mode 100644 index 000000000..b3f6cb2b2 --- /dev/null +++ b/pufferlib/ocean/benchmark/manager.py @@ -0,0 +1,325 @@ +"""EvalManager — discovers `[eval.]` sections, instantiates Evaluators, +dispatches them inline or as subprocesses, logs results. + +Config schema (see docs/eval_unification.md): + + [eval.] + type = "" + enabled = true|false + interval = + mode = "inline" | "subprocess" + inherits = "" # optional, recursive merge + clean = true|false + render = true|false + render_views = ["sim_state", ...] + env. = # any [env] override + eval. = # evaluator-specific knobs + vec. = # any [vec] override + +Sections without a `type` field are templates (only usable via `inherits`). +""" + +import copy +import glob +import importlib +import json +import os +import subprocess +import sys +import time +from pathlib import Path + +import pufferlib + +from pufferlib.ocean.benchmark.evaluators import EVALUATOR_REGISTRY, EvalResult, Evaluator + +# clean_eval macro — env knobs to zero/enforce. Per-section explicit values +# win over the macro (see _build_section_config). +CLEAN_EVAL_OVERRIDES = { + "lane_segment_dropout": 0.0, + "boundary_segment_dropout": 0.0, + "partner_blindness_prob": 0.0, + "phantom_braking_prob": 0.0, + "phantom_braking_trigger_prob": 0.0, + "traffic_light_behavior": 1, +} + + +class EvalManager: + def __init__(self, evaluators: list, train_config: dict, run_id: str = None): + self.evaluators = evaluators + self.train_config = train_config + # `run_id` is needed to resolve the latest checkpoint for subprocess + # evals. None is fine if no evaluator is mode=subprocess. + self.run_id = run_id + + @classmethod + def from_config(cls, train_config: dict, run_id: str = None) -> "EvalManager": + sections = _discover_eval_sections(train_config) + evaluators = [] + for name, raw in sections.items(): + cfg = _build_section_config(name, raw, sections) + type_name = cfg.get("type") + if type_name is None: + # Template section — referenced via inherits but not instantiated. + continue + cls_for_type = EVALUATOR_REGISTRY.get(type_name) + if cls_for_type is None: + raise ValueError( + f"[eval.{name}] type='{type_name}' is not registered. " + f"Known types: {sorted(EVALUATOR_REGISTRY.keys())}" + ) + evaluators.append(cls_for_type(name=name, config=cfg, train_config=train_config)) + return cls(evaluators=evaluators, train_config=train_config, run_id=run_id) + + def has_subprocess_evals_at(self, epoch: int) -> bool: + """True if any enabled subprocess evaluator would fire at this epoch. + Training loop uses this to decide whether to save_checkpoint() before + calling maybe_run() — subprocesses load the checkpoint from disk.""" + for ev in self.evaluators: + if not ev.enabled or ev.mode != "subprocess" or ev.interval <= 0: + continue + if epoch % ev.interval == 0: + return True + return False + + def latest_checkpoint(self, env_name: str) -> str: + """Return the path to the most recent model_*.pt under the experiment + dir. Falls back to train_config['load_model_path'] if no checkpoints + have been written yet (e.g. resume-from path before first save). + Returns None if neither resolves.""" + if self.run_id and self.train_config.get("data_dir"): + model_dir = os.path.join(self.train_config["data_dir"], f"{env_name}_{self.run_id}", "models") + if os.path.isdir(model_dir): + files = glob.glob(os.path.join(model_dir, "model_*.pt")) + if files: + return max(files, key=os.path.getctime) + return self.train_config.get("load_model_path") + + def maybe_run(self, epoch: int, policy, env_name: str, logger=None, global_step=None) -> dict: + """Called from the training loop. Runs every enabled evaluator + whose `interval` divides `epoch`. Returns a dict of {eval_name → metrics}.""" + results = {} + for ev in self.evaluators: + if not ev.enabled: + continue + if ev.interval <= 0: + continue + if epoch % ev.interval != 0: + continue + res = self._run_one( + ev, policy=policy, env_name=env_name, logger=logger, global_step=global_step, epoch=epoch + ) + results[ev.name] = res + return results + + def run_one_by_name( + self, name: str, policy, env_name: str, logger=None, global_step=None, epoch=None + ) -> EvalResult: + """Run a single named evaluator regardless of interval. Used for + the subprocess CLI entry and for standalone `puffer eval --evaluator `.""" + for ev in self.evaluators: + if ev.name == name: + return self._run_one( + ev, policy=policy, env_name=env_name, logger=logger, global_step=global_step, epoch=epoch + ) + raise KeyError(f"No evaluator named '{name}'. Known: {[e.name for e in self.evaluators]}") + + def _run_one(self, ev: Evaluator, policy, env_name: str, logger, global_step, epoch=None) -> EvalResult: + if ev.mode == "subprocess": + res = self._run_subprocess(ev, env_name=env_name, global_step=global_step, epoch=epoch) + else: + res = self._run_inline(ev, policy=policy, env_name=env_name, global_step=global_step, epoch=epoch) + if logger is not None: + self._log(ev, res, logger=logger, global_step=global_step) + if hasattr(ev, "cleanup"): + ev.cleanup() + return res + + def _run_inline(self, ev: Evaluator, policy, env_name: str, global_step, epoch=None) -> EvalResult: + args = self._build_eval_args(ev, env_name=env_name, global_step=global_step, epoch=epoch) + + package = args.get("package", "ocean") + module_name = "pufferlib.ocean" if package == "ocean" else f"pufferlib.environments.{package}" + env_module = importlib.import_module(module_name) + make_env = env_module.env_creator(env_name) + + vec_kwargs = ev.vec_overrides() + backend = vec_kwargs.get("backend", "PufferEnv") + num_envs = int(vec_kwargs.get("num_envs", 1)) + + # PufferEnv is the default: Drive's C kernel batches all internal + # envs in one call so we get per-map parallelism without paying + # fork/IPC cost, and render shares the single ffmpeg pipeline. + # Multiprocessing is opt-in via [eval..vec] backend = ... + # for evals that genuinely need it (memory-split for big replay + # sweeps, hetero scenarios, async overlap on long rollouts). + # The two backends have incompatible call shapes; branch here. + if backend == "PufferEnv": + vecenv = pufferlib.vector.make( + make_env, env_args=[], env_kwargs=args["env"], backend=backend, num_envs=num_envs + ) + else: + vec_call_kwargs = dict(vec_kwargs) + vec_call_kwargs.setdefault("num_workers", num_envs) + vec_call_kwargs.setdefault("batch_size", num_envs) + vecenv = pufferlib.vector.make( + [make_env] * num_envs, + env_args=[[]] * num_envs, + env_kwargs=[args["env"] for _ in range(num_envs)], + **vec_call_kwargs, + ) + try: + res = ev.rollout(vecenv, policy, args) + finally: + vecenv.close() + return res + + def _run_subprocess(self, ev: Evaluator, env_name: str, global_step, epoch=None) -> EvalResult: + out_path = Path(self.train_config.get("data_dir", ".")) / "eval_subprocess_out" / f"{ev.name}.json" + out_path.parent.mkdir(parents=True, exist_ok=True) + cfg_path = out_path.with_suffix(".cfg.json") + with open(cfg_path, "w") as f: + json.dump({"name": ev.name, "global_step": global_step, "epoch": epoch}, f) + + cmd = [ + sys.executable, + "-m", + "pufferlib.pufferl", + "eval", + env_name, + "--evaluator", + ev.name, + "--out", + str(out_path), + ] + # Subprocess loads the freshest checkpoint on disk. Caller (training + # loop) is responsible for save_checkpoint() before this fires — + # see has_subprocess_evals_at. + ckpt = self.latest_checkpoint(env_name) + if ckpt: + cmd += ["--load-model-path", ckpt] + subprocess.run(cmd, check=True) + with open(out_path) as f: + payload = json.load(f) + return EvalResult(metrics=payload.get("metrics", {}), frames=payload.get("frames", [])) + + def _build_eval_args(self, ev: Evaluator, env_name: str, global_step, epoch=None) -> dict: + args = copy.deepcopy(self.train_config) + args["env"].update(ev.env_overrides()) + args.setdefault("vec", {}) + args["vec"].update(ev.vec_overrides()) + args["env_name"] = env_name + args["global_step"] = global_step + args["epoch"] = epoch + args["seed"] = int(self.train_config.get("train", {}).get("seed", 42)) or 42 + # Pass through evaluator-private fields that subclasses look up on args. + ev_eval = ev.config.get("eval", {}) + if ev_eval: + args.setdefault("eval", {}) + args["eval"].update(ev_eval) + return args + + def _log(self, ev: Evaluator, result: EvalResult, logger, global_step): + if not result.metrics and not result.frames: + return + log_dict = {f"{ev.name}/{k}": float(v) for k, v in result.metrics.items() if isinstance(v, (int, float))} + if hasattr(logger, "local_writer") and logger.local_writer and global_step is not None: + for k, v in log_dict.items(): + logger.local_writer.add_scalar(k, v, global_step) + if hasattr(logger, "log") and log_dict: + if global_step is not None: + logger.log(log_dict, global_step) + else: + logger.log(log_dict) + if result.frames and hasattr(logger, "log"): + try: + import wandb + + videos = [ + wandb.Video(str(p), fps=30, format="mp4", caption=Path(p).stem) + for p in result.frames + if str(p).endswith(".mp4") + ] + if videos: + payload = {f"{ev.name}/render": videos if len(videos) > 1 else videos[0]} + if global_step is not None: + logger.log(payload, global_step) + else: + logger.log(payload) + except ImportError: + pass + + +def _discover_eval_sections(args: dict) -> dict: + """Pull `[eval.]` sections out of the parsed config. + + `load_config` flattens dotted section names into a nested dict. So + `[eval.foo]` becomes `args["eval"]["foo"]`. We collect every direct + child of `args["eval"]` that's itself a dict and treat it as a section.""" + eval_root = args.get("eval", {}) + if not isinstance(eval_root, dict): + return {} + sections = {} + for name, body in eval_root.items(): + if isinstance(body, dict): + sections[name] = body + return sections + + +def _build_section_config(name: str, raw: dict, all_sections: dict) -> dict: + """Resolve `inherits` chain + `clean` macro + dotted-key flattening.""" + chain = [] + current_name = name + current_raw = raw + visited = set() + while True: + if current_name in visited: + raise ValueError(f"Cyclic 'inherits' chain involving [eval.{current_name}]") + visited.add(current_name) + chain.append(current_raw) + parent_name = current_raw.get("inherits") + if parent_name is None: + break + if parent_name not in all_sections: + raise ValueError(f"[eval.{current_name}].inherits='{parent_name}' is not a known section") + current_name = parent_name + current_raw = all_sections[parent_name] + + merged = {} + for level in reversed(chain): + # Deepcopy each level before merging — _deep_merge recurses into + # nested dicts in `merged` and mutates them in place. Without the + # copy, `merged["env"]` would alias the parent section's env dict; + # the child merge would mutate the parent in place, and every + # subsequent evaluator that inherits from the same parent would + # see the last-processed child's overrides instead of its own. + _deep_merge(merged, _expand_dotted(copy.deepcopy(level))) + + if merged.get("clean", True): + env_section = merged.setdefault("env", {}) + for k, v in CLEAN_EVAL_OVERRIDES.items(): + env_section.setdefault(k, v) + + return merged + + +def _expand_dotted(raw: dict) -> dict: + """`{"env.simulation_mode": "replay"}` → `{"env": {"simulation_mode": "replay"}}`.""" + out = {} + for k, v in raw.items(): + if "." in k: + head, _, tail = k.partition(".") + sub = out.setdefault(head, {}) + sub[tail] = v + else: + out[k] = v + return out + + +def _deep_merge(dst: dict, src: dict): + for k, v in src.items(): + if isinstance(v, dict) and isinstance(dst.get(k), dict): + _deep_merge(dst[k], v) + else: + dst[k] = v diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 7dc537ca5..e417e3473 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -458,6 +458,7 @@ def step(self, actions): min_agents_per_env=self.min_agents_per_env, max_agents_per_env=self.max_agents_per_env, num_eval_scenarios=self.current_num_eval_scenarios, # Use the dynamic size here + goal_radius=self.goal_radius, ) # In eval mode, don't wrap counter - allows termination condition to work correctly diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index d501a2bc4..e6b275d22 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -7,8 +7,6 @@ import numbers import warnings -import pandas as pd - warnings.filterwarnings("error", category=RuntimeWarning) @@ -257,6 +255,8 @@ def __init__(self, config, vecenv, policy, logger=None): self.losses = {} self.best_score = -float("inf") self.ema_max = 0.0 + # Set later via PuffeRL.attach_eval_manager (before evaluate() fires). + self._eval_manager = None # Dashboard self.model_size = sum(p.numel() for p in policy.parameters() if p.requires_grad) @@ -457,198 +457,23 @@ def train(self): except Exception as e: print(f"Failed to export model weights: {e}") - if self.config["eval"]["wosac_realism_eval"] and ( - self.epoch % self.config["eval"]["eval_interval"] == 0 or done_training - ): - pufferlib.utils.run_wosac_eval_in_subprocess(self.config, self.logger, self.global_step) - - if self.config["eval"]["human_replay_eval"] and ( - self.epoch % self.config["eval"]["eval_interval"] == 0 or done_training - ): - pufferlib.utils.run_human_replay_eval_in_subprocess(self.config, self.logger, self.global_step) - - if self.config["eval"]["wosac_realism_eval"] and ( - self.epoch % self.config["eval"]["eval_interval"] == 0 or done_training - ): - pufferlib.utils.run_wosac_eval_in_subprocess(self.config, self.logger, self.global_step) - - if self.config["eval"]["human_replay_eval"] and ( - self.epoch % self.config["eval"]["eval_interval"] == 0 or done_training - ): - pufferlib.utils.run_human_replay_eval_in_subprocess(self.config, self.logger, self.global_step) - - behaviours_eval_enabled = self.config["eval"].get("driving_behaviours_eval", False) - behaviours_eval_interval = int(self.config["eval"].get("driving_behaviours_eval_interval", 25)) - behaviours_config = self.config.get("driving_behaviours_eval") - if ( - behaviours_eval_enabled - and behaviours_config - and behaviours_eval_interval > 0 - and (self.epoch % behaviours_eval_interval == 0 or done_training) - ): - self.save_checkpoint() - pufferlib.utils.run_driving_behaviours_eval_in_subprocess( - self.config, self.logger, self.global_step, behaviours_config - ) - if self.config["eval"].get("render_driving_behaviours"): - self._render_driving_behaviours(behaviours_config) - - if self.config["eval"]["multi_scenario_eval"] and ( - self.epoch % self.config["eval"]["eval_interval"] == 0 or done_training - ): - # Get evaluation settings from config - eval_simulation_mode = self.config["eval"]["multi_scenario_simulation_mode"] - num_agents_eval = self.config["eval"]["num_agents"] - map_dir = self.config["eval"]["map_dir"] - - # Inline eval runs "clean" by default — perturbations + dropout off, - # red-light stops enforced — so the logged validation metrics - # track progress under controlled conditions rather than noisy - # training perturbations. The live training policy's road slicing - # is re-aligned to the clean env at eval time via - # _swap_policy_obs_counts inside eval_multi_scenarios. - clean_eval = self.config["eval"].get("clean_eval", True) - eval_overrides = build_eval_overrides( - simulation_mode=eval_simulation_mode, - num_agents=num_agents_eval, - num_scenarios=self.config["eval"]["multi_scenario_num_scenarios"], - map_dir=map_dir, - num_carla_maps=self.config["eval"].get("num_carla_maps", 8), - clean=clean_eval, - scenario_length=self.config["eval"].get("scenario_length"), - ) - - # Build eval args by applying overrides to training config - eval_args = load_eval_multi_scenarios_config( + # All evaluation is now driven by the unified EvalManager. Each + # [eval.] section in drive.ini is one evaluator instance; + # the manager fires any whose interval divides this epoch. See + # docs/eval_unification.md for the design. + if self._eval_manager is not None: + # Subprocess evals load the policy from disk. Save the latest + # checkpoint first so they see this epoch's weights, not the + # last save_checkpoint() from `checkpoint_interval`. + if self._eval_manager.has_subprocess_evals_at(self.epoch): + self.save_checkpoint() + self._eval_manager.maybe_run( + epoch=self.epoch, + policy=self.uncompiled_policy, env_name=self.config["env"], - model_path=None, # No saved model - using current policy in memory - eval_overrides=eval_overrides, - ) - # Add inline-specific settings - eval_args["global_step"] = self.global_step # Log by global step for TensorBoard - eval_args["num_scenarios"] = self.config["eval"]["multi_scenario_num_scenarios"] - eval_args["eval_simulation"] = eval_simulation_mode - - # Mark this as inline evaluation and set results folder in experiments - eval_args["inline_eval"] = True # Flag to indicate inline evaluation during training - experiment_name = f"{self.config['env']}_{self.logger.run_id}" - eval_args["load_model_path"] = os.path.join( - self.config["data_dir"], experiment_name, "models", f"inline_epoch_{self.epoch}.pt" + logger=self.logger, + global_step=self.global_step, ) - # For inline eval, results go in experiments folder instead of benchmark - eval_args["eval_results_dir"] = os.path.join( - self.config["data_dir"], - experiment_name, - "validation", - f"epoch_{self.epoch}", - self.config["eval"]["multi_scenario_simulation_mode"], - ) - - # Call eval_multi_scenarios inline with current policy and logger - print(f"\n🔄 Running multi-scenario evaluation at step {self.global_step}...") - eval_multi_scenarios( - env_name=self.config["env"], - args=eval_args, - vecenv=None, # Let it create its own eval environment - policy=self.uncompiled_policy, # Pass current policy - logger=self.logger, # Pass logger for TensorBoard logging - metric_prefix="validation", # Use validation_ prefix - quiet=True, # Suppress verbose output during inline eval - clean=clean_eval, - ) - - # Multi-scenario render — independent interval so the heavier render - # path doesn't have to fire every eval_interval. Mirrors the block - # above but calls eval_multi_scenarios_render with render=True and - # the configured backend ("egl" by default on this branch, writes - # one mp4 per scenario via the C render.h pipeline). - if self.config["eval"]["multi_scenario_render"] and ( - self.epoch % self.config["eval"]["multi_scenario_render_interval"] == 0 or done_training - ): - render_simulation_mode = self.config["eval"]["multi_scenario_simulation_mode"] - num_agents_render = self.config["eval"]["num_agents"] - render_map_dir = self.config["eval"]["map_dir"] - clean_render = self.config["eval"].get("clean_eval", True) - - render_overrides = build_eval_overrides( - simulation_mode=render_simulation_mode, - num_agents=num_agents_render, - num_scenarios=self.config["eval"]["multi_scenario_num_scenarios"], - map_dir=render_map_dir, - num_carla_maps=self.config["eval"].get("num_carla_maps", 8), - clean=clean_render, - scenario_length=self.config["eval"].get("scenario_length"), - ) - - render_args = load_eval_multi_scenarios_config( - env_name=self.config["env"], - model_path=None, - eval_overrides=render_overrides, - ) - render_args["global_step"] = self.global_step - render_args["num_scenarios"] = self.config["eval"]["multi_scenario_num_scenarios"] - render_args["eval_simulation"] = render_simulation_mode - render_args["render"] = True # master on/off for the render branch - render_args["render_obs"] = False # HTML-only; EGL path ignores - - render_args["inline_eval"] = True - experiment_name = f"{self.config['env']}_{self.logger.run_id}" - render_args["load_model_path"] = os.path.join( - self.config["data_dir"], experiment_name, "models", f"inline_epoch_{self.epoch}.pt" - ) - render_args["eval_results_dir"] = os.path.join( - self.config["data_dir"], - experiment_name, - "renders", - f"epoch_{self.epoch:08d}", - self.config["eval"]["multi_scenario_simulation_mode"], - ) - - backend_name = self.config["eval"]["multi_scenario_render_backend"] - print(f"\n🎬 Running multi-scenario {backend_name} render at step {self.global_step}...") - # Render failures (missing map dir, corrupted .bin files, ffmpeg - # absent, EGL unavailable, etc.) should NEVER crash training — the - # render is a logging side-channel. Catch any exception here, log - # it, and let training keep going. The upstream eval_multi_scenarios - # metric call is separate and already ran, so metric eval continues - # to work even if video rendering is broken. - # Multi-view EGL render: run the full render fn once per view - # (sim_state then bev). Each call creates a fresh vecenv that - # starts at scenario 0, runs all scenarios with one camera, and - # tears down. Doing both views in ONE rollout would not work - # because Drive.step's resample fires at the last step and - # advances starting_map_counter — a re-reset would replay the - # NEXT batch instead of the original one. - _bev_views = [(0, "", "sim_state"), (1, "_bev", "bev")] if backend_name == "egl" else [(0, "", "sim_state")] - for _vmode, _vsuffix, _vlabel in _bev_views: - try: - eval_multi_scenarios_render( - env_name=self.config["env"], - args=dict(render_args), - vecenv=None, - policy=self.uncompiled_policy, - logger=self.logger, - metric_prefix=f"render_{_vlabel}", - quiet=True, - render_backend=backend_name, - view_mode=_vmode, - video_suffix=_vsuffix, - log_view_label=_vlabel, - # Configurable cap: eval.render_max_steps. Default 50 until - # the mystery ~500-c_render-call abort is properly diagnosed. - # Set to 0/negative to disable the cap entirely. - render_max_steps=(self.config["eval"].get("render_max_steps", 50) or None), - clean=clean_render, - ) - except Exception as e: - import traceback - - print( - f"\n⚠️ multi_scenario_render failed (view={_vlabel}) at step {self.global_step}: " - f"{type(e).__name__}: {e}" - ) - traceback.print_exc() - print("Training continues.") return logs @@ -959,90 +784,6 @@ def mean_and_log(self): self.logger.log(logs, agent_steps) return logs - def _render_driving_behaviours(self, behaviours_config): - """Render one scenario per driving behaviour class using eval_multi_scenarios_render.""" - import random as _random - - EVAL_SECTIONS_PREFIX = "eval_" - backend_name = self.config["eval"].get("multi_scenario_render_backend", "egl") - bev_views = [(0, "", "sim_state"), (1, "_bev", "bev")] if backend_name == "egl" else [(0, "", "sim_state")] - - for class_name, class_cfg in behaviours_config.items(): - if not class_name.startswith(EVAL_SECTIONS_PREFIX): - continue - map_dir = class_cfg.get("map_dir", "") - if isinstance(map_dir, str): - map_dir = map_dir.strip('"').strip("'") - if not os.path.isdir(map_dir) or not any(f.endswith(".bin") for f in os.listdir(map_dir)): - continue - - short = class_name[len(EVAL_SECTIONS_PREFIX) :] - num_maps = len([f for f in os.listdir(map_dir) if f.endswith(".bin")]) - # Render under clean-eval conditions (zero dropout, zero - # perturbations, enforced red lights) so the mp4s show what - # the policy does under controlled eval, not the noisy - # training-time perturbations. Matches run_driving_behaviours - # _eval_in_subprocess, so the video matches the metric eval. - render_overrides = build_eval_overrides( - simulation_mode="replay", - num_agents=1, - num_scenarios=1, - map_dir=map_dir, - clean=True, - ) - render_overrides["env"]["control_mode"] = "control_sdc_only" - render_overrides["env"]["num_maps"] = num_maps - render_overrides["env"]["scenario_length"] = class_cfg.get("scenario_length", 91) - # Pick a random starting map index so each render epoch shows a - # different scenario from the directory. Without this, the env - # picks scenario 0 every time and we'd always render the same - # first .bin alphabetically. - render_overrides["env"]["starting_map"] = _random.randint(0, num_maps - 1) - - render_args = load_eval_multi_scenarios_config( - env_name=self.config["env"], - model_path=None, - eval_overrides=render_overrides, - ) - experiment_name = f"{self.config['env']}_{self.logger.run_id}" - render_args["global_step"] = self.global_step - render_args["num_scenarios"] = 1 - render_args["eval_simulation"] = "replay" - render_args["render"] = True - render_args["inline_eval"] = True - render_args["eval_results_dir"] = os.path.join( - self.config["data_dir"], - experiment_name, - "renders", - f"epoch_{self.epoch:08d}", - "driving_behaviours", - short, - ) - - for vmode, vsuffix, vlabel in bev_views: - try: - eval_multi_scenarios_render( - env_name=self.config["env"], - args=dict(render_args), - vecenv=None, - policy=self.uncompiled_policy, - logger=self.logger, - metric_prefix=f"driving_behaviours/{short}", - render_key_prefix=f"driving_behaviours/{short}/render/{vlabel}", - quiet=True, - render_backend=backend_name, - view_mode=vmode, - video_suffix=vsuffix, - log_view_label=vlabel, - render_max_steps=(self.config["eval"].get("render_max_steps", 50) or None), - clean=True, - ) - except Exception as e: - import traceback - - print(f"DrivingBehavioursRender [{short}] view={vlabel}: {type(e).__name__}: {e}") - traceback.print_exc() - def close(self): self.vecenv.close() self.utilization.stop() @@ -1635,10 +1376,13 @@ def train(env_name, args=None, vecenv=None, policy=None, logger=None, early_stop **args["train"], env=env_name, eval=args.get("eval", {}), - driving_behaviours_eval=args.get("driving_behaviours_eval"), ) pufferl = PuffeRL(train_config, vecenv, policy, logger) + from pufferlib.ocean.benchmark.manager import EvalManager + + pufferl._eval_manager = EvalManager.from_config(args, run_id=logger.run_id if logger else None) + # Restore optimizer state + step counters when resuming from a checkpoint. # save_checkpoint writes models/model__.pt and trainer_state.pt # (sibling of models/) — so trainer_state.pt is one dir above the .pt path. @@ -1723,994 +1467,66 @@ def train(env_name, args=None, vecenv=None, policy=None, logger=None, early_stop return all_logs -def eval(env_name, args=None, vecenv=None, policy=None): - """Evaluate a policy.""" - - args = args or load_config(env_name) - args["env"]["termination_mode"] = 0 - - wosac_enabled = args["eval"]["wosac_realism_eval"] - human_replay_enabled = args["eval"]["human_replay_eval"] - - if wosac_enabled: - args["env"]["map_dir"] = args["eval"]["map_dir"] - dataset_name = args["env"]["map_dir"].split("/")[-1] - - print(f"Running WOSAC realism evaluation with {dataset_name} dataset.\n") - from pufferlib.ocean.benchmark.evaluator import WOSACEvaluator - - backend = args["eval"]["backend"] - assert backend == "PufferEnv" or not wosac_enabled, "WOSAC evaluation only supports PufferEnv backend." - - # Configure environment for WOSAC - args["vec"] = dict(backend=backend, num_envs=1) - args["env"]["init_mode"] = args["eval"]["wosac_init_mode"] - args["env"]["control_mode"] = args["eval"]["wosac_control_mode"] - args["env"]["init_steps"] = args["eval"]["wosac_init_steps"] - args["env"]["goal_behavior"] = args["eval"]["wosac_goal_behavior"] - args["env"]["goal_radius"] = args["eval"]["wosac_goal_radius"] - - # Batch size configuration - num_scenes_per_batch = args["eval"]["wosac_batch_size"] - args["env"]["num_agents"] = num_scenes_per_batch * 10 - args["env"]["num_maps"] = args["eval"]["wosac_scenario_pool_size"] - - # Create environment and policy - vecenv = vecenv or load_env(env_name, args) - policy = policy or load_policy(args, vecenv, env_name) - - # Make eval class instance - evaluator = WOSACEvaluator(args) - - # Obtain scores - df_results = evaluator.evaluate(args, vecenv, policy) - - # Average results over scenarios - results_dict = df_results.mean().to_dict() - results_dict["total_num_agents"] = df_results["num_agents_per_scene"].sum() - results_dict["total_unique_scenarios"] = df_results.index.unique().shape[0] - results_dict["realism_meta_score_std"] = df_results["realism_meta_score"].std() - results_dict = {k: v.item() if hasattr(v, "item") else v for k, v in results_dict.items()} - - import json - - print("\nWOSAC_METRICS_START") - print(json.dumps(results_dict)) - print("WOSAC_METRICS_END") - vecenv.close() - return results_dict - - elif human_replay_enabled: - args["env"]["map_dir"] = args["eval"]["map_dir"] - dataset_name = args["env"]["map_dir"].split("/")[-1] - print(f"Running human replay evaluation with {dataset_name} dataset.\n") - from pufferlib.ocean.benchmark.evaluator import HumanReplayEvaluator - - backend = args["eval"].get("backend", "PufferEnv") - args["env"]["map_dir"] = args["eval"]["map_dir"] - args["env"]["num_agents"] = args["eval"]["human_replay_num_agents"] - args["env"]["num_maps"] = len([f for f in os.listdir(args["env"]["map_dir"]) if f.endswith(".bin")]) - - args["vec"] = dict(backend=backend, num_envs=1) - args["env"]["control_mode"] = args["eval"]["human_replay_control_mode"] - args["env"]["scenario_length"] = args["eval"].get("scenario_length", 201) - - vecenv = vecenv or load_env(env_name, args) - policy = policy or load_policy(args, vecenv, env_name) - - print(f"Effective number of scenarios used: {len(vecenv.driver_env.agent_offsets) - 1}") - - evaluator = HumanReplayEvaluator(args) - - # Run rollouts with human replays - results = evaluator.rollout(args, vecenv, policy) - - import json - - print("HUMAN_REPLAY_METRICS_START") - print(json.dumps(results)) - print("HUMAN_REPLAY_METRICS_END") - - return results - - else: # Standard evaluation: Render - backend = args["vec"]["backend"] - if backend != "PufferEnv": - backend = "Serial" - - args["vec"] = dict(backend=backend, num_envs=1) - vecenv = vecenv or load_env(env_name, args) - policy = policy or load_policy(args, vecenv, env_name) - - ob, info = vecenv.reset() - driver = vecenv.driver_env - num_agents = vecenv.observation_space.shape[0] - device = args["train"]["device"] - - state = {} - if args["train"]["use_rnn"]: - state = dict( - lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), - lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), - ) - - frames = [] - while True: - render = driver.render() - if len(frames) < args["save_frames"]: - frames.append(render) - - # Screenshot Ocean envs with F12, gifs with control + F12 - if driver.render_mode == "ansi": - print("\033[0;0H" + render + "\n") - time.sleep(1 / args["fps"]) - elif driver.render_mode == "rgb_array": - pass - # import cv2 - # render = cv2.cvtColor(render, cv2.COLOR_RGB2BGR) - # cv2.imshow('frame', render) - # cv2.waitKey(1) - # time.sleep(1/args['fps']) - - with torch.no_grad(): - ob = torch.as_tensor(ob).to(device) - logits, value = policy.forward_eval(ob, state) - action, logprob, _ = pufferlib.pytorch.sample_logits(logits) - action = action.cpu().numpy().reshape(vecenv.action_space.shape) - - if isinstance(logits, torch.distributions.Normal): - action = np.clip(action, vecenv.action_space.low, vecenv.action_space.high) - - ob = vecenv.step(action)[0] - - if len(frames) > 0 and len(frames) == args["save_frames"]: - import imageio - - imageio.mimsave(args["gif_path"], frames, fps=args["fps"], loop=0) - frames.append("Done") - - -def load_eval_multi_scenarios_config(env_name, model_path=None, eval_overrides=None): - """Load config for evaluation, merging experiment YAML with defaults.""" - args = load_config(env_name) - if model_path: - experiment_dir = os.path.dirname(os.path.dirname(model_path)) - config_yaml_path = os.path.join(experiment_dir, "config.yaml") - EXCLUDE_KEYS = eval_overrides["env"].keys() - # Override Policy and RNN dimensions from training config - if os.path.exists(config_yaml_path): - print(f"Found config.yaml at {config_yaml_path}. Merging with defaults...") - with open(config_yaml_path, "r") as f: - yaml_config = yaml.safe_load(f) - - for section in ["env", "policy", "rnn"]: - if section in yaml_config and isinstance(yaml_config[section], dict): - for k, v in yaml_config[section].items(): - if k not in EXCLUDE_KEYS: - args[section][k] = v - - # Also copy root-level keys like rnn_name, policy_name - for key in ["rnn_name", "policy_name"]: - if key in yaml_config: - args[key] = yaml_config[key] - - # Update use_rnn based on rnn_name - args["train"]["use_rnn"] = args["rnn_name"] is not None - - # Override env parameters from evaluation config - if eval_overrides: - for section, section_overrides in eval_overrides.items(): - if isinstance(section_overrides, dict): - for k, v in section_overrides.items(): - args[section][k] = v - else: - args[section] = section_overrides - - return args - - -def build_eval_overrides( - simulation_mode, num_agents, num_scenarios, map_dir=None, num_carla_maps=8, clean=False, scenario_length=None -): - """Build evaluation overrides for a given simulation mode. - - Args: - simulation_mode: "gigaflow" or "replay" - num_agents: agent slot budget for evaluation - map_dir: replay dataset directory, required for replay mode - clean: if True, run a "clean" eval — zero road-segment dropout and - enforce red-light stops. Only safe when the policy is rebuilt - from the eval env (standalone eval / render_scenario.py). Inline - eval during training reuses the live training policy, whose - encoder was built for the training obs shape; zeroing dropout - there changes the obs shape and triggers a CUDA device-side - assert. Perturbation probabilities (partner_blindness, - phantom_braking) are always forced to zero at eval — they're - pure randomness, they don't change the obs shape, and eval - should be deterministic regardless of clean mode. - scenario_length: replay-mode scenarios per-step count (also used as - resample_frequency). Defaults to 91 — WOMD's 9.1s @ 10Hz. nuPlan - scenes from the categorized py123d pipeline want 201 (20.1s). - Ignored in gigaflow mode (procedural episodes always run for the - hardcoded 3000-step budget). - """ - # Common reward coefficients (same for both modes) - common_env = { - "eval_mode": 1, - "collision_behavior": 1, - "offroad_behavior": 1, - "traffic_light_behavior": 1 if clean else 0, - "reward_randomization": False, - "reward_vehicle_collision": 3.0, - "reward_offroad_collision": 3.0, - "reward_ade": 0.0, - "reward_goal": 1.0, - "reward_overspeed": 0.05, - "reward_comfort": 0.05, - "reward_velocity": 0.0025, - "reward_lane_align": 0.025, - "reward_lane_center": 0.0038, - "reward_timestep": 0.000025, - # Always zero perturbations at eval. These don't change obs shape so - # it's safe to force even for inline eval, and a deterministic eval - # is what we want for tracking progress. - "partner_blindness_prob": 0.0, - "phantom_braking_prob": 0.0, - "phantom_braking_trigger_prob": 0.0, - } - - if clean: - # Dropout changes the obs shape. Only safe when the policy is - # rebuilt from the eval env (standalone eval / render_scenario). - # NEVER pass clean=True from an inline-eval call site — the live - # training policy's encoder was built for the training obs shape. - common_env["lane_segment_dropout"] = 0.0 - common_env["boundary_segment_dropout"] = 0.0 - - if simulation_mode == "gigaflow": - eval_overrides = { - "env": { - **common_env, - "simulation_mode": "gigaflow", - "min_agents_per_env": 50, - "max_agents_per_env": 50, - "resample_frequency": 3000, - "scenario_length": 3000, - # Point at the py123d-converted CARLA towns added to this branch. - # The older binaries/carla dir predates the 123Drive pipeline and - # is not populated on emerge/temp_training. - "map_dir": map_dir or "pufferlib/resources/drive/binaries/carla_py123d", - "num_maps": num_carla_maps, - "num_agents": num_agents, - "termination_mode": 0.0, - } - } - elif simulation_mode == "replay": - replay_len = scenario_length if scenario_length is not None else 91 - eval_overrides = { - "env": { - **common_env, - "simulation_mode": "replay", - "resample_frequency": replay_len, - "scenario_length": replay_len, - "max_agents_per_env": 64, - "map_dir": map_dir or "pufferlib/resources/drive/binaries/womd", - "num_maps": num_scenarios, - "num_agents": num_agents, - "min_agents_per_env": 1, - "termination_mode": 0.0, - # "control_mode": "control_sdc_only", - }, - } - else: - raise ValueError(f"Invalid simulation_mode: {simulation_mode}. Must be 'gigaflow' or 'replay'.") - - return eval_overrides - - -@contextlib.contextmanager -def _swap_policy_obs_counts(policy, vecenv): - """Temporarily align the policy's road-segment slicing with the eval env. - - Training uses dropout > 0 → smaller obs_{lane,boundary}_segment_count. - Clean eval uses dropout = 0 → larger counts, larger obs buffer. The - GigaFlow encoder (lane_encoder / boundary_encoder) is a shared MLP - applied per-segment with max-pool — its weights are count-invariant. - Only the obs-buffer slicing in DriveBackbone.forward depends on these - counts, so we can just swap them for the duration of the eval and the - same training policy works on the larger clean obs. - """ - try: - eval_env = vecenv.driver_env - new_lane = int(eval_env.obs_lane_segment_count) - new_boundary = int(eval_env.obs_boundary_segment_count) - except AttributeError: - # If the eval env doesn't expose these (unknown wrapper), skip the - # swap — forward will still work when training and eval obs shapes - # coincide (clean=False or no dropout configured). - yield - return - - targets = [] - for m in policy.modules(): - if hasattr(m, "obs_lane_segment_count") and hasattr(m, "obs_boundary_segment_count"): - targets.append(m) - - saved = [(m.obs_lane_segment_count, m.obs_boundary_segment_count) for m in targets] - try: - for m in targets: - m.obs_lane_segment_count = new_lane - m.obs_boundary_segment_count = new_boundary - yield - finally: - for m, (orig_lane, orig_boundary) in zip(targets, saved): - m.obs_lane_segment_count = orig_lane - m.obs_boundary_segment_count = orig_boundary - - -def verify_scenario_coverage(csv_path: str, num_scenarios: int) -> dict: - """ - Verify that episode_metrics.csv contains all expected scenarios. - - Args: - csv_path: Path to episode_metrics.csv - num_scenarios: Expected number of scenarios (e.g., 1000) - - Returns: - dict with keys: - - complete: bool - True if all scenarios present - - expected_count: number of expected scenarios - - found_count: number of unique scenarios found - - missing: sorted list of missing map names - - extra: sorted list of unexpected map names - - duplicates: dict mapping map_name -> count (if >1) - """ - df = pd.read_csv(csv_path) - - # Expected: map_000, map_001, ..., map_{num_scenarios-1} - expected = {f"map_{i:03d}" for i in range(num_scenarios)} - found = set(df["map_name"].unique()) - - missing = expected - found - extra = found - expected +def eval(env_name, args=None, vecenv=None, policy=None, evaluator_name=None, out_path=None): + """Run a single named evaluator from drive.ini. - # Check for duplicates - counts = df["map_name"].value_counts() - duplicates = {name: count for name, count in counts.items() if count > 1} + Standalone form: `puffer eval puffer_drive --evaluator `. The + evaluator's config (env/vec overrides, render flag, etc.) comes from + the [eval.] section. Loads the policy from `--load-model-path`. - complete = len(missing) == 0 - - return { - "complete": complete, - "expected_count": num_scenarios, - "found_count": len(found), - "missing": sorted(missing), - "extra": sorted(extra), - "duplicates": duplicates, - } - - -def verify_scenario_coverage_gigaflow(csv_path: str, num_scenarios: int) -> dict: - """ - Verify gigaflow evaluation CSV: maps repeat across scenarios, so check total - row count rather than unique map names. + Subprocess form: `--out ` writes the result dict to a JSON file + so the parent EvalManager can read structured metrics back without + parsing stdout. """ - df = pd.read_csv(csv_path) - total_rows = len(df) - complete = total_rows == num_scenarios - return { - "complete": complete, - "expected_count": num_scenarios, - "found_count": total_rows, - } - - -# Helper functions for eval_multi_scenarios and eval_multi_scenarios_render -def _export_metrics(global_infos, eval_folder, num_scenarios, quiet, verify_coverage=False, simulation_mode="replay"): - """Export episode and summary CSVs, return avg_infos dict.""" - # Episode Metrics - try: - df_episodes = pd.DataFrame(global_infos) - first_cols = ["episode_id", "map_name"] - other_cols = [col for col in df_episodes.columns if col not in first_cols] - new_col_order = first_cols + other_cols - df_episodes = df_episodes[new_col_order] - - if verify_coverage: - df_episodes = df_episodes.sort_values(by=["map_name", "episode_id"]) - - episode_csv_path = os.path.join(eval_folder, "episode_metrics.csv") - df_episodes.to_csv(episode_csv_path, index=False) - if not quiet: - print(f"\n✅ Per-episode metrics exported to {episode_csv_path}") - - if verify_coverage: - if simulation_mode == "gigaflow": - result = verify_scenario_coverage_gigaflow(episode_csv_path, num_scenarios) - if not quiet: - if result["complete"]: - print(f"✅ All {num_scenarios} episodes present in CSV") - else: - print( - f"⚠️ Episode count mismatch: expected {result['expected_count']}, found {result['found_count']}" - ) - else: - result = verify_scenario_coverage(episode_csv_path, num_scenarios) - if not quiet: - if result["complete"]: - print(f"✅ All {num_scenarios} scenarios present in CSV") - else: - print(f"⚠️ Scenario coverage incomplete:") - print(f" Expected: {result['expected_count']}, Found: {result['found_count']}") - if result["missing"]: - print(f" Missing ({len(result['missing'])}): {result['missing']}") - if result["extra"]: - print(f" Extra: {result['extra'][:10]}...") - if result["duplicates"]: - print(f" Duplicates: {len(result['duplicates'])} scenarios have multiple entries") - for name, count in sorted(result["duplicates"].items()): - print(f" {name}: {count} entries") - except Exception as e: - print(f"\n⚠️ Could not export per-episode CSV. Error: {e}") - print("Global infos data:", global_infos) - - # Evaluation average metrics - avg_infos = {} - for k, v in global_infos.items(): - if k == "num_scenarios": - avg_infos[k] = np.sum(v) - elif v and isinstance(v[0], numbers.Number): - avg_infos[k] = np.mean(v) - df_summary = pd.DataFrame(list(avg_infos.items()), columns=["Metric", "Average"]) - summary_csv_path = os.path.join(eval_folder, "evaluation_summary.csv") - df_summary.to_csv(summary_csv_path, index=False) - if not quiet: - print(f"\n✅ Average results exported to {summary_csv_path}") - print(df_summary.to_string(index=False)) - - return avg_infos - - -def _log_eval_metrics(logger, avg_infos, args, metric_prefix, quiet): - """Log metrics to TensorBoard/wandb if logger is provided.""" - if logger is None or args.get("global_step") is None: - return - - global_step = args["global_step"] - - # Create log dict with metric prefix (use / for TensorBoard grouping) - log_dict = {} - for metric_key, metric_value in avg_infos.items(): - if isinstance(metric_value, (int, float)): - log_dict[f"{metric_prefix}/{metric_key}"] = float(metric_value) - - # Log to TensorBoard if available - if hasattr(logger, "local_writer") and logger.local_writer: - for key, value in log_dict.items(): - logger.local_writer.add_scalar(key, value, global_step) - if not quiet: - print(f"✅ Logged {len(log_dict)} validation metrics to TensorBoard at step {global_step}") - - # Also log to wandb/neptune if available - if hasattr(logger, "log"): - logger.log(log_dict, global_step) - - -def eval_multi_scenarios( - env_name, - args=None, - vecenv=None, - policy=None, - logger=None, - metric_prefix="validation", - quiet=False, - clean=False, -): - t0 = time.time() - - if args is None: - tmp_args = load_config(env_name) - model_path = tmp_args.get("load_model_path") - num_agents_eval = tmp_args["eval"]["num_agents"] - map_dir = tmp_args["eval"]["map_dir"] - - # CLI standalone entry point: read clean_eval from the eval section - # so users can enable it via --eval.clean-eval. Inline callers pass - # clean= directly and come in through the args-provided branch. - clean_from_config = tmp_args["eval"].get("clean_eval", False) - eval_overrides = build_eval_overrides( - simulation_mode=tmp_args["eval_simulation"], - num_agents=num_agents_eval, - num_scenarios=tmp_args["num_scenarios"], - map_dir=map_dir, - num_carla_maps=tmp_args.get("num_carla_maps", 8), - clean=clean_from_config, - scenario_length=tmp_args["eval"].get("scenario_length"), - ) - args = load_eval_multi_scenarios_config(env_name, model_path, eval_overrides) - clean = clean or clean_from_config - - # Reproducibility — same approach as training - seed = args["train"]["seed"] or 42 - np.random.seed(seed) - torch.manual_seed(seed) - - backend = args["vec"]["backend"] - num_scenarios = args["num_scenarios"] - - num_workers = min(args["vec"]["num_envs"], num_scenarios) - - # Distribute scenarios across workers - scenarios_per_worker = num_scenarios // num_workers - remainder = num_scenarios % num_workers - current_start = 0 - env_kwargs_list = [] - for j in range(num_workers): - worker_kwargs = copy.deepcopy(args["env"]) - worker_num_scenario = scenarios_per_worker + (1 if j < remainder else 0) - worker_kwargs["starting_map"] = current_start - worker_kwargs["num_eval_scenarios"] = worker_num_scenario - env_kwargs_list.append(worker_kwargs) - current_start += worker_num_scenario - - print(f"Distributing {num_scenarios} scenarios across {num_workers} workers:") - for j, w in enumerate(env_kwargs_list): - start = w["starting_map"] - count = w["num_eval_scenarios"] - print(f" Worker {j}: maps {start}-{start + count - 1} ({count} scenarios)") - - args["vec"] = dict(backend=backend, num_envs=num_workers, num_workers=num_workers, batch_size=num_workers) - - if vecenv is None: - package = args["package"] - module_name = "pufferlib.ocean" if package == "ocean" else f"pufferlib.environments.{package}" - env_module = importlib.import_module(module_name) - make_env = env_module.env_creator(env_name) - # Pass as lists to preserve per-worker env_kwargs - env_creators = [make_env] * num_workers - env_args = [[]] * num_workers - vecenv = pufferlib.vector.make(env_creators, env_args=env_args, env_kwargs=env_kwargs_list, **args["vec"]) + from pufferlib.ocean.benchmark.manager import EvalManager - policy = policy or load_policy(args, vecenv, env_name) - policy.eval() - num_agents = vecenv.observation_space.shape[0] - device = args["train"]["device"] + args = args or load_config(env_name) - state = {} - if args["train"]["use_rnn"]: - state = dict( - lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), - lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), + if evaluator_name is None: + evaluator_name = args.get("evaluator") + if evaluator_name is None: + raise pufferlib.APIUsageError( + "puffer eval requires --evaluator ; named [eval.] sections live in drive.ini" ) - # Folder for evaluation results - # For inline evaluation during training, use eval_results_dir in experiments folder - # For standalone evaluation, use benchmark folder - if "inline_eval" in args and args["inline_eval"] and "eval_results_dir" in args: - eval_folder = args["eval_results_dir"] - else: - # Standalone evaluation path (in benchmark folder) - model_path = args["load_model_path"] - if model_path is None: - eval_folder = os.path.join("benchmark", "no_policy", args["eval_simulation"]) - else: - model_filename_with_ext = os.path.basename(model_path) - model_name = os.path.splitext(model_filename_with_ext)[0] - models_dir = os.path.dirname(model_path) - experiment_dir = os.path.dirname(models_dir) - experiment_name = os.path.basename(experiment_dir) - eval_folder = os.path.join("benchmark", experiment_name, model_name, args["eval_simulation"]) - os.makedirs(eval_folder, exist_ok=True) - - global_infos = {} - scenarios_processed = 0 - vecenv.async_reset(42) - - ob, _, _, _, infos, _, _ = vecenv.recv() - # Clean eval may use different road-dropout than training. The shared - # training policy's obs slicing needs to be aligned with this env; see - # _swap_policy_obs_counts. - swap_ctx = _swap_policy_obs_counts(policy, vecenv) if clean else contextlib.nullcontext() - with swap_ctx, tqdm(total=num_scenarios, desc="Processing scenarios", disable=quiet) as pbar: - while scenarios_processed < num_scenarios: - # Reset LSTM - if args["train"]["use_rnn"]: - state = dict( - lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), - lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), - ) - - for _ in range(args["env"]["scenario_length"]): - with torch.no_grad(): - ob = torch.as_tensor(ob).to(device) - logits, _ = policy.forward_eval(ob, state) - action, _, _ = pufferlib.pytorch.sample_logits(logits, deterministic=True) - action = action.cpu().numpy().reshape(vecenv.action_space.shape) - - if isinstance(logits, torch.distributions.Normal): - action = np.clip(action, vecenv.action_space.low, vecenv.action_space.high) - - ob, _, _, _, infos = vecenv.step(action) - - # Multi-worker backend returns infos as list of lists (one per worker) - if infos and infos[0]: - for sub_env in infos: - for env_idx, summary in enumerate(sub_env): - env_map_name = summary["map_name"].split("/")[-1].split(".")[0] - summary["episode_id"] = env_idx - summary["map_name"] = env_map_name - scenarios_processed += 1 - pbar.update(1) - - for k, v in summary.items(): - if k not in global_infos: - global_infos[k] = [] - global_infos[k].append(v) - - avg_infos = _export_metrics( - global_infos, - eval_folder, - num_scenarios, - quiet, - verify_coverage=True, - simulation_mode=args["env"]["simulation_mode"], + manager = EvalManager.from_config(args) + + # Build a fresh vecenv inside the manager via the evaluator's overrides. + # Policy can come from a checkpoint (load_model_path) or be passed in. + if policy is None: + # Need a probe vecenv just to construct the policy with the right + # obs/action spaces. Use the matching evaluator's env_overrides so + # the obs shape matches what the rollout will see. + target = next((e for e in manager.evaluators if e.name == evaluator_name), None) + if target is None: + raise KeyError(f"No [eval.{evaluator_name}] section found. Known: {[e.name for e in manager.evaluators]}") + probe_args = manager._build_eval_args(target, env_name=env_name, global_step=None) + probe_vec = load_env(env_name, probe_args) + policy = load_policy(probe_args, probe_vec, env_name) + probe_vec.close() + + result = manager.run_one_by_name( + evaluator_name, + policy=policy, + env_name=env_name, + logger=None, + global_step=args.get("global_step"), ) - print(f"\nTotal evaluation time: {time.time() - t0:.2f} seconds for {num_scenarios} scenarios.") - _log_eval_metrics(logger, avg_infos, args, metric_prefix, quiet) - - # Close vectorized environment to avoid file descriptor leaks - vecenv.close() - - -def eval_multi_scenarios_render( - env_name, - args=None, - vecenv=None, - policy=None, - logger=None, - metric_prefix="validation", - quiet=False, - render_backend="html", - view_mode=0, - video_suffix="", - log_view_label="render", - render_max_steps=None, - render_key_prefix=None, - clean=False, -): - # Set fixed seed for reproducible evaluation - np.random.seed(42) - torch.manual_seed(42) - - if args is None: - tmp_args = load_config(env_name) - model_path = tmp_args.get("load_model_path") - num_agents_eval = tmp_args["eval"]["num_agents"] - map_dir = tmp_args["eval"]["map_dir"] - clean_from_config = tmp_args["eval"].get("clean_eval", False) - eval_overrides = build_eval_overrides( - simulation_mode=tmp_args["eval_simulation"], - num_agents=num_agents_eval, - num_scenarios=tmp_args["num_scenarios"], - map_dir=map_dir, - num_carla_maps=tmp_args.get("num_carla_maps", 8), - clean=clean_from_config, - scenario_length=tmp_args["eval"].get("scenario_length"), - ) - args = load_eval_multi_scenarios_config(env_name, model_path, eval_overrides) - clean = clean or clean_from_config - - backend = args["vec"]["backend"] - if backend != "PufferEnv": - backend = "Serial" - - args["vec"] = dict(backend=backend, num_envs=1) - args["env"]["num_eval_scenarios"] = args["num_scenarios"] # first batch: fill as many scenarios as fit - - # Backend selection. - # "html" — the existing viz.generate_interactive_replay path (CPU-only, - # self-contained HTML per scenario). - # "egl" — the C-side render.h → make_client → client_record_frame - # pipeline (EGL GPU context, PBO double-buffer readback, - # writev → ffmpeg libx264, one mp4 per scenario). - egl_mode = bool(args.get("render")) and render_backend == "egl" - html_mode = bool(args.get("render")) and not egl_mode - if egl_mode: - # Force the C env to RENDER_HEADLESS so make_client spawns ffmpeg and - # (under DRIVE_HAS_EGL) switches the active GL context to the GPU. - args["env"]["render_mode"] = "headless" - - vecenv = vecenv or load_env(env_name, args) - - policy = policy or load_policy(args, vecenv, env_name) - policy.eval() - num_agents = vecenv.observation_space.shape[0] - device = args["train"]["device"] - - state = {} - if args["train"]["use_rnn"]: - state = dict( - lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), - lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), - ) - - # Folder for evaluation results - # For inline evaluation during training, use eval_results_dir in experiments folder - # For standalone evaluation, use benchmark folder - if "inline_eval" in args and args["inline_eval"] and "eval_results_dir" in args: - eval_folder = args["eval_results_dir"] - else: - # Standalone evaluation path (in benchmark folder) - model_path = args["load_model_path"] - if model_path is None: - eval_folder = os.path.join("benchmark", "no_policy", args["eval_simulation"]) - else: - model_filename_with_ext = os.path.basename(model_path) - model_name = os.path.splitext(model_filename_with_ext)[0] - models_dir = os.path.dirname(model_path) - experiment_dir = os.path.dirname(models_dir) - experiment_name = os.path.basename(experiment_dir) - eval_folder = os.path.join("benchmark", experiment_name, model_name, args["eval_simulation"]) - os.makedirs(eval_folder, exist_ok=True) - - saved_cwd = None - mp4_folder = None - gif_folder = None - if html_mode: - gif_folder = eval_folder + "/gif" - os.makedirs(gif_folder, exist_ok=True) - if egl_mode: - mp4_folder = os.path.join(eval_folder, "mp4") - os.makedirs(mp4_folder, exist_ok=True) - # C-side make_client writes .mp4 into the process cwd. We - # chdir into mp4_folder so every scenario's file lands in the right - # place, then restore cwd after the rollout loop. - saved_cwd = os.getcwd() - os.chdir(mp4_folder) - - global_infos = {} - num_scenarios = args["num_scenarios"] - - # Apply per-env video suffix once before any render. make_client reads - # env->video_suffix on the first render to build the ffmpeg filename, so - # this must fire before any step. We don't yet know how many internal - # envs are in the vecenv (vecenv.get_state() only works after reset), - # so set on a generous prefix and let extras be no-ops. - if egl_mode and video_suffix: - _target_env_pre = vecenv if not hasattr(vecenv, "envs") else vecenv.envs[0] - # Drive exposes its internal C-level vec env count via num_envs. - # Use it as the loop bound so we never call set_video_suffix on an - # out-of-range env_id (which would corrupt memory before the C - # bounds check landed). - _internal_num_envs = getattr(_target_env_pre, "num_envs", 1) - for _e in range(_internal_num_envs): - try: - _target_env_pre.set_video_suffix(video_suffix, env_idx=_e) - except Exception: - break - scenarios_processed = 0 - # PufferEnv native backend: vecenv IS the Drive env (no .envs list). - # Serial/Multiprocessing: need vecenv.envs[0] to reach the underlying env. - target_env = vecenv if not hasattr(vecenv, "envs") else vecenv.envs[0] - - # Align the live training policy's obs slicing with the (potentially - # clean) eval env for the render. Same swap as eval_multi_scenarios. - swap_ctx = _swap_policy_obs_counts(policy, vecenv) if clean else contextlib.nullcontext() - with swap_ctx, tqdm(total=num_scenarios, desc="Processing scenarios", disable=quiet) as pbar: - while scenarios_processed < num_scenarios: - ob, _ = vecenv.reset() - - # Get initial states for all environments in the batch - scenarios = vecenv.get_state() - num_envs_in_batch = len(scenarios) - batch_start = scenarios_processed - - # Prepare batch_size_eval for the resample that fires at end of the step loop. - # That resample will load the NEXT batch, so cap it at remaining_after_this. - remaining_after_this = num_scenarios - scenarios_processed - num_envs_in_batch - target_env.batch_size_eval = max(1, remaining_after_this) - - map_names = [] - for env_idx in range(num_envs_in_batch): - map_names.append(scenarios[env_idx]["map_name"].split("/")[-1].split(".")[0]) - - # Reset LSTM - if args["train"]["use_rnn"]: - state = dict( - lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), - lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), - ) - - # Initialize histories as lists of lists (one list per environment). - # Only needed for the HTML replay path — EGL writes mp4 frames - # directly to ffmpeg via c_render each step. - if html_mode: - agent_histories = [[] for _ in range(num_envs_in_batch)] - traffic_histories = [[] for _ in range(num_envs_in_batch)] - trajectory_histories = [[] for _ in range(num_envs_in_batch)] - all_agents_obs_histories = [[] for _ in range(num_envs_in_batch)] - - _render_steps = args["env"]["scenario_length"] - if render_max_steps is not None: - _render_steps = min(_render_steps, render_max_steps) - for t in range(_render_steps): - if html_mode: - current_scenarios = vecenv.get_state() - start_obs_index = 0 - - # Loop through every environment in the batch to record its history - for env_idx in range(num_envs_in_batch): - env_scenario = current_scenarios[env_idx] - - agent_histories[env_idx].append( - pufferlib.viz.fill_agents_state( - env_scenario, use_trajectory="trajectory" in args["env"]["action_type"] - ) - ) - traffic_histories[env_idx].append(pufferlib.viz.fill_traffics_state(env_scenario, t)) - - if "trajectory" in args["env"]["action_type"]: - trajectory_histories[env_idx].append(pufferlib.viz.fill_trajectories(env_scenario, t)) - - # Collect observation dictionaries for ALL active agents in THIS environment at timestep t - if args["render_obs"]: - step_obs_dict = {} - if env_idx > 0: - start_obs_index += current_scenarios[env_idx - 1]["active_agent_count"] - for agent_idx in range(env_scenario["active_agent_count"]): - agent_id = env_scenario["active_agent_indices"][agent_idx] - step_obs_dict[int(agent_id)] = pufferlib.viz.extract_obs_frame( - ob, - env_scenario, - args, - timestep=t, - obs_index=start_obs_index + agent_idx, - agent_idx=agent_idx, - head_north=True, - ) - all_agents_obs_histories[env_idx].append(step_obs_dict) - - with torch.no_grad(): - ob = torch.as_tensor(ob).to(device) - logits, _ = policy.forward_eval(ob, state) - action, _, _ = pufferlib.pytorch.sample_logits(logits, deterministic=True) - action = action.cpu().numpy().reshape(vecenv.action_space.shape) + print("EVAL_RESULT_JSON_START") + import json - if isinstance(logits, torch.distributions.Normal): - action = np.clip(action, vecenv.action_space.low, vecenv.action_space.high) - - ob, _, _, _, infos = vecenv.step(action) - - if egl_mode: - # Flush one frame per env through c_render → client_record_frame - # → PBO async readback → writev → ffmpeg pipe. make_client is - # called lazily on the first render per env (sets up ffmpeg + - # GPU context) and close_client at scenario end flushes the - # trailing PBO frame. - for e in range(num_envs_in_batch): - target_env.render(env_idx=e, view_mode=view_mode) - - # Serial backend returns infos as single list (infos[0] is the env's info list) - if infos and infos[0]: - for env_idx, summary in enumerate(infos[0]): - env_map_name = summary["map_name"].split("/")[-1].split(".")[0] - summary["episode_id"] = batch_start + env_idx - summary["env_id"] = env_idx - summary["map_name"] = env_map_name - - for k, v in summary.items(): - if k not in global_infos: - global_infos[k] = [] - global_infos[k].append(v) - - if html_mode: - # Loop through every environment to generate its specific HTML replay - for env_idx in range(num_envs_in_batch): - global_episode_id = batch_start + env_idx - # Ensure we don't render padding environments if num_scenarios isn't perfectly divisible by batch_size - if global_episode_id >= num_scenarios: - break - env_map_name = map_names[env_idx] - - pufferlib.viz.generate_interactive_replay( - current_scenarios[env_idx], - agent_histories[env_idx], - traffic_histories[env_idx], - trajectory_histories[env_idx], - all_agents_obs_histories[env_idx], - f"{gif_folder}/{env_map_name}_{global_episode_id:03d}.html", - head_north=True, - ) + print(json.dumps({"name": evaluator_name, "metrics": result.metrics})) + print("EVAL_RESULT_JSON_END") - if egl_mode: - # Close every env's Client so ffmpeg gets EOF on its input pipe, - # the trailing PBO frame is flushed, and libx264 writes the mp4 - # trailer. Without this, the mp4 files are either empty or one - # frame short. - import sys as _sys_cc + if out_path: + with open(out_path, "w") as f: + json.dump( + {"name": evaluator_name, "metrics": result.metrics, "frames": [str(p) for p in result.frames]}, + f, + ) - _sys_cc.stderr.write( - f"[render-instr] starting close_client loop num_envs_in_batch={num_envs_in_batch}\n" - ) - _sys_cc.stderr.flush() - for e in range(num_envs_in_batch): - _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) calling\n") - _sys_cc.stderr.flush() - target_env.close_client(env_idx=e) - _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) returned\n") - _sys_cc.stderr.flush() - - scenarios_processed += num_envs_in_batch - pbar.update(num_envs_in_batch) - - import sys as _sys_instr - - _sys_instr.stderr.write("[render-instr] rollout loop done\n") - _sys_instr.stderr.flush() - - # render_key_prefix overrides metric_prefix for wandb media uploads only. - # This lets callers keep metric_prefix for scalar metrics while using a - # different namespace for renders (e.g. driving_behaviours//render/). - _upload_prefix = render_key_prefix if render_key_prefix is not None else metric_prefix - - if html_mode: - pufferlib.viz.build_gallery_index(gif_folder) - if logger is not None: - try: - import wandb - - html_paths = sorted(os.path.join(gif_folder, f) for f in os.listdir(gif_folder) if f.endswith(".html")) - if html_paths: - step = args.get("global_step") - # Stable key per (category, view); each render epoch overwrites - # the same wandb panel rather than fanning out by scenario UUID. - html_log = {_upload_prefix: wandb.Html(html_paths[-1])} - if hasattr(logger, "log"): - logger.log(html_log, step) if step is not None else logger.log(html_log) - if not quiet: - print(f"Uploaded {len(html_paths)} render HTML(s) to wandb") - except Exception as e: - if not quiet: - print(f"Failed to upload render HTMLs to wandb: {e}") - - if saved_cwd is not None: - os.chdir(saved_cwd) - _sys_instr.stderr.write("[render-instr] chdir restored\n") - _sys_instr.stderr.flush() - - avg_infos = _export_metrics(global_infos, eval_folder, num_scenarios, quiet, verify_coverage=False) - _sys_instr.stderr.write("[render-instr] _export_metrics done\n") - _sys_instr.stderr.flush() - _log_eval_metrics(logger, avg_infos, args, metric_prefix, quiet) - _sys_instr.stderr.write("[render-instr] _log_eval_metrics done\n") - _sys_instr.stderr.flush() - - if egl_mode and mp4_folder and logger is not None: - try: - import wandb - - mp4_paths = sorted(os.path.join(mp4_folder, f) for f in os.listdir(mp4_folder) if f.endswith(".mp4")) - if mp4_paths: - # Log under a single stable key per (category, view) so successive - # renders show up in the same wandb panel as a time series. - # The scenario UUID lives in the caption, not the key. - videos = [ - wandb.Video(p, fps=30, format="mp4", caption=os.path.splitext(os.path.basename(p))[0]) - for p in mp4_paths - ] - video_log = {_upload_prefix: videos if len(videos) > 1 else videos[0]} - step = args.get("global_step") - if hasattr(logger, "log"): - logger.log(video_log, step) if step is not None else logger.log(video_log) - if not quiet: - print(f"Uploaded {len(mp4_paths)} render mp4(s) to wandb") - except Exception as e: - if not quiet: - print(f"Failed to upload render mp4s to wandb: {e}") - - # Close vectorized environment to avoid file descriptor leaks - vecenv.close() + return result.metrics def sweep(args=None, env_name=None): @@ -3016,22 +1832,6 @@ def puffer_type(value): args["train"]["use_rnn"] = args["rnn_name"] is not None - # Load driving behaviours eval config if specified - behaviours_config_path = args.get("eval", {}).get("driving_behaviours_eval_config") - if behaviours_config_path: - if isinstance(behaviours_config_path, str): - behaviours_config_path = behaviours_config_path.strip('"').strip("'") - if os.path.exists(behaviours_config_path): - print(f"Loading driving behaviours eval config from {behaviours_config_path}") - bp = configparser.ConfigParser(inline_comment_prefixes=(";", "#")) - bp.read(behaviours_config_path) - behaviours = {} - for section in bp.sections(): - behaviours[section] = {k: puffer_type(v) for k, v in bp[section].items()} - args["driving_behaviours_eval"] = behaviours - else: - print(f"Warning: driving_behaviours_eval_config not found: {behaviours_config_path}") - # Use World size to divide Num_Agents / minibatch size in DDP if "LOCAL_RANK" in os.environ: world_size = int(os.environ.get("WORLD_SIZE", 1)) @@ -3053,12 +1853,22 @@ def main(): if mode == "train": train(env_name=env_name) elif mode == "eval": - eval(env_name=env_name) - elif mode == "eval_multi_scenarios": - eval_multi_scenarios(env_name=env_name) - elif mode == "eval_multi_scenarios_render": - eval_multi_scenarios_render(env_name=env_name) - print("") + # Pull --evaluator and --out from argv before load_config consumes them. + evaluator_name = None + out_path = None + i = 0 + while i < len(sys.argv): + arg = sys.argv[i] + if arg == "--evaluator" and i + 1 < len(sys.argv): + evaluator_name = sys.argv[i + 1] + del sys.argv[i : i + 2] + continue + if arg == "--out" and i + 1 < len(sys.argv): + out_path = sys.argv[i + 1] + del sys.argv[i : i + 2] + continue + i += 1 + eval(env_name=env_name, evaluator_name=evaluator_name, out_path=out_path) elif mode == "sweep": sweep(env_name=env_name) elif mode == "controlled_exp": diff --git a/pufferlib/utils.py b/pufferlib/utils.py index a425dcef3..9efc0c628 100644 --- a/pufferlib/utils.py +++ b/pufferlib/utils.py @@ -8,309 +8,6 @@ import json -def run_human_replay_eval_in_subprocess(config, logger, global_step): - """ - Run human replay evaluation in a subprocess and log metrics to wandb. - - """ - try: - run_id = logger.run_id - model_dir = os.path.join(config["data_dir"], f"{config['env']}_{run_id}") - model_files = glob.glob(os.path.join(model_dir, "model_*.pt")) - - if not model_files: - print("No model files found for human replay evaluation") - return - - latest_cpt = max(model_files, key=os.path.getctime) - - # Prepare evaluation command - eval_config = config["eval"] - cmd = [ - sys.executable, - "-m", - "pufferlib.pufferl", - "eval", - config["env"], - "--load-model-path", - latest_cpt, - "--eval.wosac-realism-eval", - "False", - "--eval.human-replay-eval", - "True", - "--eval.human-replay-num-agents", - str(eval_config["human_replay_num_agents"]), - "--eval.human-replay-control-mode", - str(eval_config["human_replay_control_mode"]), - ] - - # Run human replay evaluation in subprocess - result = subprocess.run(cmd, capture_output=True, text=True, timeout=600, cwd=os.getcwd()) - - if result.returncode == 0: - # Extract JSON from stdout between markers - stdout = result.stdout - if "HUMAN_REPLAY_METRICS_START" in stdout and "HUMAN_REPLAY_METRICS_END" in stdout: - start = stdout.find("HUMAN_REPLAY_METRICS_START") + len("HUMAN_REPLAY_METRICS_START") - end = stdout.find("HUMAN_REPLAY_METRICS_END") - json_str = stdout[start:end].strip() - human_replay_metrics = json.loads(json_str) - - # Log to wandb if available - if hasattr(logger, "wandb") and logger.wandb: - logger.wandb.log( - { - "eval/human_replay_collision_rate": human_replay_metrics["collision_rate"], - "eval/human_replay_offroad_rate": human_replay_metrics["offroad_rate"], - "eval/human_replay_completion_rate": human_replay_metrics["completion_rate"], - }, - step=global_step, - ) - else: - print(f"Human replay evaluation failed with exit code {result.returncode}: {result.stderr}") - - except subprocess.TimeoutExpired: - print("Human replay evaluation timed out") - except Exception as e: - print(f"Failed to run human replay evaluation: {e}") - - -def run_wosac_eval_in_subprocess(config, logger, global_step): - """ - Run WOSAC evaluation in a subprocess and log metrics to wandb. - - Args: - config: Configuration dictionary containing data_dir, env, and wosac settings - logger: Logger object with run_id and optional wandb attribute - epoch: Current training epoch - global_step: Current global training step - - Returns: - None. Prints error messages if evaluation fails. - """ - try: - run_id = logger.run_id - model_dir = os.path.join(config["data_dir"], f"{config['env']}_{run_id}") - model_files = glob.glob(os.path.join(model_dir, "model_*.pt")) - - if not model_files: - print("No model files found for WOSAC evaluation") - return - - latest_cpt = max(model_files, key=os.path.getctime) - - # Prepare evaluation command - eval_config = config.get("eval", {}) - cmd = [ - sys.executable, - "-m", - "pufferlib.pufferl", - "eval", - config["env"], - "--load-model-path", - latest_cpt, - "--eval.wosac-realism-eval", - "True", - "--eval.wosac-num-agents", - str(eval_config.get("wosac_num_agents", 256)), - "--eval.wosac-init-mode", - str(eval_config.get("wosac_init_mode", "create_all_valid")), - "--eval.wosac-control-mode", - str(eval_config.get("wosac_control_mode", "control_wosac")), - "--eval.wosac-init-steps", - str(eval_config.get("wosac_init_steps", 10)), - "--eval.wosac-goal-radius", - str(eval_config.get("wosac_goal_radius", 2.0)), - "--eval.wosac-sanity-check", - str(eval_config.get("wosac_sanity_check", False)), - "--eval.wosac-aggregate-results", - str(eval_config.get("wosac_aggregate_results", True)), - ] - - # Run WOSAC evaluation in subprocess - result = subprocess.run(cmd, capture_output=True, text=True, timeout=600, cwd=os.getcwd()) - - if result.returncode == 0: - # Extract JSON from stdout between markers - stdout = result.stdout - if "WOSAC_METRICS_START" in stdout and "WOSAC_METRICS_END" in stdout: - start = stdout.find("WOSAC_METRICS_START") + len("WOSAC_METRICS_START") - end = stdout.find("WOSAC_METRICS_END") - json_str = stdout[start:end].strip() - wosac_metrics = json.loads(json_str) - - # Log to wandb if available - if hasattr(logger, "wandb") and logger.wandb: - logger.wandb.log( - { - "eval/wosac_realism_meta_score": wosac_metrics["realism_meta_score"], - "eval/wosac_ade": wosac_metrics["ade"], - "eval/wosac_min_ade": wosac_metrics["min_ade"], - "eval/wosac_total_num_agents": wosac_metrics["total_num_agents"], - }, - step=global_step, - ) - else: - print(f"WOSAC evaluation failed with exit code {result.returncode}") - print(f"Error: {result.stderr}") - - # Check for memory issues - stderr_lower = result.stderr.lower() - if "out of memory" in stderr_lower or "cuda out of memory" in stderr_lower: - print("GPU out of memory. Skipping this WOSAC evaluation.") - - except subprocess.TimeoutExpired: - print("WOSAC evaluation timed out after 600 seconds") - except MemoryError as e: - print(f"WOSAC evaluation ran out of memory. Skipping this evaluation: {e}") - except Exception as e: - print(f"Failed to run WOSAC evaluation: {type(e).__name__}: {e}") - - -def run_driving_behaviours_eval_in_subprocess(config, logger, global_step, behaviours_config): - """ - Run driving behaviours evaluation for each of the specified scenario classes in a subprocess. - - For each class defined in behaviours_config, calls `puffer eval puffer_drive` with: - - simulation_mode=replay, control_mode=control_sdc_only, init_mode=create_all_valid - - map_dir and num_agents from the class config - Parses HUMAN_REPLAY_METRICS_START/END JSON from stdout and logs to wandb under - driving_behaviours//. - """ - sampled_dirs = [] # temp symlink dirs created for num_scenarios sampling - try: - run_id = logger.run_id - model_dir = os.path.join(config["data_dir"], f"{config['env']}_{run_id}") - model_files = glob.glob(os.path.join(model_dir, "models", "model_*.pt")) - - if not model_files: - print("DrivingBehavioursEval: no model files found, skipping.") - return - - latest_cpt = max(model_files, key=os.path.getctime) - EVAL_SECTIONS_PREFIX = "eval_" - classes = [(name, cfg) for name, cfg in behaviours_config.items() if name.startswith(EVAL_SECTIONS_PREFIX)] - - all_results = {} - for class_name, class_cfg in classes: - map_dir = class_cfg.get("map_dir", "") - if isinstance(map_dir, str): - map_dir = map_dir.strip('"').strip("'") - if not os.path.isdir(map_dir): - print( - f"DrivingBehavioursEval [{class_name[len(EVAL_SECTIONS_PREFIX) :]}]: map_dir not found, skipping ({map_dir})" - ) - continue - all_bins = [f for f in os.listdir(map_dir) if f.endswith(".bin")] - if not all_bins: - print( - f"DrivingBehavioursEval [{class_name[len(EVAL_SECTIONS_PREFIX) :]}]: no .bin files in {map_dir}, skipping" - ) - continue - # Optional cap: random-sample N bins each eval pass via a fresh - # symlink dir. Different scenes per pass; better population estimate - # without paying for the full directory. - num_scenarios = class_cfg.get("num_scenarios") - if num_scenarios and int(num_scenarios) < len(all_bins): - k = int(num_scenarios) - sampled = random.sample(all_bins, k) - tmp_dir = tempfile.mkdtemp(prefix=f"db_eval_{class_name}_") - for fname in sampled: - os.symlink(os.path.join(map_dir, fname), os.path.join(tmp_dir, fname)) - map_dir = tmp_dir - sampled_dirs.append(tmp_dir) - num_agents = len([f for f in os.listdir(map_dir) if f.endswith(".bin")]) - scenario_length = class_cfg.get("scenario_length", 201) - short = class_name[len(EVAL_SECTIONS_PREFIX) :] - - cmd = [ - sys.executable, - "-m", - "pufferlib.pufferl", - "eval", - config["env"], - "--load-model-path", - latest_cpt, - "--eval.wosac-realism-eval", - "False", - "--eval.human-replay-eval", - "True", - "--eval.map-dir", - map_dir, - "--eval.human-replay-num-agents", - str(num_agents), - "--eval.human-replay-control-mode", - str(config["eval"].get("human_replay_control_mode", "control_sdc_only")), - "--env.simulation-mode", - "replay", - "--env.init-mode", - "create_all_valid", - "--eval.scenario-length", - str(scenario_length), - # Clean-eval overrides. Mirrors build_eval_overrides(clean=True): - # red lights enforced, no road-segment dropout, no partner - # blindness or phantom braking, wider partner budget. Subprocess - # re-parses the ini so training-time CLI overrides don't leak in - # here. (eval_mode is on ev/clean-eval branch, not this one.) - "--env.traffic-light-behavior", - "1", - "--env.lane-segment-dropout", - "0.0", - "--env.boundary-segment-dropout", - "0.0", - "--env.partner-blindness-prob", - "0.0", - "--env.phantom-braking-prob", - "0.0", - "--env.phantom-braking-trigger-prob", - "0.0", - "--env.max-partner-observations", - "32", - ] - - print(f"DrivingBehavioursEval: running class '{short}' with map_dir={map_dir}") - try: - result = subprocess.run(cmd, capture_output=True, text=True, timeout=600, cwd=os.getcwd()) - if result.returncode == 0: - stdout = result.stdout - if "HUMAN_REPLAY_METRICS_START" in stdout and "HUMAN_REPLAY_METRICS_END" in stdout: - start = stdout.find("HUMAN_REPLAY_METRICS_START") + len("HUMAN_REPLAY_METRICS_START") - end = stdout.find("HUMAN_REPLAY_METRICS_END") - metrics = json.loads(stdout[start:end].strip()) - all_results[class_name] = metrics - print(f"DrivingBehavioursEval [{short}]: {metrics}") - else: - print(f"DrivingBehavioursEval [{short}]: no metrics found in output") - else: - print( - f"DrivingBehavioursEval [{short}]: subprocess failed (exit {result.returncode}): {result.stderr[-500:]}" - ) - except subprocess.TimeoutExpired: - print(f"DrivingBehavioursEval [{short}]: timed out") - except Exception as e: - print(f"DrivingBehavioursEval [{short}]: error: {e}") - - # Log all class results to wandb - if hasattr(logger, "wandb") and logger.wandb and all_results: - payload = {} - for class_name, metrics in all_results.items(): - short = class_name[len(EVAL_SECTIONS_PREFIX) :] - for k, v in metrics.items(): - try: - payload[f"driving_behaviours/{short}/{k}"] = float(v) - except (TypeError, ValueError): - pass - if payload: - payload["train_step"] = global_step - logger.wandb.log(payload, step=global_step) - - except Exception as e: - print(f"DrivingBehavioursEval: unexpected error: {e}") - finally: - for d in sampled_dirs: - shutil.rmtree(d, ignore_errors=True) - - def render_videos(config, vecenv, logger, epoch, global_step, bin_path): """ Generate and log training videos using C-based rendering. diff --git a/tests/test_eval_manager.py b/tests/test_eval_manager.py new file mode 100644 index 000000000..d40280558 --- /dev/null +++ b/tests/test_eval_manager.py @@ -0,0 +1,496 @@ +"""Smoke tests for EvalManager config parsing + dispatch. + +Doesn't load the full pufferl.py module (which pulls heavy training deps). +Verifies parser correctness, dispatch gating, info-flattening shape +handling, behavior-class symlink cleanup, and the train/section/macro +override resolution stack. +""" + +import os +import sys + +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from pufferlib.ocean.benchmark.evaluators import EvalResult, Evaluator +from pufferlib.ocean.benchmark.evaluators.behavior_class import BehaviorClassEvaluator +from pufferlib.ocean.benchmark.manager import ( + CLEAN_EVAL_OVERRIDES, + EvalManager, + _build_section_config, + _expand_dotted, +) + + +def test_dotted_expand(): + """`_expand_dotted` should turn `{"env.X": v}` flat keys into a nested + `{"env": {"X": v}}` dict so configparser-style ini keys round-trip.""" + raw = {"env.simulation_mode": "replay", "interval": 25} + out = _expand_dotted(raw) + assert out == {"env": {"simulation_mode": "replay"}, "interval": 25} + + +def test_inheritance_chain(): + """Single-level inheritance: child should pull all parent fields it + doesn't explicitly override, including nested env.*.""" + sections = { + "behaviors_defaults": { + "type": "behavior_class", + "interval": 250, + "env.simulation_mode": "replay", + "env.scenario_length": 201, + }, + "behaviors_hard_stop": { + "inherits": "behaviors_defaults", + "env.map_dir": "/tmp/hard_stop", + }, + } + cfg = _build_section_config("behaviors_hard_stop", sections["behaviors_hard_stop"], sections) + assert cfg["type"] == "behavior_class" + assert cfg["interval"] == 250 + assert cfg["env"]["simulation_mode"] == "replay" + assert cfg["env"]["scenario_length"] == 201 + assert cfg["env"]["map_dir"] == "/tmp/hard_stop" + + +def test_inheritance_three_levels(): + """Three-level chain (C → B → A): nearest ancestor wins per field; + grandparent fields survive when no descendant overrides them.""" + sections = { + "A": {"interval": 100, "env.scenario_length": 91, "env.map_dir": "/A"}, + "B": {"inherits": "A", "interval": 200, "env.scenario_length": 201}, + "C": {"inherits": "B", "env.map_dir": "/C", "render": True}, + } + cfg = _build_section_config("C", sections["C"], sections) + assert cfg["interval"] == 200, "B should win over A on interval" + assert cfg["env"]["scenario_length"] == 201, "B should win over A on scenario_length" + assert cfg["env"]["map_dir"] == "/C", "C should win over A and B on map_dir" + assert cfg["render"] is True, "C's own field" + + +def test_inheritance_self_cycle_detected(): + """A section that inherits from itself must raise rather than spin + forever in the chain walk.""" + sections = {"a": {"inherits": "a"}} + with pytest.raises(ValueError, match="Cyclic"): + _build_section_config("a", sections["a"], sections) + + +def test_inheritance_child_wins(): + """When child and parent both set the same key (top-level scalar and + nested env.*), the child's value should appear in the merged config.""" + sections = { + "parent": {"interval": 250, "env.scenario_length": 201}, + "child": {"inherits": "parent", "interval": 100, "env.scenario_length": 91}, + } + cfg = _build_section_config("child", sections["child"], sections) + assert cfg["interval"] == 100 + assert cfg["env"]["scenario_length"] == 91 + + +def test_inheritance_does_not_alias_parent_env(): + """Regression: _build_section_config used to merge by reference and mutate + the shared parent's env dict, so processing N siblings that all override + env.map_dir would leave every sibling reporting the LAST-processed sibling's + map_dir (the parent's env was mutated in place).""" + sections = { + "defaults": {"env.scenario_length": 201, "interval": 5}, + "child_a": {"inherits": "defaults", "env.map_dir": "/path/a"}, + "child_b": {"inherits": "defaults", "env.map_dir": "/path/b"}, + "child_c": {"inherits": "defaults", "env.map_dir": "/path/c"}, + } + cfg_a = _build_section_config("child_a", sections["child_a"], sections) + cfg_b = _build_section_config("child_b", sections["child_b"], sections) + cfg_c = _build_section_config("child_c", sections["child_c"], sections) + assert cfg_a["env"]["map_dir"] == "/path/a" + assert cfg_b["env"]["map_dir"] == "/path/b" + assert cfg_c["env"]["map_dir"] == "/path/c" + # The parent must remain untouched, even after building all children. + assert "map_dir" not in sections["defaults"].get("env", {}) + + +def test_inheritance_cycle_detected(): + """A two-section cycle (a→b→a) must raise rather than spin forever.""" + sections = { + "a": {"inherits": "b"}, + "b": {"inherits": "a"}, + } + with pytest.raises(ValueError, match="Cyclic"): + _build_section_config("a", sections["a"], sections) + + +def test_inheritance_unknown_parent(): + """`inherits = "nonexistent"` should fail loudly rather than silently + skip the missing parent.""" + sections = { + "child": {"inherits": "nonexistent"}, + } + with pytest.raises(ValueError, match="not a known section"): + _build_section_config("child", sections["child"], sections) + + +def test_clean_macro_applied_by_default(): + """`clean = true` (the default) injects every key from CLEAN_EVAL_OVERRIDES + into the merged env config, zeroing perturbations + enforcing red lights.""" + sections = {"foo": {"type": "multi_scenario"}} + cfg = _build_section_config("foo", sections["foo"], sections) + for k, v in CLEAN_EVAL_OVERRIDES.items(): + assert cfg["env"][k] == v + + +def test_clean_macro_disabled_when_clean_false(): + """`clean = false` opts out of the macro — none of the perturbation + knobs get injected; they fall back to whatever the train config has.""" + sections = {"foo": {"type": "multi_scenario", "clean": False}} + cfg = _build_section_config("foo", sections["foo"], sections) + for k in CLEAN_EVAL_OVERRIDES: + assert k not in cfg.get("env", {}) + + +def test_clean_macro_loses_to_explicit_override(): + """An explicit env.* value in the section beats the macro default for + that same key — useful when a particular eval wants to keep some + perturbation on for a targeted test.""" + sections = { + "foo": { + "type": "multi_scenario", + "env.lane_segment_dropout": 0.5, # explicit > macro default of 0.0 + } + } + cfg = _build_section_config("foo", sections["foo"], sections) + assert cfg["env"]["lane_segment_dropout"] == 0.5 + + +def test_manager_from_config_skips_template_sections(): + """Sections without a `type` field are templates (parents only) — they + should NOT be instantiated as Evaluators, only inherited from.""" + train_config = { + "eval": { + "behaviors_defaults": {"interval": 250, "env.scenario_length": 201}, + "behaviors_hard_stop": { + "type": "behavior_class", + "inherits": "behaviors_defaults", + "env.map_dir": "/tmp/hard_stop", + }, + }, + } + mgr = EvalManager.from_config(train_config) + names = [e.name for e in mgr.evaluators] + assert "behaviors_hard_stop" in names + assert "behaviors_defaults" not in names # template, no `type` field + + +def test_render_num_scenarios_inheritable(): + """eval.* keys inherit from parent template just like env.* keys do — + so a behaviors_defaults template's render budget is shared by every + child class without each having to re-declare it.""" + sections = { + "defaults": { + "type": "behavior_class", + "interval": 250, + "eval.num_scenarios": 50, + "eval.render_num_scenarios": 2, + }, + "hard_stop": { + "inherits": "defaults", + "env.map_dir": "/tmp/hard_stop", + }, + } + cfg = _build_section_config("hard_stop", sections["hard_stop"], sections) + assert cfg["eval"]["num_scenarios"] == 50 + assert cfg["eval"]["render_num_scenarios"] == 2 + + +def test_manager_unknown_type_raises(): + """A section with `type = ""` must fail loudly at + EvalManager construction rather than silently skipping the section.""" + train_config = {"eval": {"foo": {"type": "totally_made_up"}}} + with pytest.raises(ValueError, match="not registered"): + EvalManager.from_config(train_config) + + +def test_has_subprocess_evals_at(): + """has_subprocess_evals_at(epoch) should return True iff at least one + enabled subprocess-mode evaluator's interval divides the epoch — the + training loop uses this to decide whether to save_checkpoint() before + firing evals (subprocesses load the policy from disk).""" + train_config = { + "eval": { + "inline_one": {"type": "human_replay", "interval": 25, "mode": "inline"}, + "subprocess_one": {"type": "human_replay", "interval": 100, "mode": "subprocess"}, + "subprocess_disabled": { + "type": "human_replay", + "interval": 100, + "mode": "subprocess", + "enabled": False, + }, + } + } + mgr = EvalManager.from_config(train_config) + assert mgr.has_subprocess_evals_at(epoch=100) is True # subprocess_one fires + assert mgr.has_subprocess_evals_at(epoch=25) is False # only inline at 25 + assert mgr.has_subprocess_evals_at(epoch=50) is False # nothing at 50 + + +def test_latest_checkpoint_finds_newest_pt(tmp_path): + """latest_checkpoint should resolve to the most-recently-written .pt + under data_dir/_/models/ — subprocess evals depend on + this to load the freshest weights.""" + import time + + model_dir = tmp_path / "puffer_drive_run123" / "models" + model_dir.mkdir(parents=True) + p_old = model_dir / "model_puffer_drive_001.pt" + p_old.write_text("a") + time.sleep(0.05) + p_new = model_dir / "model_puffer_drive_002.pt" + p_new.write_text("b") + + train_config = {"data_dir": str(tmp_path), "eval": {}} + mgr = EvalManager.from_config(train_config, run_id="run123") + assert mgr.latest_checkpoint("puffer_drive") == str(p_new) + + +def test_latest_checkpoint_falls_back_to_load_model_path(tmp_path): + """When no checkpoint dir exists yet (resume-from-elsewhere before + first save), latest_checkpoint should return train_config['load_model_path'] + so the very first eval still has weights to evaluate.""" + train_config = { + "data_dir": str(tmp_path), + "load_model_path": "/some/resume/path.pt", + "eval": {}, + } + mgr = EvalManager.from_config(train_config, run_id="run123") + # No models dir exists → falls back to load_model_path + assert mgr.latest_checkpoint("puffer_drive") == "/some/resume/path.pt" + + +# -- Tier A: dispatch + invariants ----------------------------------------- + + +def test_maybe_run_dispatches_by_interval_and_enabled(monkeypatch): + """maybe_run should fire only enabled evaluators whose interval divides epoch.""" + train_config = { + "eval": { + "fires_at_25": {"type": "human_replay", "interval": 25}, + "fires_at_250": {"type": "human_replay", "interval": 250}, + "disabled": {"type": "human_replay", "interval": 25, "enabled": False}, + "zero_interval": {"type": "human_replay", "interval": 0}, + } + } + mgr = EvalManager.from_config(train_config) + + calls = [] + + def fake_run(ev, *, policy, env_name, logger, global_step, epoch): + calls.append(ev.name) + return EvalResult(metrics={}) + + monkeypatch.setattr(mgr, "_run_one", fake_run) + + mgr.maybe_run(epoch=25, policy=None, env_name="puffer_drive") + assert calls == ["fires_at_25"], "only the 25-interval evaluator fires at epoch 25" + calls.clear() + + mgr.maybe_run(epoch=250, policy=None, env_name="puffer_drive") + assert sorted(calls) == ["fires_at_25", "fires_at_250"], "both fire at epoch 250" + calls.clear() + + mgr.maybe_run(epoch=50, policy=None, env_name="puffer_drive") + assert calls == ["fires_at_25"], "only fires_at_25 at epoch 50; nothing else" + calls.clear() + + mgr.maybe_run(epoch=33, policy=None, env_name="puffer_drive") + assert calls == [], "nothing fires when no interval divides the epoch" + + +def test_flatten_infos_handles_shape_variations(): + """_flatten_infos must accept both list-of-list (multi-worker) and + flat-list (PufferEnv) info shapes, plus None / empty entries.""" + + class _Stub(Evaluator): + type_name = "_stub_flatten" + + def _should_stop(self, *args, **kwargs): + return True + + s = _Stub("test", {}, {}) + assert s._flatten_infos(None) == [] + assert s._flatten_infos([]) == [] + assert s._flatten_infos([None, None]) == [] + assert s._flatten_infos([[], []]) == [] + + d1, d2, d3 = {"a": 1}, {"b": 2}, {"c": 3} + # Multi-worker backend: list of per-worker info lists + assert s._flatten_infos([[d1], [d2]]) == [d1, d2] + assert s._flatten_infos([[d1, d2], [d3]]) == [d1, d2, d3] + # PufferEnv backend: flat list of info dicts + assert s._flatten_infos([d1, d2]) == [d1, d2] + + +def test_behavior_class_sets_num_eval_scenarios(tmp_path): + """BehaviorClassEvaluator must set num_eval_scenarios alongside + num_agents/num_maps. Without it, the C-side replay branch caps at + drive.py's default of 16, so any category with >16 bins (or any + eval.num_scenarios > 16 sampling target) silently truncates.""" + map_dir = tmp_path / "bins" + map_dir.mkdir() + for i in range(50): + (map_dir / f"map_{i}.bin").write_text("x") + + # Sampling branch: num_scenarios < total bins. + cfg_sampled = { + "type": "behavior_class", + "env": {"map_dir": str(map_dir)}, + "eval": {"num_scenarios": 50}, + } + ev_s = BehaviorClassEvaluator("sampled", cfg_sampled, train_config={}) + env_s = ev_s.env_overrides() + assert env_s["num_agents"] == 50 + assert env_s["num_maps"] == 50 + assert env_s["num_eval_scenarios"] == 50 + ev_s.cleanup() + + # All-bins branch: num_scenarios > total bins, no sampling. + cfg_full = { + "type": "behavior_class", + "env": {"map_dir": str(map_dir)}, + "eval": {"num_scenarios": 999}, + } + ev_f = BehaviorClassEvaluator("full", cfg_full, train_config={}) + env_f = ev_f.env_overrides() + assert env_f["num_agents"] == 50 + assert env_f["num_maps"] == 50 + assert env_f["num_eval_scenarios"] == 50 + + +def test_behavior_class_cleanup_removes_symlink_dir(tmp_path): + """BehaviorClassEvaluator builds a tmp symlink dir when sampling. + cleanup() must remove it; otherwise we accumulate leftovers.""" + map_dir = tmp_path / "bins" + map_dir.mkdir() + for i in range(5): + (map_dir / f"map_{i}.bin").write_text("a") + + config = { + "type": "behavior_class", + "env": {"map_dir": str(map_dir)}, + "eval": {"num_scenarios": 2}, + } + ev = BehaviorClassEvaluator("test_class", config, train_config={}) + + overrides = ev.env_overrides() + sampled = overrides["map_dir"] + assert sampled != str(map_dir), "sampling should redirect to a tmp dir" + assert os.path.isdir(sampled) + assert len([f for f in os.listdir(sampled) if f.endswith(".bin")]) == 2 + + ev.cleanup() + assert not os.path.exists(sampled), "tmp dir should be gone after cleanup" + assert ev._sampled_dir is None + + +def test_rollout_zeros_lstm_state_per_agent_on_done(monkeypatch): + """Per-agent LSTM reset on terminations or truncations. Either signal + means 'episode over, env reset' — the agent's next obs is from a fresh + scenario and stale recurrent memory would bias the policy.""" + import numpy as np + import torch + + import pufferlib.pytorch + from pufferlib.ocean.benchmark.evaluators.base import Evaluator + + state = {"lstm_h": torch.ones(4, 8), "lstm_c": torch.ones(4, 8)} + + class _Ev(Evaluator): + type_name = "_lstm_done" + + def _initial_reset(self, vecenv, args): + return vecenv.reset_obs + + def _init_lstm_state(self, num_agents, policy, device, args): + return state + + def _should_stop(self, args, infos_collected, steps): + return steps >= 1 + + class _Vec: + observation_space = type("S", (), {"shape": (4, 6)})() + action_space = type("A", (), {"shape": (4,), "low": -1.0, "high": 1.0})() + reset_obs = np.zeros((4, 6), dtype=np.float32) + + def step(self, action): + # Agents 0,2 truncated; 1 terminated; 3 alive. + return self.reset_obs, np.zeros(4), np.array([0, 1, 0, 0]), np.array([1, 0, 1, 0]), [] + + class _Policy: + def forward_eval(self, ob, state): + return torch.zeros(ob.shape[0], 1), None + + # Bypass sample_logits's distribution-shape gymnastics — return a + # placeholder action; we only care about the post-step state masking. + monkeypatch.setattr( + pufferlib.pytorch, + "sample_logits", + lambda logits, deterministic=True: (torch.zeros(4, dtype=torch.long), None, None), + ) + + args = {"train": {"device": "cpu", "use_rnn": True}} + _Ev("done_test", {}, args)._run_rollout_loop(_Vec(), _Policy(), args) + + # Done agents (0, 1, 2) zeroed; alive agent (3) untouched. + assert state["lstm_h"][0].sum().item() == 0 + assert state["lstm_h"][1].sum().item() == 0 + assert state["lstm_h"][2].sum().item() == 0 + assert state["lstm_h"][3].sum().item() == 8 + assert state["lstm_c"][3].sum().item() == 8 + + +def test_rollout_records_eval_seconds(): + """Every rollout's metrics dict should include `eval_seconds` so wandb + panels show wall-clock cost per evaluator.""" + import time as _time + + class _Stub(Evaluator): + type_name = "_stub_timing" + + def _run_rollout_loop(self, vecenv, policy, args): + _time.sleep(0.02) # forced floor so the recorded time is > 0 + return {"some_metric": 1.5} + + s = _Stub("test", {}, {}) + result = s.rollout(vecenv=None, policy=None, args={}) + assert "eval_seconds" in result.metrics + assert result.metrics["eval_seconds"] >= 0.02 + assert result.metrics["some_metric"] == 1.5 + + +def test_eval_args_compose_train_section_and_clean_macro(): + """_build_eval_args must fold train_config['env'] (baseline) + + section overrides + clean macro correctly. Section beats baseline, + explicit beats clean macro, baseline survives when not overridden.""" + train_config = { + "env": { + "lane_segment_dropout": 0.5, # training perturbation + "scenario_length": 91, + "num_agents": 1024, # only present in train baseline + }, + "train": {"seed": 42, "device": "cpu"}, + "eval": { + "validation": { + "type": "multi_scenario", + "interval": 25, + "env.scenario_length": 201, # section overrides baseline + # clean=true (default) → lane_segment_dropout zeroed by macro + # num_agents not specified → falls through to train baseline + }, + }, + } + mgr = EvalManager.from_config(train_config) + ev = mgr.evaluators[0] + args = mgr._build_eval_args(ev, env_name="puffer_drive", global_step=0) + + assert args["env"]["scenario_length"] == 201, "section override wins" + assert args["env"]["lane_segment_dropout"] == 0.0, "clean macro applied" + assert args["env"]["num_agents"] == 1024, "train baseline preserved"