Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ jobs:
sed -i 's|path = "../GSP-RL"|path = "GSP-RL"|' pyproject.toml
poetry lock
poetry install --no-interaction
poetry run pip install h5py

- name: Run unit tests
run: |
Expand Down
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ readme = "README.md"
python = "^3.10"
gsp-rl = {path = "../GSP-RL", develop = true}

[tool.poetry.extras]
hdf5 = ["h5py"]

[tool.poetry.dependencies.h5py]
version = "^3.0"
optional = true

[tool.poetry.group.dev.dependencies]
pytest = "^8.1.1"

Expand Down
51 changes: 8 additions & 43 deletions rl_code/Main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#import python_code.Agent as Agent
import src.agent as Agent
from src.env import calculate_gsp_reward, ZMQ_Utility
from src.exp_data_structures import data_logger
from src.hdf5_logger import HDF5Logger
from src.zmq_diagnostics import DiagnosticSocket
from src.diagnostics import ExperimentLogger

Expand Down Expand Up @@ -87,6 +87,10 @@
# Path to save data
data_file_path = recording_path + '/Data/'

# Initialize HDF5 logger (one per experiment)
hdf5_path = os.path.join(recording_path, os.path.basename(recording_path) + ".h5")
hdf5_writer = HDF5Logger(hdf5_path)

if args.share_prox_values:
num_obs = Utility.params['num_obs'] +Utility.params['num_robots'] #need to account for num_robots extra observations
elif args.global_knowledge:
Expand Down Expand Up @@ -542,7 +546,7 @@
else:
tmp_epsilon = model.epsilon

data_writer.writerow(r, tmp_epsilon, reached_goal, loss, force_mags, force_angs,
hdf5_writer.writerow(r, tmp_epsilon, reached_goal, loss, force_mags, force_angs,
[average_force_mag, math.degrees(average_force_ang)], obj_stats[0], obj_stats[1],
obj_stats[5], gate, obstacles, gsp_reward, next_heading_gsp,
time.time() - episode_start_time, robot_x_pos, robot_y_pos, robot_angle,
Expand All @@ -557,51 +561,12 @@
if hasattr(model, 'reset_hidden_states'):
model.reset_hidden_states()
run_time = time.time() - episode_start_time
data_writer.write_to_file()
if HAS_HDF5:
hdf5_writer.write_episode(ep_counter)
log.info(
"Episode %d done: success=%s duration=%.1fs timesteps=%d",
ep_counter, reached_goal, run_time, time_steps,
)
# Log to registry if available
try:
import sys as _sys
_stelaris_root = os.environ.get("STELARIS_ROOT",
os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "..", "..", "..")))
if _stelaris_root not in _sys.path:
_sys.path.insert(0, _stelaris_root)
from tools.registry.client import RegistryClient as _RC
_reg_db = os.environ.get("STELARIS_DB",
os.path.join(_stelaris_root, "data", "registry.db"))
if os.path.exists(_reg_db):
_reg = _RC(_reg_db)
_exp_id = f"{os.path.basename(recording_path)}_{config.get('SEED', 0)}"
# Ensure experiment exists (may already be created by runner)
if _reg.get_experiment(_exp_id) is None:
_reg.create_experiment(
id=_exp_id, name=os.path.basename(recording_path),
algorithm=config.get("LEARNING_SCHEME", "DQN"),
coordination="IC", environment="unknown",
num_robots=int(config.get("NUM_ROBOTS", 4)),
num_obstacles=int(config.get("NUM_OBSTACLES", 0)),
use_gate=bool(config.get("USE_GATE", 0)),
use_prisms=bool(config.get("USE_PRISMS", 0)),
num_episodes=int(config.get("NUM_EPISODES", 500)),
seed=int(config.get("SEED", 0)),
port=int(config.get("PORT", 55555)),
status="running",
)
_reg.log_episode(
experiment_id=_exp_id,
episode_num=ep_counter,
total_reward=float(running_reward if not args.independent_learning else np.mean(running_reward)),
success=bool(reached_goal),
duration_s=run_time,
timesteps=time_steps,
epsilon=float(tmp_epsilon),
)
_reg.close()
except Exception:
pass # Registry is optional
if not args.no_print:
print('[RUN TIME] %.2f' % run_time)
if args.independent_learning:
Expand Down
178 changes: 178 additions & 0 deletions rl_code/src/hdf5_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
"""HDF5-based episode data logger — drop-in replacement for pkl data_logger.

Main.py creates one HDF5Logger per experiment. Each episode is written
as a group in the experiment's HDF5 file. No pkl files created.

Usage:
logger = HDF5Logger("/path/to/experiment.h5")

# Per timestep (same API as data_logger.writerow)
logger.writerow(rewards, epsilon, termination, loss, ...)

# End of episode
logger.write_episode(episode_num)

# Notify ingestion worker
logger.notify(experiment_name)
"""

import os
from typing import Optional

import h5py
import numpy as np

# Notification handled by ingestion worker (optional, external)


class HDF5Logger:
"""Accumulates timestep data and writes to HDF5 at episode boundaries."""

def __init__(self, hdf5_path: str):
self.hdf5_path = hdf5_path
os.makedirs(os.path.dirname(hdf5_path), exist_ok=True)
self._reset()

def _reset(self):
"""Clear buffers for a new episode."""
self.reward = []
self.epsilon = []
self.termination = []
self.loss = []
self.force_magnitude = []
self.force_angle = []
self.average_force_vector = []
self.cyl_x_pos = []
self.cyl_y_pos = []
self.cyl_angle = []
self.gate_stats = []
self.obstacle_stats = []
self.gsp_reward = []
self.gsp_heading = []
self.run_time = []
self.robots_x_pos = []
self.robots_y_pos = []
self.robot_angle = []
self.robot_failures = []
self.com_X_pos = []
self.com_Y_pos = []

def writerow(
self, rewards, epsilons, terminations, losses,
force_magnitudes, force_angles, average_force_vectors,
cyl_x_poses, cyl_y_poses, cyl_angles,
gate_stats, obstacle_stats,
gsp_rewards, gsp_headings,
run_times, robots_x_poses, robots_y_poses, robot_angles,
robot_failure, com_X_poses=0, com_Y_poses=0,
):
"""Accumulate one timestep of data. Same signature as data_logger.writerow."""
self.reward.append(rewards)
self.epsilon.append(epsilons)
self.termination.append(terminations)
self.loss.append(losses)
self.force_magnitude.append(force_magnitudes)
self.force_angle.append(force_angles)
self.average_force_vector.append(average_force_vectors)
self.cyl_x_pos.append(cyl_x_poses)
self.cyl_y_pos.append(cyl_y_poses)
self.cyl_angle.append(cyl_angles)
self.gate_stats.append(gate_stats)
self.obstacle_stats.append(obstacle_stats)
self.gsp_reward.append(gsp_rewards)
if isinstance(gsp_headings, np.ndarray):
gsp_headings = gsp_headings.tolist()
self.gsp_heading.append(gsp_headings)
self.run_time.append(run_times)
self.robots_x_pos.append(robots_x_poses)
self.robots_y_pos.append(robots_y_poses)
self.robot_angle.append(robot_angles)
self.robot_failures.append(robot_failure)
self.com_X_pos.append(com_X_poses)
self.com_Y_pos.append(com_Y_poses)

def write_episode(self, episode_num: int) -> dict:
"""Write accumulated data to HDF5 and return summary dict.

Call this instead of data_logger.write_to_file().
Optionally notifies the ingestion worker.
"""
group_name = f"episode_{episode_num:04d}"

with h5py.File(self.hdf5_path, "a") as h5f:
if group_name in h5f:
del h5f[group_name]
grp = h5f.create_group(group_name)

# Store 2D arrays (timesteps × robots)
for key, data in [
("reward", self.reward),
("gsp_reward", self.gsp_reward),
("force_magnitude", self.force_magnitude),
("force_angle", self.force_angle),
("robot_x_pos", self.robots_x_pos),
("robot_y_pos", self.robots_y_pos),
("robot_angle", self.robot_angle),
("robot_failure", self.robot_failures),
("gsp_heading", self.gsp_heading),
]:
arr = np.array(data, dtype=np.float32)
if arr.size > 0:
grp.create_dataset(key, data=arr, compression="gzip", compression_opts=4)

# Store 1D arrays (timesteps)
for key, data in [
("epsilon", self.epsilon),
("loss", self.loss),
("cyl_x_pos", self.cyl_x_pos),
("cyl_y_pos", self.cyl_y_pos),
("cyl_angle", self.cyl_angle),
("run_time", self.run_time),
("comX", self.com_X_pos),
("comY", self.com_Y_pos),
]:
arr = np.array(data, dtype=np.float32)
if arr.size > 0:
grp.create_dataset(key, data=arr, compression="gzip", compression_opts=4)

# Termination as bool
term_arr = np.array(self.termination, dtype=bool)
if term_arr.size > 0:
grp.create_dataset("termination", data=term_arr)

# Compute and store summary attributes
rewards = np.array(self.reward, dtype=np.float32)
gsp_rewards = np.array(self.gsp_reward, dtype=np.float32)
timesteps = len(self.reward)
success = bool(np.any(term_arr)) if term_arr.size > 0 else False

if rewards.ndim == 2:
reward_per_robot = np.sum(rewards, axis=0).tolist()
elif rewards.size > 0:
reward_per_robot = [float(np.sum(rewards))]
else:
reward_per_robot = []

if gsp_rewards.ndim == 2 and gsp_rewards.size > 0:
gsp_per_robot = np.sum(gsp_rewards, axis=0).tolist()
else:
gsp_per_robot = []

grp.attrs["episode_num"] = episode_num
grp.attrs["timesteps"] = timesteps
grp.attrs["success"] = success
grp.attrs["reward_per_robot"] = reward_per_robot
grp.attrs["gsp_reward_per_robot"] = gsp_per_robot

# Reset for next episode
summary = {
"episode_num": episode_num,
"timesteps": timesteps,
"success": success,
"reward_per_robot": reward_per_robot,
"gsp_reward_per_robot": gsp_per_robot,
}
self._reset()


return summary
18 changes: 15 additions & 3 deletions rl_code/src/plotting/make_cylinder_trajectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import matplotlib.pyplot as plt
import pickle
import h5py

def angle_normalize_unsigned_deg(a):
while a < 0: a += 360
Expand Down Expand Up @@ -40,8 +41,18 @@ def angle_normalize_signed_deg(a):
print('. . . Loading Model Data')

file_names = []
for file in os.listdir(data_path):
file_names.append(file)
# Try HDF5 first
exp_name = os.path.basename(args.data_path.rstrip('/'))
h5_path = os.path.join(args.data_path, exp_name + '.h5')
use_hdf5 = os.path.exists(h5_path)
if use_hdf5:
h5_file = h5py.File(h5_path, 'r')
file_names = sorted([k for k in h5_file.keys() if k.startswith('episode')])
print(f'Loading from HDF5: {len(file_names)} episodes')
else:
for file in os.listdir(data_path):
file_names.append(file)
print(f'Loading from pkl: {len(file_names)} files')

df_list = []
for ep in range(len(file_names)-1):
Expand All @@ -52,7 +63,8 @@ def angle_normalize_signed_deg(a):

cyl_heading_diff = []
cyl_angle = data['cyl_angle']
gsp = data['gsp_heading']
gsp_raw = data["gsp_heading"]
gsp = [g[0] if isinstance(g, list) else g for g in gsp_raw]
predicted_cyl_heading = []
for i in range(len(data['cyl_angle'])-1):
predicted_cyl_heading.append(cyl_angle[i] + math.degrees(gsp[i+1]/10))
Expand Down
Loading
Loading