From d66f8da374d4d1e94c646b7e941ea42ad2ea2362 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Sat, 18 Apr 2026 09:42:08 -0400 Subject: [PATCH 01/12] Adding SharedMapCache --- pufferlib/ocean/drive/binding.c | 230 ++++++++++++++------ pufferlib/ocean/drive/drive.h | 359 ++++++++++++++++++++++++++++++-- pufferlib/ocean/drive/drive.py | 8 + 3 files changed, 516 insertions(+), 81 deletions(-) diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index b2a4c20b1..83d2ae971 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -1,3 +1,4 @@ +#include #include "drive.h" #define Env Drive #define MY_SHARED @@ -5,6 +6,66 @@ #define MY_GET #include "../env_binding.h" +// Process-local map cache: indexed by map_id, populated by my_shared(), used by my_init(). +static SharedMapData **g_map_cache = NULL; +static int g_map_cache_size = 0; +static pid_t g_map_cache_pid = 0; +// Cache key: params that affect SharedMapData content (obs dists determine vision_range) +static float g_cache_road_obs_front_dist = 0; +static float g_cache_road_obs_behind_dist = 0; +static float g_cache_road_obs_side_dist = 0; +static char **g_cache_map_paths = NULL; + +static void reset_cache_globals(void) { + g_map_cache = NULL; + g_map_cache_size = 0; + g_map_cache_pid = 0; + g_cache_road_obs_front_dist = 0; + g_cache_road_obs_behind_dist = 0; + g_cache_road_obs_side_dist = 0; + g_cache_map_paths = NULL; +} + +static void release_map_cache_internal(void) { + if (g_map_cache == NULL) + return; + // After fork, child inherits g_map_cache pointers via copy-on-write. + // We must NOT free them — they belong to the parent's address space. + // Discard them and let the child rebuild its own cache on the next call. + if (g_map_cache_pid != 0 && g_map_cache_pid != getpid()) { + reset_cache_globals(); + return; + } + // Refuse to release if any envs still hold a reference + for (int i = 0; i < g_map_cache_size; i++) { + if (g_map_cache[i] != NULL && g_map_cache[i]->ref_count > 0) { + fprintf(stderr, + "ERROR: cannot release map cache — entry %d still has %d live env(s). " + "Close all Drive instances before changing map config.\n", + i, g_map_cache[i]->ref_count); + return; + } + } + for (int i = 0; i < g_map_cache_size; i++) { + if (g_map_cache[i] != NULL) + free_shared_map_data(g_map_cache[i]); + } + free(g_map_cache); + if (g_cache_map_paths != NULL) { + for (int i = 0; i < g_map_cache_size; i++) + free(g_cache_map_paths[i]); + free(g_cache_map_paths); + } + reset_cache_globals(); +} + +static PyObject *release_map_cache_py(PyObject *self, PyObject *args) { + release_map_cache_internal(); + Py_RETURN_NONE; +} + +#define MY_METHODS {"release_map_cache", release_map_cache_py, METH_VARARGS, "Release the shared map data cache"} + static int my_put(Env *env, PyObject *args, PyObject *kwargs) { PyObject *obs = PyDict_GetItemString(kwargs, "observations"); if (!PyObject_TypeCheck(obs, &PyArray_Type)) { @@ -1544,6 +1605,9 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { int max_agents_per_env = unpack(kwargs, "max_agents_per_env"); float goal_radius = (float)unpack(kwargs, "goal_radius"); int num_eval_scenarios = unpack(kwargs, "num_eval_scenarios"); + float road_obs_front_dist = (float)unpack(kwargs, "road_obs_front_dist"); + float road_obs_behind_dist = (float)unpack(kwargs, "road_obs_behind_dist"); + float road_obs_side_dist = (float)unpack(kwargs, "road_obs_side_dist"); if (min_agents_per_env <= 0 || max_agents_per_env <= 0) { PyErr_SetString(PyExc_ValueError, "min_agents_per_env and max_agents_per_env must be > 0"); return NULL; @@ -1559,7 +1623,44 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { srand(seed); - // GIGAFLOW mode: use random sampling for agent counts per env + // Reuse the existing cache if this process created it with matching config. + // Cache key: PID, num_maps, map file paths, obs dist params (determine vision_range). + int reuse_cache = + (g_map_cache != NULL && g_map_cache_pid == getpid() && g_map_cache_size == num_maps && + g_cache_road_obs_front_dist == road_obs_front_dist && g_cache_road_obs_behind_dist == road_obs_behind_dist && + g_cache_road_obs_side_dist == road_obs_side_dist); + if (reuse_cache && g_cache_map_paths != NULL) { + for (int i = 0; i < num_maps; i++) { + const char *path = PyUnicode_AsUTF8(PyList_GetItem(map_files, i)); + if (g_cache_map_paths[i] == NULL || strcmp(g_cache_map_paths[i], path) != 0) { + reuse_cache = 0; + break; + } + } + } + if (!reuse_cache) { + release_map_cache_internal(); + if (g_map_cache != NULL) { + PyErr_SetString(PyExc_RuntimeError, + "Cannot change map cache config while Drive environments are still open. " + "Call close() on all Drive instances first."); + return NULL; + } + g_map_cache_size = num_maps; + g_map_cache = (SharedMapData **)calloc(num_maps, sizeof(SharedMapData *)); + g_map_cache_pid = getpid(); + g_cache_road_obs_front_dist = road_obs_front_dist; + g_cache_road_obs_behind_dist = road_obs_behind_dist; + g_cache_road_obs_side_dist = road_obs_side_dist; + g_cache_map_paths = (char **)calloc(num_maps, sizeof(char *)); + for (int i = 0; i < num_maps; i++) { + const char *path = PyUnicode_AsUTF8(PyList_GetItem(map_files, i)); + g_cache_map_paths[i] = strdup(path); + } + } + + // GIGAFLOW mode: agent counts are numeric, no binary loading needed for counting. + // We do lazily populate the cache so my_init can call init_from_shared. if (simulation_mode == SIMULATION_GIGAFLOW) { if (eval_mode) { // Eval mode: fixed agent count, sequential map cycling @@ -1573,10 +1674,17 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { int offset = 0; for (int i = 0; i < env_count; i++) { + int map_id_g = (s_map_counter + i) % num_maps; PyList_SetItem(agent_offsets, i, PyLong_FromLong(offset)); - PyList_SetItem(map_ids_list, i, PyLong_FromLong((s_map_counter + i) % num_maps)); + PyList_SetItem(map_ids_list, i, PyLong_FromLong(map_id_g)); int remaining = num_agents - offset; offset += (remaining < agents_per_env) ? remaining : agents_per_env; + // Lazily populate cache for assigned map + if (g_map_cache[map_id_g] == NULL) { + const char *map_file_path = PyUnicode_AsUTF8(PyList_GetItem(map_files, map_id_g)); + g_map_cache[map_id_g] = create_shared_map_data(map_file_path, road_obs_front_dist, + road_obs_behind_dist, road_obs_side_dist); + } } PyList_SetItem(agent_offsets, env_count, PyLong_FromLong(offset)); @@ -1597,23 +1705,13 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { if (remaining <= max_agents_per_env) { count = remaining; } else { - // 1. We must leave at least min_agents_per_env for the future. int absolute_max_allowed = remaining - min_agents_per_env; - - // 2. We cannot take more than max_agents_per_env right now. int current_upper_bound = (absolute_max_allowed < max_agents_per_env) ? absolute_max_allowed : max_agents_per_env; - - // 3. We must take at least min_agents_per_env right now. int current_lower_bound = min_agents_per_env; - - // Safety check: if constraints are tight, lower might equal upper. - // If absolute_max_allowed < min_lower_bound for example leading to - // current_upper_bound < current_lower_bound if (current_upper_bound <= current_lower_bound) { count = current_lower_bound; } else { - // Now the range is guaranteed to be positive. int range = current_upper_bound - current_lower_bound + 1; count = current_lower_bound + (rand() % range); } @@ -1628,9 +1726,16 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { int offset = 0; for (int i = 0; i < env_count; i++) { + int map_id_g = rand() % num_maps; PyList_SetItem(agent_offsets, i, PyLong_FromLong(offset)); - PyList_SetItem(map_ids_list, i, PyLong_FromLong(rand() % num_maps)); + PyList_SetItem(map_ids_list, i, PyLong_FromLong(map_id_g)); offset += agent_counts[i]; + // Lazily populate cache for assigned map + if (g_map_cache[map_id_g] == NULL) { + const char *map_file_path = PyUnicode_AsUTF8(PyList_GetItem(map_files, map_id_g)); + g_map_cache[map_id_g] = create_shared_map_data(map_file_path, road_obs_front_dist, road_obs_behind_dist, + road_obs_side_dist); + } } PyList_SetItem(agent_offsets, env_count, PyLong_FromLong(num_agents)); @@ -1643,7 +1748,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { return tuple; } - // REPLAY mode - existing logic with max_agents_per_env cap + // REPLAY mode: use SharedMapData cache for agent counting (avoids per-env binary load) int total_agent_count = 0; int map_id = 0; int env_count = 0; @@ -1664,62 +1769,56 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { if (eval_mode) { map_id = s_map_counter % num_maps; - s_map_counter += 1; // This increments towards end_map_index + s_map_counter += 1; } else { map_id = rand() % num_maps; } - const char *map_file = PyUnicode_AsUTF8(PyList_GetItem(map_files, map_id)); - - Drive *env = calloc(1, sizeof(Drive)); - env->init_mode = init_mode; - env->control_mode = control_mode; - env->simulation_mode = simulation_mode; - env->init_steps = init_steps; - env->num_max_agents = max_agents_per_env; - env->goal_radius = goal_radius; - load_map_binary(map_file, env); - - set_active_agents(env); - - // Skip map if it doesn't contain any controllable agents - if (env->active_agent_count == 0) { + // Lazily populate the shared map cache for this map_id + if (g_map_cache[map_id] == NULL) { + const char *map_file_path = PyUnicode_AsUTF8(PyList_GetItem(map_files, map_id)); + g_map_cache[map_id] = + create_shared_map_data(map_file_path, road_obs_front_dist, road_obs_behind_dist, road_obs_side_dist); + } + SharedMapData *shared = g_map_cache[map_id]; + + // Count active agents using a lightweight temp env (no binary reload) + Drive temp_env = {0}; + temp_env.init_mode = init_mode; + temp_env.control_mode = control_mode; + temp_env.simulation_mode = simulation_mode; + temp_env.init_steps = init_steps; + temp_env.num_max_agents = max_agents_per_env; + temp_env.goal_radius = goal_radius; + temp_env.agents = shared->template_agents; + temp_env.num_total_agents = shared->num_total_agents; + temp_env.grid_map = shared->grid_map; + set_active_agents(&temp_env); + int active_count = temp_env.active_agent_count; + free(temp_env.active_agent_indices); + free(temp_env.static_agent_indices); + free(temp_env.expert_static_agent_indices); + + // Skip map if it has no controllable agents + if (active_count == 0) { maps_checked++; - for (int j = 0; j < env->num_total_agents; j++) - free_agent(&env->agents[j]); - for (int j = 0; j < env->num_road_elements; j++) - free_road_element(&env->road_elements[j]); - for (int j = 0; j < env->num_traffic_elements; j++) - free_traffic_element(&env->traffic_elements[j]); - free(env->agents); - free(env->road_elements); - free(env->traffic_elements); - free(env->active_agent_indices); - free(env->static_agent_indices); - free(env->expert_static_agent_indices); - free(env); + if (maps_checked >= num_maps) { + Py_DECREF(agent_offsets); + Py_DECREF(map_ids); + char error_msg[256]; + snprintf(error_msg, sizeof(error_msg), + "No maps with controllable agents found after checking all %d maps.", num_maps); + PyErr_SetString(PyExc_ValueError, error_msg); + return NULL; + } continue; } - // Store map_id + // Store map_id and agent offset PyList_SetItem(map_ids, env_count, PyLong_FromLong(map_id)); - // Store agent offset PyList_SetItem(agent_offsets, env_count, PyLong_FromLong(total_agent_count)); - total_agent_count += env->active_agent_count; + total_agent_count += active_count; env_count++; - for (int j = 0; j < env->num_total_agents; j++) - free_agent(&env->agents[j]); - for (int j = 0; j < env->num_road_elements; j++) - free_road_element(&env->road_elements[j]); - for (int j = 0; j < env->num_traffic_elements; j++) - free_traffic_element(&env->traffic_elements[j]); - free(env->agents); - free(env->road_elements); - free(env->traffic_elements); - free(env->active_agent_indices); - free(env->static_agent_indices); - free(env->expert_static_agent_indices); - free(env); } if (total_agent_count >= num_agents) { @@ -1810,6 +1909,19 @@ static int my_init(Env *env, PyObject *args, PyObject *kwargs) { env->phantom_braking_trigger_prob = (float)unpack(kwargs, "phantom_braking_trigger_prob"); env->phantom_braking_duration = (int)unpack(kwargs, "phantom_braking_duration"); + // Use shared map cache if map_id is provided and cache entry exists + PyObject *map_id_obj = kwargs ? PyDict_GetItemString(kwargs, "map_id") : NULL; + if (map_id_obj != NULL && g_map_cache != NULL) { + int map_id = (int)PyLong_AsLong(map_id_obj); + if (map_id >= 0 && map_id < g_map_cache_size && g_map_cache[map_id] != NULL) { + init_from_shared(env, g_map_cache[map_id]); + return 0; + } + // Cache miss: warn and fall through to disk loading + fprintf(stderr, "WARNING: map_id=%d provided but shared map cache miss — loading from disk\n", map_id); + } + + // Fallback: load map from disk (standalone use, tests, or cache miss) init(env); return 0; } diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index af51ec260..64475b88a 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -248,6 +248,48 @@ struct GridMap { int num_drivable_grid_cell; }; +// Shared map data: loaded once per unique map file, referenced by multiple Drive envs. +// Road geometry, grid map, and spatial index are read-only after create_shared_map_data(). +// Reference-counted: freed by free_shared_map_data() when no longer needed. +typedef struct SharedMapData SharedMapData; +struct SharedMapData { + // Road geometry (read-only, never modified after load) + RoadMapElement *road_elements; + int num_road_elements; + + // Spatial index (read-only after init_grid_map + cache_neighbor_offsets) + GridMap *grid_map; + int *neighbor_offsets; + + // Lane graph (read-only after load) + struct LaneGraph lane_graph; + + // Scenario metadata + char scenario_id[128]; + char dataset_name[32]; + int log_length; + float log_dt; + float world_mean_x; + float world_mean_y; + + // Template traffic elements (always cloned per-env so GIGAFLOW can rewrite states) + TrafficControlElement *template_traffic_elements; + int num_traffic_elements; + + // Template agents (deep-cloned per-env for REPLAY; unused placeholder for GIGAFLOW) + Agent *template_agents; + int num_total_agents; + int num_objects; + + // Small per-scenario arrays (copied into each env, not shared) + int *objects_of_interest; + int num_objects_of_interest; + int *tracks_to_predict; + int num_tracks_to_predict; + + int ref_count; +}; + struct Drive { Client *client; // Render mode: RENDER_WINDOW (0) for interactive viewer, RENDER_HEADLESS (1) @@ -367,6 +409,9 @@ struct Drive { float road_obs_front_dist; float road_obs_behind_dist; float road_obs_side_dist; + + // NULL if this env owns its map data; non-NULL if using shared map cache. + SharedMapData *shared_map; }; // ======================================== @@ -3288,9 +3333,266 @@ void remove_bad_trajectories(Drive *env) { env->timestep = 0; } +// ======================================== +// Shared Map Cache Functions +// ======================================== + +void free_shared_map_data(SharedMapData *shared) { + if (shared == NULL) + return; + + for (int i = 0; i < shared->num_road_elements; i++) + free_road_element(&shared->road_elements[i]); + free(shared->road_elements); + + for (int i = 0; i < shared->num_traffic_elements; i++) + free_traffic_element(&shared->template_traffic_elements[i]); + free(shared->template_traffic_elements); + + int grid_cell_count = shared->grid_map->grid_cols * shared->grid_map->grid_rows; + for (int i = 0; i < grid_cell_count; i++) + free(shared->grid_map->cells[i]); + free(shared->grid_map->cells); + free(shared->grid_map->cell_entities_count); + free(shared->grid_map->grid_index_drivable); + free(shared->neighbor_offsets); + for (int i = 0; i < grid_cell_count; i++) + free(shared->grid_map->neighbor_cache_entities[i]); + free(shared->grid_map->neighbor_cache_entities); + free(shared->grid_map->neighbor_cache_count); + free(shared->grid_map); + + free_lane_graph(&shared->lane_graph); + + for (int i = 0; i < shared->num_total_agents; i++) + free_agent(&shared->template_agents[i]); + free(shared->template_agents); + + free(shared->objects_of_interest); + free(shared->tracks_to_predict); + + free(shared); +} + +SharedMapData *create_shared_map_data(const char *map_file_path, float road_obs_front_dist, float road_obs_behind_dist, + float road_obs_side_dist) { + SharedMapData *shared = (SharedMapData *)calloc(1, sizeof(SharedMapData)); + + Drive temp = {0}; + temp.road_obs_front_dist = road_obs_front_dist; + temp.road_obs_behind_dist = road_obs_behind_dist; + temp.road_obs_side_dist = road_obs_side_dist; + + load_map_binary(map_file_path, &temp); + + // Transfer ownership from temp to shared + shared->road_elements = temp.road_elements; + shared->num_road_elements = temp.num_road_elements; + shared->template_traffic_elements = temp.traffic_elements; + shared->num_traffic_elements = temp.num_traffic_elements; + shared->template_agents = temp.agents; + shared->num_total_agents = temp.num_total_agents; + shared->num_objects = temp.num_objects; + shared->lane_graph = temp.lane_graph; + memcpy(shared->scenario_id, temp.scenario_id, sizeof(temp.scenario_id)); + memcpy(shared->dataset_name, temp.dataset_name, sizeof(temp.dataset_name)); + shared->log_length = temp.log_length; + shared->log_dt = temp.log_dt; + shared->world_mean_x = temp.world_mean_x; + shared->world_mean_y = temp.world_mean_y; + shared->objects_of_interest = temp.objects_of_interest; + shared->num_objects_of_interest = temp.num_objects_of_interest; + shared->tracks_to_predict = temp.tracks_to_predict; + shared->num_tracks_to_predict = temp.num_tracks_to_predict; + + // Build grid map and spatial index (temp.road_elements is now shared->road_elements) + init_grid_map(&temp); + int vision_half_range = + (int)ceilf(fmaxf(fmaxf(road_obs_front_dist, road_obs_behind_dist), road_obs_side_dist) / GRID_CELL_SIZE); + temp.grid_map->vision_range = 2 * vision_half_range + 1; + init_neighbor_offsets(&temp); + cache_neighbor_offsets(&temp); + shared->grid_map = temp.grid_map; + shared->neighbor_offsets = temp.neighbor_offsets; + + shared->ref_count = 0; + return shared; +} + +void init_from_shared(Drive *env, SharedMapData *shared) { + env->human_agent_idx = 0; + env->timestep = 0; + env->shared_map = shared; + shared->ref_count++; + + // Point to shared read-only map data + env->road_elements = shared->road_elements; + env->num_road_elements = shared->num_road_elements; + env->grid_map = shared->grid_map; + env->neighbor_offsets = shared->neighbor_offsets; + env->lane_graph = shared->lane_graph; // struct copy; pointers owned by SharedMapData + env->world_mean_x = shared->world_mean_x; + env->world_mean_y = shared->world_mean_y; + env->log_length = shared->log_length; + env->log_dt = shared->log_dt; + memcpy(env->scenario_id, shared->scenario_id, sizeof(env->scenario_id)); + memcpy(env->dataset_name, shared->dataset_name, sizeof(env->dataset_name)); + env->num_total_agents = shared->num_total_agents; + env->num_objects = shared->num_objects; + + // Deep-copy small per-scenario arrays so c_close can always free them + if (shared->num_objects_of_interest > 0) { + env->objects_of_interest = (int *)malloc(shared->num_objects_of_interest * sizeof(int)); + memcpy(env->objects_of_interest, shared->objects_of_interest, shared->num_objects_of_interest * sizeof(int)); + } else { + env->objects_of_interest = NULL; + } + env->num_objects_of_interest = shared->num_objects_of_interest; + + if (shared->num_tracks_to_predict > 0) { + env->tracks_to_predict = (int *)malloc(shared->num_tracks_to_predict * sizeof(int)); + memcpy(env->tracks_to_predict, shared->tracks_to_predict, shared->num_tracks_to_predict * sizeof(int)); + } else { + env->tracks_to_predict = NULL; + } + env->num_tracks_to_predict = shared->num_tracks_to_predict; + + // Clone traffic elements per-env (GIGAFLOW rewrites states; REPLAY just reads them) + env->num_traffic_elements = shared->num_traffic_elements; + if (shared->num_traffic_elements > 0) { + env->traffic_elements = + (TrafficControlElement *)calloc(shared->num_traffic_elements, sizeof(TrafficControlElement)); + for (int i = 0; i < shared->num_traffic_elements; i++) { + TrafficControlElement *src = &shared->template_traffic_elements[i]; + TrafficControlElement *dst = &env->traffic_elements[i]; + *dst = *src; + if (src->num_controlled_lanes > 0) { + dst->controlled_lanes = (int *)malloc(src->num_controlled_lanes * sizeof(int)); + memcpy(dst->controlled_lanes, src->controlled_lanes, src->num_controlled_lanes * sizeof(int)); + } else { + dst->controlled_lanes = NULL; + } + if (src->state_length > 0) { + dst->states = (int *)malloc(src->state_length * sizeof(int)); + memcpy(dst->states, src->states, src->state_length * sizeof(int)); + } else { + dst->states = NULL; + } + } + } else { + env->traffic_elements = NULL; + } + + // Clone agents for REPLAY (each env needs independent mutable sim state). + // For GIGAFLOW, set_active_agents will allocate agents from scratch via spawn_agent. + if (env->simulation_mode == SIMULATION_REPLAY && shared->num_total_agents > 0) { + env->agents = (Agent *)calloc(shared->num_total_agents, sizeof(Agent)); + for (int i = 0; i < shared->num_total_agents; i++) { + Agent *src = &shared->template_agents[i]; + Agent *dst = &env->agents[i]; + *dst = *src; + int tlen = src->trajectory_length; + dst->log_trajectory_x = (float *)malloc(tlen * sizeof(float)); + dst->log_trajectory_y = (float *)malloc(tlen * sizeof(float)); + dst->log_trajectory_z = (float *)malloc(tlen * sizeof(float)); + dst->log_heading = (float *)malloc(tlen * sizeof(float)); + dst->log_velocity_x = (float *)malloc(tlen * sizeof(float)); + dst->log_velocity_y = (float *)malloc(tlen * sizeof(float)); + dst->log_length = (float *)malloc(tlen * sizeof(float)); + dst->log_width = (float *)malloc(tlen * sizeof(float)); + dst->log_height = (float *)malloc(tlen * sizeof(float)); + dst->log_valid = (int *)malloc(tlen * sizeof(int)); + memcpy(dst->log_trajectory_x, src->log_trajectory_x, tlen * sizeof(float)); + memcpy(dst->log_trajectory_y, src->log_trajectory_y, tlen * sizeof(float)); + memcpy(dst->log_trajectory_z, src->log_trajectory_z, tlen * sizeof(float)); + memcpy(dst->log_heading, src->log_heading, tlen * sizeof(float)); + memcpy(dst->log_velocity_x, src->log_velocity_x, tlen * sizeof(float)); + memcpy(dst->log_velocity_y, src->log_velocity_y, tlen * sizeof(float)); + memcpy(dst->log_length, src->log_length, tlen * sizeof(float)); + memcpy(dst->log_width, src->log_width, tlen * sizeof(float)); + memcpy(dst->log_height, src->log_height, tlen * sizeof(float)); + memcpy(dst->log_valid, src->log_valid, tlen * sizeof(int)); + if (src->route_length > 0) { + dst->route = (int *)malloc(src->route_length * sizeof(int)); + memcpy(dst->route, src->route, src->route_length * sizeof(int)); + } else { + dst->route = NULL; + } + dst->path = NULL; // path is computed at runtime, not from template + } + } else { + env->agents = NULL; + env->num_total_agents = 0; + } + + // Per-env observation flag (depends on per-env obs segment count params) + env->road_dropout_enabled = (env->obs_lane_segment_count < env->max_lane_segment_observations) || + (env->obs_boundary_segment_count < env->max_boundary_segment_observations); + + // Run per-env init logic (agent selection, start positions, etc.) + env->logs_capacity = 0; + set_active_agents(env); + env->logs_capacity = env->active_agent_count; + if (env->simulation_mode == SIMULATION_REPLAY) { + remove_bad_trajectories(env); + } + set_start_position(env); + if (env->simulation_mode == SIMULATION_GIGAFLOW) { + int steps = env->scenario_length; + if (steps > 0) { + for (int i = 0; i < env->num_traffic_elements; i++) { + TrafficControlElement *traffic = &env->traffic_elements[i]; + if (traffic->type != TRAFFIC_CONTROL_TYPE_TRAFFIC_LIGHT) + continue; + if (traffic->states && traffic->state_length != steps) { + free(traffic->states); + traffic->states = NULL; + } + if (traffic->states == NULL) { + traffic->states = (int *)malloc(steps * sizeof(int)); + if (traffic->states == NULL) { + traffic->state_length = 0; + continue; + } + } + traffic->state_length = steps; + } + } + generate_traffic_light_states(env); + } + env->logs = (Log *)calloc(env->active_agent_count, sizeof(Log)); + + if (env->simulation_mode == SIMULATION_REPLAY) { + for (int i = 0; i < env->active_agent_count; i++) { + int agent_idx = env->active_agent_indices[i]; + Agent *agent = &env->agents[agent_idx]; + int start = env->init_steps > 0 ? env->init_steps : 0; + int remaining = agent->trajectory_length - 1 - start; + if (remaining < 1) + remaining = 1; + int num_wp = env->num_target_waypoints; + if (num_wp > MAX_TARGET_WAYPOINTS) + num_wp = MAX_TARGET_WAYPOINTS; + for (int g = 0; g < num_wp; g++) { + int t = start + (g + 1) * remaining / num_wp; + if (t >= agent->trajectory_length) + t = agent->trajectory_length - 1; + agent->goal_positions_x[g] = agent->log_trajectory_x[t]; + agent->goal_positions_y[g] = agent->log_trajectory_y[t]; + agent->goal_positions_z[g] = agent->log_trajectory_z[t]; + } + agent->current_goal_idx = 0; + agent->goal_position_x = agent->goal_positions_x[0]; + agent->goal_position_y = agent->goal_positions_y[0]; + agent->goal_position_z = agent->goal_positions_z[0]; + } + } +} + void init(Drive *env) { env->human_agent_idx = 0; env->timestep = 0; + env->shared_map = NULL; load_map_binary(env->map_name, env); env->road_dropout_enabled = (env->obs_lane_segment_count < env->max_lane_segment_observations) || (env->obs_boundary_segment_count < env->max_boundary_segment_observations); @@ -3365,38 +3667,51 @@ void init(Drive *env) { } void c_close(Drive *env) { + // Per-env agent data (always owned by the env) for (int i = 0; i < env->num_total_agents; i++) free_agent(&env->agents[i]); - for (int i = 0; i < env->num_road_elements; i++) - free_road_element(&env->road_elements[i]); + free(env->agents); + + // Traffic elements are always cloned per-env (see init_from_shared) for (int i = 0; i < env->num_traffic_elements; i++) free_traffic_element(&env->traffic_elements[i]); - free(env->agents); - free(env->road_elements); free(env->traffic_elements); - free(env->active_agent_indices); - free(env->logs); - // GridMap cleanup - int grid_cell_count = env->grid_map->grid_cols * env->grid_map->grid_rows; - for (int grid_index = 0; grid_index < grid_cell_count; grid_index++) { - free(env->grid_map->cells[grid_index]); - } - free(env->grid_map->cells); - free(env->grid_map->cell_entities_count); - free(env->grid_map->grid_index_drivable); - free(env->neighbor_offsets); - for (int i = 0; i < grid_cell_count; i++) { - free(env->grid_map->neighbor_cache_entities[i]); - } - free(env->grid_map->neighbor_cache_entities); - free(env->grid_map->neighbor_cache_count); - free(env->grid_map); + free(env->active_agent_indices); free(env->static_agent_indices); free(env->expert_static_agent_indices); + + // objects_of_interest and tracks_to_predict are always per-env copies free(env->objects_of_interest); free(env->tracks_to_predict); - free_lane_graph(&env->lane_graph); + free(env->logs); + + if (env->shared_map != NULL) { + // road_elements, grid_map, neighbor_offsets, lane_graph are owned by SharedMapData + env->shared_map->ref_count--; + env->shared_map = NULL; + } else { + // This env owns all map data — free it + for (int i = 0; i < env->num_road_elements; i++) + free_road_element(&env->road_elements[i]); + free(env->road_elements); + + int grid_cell_count = env->grid_map->grid_cols * env->grid_map->grid_rows; + for (int grid_index = 0; grid_index < grid_cell_count; grid_index++) + free(env->grid_map->cells[grid_index]); + free(env->grid_map->cells); + free(env->grid_map->cell_entities_count); + free(env->grid_map->grid_index_drivable); + free(env->neighbor_offsets); + for (int i = 0; i < grid_cell_count; i++) + free(env->grid_map->neighbor_cache_entities[i]); + free(env->grid_map->neighbor_cache_entities); + free(env->grid_map->neighbor_cache_count); + free(env->grid_map); + + free_lane_graph(&env->lane_graph); + } + free(env->map_name); free(env->ini_file); } diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 7dc537ca5..946458f12 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -305,6 +305,9 @@ def __init__( max_agents_per_env=self.max_agents_per_env, num_eval_scenarios=self.current_num_eval_scenarios, # Use the dynamic size here goal_radius=self.goal_radius, + road_obs_front_dist=self.road_obs_front_dist, + road_obs_behind_dist=self.road_obs_behind_dist, + road_obs_side_dist=self.road_obs_side_dist, ) # In eval mode, don't wrap counter - allows termination condition to work correctly self.starting_map_counter = self.starting_map_counter + num_envs @@ -327,6 +330,7 @@ def __init__( self.masks[cur:nxt], self.random_seed, **self._env_init_kwargs(self.map_files[map_ids[i]], nxt - cur), + map_id=map_ids[i], ) env_ids.append(env_id) @@ -458,6 +462,9 @@ def step(self, actions): min_agents_per_env=self.min_agents_per_env, max_agents_per_env=self.max_agents_per_env, num_eval_scenarios=self.current_num_eval_scenarios, # Use the dynamic size here + road_obs_front_dist=self.road_obs_front_dist, + road_obs_behind_dist=self.road_obs_behind_dist, + road_obs_side_dist=self.road_obs_side_dist, ) # In eval mode, don't wrap counter - allows termination condition to work correctly @@ -474,6 +481,7 @@ def step(self, actions): self.truncations[cur:nxt], self.masks[cur:nxt], self.random_seed, + map_id=map_ids[i], **self._env_init_kwargs(self.map_files[map_ids[i]], nxt - cur), ) env_ids.append(env_id) From b5913cfb63d340403b219c7309bb2c02d3b7252a Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Sat, 18 Apr 2026 10:15:42 -0400 Subject: [PATCH 02/12] Making in-process rendering --- pufferlib/ocean/drive/drive.py | 17 ++- pufferlib/pufferl.py | 223 ++++++++++++++++++++++++++------- 2 files changed, 192 insertions(+), 48 deletions(-) diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 946458f12..2d14e33e5 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -6,9 +6,16 @@ import struct import os import pufferlib +from enum import IntEnum from pufferlib.ocean.drive import binding +class RenderView(IntEnum): + FULL_SIM_STATE = 0 # Fixed bird's-eye-ish perspective (legacy default) + BEV_AGENT_OBS = 1 # Top-down ortho centered on ego agent at vision-range zoom + TOPDOWN_SIM = 2 # Full-map orthographic top-down + + def compute_effective_road_obs_count(max_count, dropout): if max_count <= 0: return 0 @@ -584,16 +591,16 @@ def get_road_edge_polylines(self): return polylines - def render(self, env_idx=0, view_mode=0): - # view_mode: 0=default fixed perspective, 1=BEV ego-centered ortho. + def render(self, view_mode=RenderView.FULL_SIM_STATE, env_id=0): + # view_mode: 0=default fixed perspective, 1=BEV ego-centered ortho, 2=full-map topdown. # See VIEW_MODE_* defines in pufferlib/ocean/drive/render.h. - binding.vec_render(self.c_envs, view_mode, env_idx) + binding.vec_render(self.c_envs, int(view_mode), env_id) - def set_video_suffix(self, suffix, env_idx=0): + def set_video_suffix(self, suffix, env_id=0): # Append `suffix` to the next mp4 filename for the given env. # Must be called BEFORE the first render of a rollout because # make_client reads env->video_suffix when forking ffmpeg. - binding.vec_set_video_suffix(self.c_envs, suffix, env_idx) + binding.vec_set_video_suffix(self.c_envs, suffix, env_id) def close_client(self, env_idx=0): # Tear down the render Client for one env without destroying the env. diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 33d1f438e..a053a211d 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -144,9 +144,6 @@ def __init__(self, config, vecenv, policy, logger=None): self.render = config["render"] self.render_interval = config["render_interval"] - if self.render: - ensure_drive_binary() - # LSTM if config["use_rnn"]: n = vecenv.agents_per_batch @@ -430,32 +427,63 @@ def train(self): self.msg = f"Checkpoint saved at update {self.epoch}" if self.render and self.epoch % self.render_interval == 0: - model_dir = os.path.join(self.config["data_dir"], f"{self.config['env']}_{self.logger.run_id}") - model_files = glob.glob(os.path.join(model_dir, "models", "model_*.pt")) - - if model_files: - # Take the latest checkpoint - latest_cpt = max(model_files, key=os.path.getctime) - bin_path = f"{model_dir}.bin" - - # Export to .bin for rendering with raylib + render_simulation_mode = self.config["eval"].get("multi_scenario_simulation_mode", "gigaflow") + num_agents_render = self.config["eval"]["num_agents"] + render_map_dir = self.config["eval"]["map_dir"] + render_overrides = build_eval_overrides( + simulation_mode=render_simulation_mode, + num_agents=num_agents_render, + num_scenarios=self.config["eval"].get("multi_scenario_num_scenarios", 4), + map_dir=render_map_dir, + num_carla_maps=self.config["eval"].get("num_carla_maps", 8), + ) + render_args = load_eval_multi_scenarios_config( + env_name=self.config["env"], + model_path=None, + eval_overrides=render_overrides, + ) + experiment_name = f"{self.config['env']}_{self.logger.run_id}" + render_args["global_step"] = self.global_step + render_args["num_scenarios"] = self.config["eval"].get("multi_scenario_num_scenarios", 4) + render_args["eval_simulation"] = render_simulation_mode + render_args["render"] = True + render_args["render_obs"] = False + render_args["inline_eval"] = True + render_args["load_model_path"] = os.path.join( + self.config["data_dir"], experiment_name, "models", f"inline_epoch_{self.epoch}.pt" + ) + render_args["eval_results_dir"] = os.path.join( + self.config["data_dir"], + experiment_name, + "renders", + f"epoch_{self.epoch:08d}", + ) + backend_name = self.config["eval"].get("multi_scenario_render_backend", "egl") + _bev_views = ( + [(0, "", "sim_state"), (1, "_bev", "bev")] if backend_name == "egl" else [(0, "", "sim_state")] + ) + for _vmode, _vsuffix, _vlabel in _bev_views: try: - export_args = {"env_name": self.config["env"], "load_model_path": latest_cpt, **self.config} - - export( - args=export_args, + eval_multi_scenarios_render( env_name=self.config["env"], - vecenv=self.vecenv, + args=dict(render_args), + vecenv=None, policy=self.uncompiled_policy, - path=bin_path, - silent=True, + logger=self.logger, + metric_prefix=f"render_{_vlabel}", + quiet=True, + render_backend=backend_name, + view_mode=_vmode, + video_suffix=_vsuffix, + log_view_label=_vlabel, + render_max_steps=(self.config["eval"].get("render_max_steps", 50) or None), ) - pufferlib.utils.render_videos( - self.config, self.vecenv, self.logger, self.epoch, self.global_step, bin_path - ) - except Exception as e: - print(f"Failed to export model weights: {e}") + import traceback + + print(f"\n⚠️ render failed (view={_vlabel}) at epoch {self.epoch}: {type(e).__name__}: {e}") + traceback.print_exc() + print("Training continues.") if self.config["eval"]["wosac_realism_eval"] and ( self.epoch % self.config["eval"]["eval_interval"] == 0 or done_training @@ -2701,29 +2729,136 @@ def export(args=None, env_name=None, vecenv=None, policy=None, path=None, silent print(f"Saved {len(weights)} weights to {path}") -def ensure_drive_binary(): - """Delete existing visualize binary and rebuild it. This ensures the - binary is always up-to-date with the latest code changes. +def render(env_name, args=None): + """Render rollouts for a batch of maps using the in-process c_render pipeline. + + Each map is loaded as a separate environment with render_mode="headless" + (EGL/ffmpeg). A policy rollout is run for max_frames steps, producing one + mp4 per map in output_dir. + + Requires a [render] section in the config with: + map_dir, output_dir, num_maps, view_mode, max_frames """ - if os.path.exists("./visualize"): - print("Removing existing visualize binary...") - os.remove("./visualize") + import glob as _glob + import shutil + import tempfile + from pufferlib.ocean.drive.drive import RenderView + from pufferlib.ocean.drive.rollout import RenderContext, rollout_loop + + args = args or load_config(env_name) + render_configs = args.get("render", {}) - print("Building visualize binary...") try: - result = subprocess.run( - ["bash", "scripts/build_ocean.sh", "visualize", "local"], capture_output=True, text=True, timeout=300 + map_dir = render_configs["map_dir"] + num_maps = render_configs.get("num_maps", 1) + view_mode_str = str(render_configs.get("view_mode", "sim_state")).lower().strip('"').strip("'") + max_frames = render_configs.get("max_frames", 91) + output_dir = render_configs["output_dir"] + render_init_mode = ( + str(render_configs["init_mode"]).strip('"').strip("'") if "init_mode" in render_configs else None + ) + render_control_mode = ( + str(render_configs["control_mode"]).strip('"').strip("'") if "control_mode" in render_configs else None + ) + except KeyError as e: + raise pufferlib.APIUsageError(f"Missing render config: {e}") + + _VIEW_MODE_MAP = { + "sim_state": [RenderView.FULL_SIM_STATE], + "topdown": [RenderView.TOPDOWN_SIM], + "bev": [RenderView.BEV_AGENT_OBS], + "both": [RenderView.FULL_SIM_STATE, RenderView.BEV_AGENT_OBS], + "all": [RenderView.FULL_SIM_STATE, RenderView.BEV_AGENT_OBS, RenderView.TOPDOWN_SIM], + } + view_modes = _VIEW_MODE_MAP.get(view_mode_str) + if view_modes is None: + raise pufferlib.APIUsageError( + f"Unknown view_mode '{view_mode_str}'. Choose from: sim_state, topdown, bev, both, all" ) - if result.returncode == 0: - print("Successfully built visualize binary") - else: - print(f"Build failed: {result.stderr}") - raise RuntimeError("Failed to build visualize binary for rendering") - except subprocess.TimeoutExpired: - raise RuntimeError("Build timed out") - except Exception as e: - raise RuntimeError(f"Build error: {e}") + bin_files = sorted(f for f in os.listdir(map_dir) if f.endswith(".bin")) + if num_maps > len(bin_files): + num_maps = len(bin_files) + render_maps = [os.path.join(map_dir, f) for f in bin_files[:num_maps]] + + os.makedirs(output_dir, exist_ok=True) + + configured_device = args["train"]["device"] + if configured_device == "cuda" and not torch.cuda.is_available(): + print("Warning: CUDA not available, falling back to CPU for render.") + configured_device = "cpu" + args["train"]["device"] = configured_device + device = configured_device + + _VIEW_SUFFIX = { + RenderView.FULL_SIM_STATE: "sim_state", + RenderView.BEV_AGENT_OBS: "bev", + RenderView.TOPDOWN_SIM: "topdown", + } + + def render_one_map(map_path): + map_name = os.path.splitext(os.path.basename(map_path))[0] + multi = len(view_modes) > 1 + + for view_mode in view_modes: + with tempfile.TemporaryDirectory() as tmp_map_dir: + tmp_bin = os.path.join(tmp_map_dir, os.path.basename(map_path)) + shutil.copy2(map_path, tmp_bin) + + env_overrides = { + **args["env"], + "num_maps": 1, + "map_dir": tmp_map_dir, + "render_mode": "headless", + } + if render_init_mode is not None: + env_overrides["init_mode"] = render_init_mode + if render_control_mode is not None: + env_overrides["control_mode"] = render_control_mode + + map_args = { + **args, + "env": env_overrides, + "vec": {"backend": "Serial", "num_envs": 1}, + } + + env = load_env(env_name, map_args) + policy = load_policy(map_args, env, env_name) + policy.eval() + + suffix = f"_{_VIEW_SUFFIX[view_mode]}" if multi else "" + before = set(_glob.glob(os.path.join(os.getcwd(), "*.mp4"))) + + rollout_loop( + policy=policy, + env=env, + device=device, + use_rnn=map_args["train"]["use_rnn"], + max_steps=max_frames, + render_ctx=RenderContext( + view_mode=view_mode, + env_id=0, + video_suffix=suffix, + ), + ) + + env.close() + + after = set(_glob.glob(os.path.join(os.getcwd(), "*.mp4"))) + new_mp4s = after - before + if new_mp4s: + for src in sorted(new_mp4s): + dst = os.path.join(output_dir, os.path.basename(src)) + shutil.move(src, dst) + print(f" Saved {dst}") + else: + print(f" Warning: no mp4 produced for {map_name} view={_VIEW_SUFFIX[view_mode]}") + + if render_maps: + print(f"Rendering {len(render_maps)} map(s) from {map_dir} → {output_dir} ...") + for map_path in render_maps: + render_one_map(map_path) + print(f"Done. Videos written to {output_dir}") def autotune(args=None, env_name=None, vecenv=None, policy=None): @@ -2909,7 +3044,7 @@ def puffer_type(value): def main(): - err = "Usage: puffer [train, eval, sweep, controlled_exp, autotune, profile, export] [env_name] [optional args]. --help for more info" + err = "Usage: puffer [train, eval, render, sweep, controlled_exp, autotune, profile, export] [env_name] [optional args]. --help for more info" if len(sys.argv) < 3: raise pufferlib.APIUsageError(err) @@ -2919,6 +3054,8 @@ def main(): train(env_name=env_name) elif mode == "eval": eval(env_name=env_name) + elif mode == "render": + render(env_name=env_name) elif mode == "eval_multi_scenarios": eval_multi_scenarios(env_name=env_name) elif mode == "eval_multi_scenarios_render": From 62625fab8c701df5db5d012202290d0f8e9d6e15 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Sat, 18 Apr 2026 10:24:09 -0400 Subject: [PATCH 03/12] Making in-process rendering correct --- pufferlib/ocean/drive/drive.py | 4 ++-- pufferlib/pufferl.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 2d14e33e5..26f8e8585 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -602,10 +602,10 @@ def set_video_suffix(self, suffix, env_id=0): # make_client reads env->video_suffix when forking ffmpeg. binding.vec_set_video_suffix(self.c_envs, suffix, env_id) - def close_client(self, env_idx=0): + def close_client(self, env_id=0): # Tear down the render Client for one env without destroying the env. # Flushes ffmpeg + PBOs on the headless path so the mp4 is fully written. - binding.vec_close_client(self.c_envs, env_idx) + binding.vec_close_client(self.c_envs, env_id) def close(self): binding.vec_close(self.c_envs) diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index a053a211d..78f27e5a4 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -2384,7 +2384,7 @@ def eval_multi_scenarios_render( _internal_num_envs = getattr(_target_env_pre, "num_envs", 1) for _e in range(_internal_num_envs): try: - _target_env_pre.set_video_suffix(video_suffix, env_idx=_e) + _target_env_pre.set_video_suffix(video_suffix, env_id=_e) except Exception: break @@ -2485,7 +2485,7 @@ def eval_multi_scenarios_render( # GPU context) and close_client at scenario end flushes the # trailing PBO frame. for e in range(num_envs_in_batch): - target_env.render(env_idx=e, view_mode=view_mode) + target_env.render(env_id=e, view_mode=view_mode) # Serial backend returns infos as single list (infos[0] is the env's info list) if infos and infos[0]: @@ -2533,7 +2533,7 @@ def eval_multi_scenarios_render( for e in range(num_envs_in_batch): _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) calling\n") _sys_cc.stderr.flush() - target_env.close_client(env_idx=e) + target_env.close_client(env_id=e) _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) returned\n") _sys_cc.stderr.flush() From f5a40488f0b0c2490e0e3e01c18a74c46a196937 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Sat, 18 Apr 2026 10:45:58 -0400 Subject: [PATCH 04/12] Small nitpic change for correct folder allocation --- pufferlib/pufferl.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 78f27e5a4..b63ae4b6f 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -2587,7 +2587,11 @@ def eval_multi_scenarios_render( try: import wandb - mp4_paths = sorted(os.path.join(mp4_folder, f) for f in os.listdir(mp4_folder) if f.endswith(".mp4")) + mp4_paths = sorted( + os.path.join(mp4_folder, f) + for f in os.listdir(mp4_folder) + if f.endswith(".mp4") and os.path.splitext(f)[0].endswith(video_suffix) + ) if mp4_paths: video_log = { f"{_upload_prefix}/{os.path.splitext(os.path.basename(p))[0]}": wandb.Video(p, fps=30, format="mp4") From 958b2ceab2fe75344fa56232d200e36c455c6279 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Sat, 18 Apr 2026 20:44:58 -0400 Subject: [PATCH 05/12] Fixing issue with cache miss leading to renering failing --- pufferlib/config/ocean/drive.ini | 3 +++ pufferlib/ocean/drive/binding.c | 2 +- pufferlib/pufferl.py | 32 ++++++++++++++++++++++++++------ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index e71db0260..98723446e 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -207,6 +207,9 @@ multi_scenario_render_backend = egl ; Frequency of evaluation during training (in epochs) eval_interval = 25 num_agents = 512 +; Number of agents per environment in gigaflow eval mode +min_agents_per_env = 50 +max_agents_per_env = 50 ; Batch size for eval_multi_scenarios (number of scenarios per batch) ; Path to dataset used for evaluation map_dir = "pufferlib/resources/drive/binaries/carla_py123d" diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 83d2ae971..d8fff3161 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -59,7 +59,7 @@ static void release_map_cache_internal(void) { reset_cache_globals(); } -static PyObject *release_map_cache_py(PyObject *self, PyObject *args) { +static PyObject *release_map_cache_py(PyObject *self __attribute__((unused)), PyObject *args __attribute__((unused))) { release_map_cache_internal(); Py_RETURN_NONE; } diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index b63ae4b6f..572b37fbe 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -435,7 +435,9 @@ def train(self): num_agents=num_agents_render, num_scenarios=self.config["eval"].get("multi_scenario_num_scenarios", 4), map_dir=render_map_dir, - num_carla_maps=self.config["eval"].get("num_carla_maps", 8), + num_carla_maps=self.config["env"].get("num_maps", 8), + min_agents_per_env=self.config["eval"].get("min_agents_per_env", 50), + max_agents_per_env=self.config["eval"].get("max_agents_per_env", 50), ) render_args = load_eval_multi_scenarios_config( env_name=self.config["env"], @@ -535,7 +537,9 @@ def train(self): num_agents=num_agents_eval, num_scenarios=self.config["eval"]["multi_scenario_num_scenarios"], map_dir=map_dir, - num_carla_maps=self.config["eval"].get("num_carla_maps", 8), + num_carla_maps=self.config["env"].get("num_maps", 8), + min_agents_per_env=self.config["eval"].get("min_agents_per_env", 50), + max_agents_per_env=self.config["eval"].get("max_agents_per_env", 50), ) # Build eval args by applying overrides to training config @@ -593,7 +597,9 @@ def train(self): num_agents=num_agents_render, num_scenarios=self.config["eval"]["multi_scenario_num_scenarios"], map_dir=render_map_dir, - num_carla_maps=self.config["eval"].get("num_carla_maps", 8), + num_carla_maps=self.config["env"].get("num_maps", 8), + min_agents_per_env=self.config["eval"].get("min_agents_per_env", 50), + max_agents_per_env=self.config["eval"].get("max_agents_per_env", 50), ) render_args = load_eval_multi_scenarios_config( @@ -1892,13 +1898,23 @@ def load_eval_multi_scenarios_config(env_name, model_path=None, eval_overrides=N return args -def build_eval_overrides(simulation_mode, num_agents, num_scenarios, map_dir=None, num_carla_maps=8): +def build_eval_overrides( + simulation_mode, + num_agents, + num_scenarios, + map_dir=None, + num_carla_maps=8, + min_agents_per_env=50, + max_agents_per_env=50, +): """Build evaluation overrides for a given simulation mode. Args: simulation_mode: "gigaflow" or "replay" num_agents: agent slot budget for evaluation map_dir: replay dataset directory, required for replay mode + min_agents_per_env: minimum agents per env (gigaflow only) + max_agents_per_env: maximum agents per env (gigaflow only) """ # Common reward coefficients (same for both modes) common_env = { @@ -1931,8 +1947,8 @@ def build_eval_overrides(simulation_mode, num_agents, num_scenarios, map_dir=Non "env": { **common_env, "simulation_mode": "gigaflow", - "min_agents_per_env": 50, - "max_agents_per_env": 50, + "min_agents_per_env": min_agents_per_env, + "max_agents_per_env": max_agents_per_env, "resample_frequency": 3000, "scenario_length": 3000, # Point at the py123d-converted CARLA towns added to this branch. @@ -2131,6 +2147,8 @@ def eval_multi_scenarios( num_scenarios=tmp_args["num_scenarios"], map_dir=map_dir, num_carla_maps=tmp_args.get("num_carla_maps", 8), + min_agents_per_env=tmp_args["eval"].get("min_agents_per_env", 50), + max_agents_per_env=tmp_args["eval"].get("max_agents_per_env", 50), ) args = load_eval_multi_scenarios_config(env_name, model_path, eval_overrides) @@ -2296,6 +2314,8 @@ def eval_multi_scenarios_render( num_scenarios=tmp_args["num_scenarios"], map_dir=map_dir, num_carla_maps=tmp_args.get("num_carla_maps", 8), + min_agents_per_env=tmp_args["eval"].get("min_agents_per_env", 50), + max_agents_per_env=tmp_args["eval"].get("max_agents_per_env", 50), ) args = load_eval_multi_scenarios_config(env_name, model_path, eval_overrides) From a1fd594f63a596bb56cf69966649dd37bead3e36 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Sun, 19 Apr 2026 23:52:40 -0400 Subject: [PATCH 06/12] Small fix --- pufferlib/pufferl.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 572b37fbe..98fb64350 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -435,7 +435,7 @@ def train(self): num_agents=num_agents_render, num_scenarios=self.config["eval"].get("multi_scenario_num_scenarios", 4), map_dir=render_map_dir, - num_carla_maps=self.config["env"].get("num_maps", 8), + num_carla_maps=self.config["env_config"].get("num_maps", 8), min_agents_per_env=self.config["eval"].get("min_agents_per_env", 50), max_agents_per_env=self.config["eval"].get("max_agents_per_env", 50), ) @@ -537,7 +537,7 @@ def train(self): num_agents=num_agents_eval, num_scenarios=self.config["eval"]["multi_scenario_num_scenarios"], map_dir=map_dir, - num_carla_maps=self.config["env"].get("num_maps", 8), + num_carla_maps=self.config["env_config"].get("num_maps", 8), min_agents_per_env=self.config["eval"].get("min_agents_per_env", 50), max_agents_per_env=self.config["eval"].get("max_agents_per_env", 50), ) @@ -597,7 +597,7 @@ def train(self): num_agents=num_agents_render, num_scenarios=self.config["eval"]["multi_scenario_num_scenarios"], map_dir=render_map_dir, - num_carla_maps=self.config["env"].get("num_maps", 8), + num_carla_maps=self.config["env_config"].get("num_maps", 8), min_agents_per_env=self.config["eval"].get("min_agents_per_env", 50), max_agents_per_env=self.config["eval"].get("max_agents_per_env", 50), ) @@ -1641,6 +1641,7 @@ def train(env_name, args=None, vecenv=None, policy=None, logger=None, early_stop train_config = dict( **args["train"], env=env_name, + env_config=args.get("env", {}), eval=args.get("eval", {}), driving_behaviours_eval=args.get("driving_behaviours_eval"), ) From a2568c33c8aaa33c750d994b559be17a6fbed106 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Sun, 19 Apr 2026 23:55:12 -0400 Subject: [PATCH 07/12] Small fix --- pufferlib/ocean/drive/rollout.py | 105 +++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 pufferlib/ocean/drive/rollout.py diff --git a/pufferlib/ocean/drive/rollout.py b/pufferlib/ocean/drive/rollout.py new file mode 100644 index 000000000..8de059fd5 --- /dev/null +++ b/pufferlib/ocean/drive/rollout.py @@ -0,0 +1,105 @@ +"""Shared rollout loop for Drive evaluation and rendering. + +Single source of truth for the forward-sample-step-break cycle. Used by: + - ``pufferl.render`` — offline batch rendering, one video per map + - ``eval_multi_scenarios_render`` — inline training render path + +Callers pass a ``RenderContext`` to turn on rendering; pass ``None`` for a +pure stats rollout. +""" + +from dataclasses import dataclass +from typing import Optional + +import numpy as np +import torch + +import pufferlib.pytorch + + +@dataclass +class RenderContext: + """Enables rendering inside :func:`rollout_loop`. + + Attributes: + view_mode: ``RenderView`` enum value passed to ``driver.render``. + env_id: which sub-env in the vecenv to record from (default 0). + video_suffix: appended to the mp4 filename; applied once before the + first render via ``set_video_suffix`` so multi-view rollouts don't + collide on output paths. + """ + + view_mode: int + env_id: int = 0 + video_suffix: str = "" + + +def rollout_loop( + policy, + env, + device, + use_rnn: bool, + max_steps: Optional[int] = None, + render_ctx: Optional[RenderContext] = None, + per_env_logs: bool = False, +): + """Run a single policy rollout in a Drive vecenv. + + Args: + policy: the policy to run. Caller is responsible for calling ``.eval()``. + env: a ``PufferEnv``-compatible vecenv wrapping one or more Drive sub-envs. + device: torch device for observation / state tensors. + use_rnn: whether to allocate and carry LSTM hidden state. + max_steps: loop iteration cap. Defaults to ``env.driver_env.episode_length``. + render_ctx: if set, render the specified env/view every step before + sampling actions. + per_env_logs: passed through to ``env.step`` for unaggregated per-env + logs (only supported on PufferEnv native backend). + + Returns: + The last ``info`` returned by ``env.step``. + """ + driver = env.driver_env + num_agents = env.observation_space.shape[0] + + if render_ctx is not None: + driver.set_video_suffix(render_ctx.video_suffix, env_id=render_ctx.env_id) + + obs, _ = env.reset() + + state = {} + if use_rnn: + state = dict( + lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), + lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), + ) + + if max_steps is None: + max_steps = driver.episode_length + + info = [] + for _ in range(max_steps): + if render_ctx is not None: + driver.render( + view_mode=render_ctx.view_mode, + env_id=render_ctx.env_id, + ) + + with torch.no_grad(): + ob_t = torch.as_tensor(obs).to(device) + logits, _ = policy.forward_eval(ob_t, state) + action, _, _ = pufferlib.pytorch.sample_logits(logits) + action_np = action.cpu().numpy().reshape(env.action_space.shape) + + if isinstance(logits, torch.distributions.Normal): + action_np = np.clip(action_np, env.action_space.low, env.action_space.high) + + if per_env_logs: + obs, _, _, truncs, info = env.step(action_np, per_env_logs=True) + else: + obs, _, _, truncs, info = env.step(action_np) + + if truncs.all(): + break + + return info From 6a2e50e9878fa4c27e0e5066643749d4e89d4c07 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Wed, 22 Apr 2026 22:14:32 -0400 Subject: [PATCH 08/12] Supporting renders with cache-miss --- pufferlib/config/ocean/drive.ini | 18 +++++++++--------- pufferlib/ocean/drive/binding.c | 24 +++++++----------------- pufferlib/ocean/drive/drive.h | 3 +++ pufferlib/pufferl.py | 32 +++++++++++++++++++++++++++++--- 4 files changed, 48 insertions(+), 29 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index 98723446e..e96ec1980 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -39,7 +39,7 @@ simulation_mode = "gigaflow" num_agents = 1024 ; GIGAFLOW-specific: minimum/maximum number of agents per environment min_agents_per_env = 1 -max_agents_per_env = 80 +max_agents_per_env = 1 ; Actions type - options: "discrete", "continuous" action_type = "discrete" ; Dynamics model - options: "classic", "jerk" @@ -104,9 +104,9 @@ reward_ade = 0.0 ; --- Map --- ; Path to map used for training -map_dir = "pufferlib/resources/drive/binaries/carla_py123d" +map_dir = "pufferlib/resources/drive/binaries/carla_copy" ; Number of maps to load from map_dir -num_maps = 8 +num_maps = 1 ; --- Observation limits --- max_lane_segment_observations = 80 @@ -148,7 +148,7 @@ road_obs_side_dist = 30.0 [train] total_timesteps = 10_000_000_000 -checkpoint_interval = 50 +checkpoint_interval = 25 anneal_lr = True ; batch_size = num_workers * num_agents * bptt_horizon batch_size = auto @@ -200,7 +200,7 @@ multi_scenario_eval = False multi_scenario_render = True ; Epoch interval between render runs. Independent of eval_interval so metric ; eval can run on a tighter schedule than the more expensive render. -multi_scenario_render_interval = 250 +multi_scenario_render_interval = 25 ; Render backend for multi_scenario_render: "html" (CPU, viz.generate_interactive_replay) ; or "egl" (C-side render.h → EGL → PBO → ffmpeg libx264, one mp4 per scenario). multi_scenario_render_backend = egl @@ -208,8 +208,8 @@ multi_scenario_render_backend = egl eval_interval = 25 num_agents = 512 ; Number of agents per environment in gigaflow eval mode -min_agents_per_env = 50 -max_agents_per_env = 50 +min_agents_per_env = 1 +max_agents_per_env = 1 ; Batch size for eval_multi_scenarios (number of scenarios per batch) ; Path to dataset used for evaluation map_dir = "pufferlib/resources/drive/binaries/carla_py123d" @@ -220,7 +220,7 @@ map_dir = "pufferlib/resources/drive/binaries/carla_py123d" ; trajectory-bearing .bin files in pufferlib/resources/drive/binaries/womd) multi_scenario_simulation_mode = "gigaflow" ; Total number of scenarios to evaluate -multi_scenario_num_scenarios = 250 +multi_scenario_num_scenarios = 8 ; Cap the render rollout at this many steps. render_max_steps = 200 backend = PufferEnv @@ -242,7 +242,7 @@ human_replay_num_agents = 64 ; This equals the number of scenarios, since we con ; Evaluating different driving behaviours learned by the policy driving_behaviours_eval = True driving_behaviours_eval_config = "pufferlib/config/ocean/driving_behaviours_eval.ini" -driving_behaviours_eval_interval = 250 +driving_behaviours_eval_interval = 30 render_driving_behaviours = True ; [sweep.train.learning_rate] diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index d8fff3161..56651c71d 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -36,18 +36,14 @@ static void release_map_cache_internal(void) { reset_cache_globals(); return; } - // Refuse to release if any envs still hold a reference + // Entries with live refs are detached: c_close will free them when ref_count reaches 0. + // Entries with no live refs are freed immediately. for (int i = 0; i < g_map_cache_size; i++) { - if (g_map_cache[i] != NULL && g_map_cache[i]->ref_count > 0) { - fprintf(stderr, - "ERROR: cannot release map cache — entry %d still has %d live env(s). " - "Close all Drive instances before changing map config.\n", - i, g_map_cache[i]->ref_count); - return; - } - } - for (int i = 0; i < g_map_cache_size; i++) { - if (g_map_cache[i] != NULL) + if (g_map_cache[i] == NULL) + continue; + if (g_map_cache[i]->ref_count > 0) + g_map_cache[i]->detached = 1; + else free_shared_map_data(g_map_cache[i]); } free(g_map_cache); @@ -1640,12 +1636,6 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { } if (!reuse_cache) { release_map_cache_internal(); - if (g_map_cache != NULL) { - PyErr_SetString(PyExc_RuntimeError, - "Cannot change map cache config while Drive environments are still open. " - "Call close() on all Drive instances first."); - return NULL; - } g_map_cache_size = num_maps; g_map_cache = (SharedMapData **)calloc(num_maps, sizeof(SharedMapData *)); g_map_cache_pid = getpid(); diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index 64475b88a..09c594678 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -288,6 +288,7 @@ struct SharedMapData { int num_tracks_to_predict; int ref_count; + int detached; // set when released from g_map_cache while refs still live; c_close frees when ref_count reaches 0 }; struct Drive { @@ -3689,6 +3690,8 @@ void c_close(Drive *env) { if (env->shared_map != NULL) { // road_elements, grid_map, neighbor_offsets, lane_graph are owned by SharedMapData env->shared_map->ref_count--; + if (env->shared_map->ref_count == 0 && env->shared_map->detached) + free_shared_map_data(env->shared_map); env->shared_map = NULL; } else { // This env owns all map data — free it diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 98fb64350..a81594503 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -2414,7 +2414,16 @@ def eval_multi_scenarios_render( # Serial/Multiprocessing: need vecenv.envs[0] to reach the underlying env. target_env = vecenv if not hasattr(vecenv, "envs") else vecenv.envs[0] - with tqdm(total=num_scenarios, desc="Processing scenarios", disable=quiet) as pbar: + # Wrap rollout + post-processing in try/finally so vecenv.close() is + # guaranteed to run even if an exception fires mid-rollout. + # Without this, a crash inside the step loop leaves Drive C envs alive + # (ref_count > 0), which causes the next binding.shared() call with a + # different map_dir to raise: + # RuntimeError: Cannot change map cache config while Drive environments + # are still open. Call close() on all Drive instances first. + _rollout_exc = None + try: + with tqdm(total=num_scenarios, desc="Processing scenarios", disable=quiet) as pbar: while scenarios_processed < num_scenarios: ob, _ = vecenv.reset() @@ -2561,6 +2570,9 @@ def eval_multi_scenarios_render( scenarios_processed += num_envs_in_batch pbar.update(num_envs_in_batch) + except Exception as _rollout_exc_caught: + _rollout_exc = _rollout_exc_caught + import sys as _sys_instr _sys_instr.stderr.write("[render-instr] rollout loop done\n") @@ -2627,8 +2639,22 @@ def eval_multi_scenarios_render( if not quiet: print(f"Failed to upload render mp4s to wandb: {e}") - # Close vectorized environment to avoid file descriptor leaks - vecenv.close() + # Close vectorized environment to avoid file descriptor leaks. + # IMPORTANT: always run even if an exception occurred earlier in the rollout + # loop — skipping this leaves Drive C envs alive (ref_count > 0), which + # causes the next binding.shared() call with a different map_dir to raise + # RuntimeError("Cannot change map cache config while Drive environments are + # still open"). + try: + vecenv.close() + except Exception as _close_exc: + import sys as _sys_close + _sys_close.stderr.write(f"[render-instr] vecenv.close() raised: {_close_exc}\n") + _sys_close.stderr.flush() + + # Re-raise any rollout exception now that the cache is clean + if _rollout_exc is not None: + raise _rollout_exc def sweep(args=None, env_name=None): From 88f99b00ea8e1301a8125c51ff8eec5272478bf2 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Thu, 23 Apr 2026 00:22:58 -0400 Subject: [PATCH 09/12] Further updates --- pufferlib/config/ocean/drive.ini | 14 +- pufferlib/pufferl.py | 401 +++++++++++-------------------- 2 files changed, 141 insertions(+), 274 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index e96ec1980..5029ab197 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -39,7 +39,7 @@ simulation_mode = "gigaflow" num_agents = 1024 ; GIGAFLOW-specific: minimum/maximum number of agents per environment min_agents_per_env = 1 -max_agents_per_env = 1 +max_agents_per_env = 80 ; Actions type - options: "discrete", "continuous" action_type = "discrete" ; Dynamics model - options: "classic", "jerk" @@ -104,9 +104,9 @@ reward_ade = 0.0 ; --- Map --- ; Path to map used for training -map_dir = "pufferlib/resources/drive/binaries/carla_copy" +map_dir = "pufferlib/resources/drive/binaries/carla_py123d" ; Number of maps to load from map_dir -num_maps = 1 +num_maps = 8 ; --- Observation limits --- max_lane_segment_observations = 80 @@ -148,7 +148,7 @@ road_obs_side_dist = 30.0 [train] total_timesteps = 10_000_000_000 -checkpoint_interval = 25 +checkpoint_interval = 50 anneal_lr = True ; batch_size = num_workers * num_agents * bptt_horizon batch_size = auto @@ -200,7 +200,7 @@ multi_scenario_eval = False multi_scenario_render = True ; Epoch interval between render runs. Independent of eval_interval so metric ; eval can run on a tighter schedule than the more expensive render. -multi_scenario_render_interval = 25 +multi_scenario_render_interval = 250 ; Render backend for multi_scenario_render: "html" (CPU, viz.generate_interactive_replay) ; or "egl" (C-side render.h → EGL → PBO → ffmpeg libx264, one mp4 per scenario). multi_scenario_render_backend = egl @@ -220,7 +220,7 @@ map_dir = "pufferlib/resources/drive/binaries/carla_py123d" ; trajectory-bearing .bin files in pufferlib/resources/drive/binaries/womd) multi_scenario_simulation_mode = "gigaflow" ; Total number of scenarios to evaluate -multi_scenario_num_scenarios = 8 +multi_scenario_num_scenarios = 250 ; Cap the render rollout at this many steps. render_max_steps = 200 backend = PufferEnv @@ -242,7 +242,7 @@ human_replay_num_agents = 64 ; This equals the number of scenarios, since we con ; Evaluating different driving behaviours learned by the policy driving_behaviours_eval = True driving_behaviours_eval_config = "pufferlib/config/ocean/driving_behaviours_eval.ini" -driving_behaviours_eval_interval = 30 +driving_behaviours_eval_interval = 250 render_driving_behaviours = True ; [sweep.train.learning_rate] diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index a81594503..7f36fe271 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -2423,152 +2423,152 @@ def eval_multi_scenarios_render( # are still open. Call close() on all Drive instances first. _rollout_exc = None try: - with tqdm(total=num_scenarios, desc="Processing scenarios", disable=quiet) as pbar: - while scenarios_processed < num_scenarios: - ob, _ = vecenv.reset() + with tqdm(total=num_scenarios, desc="Processing scenarios", disable=quiet) as pbar: + while scenarios_processed < num_scenarios: + ob, _ = vecenv.reset() - # Get initial states for all environments in the batch - scenarios = vecenv.get_state() - num_envs_in_batch = len(scenarios) - batch_start = scenarios_processed + # Get initial states for all environments in the batch + scenarios = vecenv.get_state() + num_envs_in_batch = len(scenarios) + batch_start = scenarios_processed - # Prepare batch_size_eval for the resample that fires at end of the step loop. - # That resample will load the NEXT batch, so cap it at remaining_after_this. - remaining_after_this = num_scenarios - scenarios_processed - num_envs_in_batch - target_env.batch_size_eval = max(1, remaining_after_this) + # Prepare batch_size_eval for the resample that fires at end of the step loop. + # That resample will load the NEXT batch, so cap it at remaining_after_this. + remaining_after_this = num_scenarios - scenarios_processed - num_envs_in_batch + target_env.batch_size_eval = max(1, remaining_after_this) - map_names = [] - for env_idx in range(num_envs_in_batch): - map_names.append(scenarios[env_idx]["map_name"].split("/")[-1].split(".")[0]) + map_names = [] + for env_idx in range(num_envs_in_batch): + map_names.append(scenarios[env_idx]["map_name"].split("/")[-1].split(".")[0]) - # Reset LSTM - if args["train"]["use_rnn"]: - state = dict( - lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), - lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), - ) + # Reset LSTM + if args["train"]["use_rnn"]: + state = dict( + lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), + lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), + ) - # Initialize histories as lists of lists (one list per environment). - # Only needed for the HTML replay path — EGL writes mp4 frames - # directly to ffmpeg via c_render each step. - if html_mode: - agent_histories = [[] for _ in range(num_envs_in_batch)] - traffic_histories = [[] for _ in range(num_envs_in_batch)] - trajectory_histories = [[] for _ in range(num_envs_in_batch)] - all_agents_obs_histories = [[] for _ in range(num_envs_in_batch)] - - _render_steps = args["env"]["scenario_length"] - if render_max_steps is not None: - _render_steps = min(_render_steps, render_max_steps) - for t in range(_render_steps): + # Initialize histories as lists of lists (one list per environment). + # Only needed for the HTML replay path — EGL writes mp4 frames + # directly to ffmpeg via c_render each step. if html_mode: - current_scenarios = vecenv.get_state() - start_obs_index = 0 - - # Loop through every environment in the batch to record its history - for env_idx in range(num_envs_in_batch): - env_scenario = current_scenarios[env_idx] - - agent_histories[env_idx].append( - pufferlib.viz.fill_agents_state( - env_scenario, use_trajectory="trajectory" in args["env"]["action_type"] - ) - ) - traffic_histories[env_idx].append(pufferlib.viz.fill_traffics_state(env_scenario, t)) - - if "trajectory" in args["env"]["action_type"]: - trajectory_histories[env_idx].append(pufferlib.viz.fill_trajectories(env_scenario, t)) - - # Collect observation dictionaries for ALL active agents in THIS environment at timestep t - if args["render_obs"]: - step_obs_dict = {} - if env_idx > 0: - start_obs_index += current_scenarios[env_idx - 1]["active_agent_count"] - for agent_idx in range(env_scenario["active_agent_count"]): - agent_id = env_scenario["active_agent_indices"][agent_idx] - step_obs_dict[int(agent_id)] = pufferlib.viz.extract_obs_frame( - ob, - env_scenario, - args, - timestep=t, - obs_index=start_obs_index + agent_idx, - agent_idx=agent_idx, - head_north=True, + agent_histories = [[] for _ in range(num_envs_in_batch)] + traffic_histories = [[] for _ in range(num_envs_in_batch)] + trajectory_histories = [[] for _ in range(num_envs_in_batch)] + all_agents_obs_histories = [[] for _ in range(num_envs_in_batch)] + + _render_steps = args["env"]["scenario_length"] + if render_max_steps is not None: + _render_steps = min(_render_steps, render_max_steps) + for t in range(_render_steps): + if html_mode: + current_scenarios = vecenv.get_state() + start_obs_index = 0 + + # Loop through every environment in the batch to record its history + for env_idx in range(num_envs_in_batch): + env_scenario = current_scenarios[env_idx] + + agent_histories[env_idx].append( + pufferlib.viz.fill_agents_state( + env_scenario, use_trajectory="trajectory" in args["env"]["action_type"] ) - all_agents_obs_histories[env_idx].append(step_obs_dict) - - with torch.no_grad(): - ob = torch.as_tensor(ob).to(device) - logits, _ = policy.forward_eval(ob, state) - action, _, _ = pufferlib.pytorch.sample_logits(logits, deterministic=True) - action = action.cpu().numpy().reshape(vecenv.action_space.shape) + ) + traffic_histories[env_idx].append(pufferlib.viz.fill_traffics_state(env_scenario, t)) + + if "trajectory" in args["env"]["action_type"]: + trajectory_histories[env_idx].append(pufferlib.viz.fill_trajectories(env_scenario, t)) + + # Collect observation dictionaries for ALL active agents in THIS environment at timestep t + if args["render_obs"]: + step_obs_dict = {} + if env_idx > 0: + start_obs_index += current_scenarios[env_idx - 1]["active_agent_count"] + for agent_idx in range(env_scenario["active_agent_count"]): + agent_id = env_scenario["active_agent_indices"][agent_idx] + step_obs_dict[int(agent_id)] = pufferlib.viz.extract_obs_frame( + ob, + env_scenario, + args, + timestep=t, + obs_index=start_obs_index + agent_idx, + agent_idx=agent_idx, + head_north=True, + ) + all_agents_obs_histories[env_idx].append(step_obs_dict) + + with torch.no_grad(): + ob = torch.as_tensor(ob).to(device) + logits, _ = policy.forward_eval(ob, state) + action, _, _ = pufferlib.pytorch.sample_logits(logits, deterministic=True) + action = action.cpu().numpy().reshape(vecenv.action_space.shape) + + if isinstance(logits, torch.distributions.Normal): + action = np.clip(action, vecenv.action_space.low, vecenv.action_space.high) + + ob, _, _, _, infos = vecenv.step(action) + + if egl_mode: + # Flush one frame per env through c_render → client_record_frame + # → PBO async readback → writev → ffmpeg pipe. make_client is + # called lazily on the first render per env (sets up ffmpeg + + # GPU context) and close_client at scenario end flushes the + # trailing PBO frame. + for e in range(num_envs_in_batch): + target_env.render(env_id=e, view_mode=view_mode) + + # Serial backend returns infos as single list (infos[0] is the env's info list) + if infos and infos[0]: + for env_idx, summary in enumerate(infos[0]): + env_map_name = summary["map_name"].split("/")[-1].split(".")[0] + summary["episode_id"] = batch_start + env_idx + summary["env_id"] = env_idx + summary["map_name"] = env_map_name - if isinstance(logits, torch.distributions.Normal): - action = np.clip(action, vecenv.action_space.low, vecenv.action_space.high) + for k, v in summary.items(): + if k not in global_infos: + global_infos[k] = [] + global_infos[k].append(v) - ob, _, _, _, infos = vecenv.step(action) + if html_mode: + # Loop through every environment to generate its specific HTML replay + for env_idx in range(num_envs_in_batch): + global_episode_id = batch_start + env_idx + # Ensure we don't render padding environments if num_scenarios isn't perfectly divisible by batch_size + if global_episode_id >= num_scenarios: + break + env_map_name = map_names[env_idx] + + pufferlib.viz.generate_interactive_replay( + current_scenarios[env_idx], + agent_histories[env_idx], + traffic_histories[env_idx], + trajectory_histories[env_idx], + all_agents_obs_histories[env_idx], + f"{gif_folder}/{env_map_name}_{global_episode_id:03d}.html", + head_north=True, + ) if egl_mode: - # Flush one frame per env through c_render → client_record_frame - # → PBO async readback → writev → ffmpeg pipe. make_client is - # called lazily on the first render per env (sets up ffmpeg + - # GPU context) and close_client at scenario end flushes the - # trailing PBO frame. - for e in range(num_envs_in_batch): - target_env.render(env_id=e, view_mode=view_mode) - - # Serial backend returns infos as single list (infos[0] is the env's info list) - if infos and infos[0]: - for env_idx, summary in enumerate(infos[0]): - env_map_name = summary["map_name"].split("/")[-1].split(".")[0] - summary["episode_id"] = batch_start + env_idx - summary["env_id"] = env_idx - summary["map_name"] = env_map_name - - for k, v in summary.items(): - if k not in global_infos: - global_infos[k] = [] - global_infos[k].append(v) - - if html_mode: - # Loop through every environment to generate its specific HTML replay - for env_idx in range(num_envs_in_batch): - global_episode_id = batch_start + env_idx - # Ensure we don't render padding environments if num_scenarios isn't perfectly divisible by batch_size - if global_episode_id >= num_scenarios: - break - env_map_name = map_names[env_idx] - - pufferlib.viz.generate_interactive_replay( - current_scenarios[env_idx], - agent_histories[env_idx], - traffic_histories[env_idx], - trajectory_histories[env_idx], - all_agents_obs_histories[env_idx], - f"{gif_folder}/{env_map_name}_{global_episode_id:03d}.html", - head_north=True, + # Close every env's Client so ffmpeg gets EOF on its input pipe, + # the trailing PBO frame is flushed, and libx264 writes the mp4 + # trailer. Without this, the mp4 files are either empty or one + # frame short. + import sys as _sys_cc + + _sys_cc.stderr.write( + f"[render-instr] starting close_client loop num_envs_in_batch={num_envs_in_batch}\n" ) - - if egl_mode: - # Close every env's Client so ffmpeg gets EOF on its input pipe, - # the trailing PBO frame is flushed, and libx264 writes the mp4 - # trailer. Without this, the mp4 files are either empty or one - # frame short. - import sys as _sys_cc - - _sys_cc.stderr.write( - f"[render-instr] starting close_client loop num_envs_in_batch={num_envs_in_batch}\n" - ) - _sys_cc.stderr.flush() - for e in range(num_envs_in_batch): - _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) calling\n") - _sys_cc.stderr.flush() - target_env.close_client(env_id=e) - _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) returned\n") _sys_cc.stderr.flush() + for e in range(num_envs_in_batch): + _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) calling\n") + _sys_cc.stderr.flush() + target_env.close_client(env_id=e) + _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) returned\n") + _sys_cc.stderr.flush() - scenarios_processed += num_envs_in_batch - pbar.update(num_envs_in_batch) + scenarios_processed += num_envs_in_batch + pbar.update(num_envs_in_batch) except Exception as _rollout_exc_caught: _rollout_exc = _rollout_exc_caught @@ -2649,6 +2649,7 @@ def eval_multi_scenarios_render( vecenv.close() except Exception as _close_exc: import sys as _sys_close + _sys_close.stderr.write(f"[render-instr] vecenv.close() raised: {_close_exc}\n") _sys_close.stderr.flush() @@ -2780,138 +2781,6 @@ def export(args=None, env_name=None, vecenv=None, policy=None, path=None, silent print(f"Saved {len(weights)} weights to {path}") -def render(env_name, args=None): - """Render rollouts for a batch of maps using the in-process c_render pipeline. - - Each map is loaded as a separate environment with render_mode="headless" - (EGL/ffmpeg). A policy rollout is run for max_frames steps, producing one - mp4 per map in output_dir. - - Requires a [render] section in the config with: - map_dir, output_dir, num_maps, view_mode, max_frames - """ - import glob as _glob - import shutil - import tempfile - from pufferlib.ocean.drive.drive import RenderView - from pufferlib.ocean.drive.rollout import RenderContext, rollout_loop - - args = args or load_config(env_name) - render_configs = args.get("render", {}) - - try: - map_dir = render_configs["map_dir"] - num_maps = render_configs.get("num_maps", 1) - view_mode_str = str(render_configs.get("view_mode", "sim_state")).lower().strip('"').strip("'") - max_frames = render_configs.get("max_frames", 91) - output_dir = render_configs["output_dir"] - render_init_mode = ( - str(render_configs["init_mode"]).strip('"').strip("'") if "init_mode" in render_configs else None - ) - render_control_mode = ( - str(render_configs["control_mode"]).strip('"').strip("'") if "control_mode" in render_configs else None - ) - except KeyError as e: - raise pufferlib.APIUsageError(f"Missing render config: {e}") - - _VIEW_MODE_MAP = { - "sim_state": [RenderView.FULL_SIM_STATE], - "topdown": [RenderView.TOPDOWN_SIM], - "bev": [RenderView.BEV_AGENT_OBS], - "both": [RenderView.FULL_SIM_STATE, RenderView.BEV_AGENT_OBS], - "all": [RenderView.FULL_SIM_STATE, RenderView.BEV_AGENT_OBS, RenderView.TOPDOWN_SIM], - } - view_modes = _VIEW_MODE_MAP.get(view_mode_str) - if view_modes is None: - raise pufferlib.APIUsageError( - f"Unknown view_mode '{view_mode_str}'. Choose from: sim_state, topdown, bev, both, all" - ) - - bin_files = sorted(f for f in os.listdir(map_dir) if f.endswith(".bin")) - if num_maps > len(bin_files): - num_maps = len(bin_files) - render_maps = [os.path.join(map_dir, f) for f in bin_files[:num_maps]] - - os.makedirs(output_dir, exist_ok=True) - - configured_device = args["train"]["device"] - if configured_device == "cuda" and not torch.cuda.is_available(): - print("Warning: CUDA not available, falling back to CPU for render.") - configured_device = "cpu" - args["train"]["device"] = configured_device - device = configured_device - - _VIEW_SUFFIX = { - RenderView.FULL_SIM_STATE: "sim_state", - RenderView.BEV_AGENT_OBS: "bev", - RenderView.TOPDOWN_SIM: "topdown", - } - - def render_one_map(map_path): - map_name = os.path.splitext(os.path.basename(map_path))[0] - multi = len(view_modes) > 1 - - for view_mode in view_modes: - with tempfile.TemporaryDirectory() as tmp_map_dir: - tmp_bin = os.path.join(tmp_map_dir, os.path.basename(map_path)) - shutil.copy2(map_path, tmp_bin) - - env_overrides = { - **args["env"], - "num_maps": 1, - "map_dir": tmp_map_dir, - "render_mode": "headless", - } - if render_init_mode is not None: - env_overrides["init_mode"] = render_init_mode - if render_control_mode is not None: - env_overrides["control_mode"] = render_control_mode - - map_args = { - **args, - "env": env_overrides, - "vec": {"backend": "Serial", "num_envs": 1}, - } - - env = load_env(env_name, map_args) - policy = load_policy(map_args, env, env_name) - policy.eval() - - suffix = f"_{_VIEW_SUFFIX[view_mode]}" if multi else "" - before = set(_glob.glob(os.path.join(os.getcwd(), "*.mp4"))) - - rollout_loop( - policy=policy, - env=env, - device=device, - use_rnn=map_args["train"]["use_rnn"], - max_steps=max_frames, - render_ctx=RenderContext( - view_mode=view_mode, - env_id=0, - video_suffix=suffix, - ), - ) - - env.close() - - after = set(_glob.glob(os.path.join(os.getcwd(), "*.mp4"))) - new_mp4s = after - before - if new_mp4s: - for src in sorted(new_mp4s): - dst = os.path.join(output_dir, os.path.basename(src)) - shutil.move(src, dst) - print(f" Saved {dst}") - else: - print(f" Warning: no mp4 produced for {map_name} view={_VIEW_SUFFIX[view_mode]}") - - if render_maps: - print(f"Rendering {len(render_maps)} map(s) from {map_dir} → {output_dir} ...") - for map_path in render_maps: - render_one_map(map_path) - print(f"Done. Videos written to {output_dir}") - - def autotune(args=None, env_name=None, vecenv=None, policy=None): package = args["package"] module_name = "pufferlib.ocean" if package == "ocean" else f"pufferlib.environments.{package}" @@ -3095,7 +2964,7 @@ def puffer_type(value): def main(): - err = "Usage: puffer [train, eval, render, sweep, controlled_exp, autotune, profile, export] [env_name] [optional args]. --help for more info" + err = "Usage: puffer [train, eval, sweep, controlled_exp, autotune, profile, export] [env_name] [optional args]. --help for more info" if len(sys.argv) < 3: raise pufferlib.APIUsageError(err) @@ -3105,8 +2974,6 @@ def main(): train(env_name=env_name) elif mode == "eval": eval(env_name=env_name) - elif mode == "render": - render(env_name=env_name) elif mode == "eval_multi_scenarios": eval_multi_scenarios(env_name=env_name) elif mode == "eval_multi_scenarios_render": From 39f51bde377ed8453c698dd09b3b089ddd318001 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Thu, 23 Apr 2026 19:42:37 -0400 Subject: [PATCH 10/12] Fixing bugs and replay mode caching issues --- pufferlib/ocean/drive/binding.c | 11 +++++++++-- pufferlib/ocean/drive/drive.py | 14 ++++---------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 56651c71d..78d64724c 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -1772,7 +1772,13 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { } SharedMapData *shared = g_map_cache[map_id]; - // Count active agents using a lightweight temp env (no binary reload) + // Count active agents using a lightweight temp env (no binary reload). + // Shallow-copy the Agent structs so set_active_agents cannot mutate + // shared->template_agents (it writes active_agent and mark_as_expert). + // Do NOT call free_agent() on the copies — trajectory/route pointers + // inside still belong to shared and must not be freed here. + Agent *temp_agents = (Agent *)malloc(shared->num_total_agents * sizeof(Agent)); + memcpy(temp_agents, shared->template_agents, shared->num_total_agents * sizeof(Agent)); Drive temp_env = {0}; temp_env.init_mode = init_mode; temp_env.control_mode = control_mode; @@ -1780,7 +1786,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { temp_env.init_steps = init_steps; temp_env.num_max_agents = max_agents_per_env; temp_env.goal_radius = goal_radius; - temp_env.agents = shared->template_agents; + temp_env.agents = temp_agents; temp_env.num_total_agents = shared->num_total_agents; temp_env.grid_map = shared->grid_map; set_active_agents(&temp_env); @@ -1788,6 +1794,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { free(temp_env.active_agent_indices); free(temp_env.static_agent_indices); free(temp_env.expert_static_agent_indices); + free(temp_agents); // Skip map if it has no controllable agents if (active_count == 0) { diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 26f8e8585..43f9b3509 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -6,16 +6,9 @@ import struct import os import pufferlib -from enum import IntEnum from pufferlib.ocean.drive import binding -class RenderView(IntEnum): - FULL_SIM_STATE = 0 # Fixed bird's-eye-ish perspective (legacy default) - BEV_AGENT_OBS = 1 # Top-down ortho centered on ego agent at vision-range zoom - TOPDOWN_SIM = 2 # Full-map orthographic top-down - - def compute_effective_road_obs_count(max_count, dropout): if max_count <= 0: return 0 @@ -466,6 +459,7 @@ def step(self, actions): init_steps=self.init_steps, map_files=self.map_files, seed=self.random_seed, + goal_radius=self.goal_radius, min_agents_per_env=self.min_agents_per_env, max_agents_per_env=self.max_agents_per_env, num_eval_scenarios=self.current_num_eval_scenarios, # Use the dynamic size here @@ -591,10 +585,10 @@ def get_road_edge_polylines(self): return polylines - def render(self, view_mode=RenderView.FULL_SIM_STATE, env_id=0): - # view_mode: 0=default fixed perspective, 1=BEV ego-centered ortho, 2=full-map topdown. + def render(self, view_mode=0, env_id=0): + # view_mode: 0=default fixed perspective, 1=BEV ego-centered ortho. # See VIEW_MODE_* defines in pufferlib/ocean/drive/render.h. - binding.vec_render(self.c_envs, int(view_mode), env_id) + binding.vec_render(self.c_envs, view_mode, env_id) def set_video_suffix(self, suffix, env_id=0): # Append `suffix` to the next mp4 filename for the given env. From 5a9d0ab4c169ab7037f1f0b550433587af9b60e7 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Thu, 23 Apr 2026 20:39:55 -0400 Subject: [PATCH 11/12] Resetting replay mode renders --- pufferlib/ocean/drive/binding.c | 93 ++++++++++++++++----------------- 1 file changed, 46 insertions(+), 47 deletions(-) diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 78d64724c..2b6fb63d1 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -1738,7 +1738,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { return tuple; } - // REPLAY mode: use SharedMapData cache for agent counting (avoids per-env binary load) + // REPLAY mode - existing logic with max_agents_per_env cap int total_agent_count = 0; int map_id = 0; int env_count = 0; @@ -1759,63 +1759,62 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { if (eval_mode) { map_id = s_map_counter % num_maps; - s_map_counter += 1; + s_map_counter += 1; // This increments towards end_map_index } else { map_id = rand() % num_maps; } - // Lazily populate the shared map cache for this map_id - if (g_map_cache[map_id] == NULL) { - const char *map_file_path = PyUnicode_AsUTF8(PyList_GetItem(map_files, map_id)); - g_map_cache[map_id] = - create_shared_map_data(map_file_path, road_obs_front_dist, road_obs_behind_dist, road_obs_side_dist); - } - SharedMapData *shared = g_map_cache[map_id]; - - // Count active agents using a lightweight temp env (no binary reload). - // Shallow-copy the Agent structs so set_active_agents cannot mutate - // shared->template_agents (it writes active_agent and mark_as_expert). - // Do NOT call free_agent() on the copies — trajectory/route pointers - // inside still belong to shared and must not be freed here. - Agent *temp_agents = (Agent *)malloc(shared->num_total_agents * sizeof(Agent)); - memcpy(temp_agents, shared->template_agents, shared->num_total_agents * sizeof(Agent)); - Drive temp_env = {0}; - temp_env.init_mode = init_mode; - temp_env.control_mode = control_mode; - temp_env.simulation_mode = simulation_mode; - temp_env.init_steps = init_steps; - temp_env.num_max_agents = max_agents_per_env; - temp_env.goal_radius = goal_radius; - temp_env.agents = temp_agents; - temp_env.num_total_agents = shared->num_total_agents; - temp_env.grid_map = shared->grid_map; - set_active_agents(&temp_env); - int active_count = temp_env.active_agent_count; - free(temp_env.active_agent_indices); - free(temp_env.static_agent_indices); - free(temp_env.expert_static_agent_indices); - free(temp_agents); - - // Skip map if it has no controllable agents - if (active_count == 0) { + const char *map_file = PyUnicode_AsUTF8(PyList_GetItem(map_files, map_id)); + + Drive *env = calloc(1, sizeof(Drive)); + env->init_mode = init_mode; + env->control_mode = control_mode; + env->simulation_mode = simulation_mode; + env->init_steps = init_steps; + env->num_max_agents = max_agents_per_env; + env->goal_radius = goal_radius; + load_map_binary(map_file, env); + + set_active_agents(env); + + // Skip map if it doesn't contain any controllable agents + if (env->active_agent_count == 0) { maps_checked++; - if (maps_checked >= num_maps) { - Py_DECREF(agent_offsets); - Py_DECREF(map_ids); - char error_msg[256]; - snprintf(error_msg, sizeof(error_msg), - "No maps with controllable agents found after checking all %d maps.", num_maps); - PyErr_SetString(PyExc_ValueError, error_msg); - return NULL; - } + for (int j = 0; j < env->num_total_agents; j++) + free_agent(&env->agents[j]); + for (int j = 0; j < env->num_road_elements; j++) + free_road_element(&env->road_elements[j]); + for (int j = 0; j < env->num_traffic_elements; j++) + free_traffic_element(&env->traffic_elements[j]); + free(env->agents); + free(env->road_elements); + free(env->traffic_elements); + free(env->active_agent_indices); + free(env->static_agent_indices); + free(env->expert_static_agent_indices); + free(env); continue; } - // Store map_id and agent offset + // Store map_id PyList_SetItem(map_ids, env_count, PyLong_FromLong(map_id)); + // Store agent offset PyList_SetItem(agent_offsets, env_count, PyLong_FromLong(total_agent_count)); - total_agent_count += active_count; + total_agent_count += env->active_agent_count; env_count++; + for (int j = 0; j < env->num_total_agents; j++) + free_agent(&env->agents[j]); + for (int j = 0; j < env->num_road_elements; j++) + free_road_element(&env->road_elements[j]); + for (int j = 0; j < env->num_traffic_elements; j++) + free_traffic_element(&env->traffic_elements[j]); + free(env->agents); + free(env->road_elements); + free(env->traffic_elements); + free(env->active_agent_indices); + free(env->static_agent_indices); + free(env->expert_static_agent_indices); + free(env); } if (total_agent_count >= num_agents) { From 9be8462be52c548217ccee01c106beeffe516940 Mon Sep 17 00:00:00 2001 From: Aditya Gupta Date: Thu, 23 Apr 2026 21:18:35 -0400 Subject: [PATCH 12/12] Remove rollout.py --- pufferlib/ocean/drive/rollout.py | 105 ------------------------------- 1 file changed, 105 deletions(-) delete mode 100644 pufferlib/ocean/drive/rollout.py diff --git a/pufferlib/ocean/drive/rollout.py b/pufferlib/ocean/drive/rollout.py deleted file mode 100644 index 8de059fd5..000000000 --- a/pufferlib/ocean/drive/rollout.py +++ /dev/null @@ -1,105 +0,0 @@ -"""Shared rollout loop for Drive evaluation and rendering. - -Single source of truth for the forward-sample-step-break cycle. Used by: - - ``pufferl.render`` — offline batch rendering, one video per map - - ``eval_multi_scenarios_render`` — inline training render path - -Callers pass a ``RenderContext`` to turn on rendering; pass ``None`` for a -pure stats rollout. -""" - -from dataclasses import dataclass -from typing import Optional - -import numpy as np -import torch - -import pufferlib.pytorch - - -@dataclass -class RenderContext: - """Enables rendering inside :func:`rollout_loop`. - - Attributes: - view_mode: ``RenderView`` enum value passed to ``driver.render``. - env_id: which sub-env in the vecenv to record from (default 0). - video_suffix: appended to the mp4 filename; applied once before the - first render via ``set_video_suffix`` so multi-view rollouts don't - collide on output paths. - """ - - view_mode: int - env_id: int = 0 - video_suffix: str = "" - - -def rollout_loop( - policy, - env, - device, - use_rnn: bool, - max_steps: Optional[int] = None, - render_ctx: Optional[RenderContext] = None, - per_env_logs: bool = False, -): - """Run a single policy rollout in a Drive vecenv. - - Args: - policy: the policy to run. Caller is responsible for calling ``.eval()``. - env: a ``PufferEnv``-compatible vecenv wrapping one or more Drive sub-envs. - device: torch device for observation / state tensors. - use_rnn: whether to allocate and carry LSTM hidden state. - max_steps: loop iteration cap. Defaults to ``env.driver_env.episode_length``. - render_ctx: if set, render the specified env/view every step before - sampling actions. - per_env_logs: passed through to ``env.step`` for unaggregated per-env - logs (only supported on PufferEnv native backend). - - Returns: - The last ``info`` returned by ``env.step``. - """ - driver = env.driver_env - num_agents = env.observation_space.shape[0] - - if render_ctx is not None: - driver.set_video_suffix(render_ctx.video_suffix, env_id=render_ctx.env_id) - - obs, _ = env.reset() - - state = {} - if use_rnn: - state = dict( - lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), - lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), - ) - - if max_steps is None: - max_steps = driver.episode_length - - info = [] - for _ in range(max_steps): - if render_ctx is not None: - driver.render( - view_mode=render_ctx.view_mode, - env_id=render_ctx.env_id, - ) - - with torch.no_grad(): - ob_t = torch.as_tensor(obs).to(device) - logits, _ = policy.forward_eval(ob_t, state) - action, _, _ = pufferlib.pytorch.sample_logits(logits) - action_np = action.cpu().numpy().reshape(env.action_space.shape) - - if isinstance(logits, torch.distributions.Normal): - action_np = np.clip(action_np, env.action_space.low, env.action_space.high) - - if per_env_logs: - obs, _, _, truncs, info = env.step(action_np, per_env_logs=True) - else: - obs, _, _, truncs, info = env.step(action_np) - - if truncs.all(): - break - - return info