Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions rl_code/Main.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,11 @@
gate_stats = Utility.parse_gate_stats(msgs[7])

############################## gsp REWARD ##############################################
gsp_reward, label = calculate_gsp_reward(
config['GSP'],
old_cyl_ang,
obj_stats[5],
next_heading_gsp,
gsp_reward, label, gsp_squared_error = calculate_gsp_reward(
config['GSP'],
old_cyl_ang,
obj_stats[5],
next_heading_gsp,
Utility.params['num_robots']
)
# print('[MAIN] GSP Reward', gsp_reward)
Expand Down Expand Up @@ -501,10 +501,25 @@
if train_mode and config['LEARNING_SCHEME'] != 'None':
if time_steps % learn_every == 0:
if args.independent_learning:
# Aggregate GSP losses across per-robot models to a single
# scalar per learn tick. Otherwise the 1D gsp_loss dataset
# would have (num_learn_steps × num_robots) entries in
# independent mode vs. num_learn_steps in shared mode,
# breaking cross-mode comparability of the
# information-collapse diagnostic.
for i in range(Utility.params['num_robots']):
loss = models[i].learn()
gsp_losses = [
m.last_gsp_loss for m in models
if getattr(m, "last_gsp_loss", None) is not None
]
if gsp_losses:
hdf5_writer.record_gsp_loss(float(np.mean(gsp_losses)))
else:
loss = model.learn()
gsp_step_loss = getattr(model, "last_gsp_loss", None)
if gsp_step_loss is not None:
hdf5_writer.record_gsp_loss(gsp_step_loss)
else:
loss = 0
else:
Expand Down Expand Up @@ -546,11 +561,16 @@
else:
tmp_epsilon = model.epsilon

# gsp_target: broadcast the scalar payload delta-theta label to per-robot list
# so it aligns with the (timesteps × robots) HDF5 schema. Needed for the
# information-collapse diagnostic (gsp_output_std, gsp_pred_target_corr).
gsp_target_per_robot = [float(label)] * Utility.params['num_robots']
hdf5_writer.writerow(r, tmp_epsilon, reached_goal, loss, force_mags, force_angs,
[average_force_mag, math.degrees(average_force_ang)], obj_stats[0], obj_stats[1],
obj_stats[5], gate, obstacles, gsp_reward, next_heading_gsp,
time.time() - episode_start_time, robot_x_pos, robot_y_pos, robot_angle,
robot_failures, com_X_poses=com_X_poses, com_Y_poses=com_Y_poses)
robot_failures, com_X_poses=com_X_poses, com_Y_poses=com_Y_poses,
gsp_target=gsp_target_per_robot, gsp_squared_error=gsp_squared_error)

if episode_done:
if args.independent_learning:
Expand Down
27 changes: 12 additions & 15 deletions rl_code/src/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ def angle_normalize_signed_deg(a):
return a

def calculate_gsp_reward(GSP, old_cyl_ang, cyl_ang, next_heading_gsp, num_robots):
"""Return (clipped_rewards, label, squared_errors) per robot.

The clipped reward saturates at -2 and hides the magnitude of large prediction errors.
squared_errors carries the raw (diff - prediction)^2 per robot — needed for the
information-collapse diagnostic (paper outline: "Revamped Reward structure for GSP
to prevent information collapse").
"""
gsp_reward = []
squared_errors = []
label = 0
if GSP:
old_cyl_ang = angle_normalize_unsigned_deg(old_cyl_ang)
Expand All @@ -28,27 +36,16 @@ def calculate_gsp_reward(GSP, old_cyl_ang, cyl_ang, next_heading_gsp, num_robots
# Max rotation is 0.09 rad/step so we can multiply by 10 to get within range of -1, 1
diff = np.clip(diff*100, -1, 1)
label=diff
x1 = math.cos(diff)
y1 = math.sin(diff)
for i in range(num_robots):
# x2 = math.cos(next_heading_gsp[i])
# y2 = math.sin(next_heading_gsp[i])
# error = np.dot([x1, y1], [x2, y2])
# gsp_reward.append(-1 + error)
# print('GSP:', next_heading_gsp[i])
# print(f'Diff: {diff:.2f}, next_heading_gsp: {next_heading_gsp[i]:.2f}')
reward = diff - next_heading_gsp[i]
# print('reward', reward)
# norm_reward = reward / next_heading_gsp[i]
# print('norm_reward', norm_reward)
abs_reward = abs(reward)**2
# print('abs reward', abs_reward)
squared_errors.append(float(abs_reward))
gsp_reward.append(np.clip(-1*abs_reward, -2, 0))

else:
gsp_reward = [0 for i in range(num_robots)]

return gsp_reward, label
squared_errors = [0 for i in range(num_robots)]

return gsp_reward, label, squared_errors


class ZMQ_Utility:
Expand Down
70 changes: 68 additions & 2 deletions rl_code/src/hdf5_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def _reset(self):
self.robot_failures = []
self.com_X_pos = []
self.com_Y_pos = []
# GSP information-collapse diagnostics: predicted delta-theta target and squared error
# per step, plus GSP-specific training loss per learning step.
self.gsp_target = []
self.gsp_squared_error = []
self.gsp_loss = []

def writerow(
self, rewards, epsilons, terminations, losses,
Expand All @@ -65,6 +70,7 @@ def writerow(
gsp_rewards, gsp_headings,
run_times, robots_x_poses, robots_y_poses, robot_angles,
robot_failure, com_X_poses=0, com_Y_poses=0,
gsp_target=None, gsp_squared_error=None,
):
"""Accumulate one timestep of data. Same signature as data_logger.writerow."""
self.reward.append(rewards)
Expand All @@ -90,6 +96,17 @@ def writerow(
self.robot_failures.append(robot_failure)
self.com_X_pos.append(com_X_poses)
self.com_Y_pos.append(com_Y_poses)
if gsp_target is not None:
self.gsp_target.append(gsp_target)
if gsp_squared_error is not None:
self.gsp_squared_error.append(gsp_squared_error)

def record_gsp_loss(self, loss_value: float) -> None:
"""Record one GSP prediction network training loss sample.

Called per GSP learning step (cadence differs from per-timestep writerow).
"""
self.gsp_loss.append(float(loss_value))

def write_episode(self, episode_num: int) -> dict:
"""Write accumulated data to HDF5 and return summary dict.
Expand All @@ -105,7 +122,7 @@ def write_episode(self, episode_num: int) -> dict:
grp = h5f.create_group(group_name)

# Store 2D arrays (timesteps × robots)
for key, data in [
twod_specs = [
("reward", self.reward),
("gsp_reward", self.gsp_reward),
("force_magnitude", self.force_magnitude),
Expand All @@ -115,7 +132,12 @@ def write_episode(self, episode_num: int) -> dict:
("robot_angle", self.robot_angle),
("robot_failure", self.robot_failures),
("gsp_heading", self.gsp_heading),
]:
]
if self.gsp_target:
twod_specs.append(("gsp_target", self.gsp_target))
if self.gsp_squared_error:
twod_specs.append(("gsp_squared_error", self.gsp_squared_error))
for key, data in twod_specs:
arr = np.array(data, dtype=np.float32)
if arr.size > 0:
grp.create_dataset(key, data=arr, compression="gzip", compression_opts=4)
Expand All @@ -135,6 +157,13 @@ def write_episode(self, episode_num: int) -> dict:
if arr.size > 0:
grp.create_dataset(key, data=arr, compression="gzip", compression_opts=4)

# GSP-specific training loss — 1D but recorded at a different cadence than writerow,
# so it lives outside the timestep-indexed 1D block above.
if self.gsp_loss:
gsp_loss_arr = np.array(self.gsp_loss, dtype=np.float32)
grp.create_dataset("gsp_loss", data=gsp_loss_arr,
compression="gzip", compression_opts=4)

# Termination as bool
term_arr = np.array(self.termination, dtype=bool)
if term_arr.size > 0:
Expand Down Expand Up @@ -164,6 +193,43 @@ def write_episode(self, episode_num: int) -> dict:
grp.attrs["reward_per_robot"] = reward_per_robot
grp.attrs["gsp_reward_per_robot"] = gsp_per_robot

# Information-collapse summary attrs. Computed only when both prediction
# (gsp_heading) and target (gsp_target) are present. The caller contract is
# that gsp_target must be passed on every writerow call within an episode once
# it's been passed on any — i.e. it tracks gsp_heading 1:1. Enforce here rather
# than silently zipping misaligned buffers. Use NaN-aware aggregation so a
# single physics glitch (prediction -> NaN) does not poison the summary.
# Undefined correlations (zero-variance in predictions or targets) are written
# as NaN so downstream analysis can distinguish "undefined" from "measured zero".
if self.gsp_target and self.gsp_heading:
if len(self.gsp_target) != len(self.gsp_heading):
raise ValueError(
f"gsp_target buffer length {len(self.gsp_target)} does not match "
f"gsp_heading buffer length {len(self.gsp_heading)}; gsp_target must "
"be passed on every writerow call within an episode once it's been "
"passed on any call."
)
pred_arr = np.array(self.gsp_heading, dtype=np.float64).ravel()
target_arr = np.array(self.gsp_target, dtype=np.float64).ravel()
if pred_arr.size > 0:
pred_std = float(np.nanstd(pred_arr))
target_std = float(np.nanstd(target_arr))
# Tolerance guard: np.nanstd of a constant returns ~1e-18 due to
# sum-of-squares fuzz, which passes a strict `> 0` check and lets
# corrcoef return a garbage value. The observables here are radians
# clipped to [-1, 1], so 1e-12 is well below any physical variation.
STD_TOL = 1e-12
grp.attrs["gsp_output_std"] = pred_std
if pred_arr.size > 1 and pred_std > STD_TOL and target_std > STD_TOL:
mask = np.isfinite(pred_arr) & np.isfinite(target_arr)
if mask.sum() > 1:
corr = float(np.corrcoef(pred_arr[mask], target_arr[mask])[0, 1])
else:
corr = float("nan")
else:
corr = float("nan")
grp.attrs["gsp_pred_target_corr"] = corr

# Reset for next episode
summary = {
"episode_num": episode_num,
Expand Down
Loading
Loading