diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index e71db0260..5029ab197 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -207,6 +207,9 @@ multi_scenario_render_backend = egl ; Frequency of evaluation during training (in epochs) eval_interval = 25 num_agents = 512 +; Number of agents per environment in gigaflow eval mode +min_agents_per_env = 1 +max_agents_per_env = 1 ; Batch size for eval_multi_scenarios (number of scenarios per batch) ; Path to dataset used for evaluation map_dir = "pufferlib/resources/drive/binaries/carla_py123d" diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index b2a4c20b1..2b6fb63d1 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -1,3 +1,4 @@ +#include #include "drive.h" #define Env Drive #define MY_SHARED @@ -5,6 +6,62 @@ #define MY_GET #include "../env_binding.h" +// Process-local map cache: indexed by map_id, populated by my_shared(), used by my_init(). +static SharedMapData **g_map_cache = NULL; +static int g_map_cache_size = 0; +static pid_t g_map_cache_pid = 0; +// Cache key: params that affect SharedMapData content (obs dists determine vision_range) +static float g_cache_road_obs_front_dist = 0; +static float g_cache_road_obs_behind_dist = 0; +static float g_cache_road_obs_side_dist = 0; +static char **g_cache_map_paths = NULL; + +static void reset_cache_globals(void) { + g_map_cache = NULL; + g_map_cache_size = 0; + g_map_cache_pid = 0; + g_cache_road_obs_front_dist = 0; + g_cache_road_obs_behind_dist = 0; + g_cache_road_obs_side_dist = 0; + g_cache_map_paths = NULL; +} + +static void release_map_cache_internal(void) { + if (g_map_cache == NULL) + return; + // After fork, child inherits g_map_cache pointers via copy-on-write. + // We must NOT free them — they belong to the parent's address space. + // Discard them and let the child rebuild its own cache on the next call. + if (g_map_cache_pid != 0 && g_map_cache_pid != getpid()) { + reset_cache_globals(); + return; + } + // Entries with live refs are detached: c_close will free them when ref_count reaches 0. + // Entries with no live refs are freed immediately. + for (int i = 0; i < g_map_cache_size; i++) { + if (g_map_cache[i] == NULL) + continue; + if (g_map_cache[i]->ref_count > 0) + g_map_cache[i]->detached = 1; + else + free_shared_map_data(g_map_cache[i]); + } + free(g_map_cache); + if (g_cache_map_paths != NULL) { + for (int i = 0; i < g_map_cache_size; i++) + free(g_cache_map_paths[i]); + free(g_cache_map_paths); + } + reset_cache_globals(); +} + +static PyObject *release_map_cache_py(PyObject *self __attribute__((unused)), PyObject *args __attribute__((unused))) { + release_map_cache_internal(); + Py_RETURN_NONE; +} + +#define MY_METHODS {"release_map_cache", release_map_cache_py, METH_VARARGS, "Release the shared map data cache"} + static int my_put(Env *env, PyObject *args, PyObject *kwargs) { PyObject *obs = PyDict_GetItemString(kwargs, "observations"); if (!PyObject_TypeCheck(obs, &PyArray_Type)) { @@ -1544,6 +1601,9 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { int max_agents_per_env = unpack(kwargs, "max_agents_per_env"); float goal_radius = (float)unpack(kwargs, "goal_radius"); int num_eval_scenarios = unpack(kwargs, "num_eval_scenarios"); + float road_obs_front_dist = (float)unpack(kwargs, "road_obs_front_dist"); + float road_obs_behind_dist = (float)unpack(kwargs, "road_obs_behind_dist"); + float road_obs_side_dist = (float)unpack(kwargs, "road_obs_side_dist"); if (min_agents_per_env <= 0 || max_agents_per_env <= 0) { PyErr_SetString(PyExc_ValueError, "min_agents_per_env and max_agents_per_env must be > 0"); return NULL; @@ -1559,7 +1619,38 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { srand(seed); - // GIGAFLOW mode: use random sampling for agent counts per env + // Reuse the existing cache if this process created it with matching config. + // Cache key: PID, num_maps, map file paths, obs dist params (determine vision_range). + int reuse_cache = + (g_map_cache != NULL && g_map_cache_pid == getpid() && g_map_cache_size == num_maps && + g_cache_road_obs_front_dist == road_obs_front_dist && g_cache_road_obs_behind_dist == road_obs_behind_dist && + g_cache_road_obs_side_dist == road_obs_side_dist); + if (reuse_cache && g_cache_map_paths != NULL) { + for (int i = 0; i < num_maps; i++) { + const char *path = PyUnicode_AsUTF8(PyList_GetItem(map_files, i)); + if (g_cache_map_paths[i] == NULL || strcmp(g_cache_map_paths[i], path) != 0) { + reuse_cache = 0; + break; + } + } + } + if (!reuse_cache) { + release_map_cache_internal(); + g_map_cache_size = num_maps; + g_map_cache = (SharedMapData **)calloc(num_maps, sizeof(SharedMapData *)); + g_map_cache_pid = getpid(); + g_cache_road_obs_front_dist = road_obs_front_dist; + g_cache_road_obs_behind_dist = road_obs_behind_dist; + g_cache_road_obs_side_dist = road_obs_side_dist; + g_cache_map_paths = (char **)calloc(num_maps, sizeof(char *)); + for (int i = 0; i < num_maps; i++) { + const char *path = PyUnicode_AsUTF8(PyList_GetItem(map_files, i)); + g_cache_map_paths[i] = strdup(path); + } + } + + // GIGAFLOW mode: agent counts are numeric, no binary loading needed for counting. + // We do lazily populate the cache so my_init can call init_from_shared. if (simulation_mode == SIMULATION_GIGAFLOW) { if (eval_mode) { // Eval mode: fixed agent count, sequential map cycling @@ -1573,10 +1664,17 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { int offset = 0; for (int i = 0; i < env_count; i++) { + int map_id_g = (s_map_counter + i) % num_maps; PyList_SetItem(agent_offsets, i, PyLong_FromLong(offset)); - PyList_SetItem(map_ids_list, i, PyLong_FromLong((s_map_counter + i) % num_maps)); + PyList_SetItem(map_ids_list, i, PyLong_FromLong(map_id_g)); int remaining = num_agents - offset; offset += (remaining < agents_per_env) ? remaining : agents_per_env; + // Lazily populate cache for assigned map + if (g_map_cache[map_id_g] == NULL) { + const char *map_file_path = PyUnicode_AsUTF8(PyList_GetItem(map_files, map_id_g)); + g_map_cache[map_id_g] = create_shared_map_data(map_file_path, road_obs_front_dist, + road_obs_behind_dist, road_obs_side_dist); + } } PyList_SetItem(agent_offsets, env_count, PyLong_FromLong(offset)); @@ -1597,23 +1695,13 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { if (remaining <= max_agents_per_env) { count = remaining; } else { - // 1. We must leave at least min_agents_per_env for the future. int absolute_max_allowed = remaining - min_agents_per_env; - - // 2. We cannot take more than max_agents_per_env right now. int current_upper_bound = (absolute_max_allowed < max_agents_per_env) ? absolute_max_allowed : max_agents_per_env; - - // 3. We must take at least min_agents_per_env right now. int current_lower_bound = min_agents_per_env; - - // Safety check: if constraints are tight, lower might equal upper. - // If absolute_max_allowed < min_lower_bound for example leading to - // current_upper_bound < current_lower_bound if (current_upper_bound <= current_lower_bound) { count = current_lower_bound; } else { - // Now the range is guaranteed to be positive. int range = current_upper_bound - current_lower_bound + 1; count = current_lower_bound + (rand() % range); } @@ -1628,9 +1716,16 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { int offset = 0; for (int i = 0; i < env_count; i++) { + int map_id_g = rand() % num_maps; PyList_SetItem(agent_offsets, i, PyLong_FromLong(offset)); - PyList_SetItem(map_ids_list, i, PyLong_FromLong(rand() % num_maps)); + PyList_SetItem(map_ids_list, i, PyLong_FromLong(map_id_g)); offset += agent_counts[i]; + // Lazily populate cache for assigned map + if (g_map_cache[map_id_g] == NULL) { + const char *map_file_path = PyUnicode_AsUTF8(PyList_GetItem(map_files, map_id_g)); + g_map_cache[map_id_g] = create_shared_map_data(map_file_path, road_obs_front_dist, road_obs_behind_dist, + road_obs_side_dist); + } } PyList_SetItem(agent_offsets, env_count, PyLong_FromLong(num_agents)); @@ -1810,6 +1905,19 @@ static int my_init(Env *env, PyObject *args, PyObject *kwargs) { env->phantom_braking_trigger_prob = (float)unpack(kwargs, "phantom_braking_trigger_prob"); env->phantom_braking_duration = (int)unpack(kwargs, "phantom_braking_duration"); + // Use shared map cache if map_id is provided and cache entry exists + PyObject *map_id_obj = kwargs ? PyDict_GetItemString(kwargs, "map_id") : NULL; + if (map_id_obj != NULL && g_map_cache != NULL) { + int map_id = (int)PyLong_AsLong(map_id_obj); + if (map_id >= 0 && map_id < g_map_cache_size && g_map_cache[map_id] != NULL) { + init_from_shared(env, g_map_cache[map_id]); + return 0; + } + // Cache miss: warn and fall through to disk loading + fprintf(stderr, "WARNING: map_id=%d provided but shared map cache miss — loading from disk\n", map_id); + } + + // Fallback: load map from disk (standalone use, tests, or cache miss) init(env); return 0; } diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index af51ec260..09c594678 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -248,6 +248,49 @@ struct GridMap { int num_drivable_grid_cell; }; +// Shared map data: loaded once per unique map file, referenced by multiple Drive envs. +// Road geometry, grid map, and spatial index are read-only after create_shared_map_data(). +// Reference-counted: freed by free_shared_map_data() when no longer needed. +typedef struct SharedMapData SharedMapData; +struct SharedMapData { + // Road geometry (read-only, never modified after load) + RoadMapElement *road_elements; + int num_road_elements; + + // Spatial index (read-only after init_grid_map + cache_neighbor_offsets) + GridMap *grid_map; + int *neighbor_offsets; + + // Lane graph (read-only after load) + struct LaneGraph lane_graph; + + // Scenario metadata + char scenario_id[128]; + char dataset_name[32]; + int log_length; + float log_dt; + float world_mean_x; + float world_mean_y; + + // Template traffic elements (always cloned per-env so GIGAFLOW can rewrite states) + TrafficControlElement *template_traffic_elements; + int num_traffic_elements; + + // Template agents (deep-cloned per-env for REPLAY; unused placeholder for GIGAFLOW) + Agent *template_agents; + int num_total_agents; + int num_objects; + + // Small per-scenario arrays (copied into each env, not shared) + int *objects_of_interest; + int num_objects_of_interest; + int *tracks_to_predict; + int num_tracks_to_predict; + + int ref_count; + int detached; // set when released from g_map_cache while refs still live; c_close frees when ref_count reaches 0 +}; + struct Drive { Client *client; // Render mode: RENDER_WINDOW (0) for interactive viewer, RENDER_HEADLESS (1) @@ -367,6 +410,9 @@ struct Drive { float road_obs_front_dist; float road_obs_behind_dist; float road_obs_side_dist; + + // NULL if this env owns its map data; non-NULL if using shared map cache. + SharedMapData *shared_map; }; // ======================================== @@ -3288,9 +3334,266 @@ void remove_bad_trajectories(Drive *env) { env->timestep = 0; } +// ======================================== +// Shared Map Cache Functions +// ======================================== + +void free_shared_map_data(SharedMapData *shared) { + if (shared == NULL) + return; + + for (int i = 0; i < shared->num_road_elements; i++) + free_road_element(&shared->road_elements[i]); + free(shared->road_elements); + + for (int i = 0; i < shared->num_traffic_elements; i++) + free_traffic_element(&shared->template_traffic_elements[i]); + free(shared->template_traffic_elements); + + int grid_cell_count = shared->grid_map->grid_cols * shared->grid_map->grid_rows; + for (int i = 0; i < grid_cell_count; i++) + free(shared->grid_map->cells[i]); + free(shared->grid_map->cells); + free(shared->grid_map->cell_entities_count); + free(shared->grid_map->grid_index_drivable); + free(shared->neighbor_offsets); + for (int i = 0; i < grid_cell_count; i++) + free(shared->grid_map->neighbor_cache_entities[i]); + free(shared->grid_map->neighbor_cache_entities); + free(shared->grid_map->neighbor_cache_count); + free(shared->grid_map); + + free_lane_graph(&shared->lane_graph); + + for (int i = 0; i < shared->num_total_agents; i++) + free_agent(&shared->template_agents[i]); + free(shared->template_agents); + + free(shared->objects_of_interest); + free(shared->tracks_to_predict); + + free(shared); +} + +SharedMapData *create_shared_map_data(const char *map_file_path, float road_obs_front_dist, float road_obs_behind_dist, + float road_obs_side_dist) { + SharedMapData *shared = (SharedMapData *)calloc(1, sizeof(SharedMapData)); + + Drive temp = {0}; + temp.road_obs_front_dist = road_obs_front_dist; + temp.road_obs_behind_dist = road_obs_behind_dist; + temp.road_obs_side_dist = road_obs_side_dist; + + load_map_binary(map_file_path, &temp); + + // Transfer ownership from temp to shared + shared->road_elements = temp.road_elements; + shared->num_road_elements = temp.num_road_elements; + shared->template_traffic_elements = temp.traffic_elements; + shared->num_traffic_elements = temp.num_traffic_elements; + shared->template_agents = temp.agents; + shared->num_total_agents = temp.num_total_agents; + shared->num_objects = temp.num_objects; + shared->lane_graph = temp.lane_graph; + memcpy(shared->scenario_id, temp.scenario_id, sizeof(temp.scenario_id)); + memcpy(shared->dataset_name, temp.dataset_name, sizeof(temp.dataset_name)); + shared->log_length = temp.log_length; + shared->log_dt = temp.log_dt; + shared->world_mean_x = temp.world_mean_x; + shared->world_mean_y = temp.world_mean_y; + shared->objects_of_interest = temp.objects_of_interest; + shared->num_objects_of_interest = temp.num_objects_of_interest; + shared->tracks_to_predict = temp.tracks_to_predict; + shared->num_tracks_to_predict = temp.num_tracks_to_predict; + + // Build grid map and spatial index (temp.road_elements is now shared->road_elements) + init_grid_map(&temp); + int vision_half_range = + (int)ceilf(fmaxf(fmaxf(road_obs_front_dist, road_obs_behind_dist), road_obs_side_dist) / GRID_CELL_SIZE); + temp.grid_map->vision_range = 2 * vision_half_range + 1; + init_neighbor_offsets(&temp); + cache_neighbor_offsets(&temp); + shared->grid_map = temp.grid_map; + shared->neighbor_offsets = temp.neighbor_offsets; + + shared->ref_count = 0; + return shared; +} + +void init_from_shared(Drive *env, SharedMapData *shared) { + env->human_agent_idx = 0; + env->timestep = 0; + env->shared_map = shared; + shared->ref_count++; + + // Point to shared read-only map data + env->road_elements = shared->road_elements; + env->num_road_elements = shared->num_road_elements; + env->grid_map = shared->grid_map; + env->neighbor_offsets = shared->neighbor_offsets; + env->lane_graph = shared->lane_graph; // struct copy; pointers owned by SharedMapData + env->world_mean_x = shared->world_mean_x; + env->world_mean_y = shared->world_mean_y; + env->log_length = shared->log_length; + env->log_dt = shared->log_dt; + memcpy(env->scenario_id, shared->scenario_id, sizeof(env->scenario_id)); + memcpy(env->dataset_name, shared->dataset_name, sizeof(env->dataset_name)); + env->num_total_agents = shared->num_total_agents; + env->num_objects = shared->num_objects; + + // Deep-copy small per-scenario arrays so c_close can always free them + if (shared->num_objects_of_interest > 0) { + env->objects_of_interest = (int *)malloc(shared->num_objects_of_interest * sizeof(int)); + memcpy(env->objects_of_interest, shared->objects_of_interest, shared->num_objects_of_interest * sizeof(int)); + } else { + env->objects_of_interest = NULL; + } + env->num_objects_of_interest = shared->num_objects_of_interest; + + if (shared->num_tracks_to_predict > 0) { + env->tracks_to_predict = (int *)malloc(shared->num_tracks_to_predict * sizeof(int)); + memcpy(env->tracks_to_predict, shared->tracks_to_predict, shared->num_tracks_to_predict * sizeof(int)); + } else { + env->tracks_to_predict = NULL; + } + env->num_tracks_to_predict = shared->num_tracks_to_predict; + + // Clone traffic elements per-env (GIGAFLOW rewrites states; REPLAY just reads them) + env->num_traffic_elements = shared->num_traffic_elements; + if (shared->num_traffic_elements > 0) { + env->traffic_elements = + (TrafficControlElement *)calloc(shared->num_traffic_elements, sizeof(TrafficControlElement)); + for (int i = 0; i < shared->num_traffic_elements; i++) { + TrafficControlElement *src = &shared->template_traffic_elements[i]; + TrafficControlElement *dst = &env->traffic_elements[i]; + *dst = *src; + if (src->num_controlled_lanes > 0) { + dst->controlled_lanes = (int *)malloc(src->num_controlled_lanes * sizeof(int)); + memcpy(dst->controlled_lanes, src->controlled_lanes, src->num_controlled_lanes * sizeof(int)); + } else { + dst->controlled_lanes = NULL; + } + if (src->state_length > 0) { + dst->states = (int *)malloc(src->state_length * sizeof(int)); + memcpy(dst->states, src->states, src->state_length * sizeof(int)); + } else { + dst->states = NULL; + } + } + } else { + env->traffic_elements = NULL; + } + + // Clone agents for REPLAY (each env needs independent mutable sim state). + // For GIGAFLOW, set_active_agents will allocate agents from scratch via spawn_agent. + if (env->simulation_mode == SIMULATION_REPLAY && shared->num_total_agents > 0) { + env->agents = (Agent *)calloc(shared->num_total_agents, sizeof(Agent)); + for (int i = 0; i < shared->num_total_agents; i++) { + Agent *src = &shared->template_agents[i]; + Agent *dst = &env->agents[i]; + *dst = *src; + int tlen = src->trajectory_length; + dst->log_trajectory_x = (float *)malloc(tlen * sizeof(float)); + dst->log_trajectory_y = (float *)malloc(tlen * sizeof(float)); + dst->log_trajectory_z = (float *)malloc(tlen * sizeof(float)); + dst->log_heading = (float *)malloc(tlen * sizeof(float)); + dst->log_velocity_x = (float *)malloc(tlen * sizeof(float)); + dst->log_velocity_y = (float *)malloc(tlen * sizeof(float)); + dst->log_length = (float *)malloc(tlen * sizeof(float)); + dst->log_width = (float *)malloc(tlen * sizeof(float)); + dst->log_height = (float *)malloc(tlen * sizeof(float)); + dst->log_valid = (int *)malloc(tlen * sizeof(int)); + memcpy(dst->log_trajectory_x, src->log_trajectory_x, tlen * sizeof(float)); + memcpy(dst->log_trajectory_y, src->log_trajectory_y, tlen * sizeof(float)); + memcpy(dst->log_trajectory_z, src->log_trajectory_z, tlen * sizeof(float)); + memcpy(dst->log_heading, src->log_heading, tlen * sizeof(float)); + memcpy(dst->log_velocity_x, src->log_velocity_x, tlen * sizeof(float)); + memcpy(dst->log_velocity_y, src->log_velocity_y, tlen * sizeof(float)); + memcpy(dst->log_length, src->log_length, tlen * sizeof(float)); + memcpy(dst->log_width, src->log_width, tlen * sizeof(float)); + memcpy(dst->log_height, src->log_height, tlen * sizeof(float)); + memcpy(dst->log_valid, src->log_valid, tlen * sizeof(int)); + if (src->route_length > 0) { + dst->route = (int *)malloc(src->route_length * sizeof(int)); + memcpy(dst->route, src->route, src->route_length * sizeof(int)); + } else { + dst->route = NULL; + } + dst->path = NULL; // path is computed at runtime, not from template + } + } else { + env->agents = NULL; + env->num_total_agents = 0; + } + + // Per-env observation flag (depends on per-env obs segment count params) + env->road_dropout_enabled = (env->obs_lane_segment_count < env->max_lane_segment_observations) || + (env->obs_boundary_segment_count < env->max_boundary_segment_observations); + + // Run per-env init logic (agent selection, start positions, etc.) + env->logs_capacity = 0; + set_active_agents(env); + env->logs_capacity = env->active_agent_count; + if (env->simulation_mode == SIMULATION_REPLAY) { + remove_bad_trajectories(env); + } + set_start_position(env); + if (env->simulation_mode == SIMULATION_GIGAFLOW) { + int steps = env->scenario_length; + if (steps > 0) { + for (int i = 0; i < env->num_traffic_elements; i++) { + TrafficControlElement *traffic = &env->traffic_elements[i]; + if (traffic->type != TRAFFIC_CONTROL_TYPE_TRAFFIC_LIGHT) + continue; + if (traffic->states && traffic->state_length != steps) { + free(traffic->states); + traffic->states = NULL; + } + if (traffic->states == NULL) { + traffic->states = (int *)malloc(steps * sizeof(int)); + if (traffic->states == NULL) { + traffic->state_length = 0; + continue; + } + } + traffic->state_length = steps; + } + } + generate_traffic_light_states(env); + } + env->logs = (Log *)calloc(env->active_agent_count, sizeof(Log)); + + if (env->simulation_mode == SIMULATION_REPLAY) { + for (int i = 0; i < env->active_agent_count; i++) { + int agent_idx = env->active_agent_indices[i]; + Agent *agent = &env->agents[agent_idx]; + int start = env->init_steps > 0 ? env->init_steps : 0; + int remaining = agent->trajectory_length - 1 - start; + if (remaining < 1) + remaining = 1; + int num_wp = env->num_target_waypoints; + if (num_wp > MAX_TARGET_WAYPOINTS) + num_wp = MAX_TARGET_WAYPOINTS; + for (int g = 0; g < num_wp; g++) { + int t = start + (g + 1) * remaining / num_wp; + if (t >= agent->trajectory_length) + t = agent->trajectory_length - 1; + agent->goal_positions_x[g] = agent->log_trajectory_x[t]; + agent->goal_positions_y[g] = agent->log_trajectory_y[t]; + agent->goal_positions_z[g] = agent->log_trajectory_z[t]; + } + agent->current_goal_idx = 0; + agent->goal_position_x = agent->goal_positions_x[0]; + agent->goal_position_y = agent->goal_positions_y[0]; + agent->goal_position_z = agent->goal_positions_z[0]; + } + } +} + void init(Drive *env) { env->human_agent_idx = 0; env->timestep = 0; + env->shared_map = NULL; load_map_binary(env->map_name, env); env->road_dropout_enabled = (env->obs_lane_segment_count < env->max_lane_segment_observations) || (env->obs_boundary_segment_count < env->max_boundary_segment_observations); @@ -3365,38 +3668,53 @@ void init(Drive *env) { } void c_close(Drive *env) { + // Per-env agent data (always owned by the env) for (int i = 0; i < env->num_total_agents; i++) free_agent(&env->agents[i]); - for (int i = 0; i < env->num_road_elements; i++) - free_road_element(&env->road_elements[i]); + free(env->agents); + + // Traffic elements are always cloned per-env (see init_from_shared) for (int i = 0; i < env->num_traffic_elements; i++) free_traffic_element(&env->traffic_elements[i]); - free(env->agents); - free(env->road_elements); free(env->traffic_elements); - free(env->active_agent_indices); - free(env->logs); - // GridMap cleanup - int grid_cell_count = env->grid_map->grid_cols * env->grid_map->grid_rows; - for (int grid_index = 0; grid_index < grid_cell_count; grid_index++) { - free(env->grid_map->cells[grid_index]); - } - free(env->grid_map->cells); - free(env->grid_map->cell_entities_count); - free(env->grid_map->grid_index_drivable); - free(env->neighbor_offsets); - for (int i = 0; i < grid_cell_count; i++) { - free(env->grid_map->neighbor_cache_entities[i]); - } - free(env->grid_map->neighbor_cache_entities); - free(env->grid_map->neighbor_cache_count); - free(env->grid_map); + free(env->active_agent_indices); free(env->static_agent_indices); free(env->expert_static_agent_indices); + + // objects_of_interest and tracks_to_predict are always per-env copies free(env->objects_of_interest); free(env->tracks_to_predict); - free_lane_graph(&env->lane_graph); + free(env->logs); + + if (env->shared_map != NULL) { + // road_elements, grid_map, neighbor_offsets, lane_graph are owned by SharedMapData + env->shared_map->ref_count--; + if (env->shared_map->ref_count == 0 && env->shared_map->detached) + free_shared_map_data(env->shared_map); + env->shared_map = NULL; + } else { + // This env owns all map data — free it + for (int i = 0; i < env->num_road_elements; i++) + free_road_element(&env->road_elements[i]); + free(env->road_elements); + + int grid_cell_count = env->grid_map->grid_cols * env->grid_map->grid_rows; + for (int grid_index = 0; grid_index < grid_cell_count; grid_index++) + free(env->grid_map->cells[grid_index]); + free(env->grid_map->cells); + free(env->grid_map->cell_entities_count); + free(env->grid_map->grid_index_drivable); + free(env->neighbor_offsets); + for (int i = 0; i < grid_cell_count; i++) + free(env->grid_map->neighbor_cache_entities[i]); + free(env->grid_map->neighbor_cache_entities); + free(env->grid_map->neighbor_cache_count); + free(env->grid_map); + + free_lane_graph(&env->lane_graph); + } + free(env->map_name); free(env->ini_file); } diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 7dc537ca5..43f9b3509 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -305,6 +305,9 @@ def __init__( max_agents_per_env=self.max_agents_per_env, num_eval_scenarios=self.current_num_eval_scenarios, # Use the dynamic size here goal_radius=self.goal_radius, + road_obs_front_dist=self.road_obs_front_dist, + road_obs_behind_dist=self.road_obs_behind_dist, + road_obs_side_dist=self.road_obs_side_dist, ) # In eval mode, don't wrap counter - allows termination condition to work correctly self.starting_map_counter = self.starting_map_counter + num_envs @@ -327,6 +330,7 @@ def __init__( self.masks[cur:nxt], self.random_seed, **self._env_init_kwargs(self.map_files[map_ids[i]], nxt - cur), + map_id=map_ids[i], ) env_ids.append(env_id) @@ -455,9 +459,13 @@ def step(self, actions): init_steps=self.init_steps, map_files=self.map_files, seed=self.random_seed, + goal_radius=self.goal_radius, min_agents_per_env=self.min_agents_per_env, max_agents_per_env=self.max_agents_per_env, num_eval_scenarios=self.current_num_eval_scenarios, # Use the dynamic size here + road_obs_front_dist=self.road_obs_front_dist, + road_obs_behind_dist=self.road_obs_behind_dist, + road_obs_side_dist=self.road_obs_side_dist, ) # In eval mode, don't wrap counter - allows termination condition to work correctly @@ -474,6 +482,7 @@ def step(self, actions): self.truncations[cur:nxt], self.masks[cur:nxt], self.random_seed, + map_id=map_ids[i], **self._env_init_kwargs(self.map_files[map_ids[i]], nxt - cur), ) env_ids.append(env_id) @@ -576,21 +585,21 @@ def get_road_edge_polylines(self): return polylines - def render(self, env_idx=0, view_mode=0): + def render(self, view_mode=0, env_id=0): # view_mode: 0=default fixed perspective, 1=BEV ego-centered ortho. # See VIEW_MODE_* defines in pufferlib/ocean/drive/render.h. - binding.vec_render(self.c_envs, view_mode, env_idx) + binding.vec_render(self.c_envs, view_mode, env_id) - def set_video_suffix(self, suffix, env_idx=0): + def set_video_suffix(self, suffix, env_id=0): # Append `suffix` to the next mp4 filename for the given env. # Must be called BEFORE the first render of a rollout because # make_client reads env->video_suffix when forking ffmpeg. - binding.vec_set_video_suffix(self.c_envs, suffix, env_idx) + binding.vec_set_video_suffix(self.c_envs, suffix, env_id) - def close_client(self, env_idx=0): + def close_client(self, env_id=0): # Tear down the render Client for one env without destroying the env. # Flushes ffmpeg + PBOs on the headless path so the mp4 is fully written. - binding.vec_close_client(self.c_envs, env_idx) + binding.vec_close_client(self.c_envs, env_id) def close(self): binding.vec_close(self.c_envs) diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 33d1f438e..7f36fe271 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -144,9 +144,6 @@ def __init__(self, config, vecenv, policy, logger=None): self.render = config["render"] self.render_interval = config["render_interval"] - if self.render: - ensure_drive_binary() - # LSTM if config["use_rnn"]: n = vecenv.agents_per_batch @@ -430,32 +427,65 @@ def train(self): self.msg = f"Checkpoint saved at update {self.epoch}" if self.render and self.epoch % self.render_interval == 0: - model_dir = os.path.join(self.config["data_dir"], f"{self.config['env']}_{self.logger.run_id}") - model_files = glob.glob(os.path.join(model_dir, "models", "model_*.pt")) - - if model_files: - # Take the latest checkpoint - latest_cpt = max(model_files, key=os.path.getctime) - bin_path = f"{model_dir}.bin" - - # Export to .bin for rendering with raylib + render_simulation_mode = self.config["eval"].get("multi_scenario_simulation_mode", "gigaflow") + num_agents_render = self.config["eval"]["num_agents"] + render_map_dir = self.config["eval"]["map_dir"] + render_overrides = build_eval_overrides( + simulation_mode=render_simulation_mode, + num_agents=num_agents_render, + num_scenarios=self.config["eval"].get("multi_scenario_num_scenarios", 4), + map_dir=render_map_dir, + num_carla_maps=self.config["env_config"].get("num_maps", 8), + min_agents_per_env=self.config["eval"].get("min_agents_per_env", 50), + max_agents_per_env=self.config["eval"].get("max_agents_per_env", 50), + ) + render_args = load_eval_multi_scenarios_config( + env_name=self.config["env"], + model_path=None, + eval_overrides=render_overrides, + ) + experiment_name = f"{self.config['env']}_{self.logger.run_id}" + render_args["global_step"] = self.global_step + render_args["num_scenarios"] = self.config["eval"].get("multi_scenario_num_scenarios", 4) + render_args["eval_simulation"] = render_simulation_mode + render_args["render"] = True + render_args["render_obs"] = False + render_args["inline_eval"] = True + render_args["load_model_path"] = os.path.join( + self.config["data_dir"], experiment_name, "models", f"inline_epoch_{self.epoch}.pt" + ) + render_args["eval_results_dir"] = os.path.join( + self.config["data_dir"], + experiment_name, + "renders", + f"epoch_{self.epoch:08d}", + ) + backend_name = self.config["eval"].get("multi_scenario_render_backend", "egl") + _bev_views = ( + [(0, "", "sim_state"), (1, "_bev", "bev")] if backend_name == "egl" else [(0, "", "sim_state")] + ) + for _vmode, _vsuffix, _vlabel in _bev_views: try: - export_args = {"env_name": self.config["env"], "load_model_path": latest_cpt, **self.config} - - export( - args=export_args, + eval_multi_scenarios_render( env_name=self.config["env"], - vecenv=self.vecenv, + args=dict(render_args), + vecenv=None, policy=self.uncompiled_policy, - path=bin_path, - silent=True, - ) - pufferlib.utils.render_videos( - self.config, self.vecenv, self.logger, self.epoch, self.global_step, bin_path + logger=self.logger, + metric_prefix=f"render_{_vlabel}", + quiet=True, + render_backend=backend_name, + view_mode=_vmode, + video_suffix=_vsuffix, + log_view_label=_vlabel, + render_max_steps=(self.config["eval"].get("render_max_steps", 50) or None), ) - except Exception as e: - print(f"Failed to export model weights: {e}") + import traceback + + print(f"\n⚠️ render failed (view={_vlabel}) at epoch {self.epoch}: {type(e).__name__}: {e}") + traceback.print_exc() + print("Training continues.") if self.config["eval"]["wosac_realism_eval"] and ( self.epoch % self.config["eval"]["eval_interval"] == 0 or done_training @@ -507,7 +537,9 @@ def train(self): num_agents=num_agents_eval, num_scenarios=self.config["eval"]["multi_scenario_num_scenarios"], map_dir=map_dir, - num_carla_maps=self.config["eval"].get("num_carla_maps", 8), + num_carla_maps=self.config["env_config"].get("num_maps", 8), + min_agents_per_env=self.config["eval"].get("min_agents_per_env", 50), + max_agents_per_env=self.config["eval"].get("max_agents_per_env", 50), ) # Build eval args by applying overrides to training config @@ -565,7 +597,9 @@ def train(self): num_agents=num_agents_render, num_scenarios=self.config["eval"]["multi_scenario_num_scenarios"], map_dir=render_map_dir, - num_carla_maps=self.config["eval"].get("num_carla_maps", 8), + num_carla_maps=self.config["env_config"].get("num_maps", 8), + min_agents_per_env=self.config["eval"].get("min_agents_per_env", 50), + max_agents_per_env=self.config["eval"].get("max_agents_per_env", 50), ) render_args = load_eval_multi_scenarios_config( @@ -1607,6 +1641,7 @@ def train(env_name, args=None, vecenv=None, policy=None, logger=None, early_stop train_config = dict( **args["train"], env=env_name, + env_config=args.get("env", {}), eval=args.get("eval", {}), driving_behaviours_eval=args.get("driving_behaviours_eval"), ) @@ -1864,13 +1899,23 @@ def load_eval_multi_scenarios_config(env_name, model_path=None, eval_overrides=N return args -def build_eval_overrides(simulation_mode, num_agents, num_scenarios, map_dir=None, num_carla_maps=8): +def build_eval_overrides( + simulation_mode, + num_agents, + num_scenarios, + map_dir=None, + num_carla_maps=8, + min_agents_per_env=50, + max_agents_per_env=50, +): """Build evaluation overrides for a given simulation mode. Args: simulation_mode: "gigaflow" or "replay" num_agents: agent slot budget for evaluation map_dir: replay dataset directory, required for replay mode + min_agents_per_env: minimum agents per env (gigaflow only) + max_agents_per_env: maximum agents per env (gigaflow only) """ # Common reward coefficients (same for both modes) common_env = { @@ -1903,8 +1948,8 @@ def build_eval_overrides(simulation_mode, num_agents, num_scenarios, map_dir=Non "env": { **common_env, "simulation_mode": "gigaflow", - "min_agents_per_env": 50, - "max_agents_per_env": 50, + "min_agents_per_env": min_agents_per_env, + "max_agents_per_env": max_agents_per_env, "resample_frequency": 3000, "scenario_length": 3000, # Point at the py123d-converted CARLA towns added to this branch. @@ -2103,6 +2148,8 @@ def eval_multi_scenarios( num_scenarios=tmp_args["num_scenarios"], map_dir=map_dir, num_carla_maps=tmp_args.get("num_carla_maps", 8), + min_agents_per_env=tmp_args["eval"].get("min_agents_per_env", 50), + max_agents_per_env=tmp_args["eval"].get("max_agents_per_env", 50), ) args = load_eval_multi_scenarios_config(env_name, model_path, eval_overrides) @@ -2268,6 +2315,8 @@ def eval_multi_scenarios_render( num_scenarios=tmp_args["num_scenarios"], map_dir=map_dir, num_carla_maps=tmp_args.get("num_carla_maps", 8), + min_agents_per_env=tmp_args["eval"].get("min_agents_per_env", 50), + max_agents_per_env=tmp_args["eval"].get("max_agents_per_env", 50), ) args = load_eval_multi_scenarios_config(env_name, model_path, eval_overrides) @@ -2356,7 +2405,7 @@ def eval_multi_scenarios_render( _internal_num_envs = getattr(_target_env_pre, "num_envs", 1) for _e in range(_internal_num_envs): try: - _target_env_pre.set_video_suffix(video_suffix, env_idx=_e) + _target_env_pre.set_video_suffix(video_suffix, env_id=_e) except Exception: break @@ -2365,152 +2414,164 @@ def eval_multi_scenarios_render( # Serial/Multiprocessing: need vecenv.envs[0] to reach the underlying env. target_env = vecenv if not hasattr(vecenv, "envs") else vecenv.envs[0] - with tqdm(total=num_scenarios, desc="Processing scenarios", disable=quiet) as pbar: - while scenarios_processed < num_scenarios: - ob, _ = vecenv.reset() + # Wrap rollout + post-processing in try/finally so vecenv.close() is + # guaranteed to run even if an exception fires mid-rollout. + # Without this, a crash inside the step loop leaves Drive C envs alive + # (ref_count > 0), which causes the next binding.shared() call with a + # different map_dir to raise: + # RuntimeError: Cannot change map cache config while Drive environments + # are still open. Call close() on all Drive instances first. + _rollout_exc = None + try: + with tqdm(total=num_scenarios, desc="Processing scenarios", disable=quiet) as pbar: + while scenarios_processed < num_scenarios: + ob, _ = vecenv.reset() - # Get initial states for all environments in the batch - scenarios = vecenv.get_state() - num_envs_in_batch = len(scenarios) - batch_start = scenarios_processed + # Get initial states for all environments in the batch + scenarios = vecenv.get_state() + num_envs_in_batch = len(scenarios) + batch_start = scenarios_processed - # Prepare batch_size_eval for the resample that fires at end of the step loop. - # That resample will load the NEXT batch, so cap it at remaining_after_this. - remaining_after_this = num_scenarios - scenarios_processed - num_envs_in_batch - target_env.batch_size_eval = max(1, remaining_after_this) + # Prepare batch_size_eval for the resample that fires at end of the step loop. + # That resample will load the NEXT batch, so cap it at remaining_after_this. + remaining_after_this = num_scenarios - scenarios_processed - num_envs_in_batch + target_env.batch_size_eval = max(1, remaining_after_this) - map_names = [] - for env_idx in range(num_envs_in_batch): - map_names.append(scenarios[env_idx]["map_name"].split("/")[-1].split(".")[0]) + map_names = [] + for env_idx in range(num_envs_in_batch): + map_names.append(scenarios[env_idx]["map_name"].split("/")[-1].split(".")[0]) - # Reset LSTM - if args["train"]["use_rnn"]: - state = dict( - lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), - lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), - ) + # Reset LSTM + if args["train"]["use_rnn"]: + state = dict( + lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), + lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), + ) - # Initialize histories as lists of lists (one list per environment). - # Only needed for the HTML replay path — EGL writes mp4 frames - # directly to ffmpeg via c_render each step. - if html_mode: - agent_histories = [[] for _ in range(num_envs_in_batch)] - traffic_histories = [[] for _ in range(num_envs_in_batch)] - trajectory_histories = [[] for _ in range(num_envs_in_batch)] - all_agents_obs_histories = [[] for _ in range(num_envs_in_batch)] - - _render_steps = args["env"]["scenario_length"] - if render_max_steps is not None: - _render_steps = min(_render_steps, render_max_steps) - for t in range(_render_steps): + # Initialize histories as lists of lists (one list per environment). + # Only needed for the HTML replay path — EGL writes mp4 frames + # directly to ffmpeg via c_render each step. if html_mode: - current_scenarios = vecenv.get_state() - start_obs_index = 0 - - # Loop through every environment in the batch to record its history - for env_idx in range(num_envs_in_batch): - env_scenario = current_scenarios[env_idx] - - agent_histories[env_idx].append( - pufferlib.viz.fill_agents_state( - env_scenario, use_trajectory="trajectory" in args["env"]["action_type"] - ) - ) - traffic_histories[env_idx].append(pufferlib.viz.fill_traffics_state(env_scenario, t)) - - if "trajectory" in args["env"]["action_type"]: - trajectory_histories[env_idx].append(pufferlib.viz.fill_trajectories(env_scenario, t)) - - # Collect observation dictionaries for ALL active agents in THIS environment at timestep t - if args["render_obs"]: - step_obs_dict = {} - if env_idx > 0: - start_obs_index += current_scenarios[env_idx - 1]["active_agent_count"] - for agent_idx in range(env_scenario["active_agent_count"]): - agent_id = env_scenario["active_agent_indices"][agent_idx] - step_obs_dict[int(agent_id)] = pufferlib.viz.extract_obs_frame( - ob, - env_scenario, - args, - timestep=t, - obs_index=start_obs_index + agent_idx, - agent_idx=agent_idx, - head_north=True, + agent_histories = [[] for _ in range(num_envs_in_batch)] + traffic_histories = [[] for _ in range(num_envs_in_batch)] + trajectory_histories = [[] for _ in range(num_envs_in_batch)] + all_agents_obs_histories = [[] for _ in range(num_envs_in_batch)] + + _render_steps = args["env"]["scenario_length"] + if render_max_steps is not None: + _render_steps = min(_render_steps, render_max_steps) + for t in range(_render_steps): + if html_mode: + current_scenarios = vecenv.get_state() + start_obs_index = 0 + + # Loop through every environment in the batch to record its history + for env_idx in range(num_envs_in_batch): + env_scenario = current_scenarios[env_idx] + + agent_histories[env_idx].append( + pufferlib.viz.fill_agents_state( + env_scenario, use_trajectory="trajectory" in args["env"]["action_type"] ) - all_agents_obs_histories[env_idx].append(step_obs_dict) - - with torch.no_grad(): - ob = torch.as_tensor(ob).to(device) - logits, _ = policy.forward_eval(ob, state) - action, _, _ = pufferlib.pytorch.sample_logits(logits, deterministic=True) - action = action.cpu().numpy().reshape(vecenv.action_space.shape) + ) + traffic_histories[env_idx].append(pufferlib.viz.fill_traffics_state(env_scenario, t)) + + if "trajectory" in args["env"]["action_type"]: + trajectory_histories[env_idx].append(pufferlib.viz.fill_trajectories(env_scenario, t)) + + # Collect observation dictionaries for ALL active agents in THIS environment at timestep t + if args["render_obs"]: + step_obs_dict = {} + if env_idx > 0: + start_obs_index += current_scenarios[env_idx - 1]["active_agent_count"] + for agent_idx in range(env_scenario["active_agent_count"]): + agent_id = env_scenario["active_agent_indices"][agent_idx] + step_obs_dict[int(agent_id)] = pufferlib.viz.extract_obs_frame( + ob, + env_scenario, + args, + timestep=t, + obs_index=start_obs_index + agent_idx, + agent_idx=agent_idx, + head_north=True, + ) + all_agents_obs_histories[env_idx].append(step_obs_dict) + + with torch.no_grad(): + ob = torch.as_tensor(ob).to(device) + logits, _ = policy.forward_eval(ob, state) + action, _, _ = pufferlib.pytorch.sample_logits(logits, deterministic=True) + action = action.cpu().numpy().reshape(vecenv.action_space.shape) + + if isinstance(logits, torch.distributions.Normal): + action = np.clip(action, vecenv.action_space.low, vecenv.action_space.high) + + ob, _, _, _, infos = vecenv.step(action) + + if egl_mode: + # Flush one frame per env through c_render → client_record_frame + # → PBO async readback → writev → ffmpeg pipe. make_client is + # called lazily on the first render per env (sets up ffmpeg + + # GPU context) and close_client at scenario end flushes the + # trailing PBO frame. + for e in range(num_envs_in_batch): + target_env.render(env_id=e, view_mode=view_mode) + + # Serial backend returns infos as single list (infos[0] is the env's info list) + if infos and infos[0]: + for env_idx, summary in enumerate(infos[0]): + env_map_name = summary["map_name"].split("/")[-1].split(".")[0] + summary["episode_id"] = batch_start + env_idx + summary["env_id"] = env_idx + summary["map_name"] = env_map_name - if isinstance(logits, torch.distributions.Normal): - action = np.clip(action, vecenv.action_space.low, vecenv.action_space.high) + for k, v in summary.items(): + if k not in global_infos: + global_infos[k] = [] + global_infos[k].append(v) - ob, _, _, _, infos = vecenv.step(action) + if html_mode: + # Loop through every environment to generate its specific HTML replay + for env_idx in range(num_envs_in_batch): + global_episode_id = batch_start + env_idx + # Ensure we don't render padding environments if num_scenarios isn't perfectly divisible by batch_size + if global_episode_id >= num_scenarios: + break + env_map_name = map_names[env_idx] + + pufferlib.viz.generate_interactive_replay( + current_scenarios[env_idx], + agent_histories[env_idx], + traffic_histories[env_idx], + trajectory_histories[env_idx], + all_agents_obs_histories[env_idx], + f"{gif_folder}/{env_map_name}_{global_episode_id:03d}.html", + head_north=True, + ) if egl_mode: - # Flush one frame per env through c_render → client_record_frame - # → PBO async readback → writev → ffmpeg pipe. make_client is - # called lazily on the first render per env (sets up ffmpeg + - # GPU context) and close_client at scenario end flushes the - # trailing PBO frame. - for e in range(num_envs_in_batch): - target_env.render(env_idx=e, view_mode=view_mode) - - # Serial backend returns infos as single list (infos[0] is the env's info list) - if infos and infos[0]: - for env_idx, summary in enumerate(infos[0]): - env_map_name = summary["map_name"].split("/")[-1].split(".")[0] - summary["episode_id"] = batch_start + env_idx - summary["env_id"] = env_idx - summary["map_name"] = env_map_name - - for k, v in summary.items(): - if k not in global_infos: - global_infos[k] = [] - global_infos[k].append(v) - - if html_mode: - # Loop through every environment to generate its specific HTML replay - for env_idx in range(num_envs_in_batch): - global_episode_id = batch_start + env_idx - # Ensure we don't render padding environments if num_scenarios isn't perfectly divisible by batch_size - if global_episode_id >= num_scenarios: - break - env_map_name = map_names[env_idx] - - pufferlib.viz.generate_interactive_replay( - current_scenarios[env_idx], - agent_histories[env_idx], - traffic_histories[env_idx], - trajectory_histories[env_idx], - all_agents_obs_histories[env_idx], - f"{gif_folder}/{env_map_name}_{global_episode_id:03d}.html", - head_north=True, + # Close every env's Client so ffmpeg gets EOF on its input pipe, + # the trailing PBO frame is flushed, and libx264 writes the mp4 + # trailer. Without this, the mp4 files are either empty or one + # frame short. + import sys as _sys_cc + + _sys_cc.stderr.write( + f"[render-instr] starting close_client loop num_envs_in_batch={num_envs_in_batch}\n" ) - - if egl_mode: - # Close every env's Client so ffmpeg gets EOF on its input pipe, - # the trailing PBO frame is flushed, and libx264 writes the mp4 - # trailer. Without this, the mp4 files are either empty or one - # frame short. - import sys as _sys_cc - - _sys_cc.stderr.write( - f"[render-instr] starting close_client loop num_envs_in_batch={num_envs_in_batch}\n" - ) - _sys_cc.stderr.flush() - for e in range(num_envs_in_batch): - _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) calling\n") - _sys_cc.stderr.flush() - target_env.close_client(env_idx=e) - _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) returned\n") _sys_cc.stderr.flush() + for e in range(num_envs_in_batch): + _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) calling\n") + _sys_cc.stderr.flush() + target_env.close_client(env_id=e) + _sys_cc.stderr.write(f"[render-instr] close_client(env_idx={e}) returned\n") + _sys_cc.stderr.flush() + + scenarios_processed += num_envs_in_batch + pbar.update(num_envs_in_batch) - scenarios_processed += num_envs_in_batch - pbar.update(num_envs_in_batch) + except Exception as _rollout_exc_caught: + _rollout_exc = _rollout_exc_caught import sys as _sys_instr @@ -2559,7 +2620,11 @@ def eval_multi_scenarios_render( try: import wandb - mp4_paths = sorted(os.path.join(mp4_folder, f) for f in os.listdir(mp4_folder) if f.endswith(".mp4")) + mp4_paths = sorted( + os.path.join(mp4_folder, f) + for f in os.listdir(mp4_folder) + if f.endswith(".mp4") and os.path.splitext(f)[0].endswith(video_suffix) + ) if mp4_paths: video_log = { f"{_upload_prefix}/{os.path.splitext(os.path.basename(p))[0]}": wandb.Video(p, fps=30, format="mp4") @@ -2574,8 +2639,23 @@ def eval_multi_scenarios_render( if not quiet: print(f"Failed to upload render mp4s to wandb: {e}") - # Close vectorized environment to avoid file descriptor leaks - vecenv.close() + # Close vectorized environment to avoid file descriptor leaks. + # IMPORTANT: always run even if an exception occurred earlier in the rollout + # loop — skipping this leaves Drive C envs alive (ref_count > 0), which + # causes the next binding.shared() call with a different map_dir to raise + # RuntimeError("Cannot change map cache config while Drive environments are + # still open"). + try: + vecenv.close() + except Exception as _close_exc: + import sys as _sys_close + + _sys_close.stderr.write(f"[render-instr] vecenv.close() raised: {_close_exc}\n") + _sys_close.stderr.flush() + + # Re-raise any rollout exception now that the cache is clean + if _rollout_exc is not None: + raise _rollout_exc def sweep(args=None, env_name=None): @@ -2701,31 +2781,6 @@ def export(args=None, env_name=None, vecenv=None, policy=None, path=None, silent print(f"Saved {len(weights)} weights to {path}") -def ensure_drive_binary(): - """Delete existing visualize binary and rebuild it. This ensures the - binary is always up-to-date with the latest code changes. - """ - if os.path.exists("./visualize"): - print("Removing existing visualize binary...") - os.remove("./visualize") - - print("Building visualize binary...") - try: - result = subprocess.run( - ["bash", "scripts/build_ocean.sh", "visualize", "local"], capture_output=True, text=True, timeout=300 - ) - - if result.returncode == 0: - print("Successfully built visualize binary") - else: - print(f"Build failed: {result.stderr}") - raise RuntimeError("Failed to build visualize binary for rendering") - except subprocess.TimeoutExpired: - raise RuntimeError("Build timed out") - except Exception as e: - raise RuntimeError(f"Build error: {e}") - - def autotune(args=None, env_name=None, vecenv=None, policy=None): package = args["package"] module_name = "pufferlib.ocean" if package == "ocean" else f"pufferlib.environments.{package}"