diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 4cb35e58..0b83af33 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -75,7 +75,7 @@ jobs: - uses: actions/checkout@v4 - name: Run tests run: | - pip install -e . --extra-index-url http://pyp.open3dv.site:2345/simple/ --trusted-host pyp.open3dv.site + pip install -e .[lerobot] --extra-index-url http://pyp.open3dv.site:2345/simple/ --trusted-host pyp.open3dv.site echo "Unit test Start" export HF_ENDPOINT=https://hf-mirror.com pip uninstall pymeshlab -y diff --git a/configs/agents/rl/push_cube/gym_config.json b/configs/agents/rl/push_cube/gym_config.json index 766d12c4..3e0e0445 100644 --- a/configs/agents/rl/push_cube/gym_config.json +++ b/configs/agents/rl/push_cube/gym_config.json @@ -113,9 +113,8 @@ } }, "extensions": { - "obs_mode": "state", + "action_type": "delta_qpos", "episode_length": 100, - "joint_limits": 0.5, "action_scale": 0.1, "success_threshold": 0.1 } diff --git a/configs/agents/rl/push_cube/train_config.json b/configs/agents/rl/push_cube/train_config.json index 3c3bda05..64e2ecb2 100644 --- a/configs/agents/rl/push_cube/train_config.json +++ b/configs/agents/rl/push_cube/train_config.json @@ -4,15 +4,15 @@ "gym_config": "configs/agents/rl/push_cube/gym_config.json", "seed": 42, "device": "cuda:0", - "headless": false, + "headless": true, "enable_rt": false, "gpu_id": 0, - "num_envs": 8, + "num_envs": 64, "iterations": 1000, "rollout_steps": 1024, "eval_freq": 200, "save_freq": 200, - "use_wandb": true, + "use_wandb": false, "wandb_project_name": "embodychain-push_cube", "events": { "eval": { diff --git a/docs/source/overview/gym/env.md b/docs/source/overview/gym/env.md index a15d26f9..88f9cac9 100644 --- a/docs/source/overview/gym/env.md +++ b/docs/source/overview/gym/env.md @@ -5,14 +5,19 @@ The {class}`~envs.EmbodiedEnv` is the core environment class in EmbodiChain designed for complex Embodied AI tasks. It adopts a **configuration-driven** architecture, allowing users to define robots, sensors, objects, lighting, and automated behaviors (events) purely through configuration classes, minimizing the need for boilerplate code. +For **Reinforcement Learning** tasks, EmbodiChain provides {class}`~envs.RLEnv`, a specialized subclass that extends {class}`~envs.EmbodiedEnv` with RL-specific utilities such as flexible action preprocessing, goal management, and standardized info structure. + ## Core Architecture -Unlike the standard {class}`~envs.BaseEnv`, the {class}`~envs.EmbodiedEnv` integrates several manager systems to handle the complexity of simulation: +EmbodiChain provides a hierarchy of environment classes for different task types: -* **Scene Management**: Automatically loads and manages robots, sensors, and scene objects defined in the configuration. -* **Event Manager**: Handles automated behaviors such as domain randomization, scene setup, and dynamic asset swapping. -* **Observation Manager**: Allows flexible extension of observation spaces without modifying the environment code. -* **Dataset Manager**: Built-in support for collecting demonstration data during simulation steps. +* **{class}`~envs.BaseEnv`**: Minimal environment for simple tasks with custom simulation logic. +* **{class}`~envs.EmbodiedEnv`**: Feature-rich environment for Embodied AI tasks (IL, custom control). Integrates manager systems: + * **Scene Management**: Automatically loads and manages robots, sensors, and scene objects. + * **Event Manager**: Domain randomization, scene setup, and dynamic asset swapping. + * **Observation Manager**: Flexible observation space extensions. + * **Dataset Manager**: Built-in support for demonstration data collection. +* **{class}`~envs.RLEnv`**: Specialized environment for RL tasks, extending {class}`~envs.EmbodiedEnv` with action preprocessing, goal management, and standardized reward/info structure. ## Configuration System @@ -77,7 +82,7 @@ The {class}`~envs.EmbodiedEnvCfg` class exposes the following additional paramet Dataset collection settings. Defaults to None, in which case no dataset collection is performed. Please refer to the {class}`~envs.managers.DatasetManager` class for more details. * **extensions** (Union[Dict[str, Any], None]): - Task-specific extension parameters that are automatically bound to the environment instance. This allows passing custom parameters (e.g., ``episode_length``, ``obs_mode``, ``action_scale``) without modifying the base configuration class. These parameters are accessible as instance attributes after environment initialization. For example, if ``extensions = {"episode_length": 500}``, you can access it via ``self.episode_length``. Defaults to None. + Task-specific extension parameters that are automatically bound to the environment instance. This allows passing custom parameters (e.g., ``episode_length``, ``action_type``, ``action_scale``) without modifying the base configuration class. These parameters are accessible as instance attributes after environment initialization. For example, if ``extensions = {"episode_length": 500}``, you can access it via ``self.episode_length``. Defaults to None. * **filter_visual_rand** (bool): Whether to filter out visual randomization functors. Useful for debugging motion and physics issues when visual randomization interferes with the debugging process. Defaults to ``False``. @@ -108,7 +113,8 @@ class MyTaskEnvCfg(EmbodiedEnvCfg): # 4. Task Extensions extensions = { # Task-specific parameters "episode_length": 500, - "obs_mode": "state", + "action_type": "delta_qpos", + "action_scale": 0.1, } ``` @@ -165,54 +171,104 @@ The manager operates in a single mode ``"save"`` which handles both recording an The dataset manager is called automatically during {meth}`~envs.Env.step()`, ensuring all observation-action pairs are recorded without additional user code. +## Reinforcement Learning Environment + +For RL tasks, EmbodiChain provides {class}`~envs.RLEnv`, a specialized base class that extends {class}`~envs.EmbodiedEnv` with RL-specific utilities: + +* **Action Preprocessing**: Flexible action transformation supporting delta_qpos, absolute qpos, joint velocity, joint force, and end-effector pose (with IK). +* **Goal Management**: Built-in goal pose tracking and visualization with axis markers. +* **Standardized Info Structure**: Template methods for computing task-specific success/failure conditions and metrics. +* **Episode Management**: Configurable episode length and truncation logic. + +### Configuration Extensions for RL + +RL environments use the ``extensions`` field to pass task-specific parameters: + +```python +extensions = { + "action_type": "delta_qpos", # Action type: delta_qpos, qpos, qvel, qf, eef_pose + "action_scale": 0.1, # Scaling factor applied to all actions + "episode_length": 100, # Maximum episode length + "success_threshold": 0.1, # Task-specific success threshold (optional) +} +``` + ## Creating a Custom Task -To create a new task, inherit from {class}`~envs.EmbodiedEnv` and implement the task-specific logic. +### For Reinforcement Learning Tasks + +Inherit from {class}`~envs.RLEnv` and implement the task-specific logic: + +```python +from embodichain.lab.gym.envs import RLEnv, EmbodiedEnvCfg +from embodichain.lab.gym.utils.registration import register_env + +@register_env("MyRLTask-v0", max_episode_steps=100) +class MyRLTaskEnv(RLEnv): + def __init__(self, cfg: MyTaskEnvCfg, **kwargs): + super().__init__(cfg, **kwargs) + + def compute_task_state(self, **kwargs): + # Required: Compute task-specific success/failure and metrics + # Returns: Tuple[success, fail, metrics] + # - success: torch.Tensor of shape (num_envs,) with boolean values + # - fail: torch.Tensor of shape (num_envs,) with boolean values + # - metrics: Dict of metric tensors for logging + + is_success = ... # Compute success condition + is_fail = torch.zeros_like(is_success) + metrics = {"distance": ..., "angle_error": ...} + + return is_success, is_fail, metrics + + def check_truncated(self, obs, info): + # Optional: Override to add custom truncation conditions + # Default: episode_length timeout + is_timeout = super().check_truncated(obs, info) + is_fallen = ... # Custom condition (e.g., robot fell) + return is_timeout | is_fallen +``` + +Configure rewards through the {class}`~envs.managers.RewardManager` in your environment config rather than overriding ``get_reward``. + +### For Imitation Learning Tasks + +Inherit from {class}`~envs.EmbodiedEnv` for IL tasks: ```python from embodichain.lab.gym.envs import EmbodiedEnv, EmbodiedEnvCfg from embodichain.lab.gym.utils.registration import register_env -@register_env("MyTask-v0", max_episode_steps=500) -class MyTaskEnv(EmbodiedEnv): +@register_env("MyILTask-v0", max_episode_steps=500) +class MyILTaskEnv(EmbodiedEnv): def __init__(self, cfg: MyTaskEnvCfg, **kwargs): super().__init__(cfg, **kwargs) def create_demo_action_list(self, *args, **kwargs): - # Optional: Implement for expert demonstration data generation (for Imitation Learning) - # This method is used to generate scripted demonstrations for IL data collection. + # Required: Generate scripted demonstrations for data collection # Must set self.action_length = len(action_list) if returning actions pass def is_task_success(self, **kwargs): - # Optional: Define success criteria (mainly for IL data collection) + # Required: Define success criteria for filtering successful episodes # Returns: torch.Tensor of shape (num_envs,) with boolean values return success_tensor - def get_reward(self, obs, action, info): - # Optional: Override for RL tasks - # Returns: torch.Tensor of shape (num_envs,) - return super().get_reward(obs, action, info) - def get_info(self, **kwargs): # Optional: Override to add custom info fields - # Should include "success" and "fail" keys for termination info = super().get_info(**kwargs) info["custom_metric"] = ... return info ``` -```{note} -The {meth}`~envs.EmbodiedEnv.create_demo_action_list` method is specifically designed for expert demonstration data generation in Imitation Learning scenarios. For Reinforcement Learning tasks, you should override the {meth}`~envs.EmbodiedEnv.get_reward` method instead. -``` - For a complete example of a modular environment setup, please refer to the {ref}`tutorial_modular_env` tutorial. ## See Also - {ref}`tutorial_create_basic_env` - Creating basic environments - {ref}`tutorial_modular_env` - Advanced modular environment setup -- {doc}`/api_reference/embodichain/embodichain.lab.gym.envs` - Complete API reference for EmbodiedEnv and EmbodiedEnvCfg +- {ref}`tutorial_rl` - Reinforcement learning training guide +- {doc}`/api_reference/embodichain/embodichain.lab.gym.envs` - Complete API reference for EmbodiedEnv, RLEnv, and configurations ```{toctree} :maxdepth: 1 diff --git a/docs/source/tutorial/rl.rst b/docs/source/tutorial/rl.rst index 7be8c73c..cbc011b2 100644 --- a/docs/source/tutorial/rl.rst +++ b/docs/source/tutorial/rl.rst @@ -78,6 +78,13 @@ The ``env`` section defines the task environment: - **id**: Environment registry ID (e.g., "PushCubeRL") - **cfg**: Environment-specific configuration parameters +For RL environments (inheriting from ``RLEnv``), use the ``extensions`` field for RL-specific parameters: + +- **action_type**: Action type - "delta_qpos" (default), "qpos", "qvel", "qf", "eef_pose" +- **action_scale**: Scaling factor applied to all actions (default: 1.0) +- **episode_length**: Maximum episode length (default: 1000) +- **success_threshold**: Task-specific success threshold (optional) + Example: .. code-block:: json @@ -86,10 +93,12 @@ Example: "id": "PushCubeRL", "cfg": { "num_envs": 4, - "obs_mode": "state", - "episode_length": 100, - "action_scale": 0.1, - "success_threshold": 0.1 + "extensions": { + "action_type": "delta_qpos", + "action_scale": 0.1, + "episode_length": 100, + "success_threshold": 0.1 + } } } @@ -321,41 +330,74 @@ Adding a New Environment To add a new RL environment: -1. Create an environment class inheriting from ``EmbodiedEnv`` -2. Register it with the Gymnasium registry: +1. Create an environment class inheriting from ``RLEnv`` (which provides action preprocessing, goal management, and standardized info structure): .. code-block:: python + from embodichain.lab.gym.envs import RLEnv, EmbodiedEnvCfg from embodichain.lab.gym.utils.registration import register_env + import torch @register_env("MyTaskRL", max_episode_steps=100, override=True) - class MyTaskEnv(EmbodiedEnv): - cfg: MyTaskEnvCfg - ... + class MyTaskEnv(RLEnv): + def __init__(self, cfg: EmbodiedEnvCfg = None, **kwargs): + super().__init__(cfg, **kwargs) + + def compute_task_state(self, **kwargs): + """Compute success/failure conditions and metrics.""" + is_success = ... # Define success condition + is_fail = torch.zeros_like(is_success) + metrics = {"distance": ..., "error": ...} + return is_success, is_fail, metrics + + def check_truncated(self, obs, info): + """Optional: Add custom truncation conditions.""" + is_timeout = super().check_truncated(obs, info) + # Add custom conditions if needed + return is_timeout -3. Use the environment ID in your JSON config: +2. Configure the environment in your JSON config with RL-specific extensions: .. code-block:: json "env": { "id": "MyTaskRL", "cfg": { - ... + "num_envs": 4, + "extensions": { + "action_type": "delta_qpos", + "action_scale": 0.1, + "episode_length": 100, + "success_threshold": 0.05 + } } } +The ``RLEnv`` base class provides: + +- **Action Preprocessing**: Automatically handles different action types (delta_qpos, qpos, qvel, qf, eef_pose) +- **Action Scaling**: Applies ``action_scale`` to all actions +- **Goal Management**: Built-in goal pose tracking and visualization +- **Standardized Info**: Implements ``get_info()`` using ``compute_task_state()`` template method + Best Practices ~~~~~~~~~~~~~~ -- **Device Management**: Device is single-sourced from ``runtime.cuda``. All components (trainer/algorithm/policy/env) share the same device. +- **Use RLEnv for RL Tasks**: Always inherit from ``RLEnv`` for reinforcement learning tasks. It provides action preprocessing, goal management, and standardized info structure out of the box. + +- **Action Type Configuration**: Configure ``action_type`` in the environment's ``extensions`` field. The default is "delta_qpos" (incremental joint positions). Other options: "qpos" (absolute), "qvel" (velocity), "qf" (force), "eef_pose" (end-effector pose with IK). -- **Action Scaling**: Keep action scaling in the environment, not in the policy. +- **Action Scaling**: Use ``action_scale`` in the environment's ``extensions`` field to scale actions. This is applied in ``RLEnv._preprocess_action()`` before robot control. + +- **Device Management**: Device is single-sourced from ``runtime.cuda``. All components (trainer/algorithm/policy/env) share the same device. - **Observation Format**: Environments should provide consistent observation shape/types (torch.float32) and a single ``done = terminated | truncated``. - **Algorithm Interface**: Algorithms must implement ``initialize_buffer()``, ``collect_rollout()``, and ``update()`` methods. The algorithm completely controls data collection and buffer management. -- **Reward Components**: Organize reward components in ``info["rewards"]`` dictionary and metrics in ``info["metrics"]`` dictionary. The trainer performs dense per-step logging directly from environment info. +- **Reward Configuration**: Use the ``RewardManager`` in your environment config to define reward components. Organize reward components in ``info["rewards"]`` dictionary and metrics in ``info["metrics"]`` dictionary. The trainer performs dense per-step logging directly from environment info. + +- **Template Methods**: Override ``compute_task_state()`` to define success/failure conditions and metrics. Override ``check_truncated()`` for custom truncation logic. - **Configuration**: Use JSON for all hyperparameters. This makes experiments reproducible and easy to track. diff --git a/embodichain/agents/rl/algo/ppo.py b/embodichain/agents/rl/algo/ppo.py index 5253e897..f11fbe37 100644 --- a/embodichain/agents/rl/algo/ppo.py +++ b/embodichain/agents/rl/algo/ppo.py @@ -94,8 +94,12 @@ def collect_rollout( current_obs, deterministic=False ) + # Wrap action as dict for env processing + action_type = getattr(env, "action_type", "delta_qpos") + action_dict = {action_type: actions} + # Step environment - result = env.step(actions) + result = env.step(action_dict) next_obs, reward, terminated, truncated, env_info = result done = terminated | truncated # Light dtype normalization diff --git a/embodichain/agents/rl/train.py b/embodichain/agents/rl/train.py index ca3c2996..b7e292e3 100644 --- a/embodichain/agents/rl/train.py +++ b/embodichain/agents/rl/train.py @@ -39,12 +39,20 @@ from embodichain.lab.gym.envs.managers.cfg import EventCfg -def main(): +def parse_args(): + """Parse command line arguments.""" parser = argparse.ArgumentParser() parser.add_argument("--config", type=str, required=True, help="Path to JSON config") - args = parser.parse_args() + return parser.parse_args() + + +def train_from_config(config_path: str): + """Run training from a config file path. - with open(args.config, "r") as f: + Args: + config_path: Path to the JSON config file + """ + with open(config_path, "r") as f: cfg_json = json.load(f) trainer_cfg = cfg_json["trainer"] @@ -274,8 +282,28 @@ def main(): wandb.finish() except Exception: pass + + # Clean up environments to prevent resource leaks + try: + if env is not None: + env.close() + except Exception as e: + logger.log_warning(f"Failed to close training environment: {e}") + + try: + if eval_env is not None: + eval_env.close() + except Exception as e: + logger.log_warning(f"Failed to close evaluation environment: {e}") + logger.log_info("Training finished") +def main(): + """Main entry point for command-line training.""" + args = parse_args() + train_from_config(args.config) + + if __name__ == "__main__": main() diff --git a/embodichain/agents/rl/utils/trainer.py b/embodichain/agents/rl/utils/trainer.py index 34dc1919..333a0bcd 100644 --- a/embodichain/agents/rl/utils/trainer.py +++ b/embodichain/agents/rl/utils/trainer.py @@ -239,7 +239,14 @@ def _eval_once(self, num_episodes: int = 5): ep_ret = torch.zeros(num_envs_eval, dtype=torch.float32, device=self.device) while not done_any.any(): actions, _, _ = self.policy.get_action(obs, deterministic=True) - obs, reward, terminated, truncated, info = self.eval_env.step(actions) + + # Wrap action as dict for env processing + action_type = getattr(self.eval_env, "action_type", "delta_qpos") + action_dict = {action_type: actions} + + obs, reward, terminated, truncated, info = self.eval_env.step( + action_dict + ) # Flatten dict observation from step if isinstance(obs, dict): diff --git a/embodichain/lab/gym/envs/__init__.py b/embodichain/lab/gym/envs/__init__.py index 88257690..4bd4321f 100644 --- a/embodichain/lab/gym/envs/__init__.py +++ b/embodichain/lab/gym/envs/__init__.py @@ -16,10 +16,12 @@ from .base_env import * from .embodied_env import * +from .rl_env import * from .tasks import * from .wrapper import * from embodichain.lab.gym.envs.embodied_env import EmbodiedEnv +from embodichain.lab.gym.envs.rl_env import RLEnv # Specific task environments from embodichain.lab.gym.envs.tasks.tableware.pour_water.pour_water import ( diff --git a/embodichain/lab/gym/envs/base_env.py b/embodichain/lab/gym/envs/base_env.py index 99743607..fd0f873c 100644 --- a/embodichain/lab/gym/envs/base_env.py +++ b/embodichain/lab/gym/envs/base_env.py @@ -482,6 +482,22 @@ def get_reward( return rewards + def _preprocess_action(self, action: EnvAction) -> EnvAction: + """Preprocess action before sending to robot. + + Override this method to add custom preprocessing like: + - Action scaling + - Coordinate transformation (e.g., EEF pose to joint positions) + - Action space conversion + + Args: + action: Raw action from policy + + Returns: + Preprocessed action + """ + return action + def _step_action(self, action: EnvAction) -> EnvAction: """Set action control command into simulation. @@ -536,7 +552,7 @@ def step( """ self._elapsed_steps += 1 - # TODO: may be add hook for action preprocessing. + action = self._preprocess_action(action=action) action = self._step_action(action=action) self.sim.update(self.sim_cfg.physics_dt, self.cfg.sim_steps_per_control) self._update_sim_state(**kwargs) diff --git a/embodichain/lab/gym/envs/embodied_env.py b/embodichain/lab/gym/envs/embodied_env.py index 4e6b1b98..af101935 100644 --- a/embodichain/lab/gym/envs/embodied_env.py +++ b/embodichain/lab/gym/envs/embodied_env.py @@ -110,10 +110,9 @@ class EnvLightCfg: This field can be used to pass additional parameters that are specific to certain environments or tasks without modifying the base configuration class. For example: - - obs_mode: Observation mode (e.g., "state", "image") - episode_length: Maximum episode length - - joint_limits: Joint limit constraints - action_scale: Action scaling factor + - action_type: Action type (e.g., "delta_qpos", "qpos", "qvel") - vr_joint_mapping: VR joint mapping for teleoperation - control_frequency: Control frequency for VR teleoperation """ @@ -171,6 +170,13 @@ def __init__(self, cfg: EmbodiedEnvCfg, **kwargs): super().__init__(cfg, **kwargs) + self.episode_obs_buffer: Dict[int, List[EnvObs]] = { + i: [] for i in range(self.num_envs) + } + self.episode_action_buffer: Dict[int, List[EnvAction]] = { + i: [] for i in range(self.num_envs) + } + def _init_sim_state(self, **kwargs): """Initialize the simulation state at the beginning of scene creation.""" @@ -272,6 +278,29 @@ def get_affordance(self, key: str, default: Any = None): """ return self.affordance_datas.get(key, default) + def _extract_single_env_data(self, data: Any, env_id: int) -> Any: + """Extract single environment data from batched data. + + Args: + data: Batched data (dict, tensor, list, or primitive) + env_id: Environment index + + Returns: + Data for the specified environment + """ + if isinstance(data, dict): + return { + k: self._extract_single_env_data(v, env_id) for k, v in data.items() + } + elif isinstance(data, torch.Tensor): + return data[env_id] if data.ndim > 0 else data + elif isinstance(data, (list, tuple)): + return type(data)( + self._extract_single_env_data(item, env_id) for item in data + ) + else: + return data + def _hook_after_sim_step( self, obs: EnvObs, @@ -281,6 +310,13 @@ def _hook_after_sim_step( info: Dict, **kwargs, ): + # Extract and append data for each environment + for env_id in range(self.num_envs): + single_obs = self._extract_single_env_data(obs, env_id) + single_action = self._extract_single_env_data(action, env_id) + self.episode_obs_buffer[env_id].append(single_obs) + self.episode_action_buffer[env_id].append(single_action) + # Call dataset manager with mode="save": it will record and auto-save if dones=True if self.cfg.dataset: if "save" in self.dataset_manager.available_modes: @@ -332,6 +368,15 @@ def _update_sim_state(self, **kwargs) -> None: def _initialize_episode( self, env_ids: Sequence[int] | None = None, **kwargs ) -> None: + # Clear episode buffers for environments that are being reset + if env_ids is None: + env_ids = range(self.num_envs) + for env_id in env_ids: + # Convert to int if it's a tensor + env_id = int(env_id) if isinstance(env_id, torch.Tensor) else env_id + self.episode_obs_buffer[env_id].clear() + self.episode_action_buffer[env_id].clear() + # apply events such as randomization for environments that need a reset if self.cfg.events: if "reset" in self.event_manager.available_modes: @@ -344,16 +389,31 @@ def _initialize_episode( def _step_action(self, action: EnvAction) -> EnvAction: """Set action control command into simulation. + Supports multiple action formats: + 1. torch.Tensor: Interpreted as qpos (joint positions) + 2. Dict with keys: + - "qpos": Joint positions + - "qvel": Joint velocities + - "qf": Joint forces/torques + Args: action: The action applied to the robot agent. Returns: The action return. """ - # TODO: Support data structure action input such as struct. - qpos = action - - self.robot.set_qpos(qpos=qpos) + if isinstance(action, dict): + # Support multiple control modes simultaneously + if "qpos" in action: + self.robot.set_qpos(qpos=action["qpos"]) + if "qvel" in action: + self.robot.set_qvel(qvel=action["qvel"]) + if "qf" in action: + self.robot.set_qf(qf=action["qf"]) + elif isinstance(action, torch.Tensor): + self.robot.set_qpos(qpos=action) + else: + logger.error(f"Unsupported action type: {type(action)}") return action diff --git a/embodichain/lab/gym/envs/managers/datasets.py b/embodichain/lab/gym/envs/managers/datasets.py index 625ce8e7..1533d97b 100644 --- a/embodichain/lab/gym/envs/managers/datasets.py +++ b/embodichain/lab/gym/envs/managers/datasets.py @@ -92,10 +92,6 @@ def __init__(self, cfg: DatasetFunctorCfg, env: EmbodiedEnv): self.use_videos = params.get("use_videos", False) self.export_success_only = params.get("export_success_only", False) - # Episode data buffers - self.episode_obs_list: List[Dict] = [] - self.episode_action_list: List[Any] = [] - # LeRobot dataset instance self.dataset: Optional[LeRobotDataset] = None self.dataset_full_path: Optional[Path] = None @@ -114,14 +110,6 @@ def dataset_path(self) -> str: str(self.dataset_full_path) if self.dataset_full_path else "Not initialized" ) - def reset(self, env_ids: Optional[torch.Tensor] = None) -> None: - """Reset the recorder buffers. - - Args: - env_ids: Environment IDs to reset (currently clears all data). - """ - self._reset_buffer() - def __call__( self, env: EmbodiedEnv, @@ -160,8 +148,6 @@ def __call__( use_videos: Whether to save videos (already set in __init__). export_success_only: Whether to export only successful episodes (already set in __init__). """ - # Always record the step - self._record_step(obs, action) # Check if any episodes are done and save them done_env_ids = dones.nonzero(as_tuple=False).squeeze(-1) @@ -169,43 +155,38 @@ def __call__( # Save completed episodes self._save_episodes(done_env_ids, terminateds, info) - def _record_step(self, obs: EnvObs, action: EnvAction) -> None: - """Record a single step.""" - self.episode_obs_list.append(obs) - self.episode_action_list.append(action) - def _save_episodes( self, env_ids: torch.Tensor, terminateds: Optional[torch.Tensor] = None, info: Optional[Dict[str, Any]] = None, ) -> None: - """Save completed episodes.""" - if len(self.episode_obs_list) == 0: - logger.log_warning("No episode data to save") - return - - obs_list = self.episode_obs_list - action_list = self.episode_action_list + """Save completed episodes for specified environments.""" + task = self.instruction.get("lang", "unknown_task") - # Align obs and action - if len(obs_list) > len(action_list): - obs_list = obs_list[:-1] + # Process each environment + for env_id in env_ids.cpu().tolist(): + # Get buffer for this environment (already contains single-env data) + obs_list = self._env.episode_obs_buffer[env_id] + action_list = self._env.episode_action_buffer[env_id] - task = self.instruction.get("lang", "unknown_task") + if len(obs_list) == 0: + logger.log_warning(f"No episode data to save for env {env_id}") + continue - # Update metadata - extra_info = self.extra.copy() if self.extra else {} - fps = self.dataset.meta.info.get("fps", 30) - current_episode_time = (len(obs_list) * len(env_ids)) / fps if fps > 0 else 0 + # Align obs and action + if len(obs_list) > len(action_list): + obs_list = obs_list[:-1] - episode_extra_info = extra_info.copy() - self.total_time += current_episode_time - episode_extra_info["total_time"] = self.total_time - self._update_dataset_info({"extra": episode_extra_info}) + # Update metadata + extra_info = self.extra.copy() if self.extra else {} + fps = self.dataset.meta.info.get("fps", 30) + current_episode_time = len(obs_list) / fps if fps > 0 else 0 - # Process each environment - for env_id in env_ids.cpu().tolist(): + episode_extra_info = extra_info.copy() + self.total_time += current_episode_time + episode_extra_info["total_time"] = self.total_time + self._update_dataset_info({"extra": episode_extra_info}) is_success = False if info is not None and "success" in info: success_tensor = info["success"] @@ -222,42 +203,48 @@ def _save_episodes( try: for obs, action in zip(obs_list, action_list): - frame = self._convert_frame_to_lerobot(obs, action, task, env_id) + frame = self._convert_frame_to_lerobot(obs, action, task) self.dataset.add_frame(frame) self.dataset.save_episode() + logger.log_info( - f"Auto-saved {'successful' if is_success else 'failed'} " - f"episode {self.curr_episode} for env {env_id} with {len(obs_list)} frames" + f"[LeRobotRecorder] Saved dataset to: {self.dataset_path}\n" + f" Episode {self.curr_episode} (env {env_id}): " + f"{'successful' if is_success else 'failed'}, {len(obs_list)} frames" ) + self.curr_episode += 1 except Exception as e: logger.log_error(f"Failed to save episode {env_id}: {e}") - self._reset_buffer() - def finalize(self) -> Optional[str]: """Finalize the dataset.""" - if len(self.episode_obs_list) > 0: - active_env_ids = torch.arange(self.num_envs, device=self.device) + # Save any remaining episodes + env_ids_with_data = [] + for env_id in range(self.num_envs): + if len(self._env.episode_obs_buffer[env_id]) > 0: + env_ids_with_data.append(env_id) + + if env_ids_with_data: + active_env_ids = torch.tensor(env_ids_with_data, device=self.device) self._save_episodes(active_env_ids) try: if self.dataset is not None: self.dataset.finalize() - logger.log_info(f"Dataset finalized at: {self.dataset_path}") + logger.log_info( + f"[LeRobotRecorder] Dataset finalized successfully\n" + f" Path: {self.dataset_path}\n" + f" Total episodes: {self.curr_episode}\n" + f" Total time: {self.total_time:.2f}s" + ) return self.dataset_path except Exception as e: - logger.log_error(f"Failed to finalize dataset: {e}") + logger.log_error(f"[LeRobotRecorder] Failed to finalize dataset: {e}") return None - def _reset_buffer(self) -> None: - """Reset episode buffers.""" - self.episode_obs_list.clear() - self.episode_action_list.clear() - logger.log_info("Reset buffers (cleared all batched data)") - def _initialize_dataset(self) -> None: """Initialize the LeRobot dataset.""" robot_type = self.robot_meta.get("robot_type", "robot") @@ -348,9 +335,18 @@ def _build_features(self) -> Dict: return features def _convert_frame_to_lerobot( - self, obs: Dict[str, Any], action: Any, task: str, env_id: int + self, obs: Dict[str, Any], action: Any, task: str ) -> Dict: - """Convert a single frame to LeRobot format.""" + """Convert a single frame to LeRobot format. + + Args: + obs: Single environment observation (already extracted from batch) + action: Single environment action (already extracted from batch) + task: Task name + + Returns: + Frame dict in LeRobot format with numpy arrays + """ frame = {"task": task} extra_vision_config = self.robot_meta.get("observation", {}).get("vision", {}) arm_dofs = self.robot_meta.get("arm_dofs", 7) @@ -363,9 +359,9 @@ def _convert_frame_to_lerobot( color_data = obs["sensor"][camera_name]["color"] if isinstance(color_data, torch.Tensor): - color_img = color_data[env_id][:, :, :3].cpu().numpy() + color_img = color_data[:, :, :3].cpu().numpy() else: - color_img = np.array(color_data)[env_id][:, :, :3] + color_img = np.array(color_data)[:, :, :3] if color_img.dtype in [np.float32, np.float64]: color_img = (color_img * 255).astype(np.uint8) @@ -375,11 +371,9 @@ def _convert_frame_to_lerobot( if is_stereo: color_right_data = obs["sensor"][camera_name]["color_right"] if isinstance(color_right_data, torch.Tensor): - color_right_img = ( - color_right_data[env_id][:, :, :3].cpu().numpy() - ) + color_right_img = color_right_data[:, :, :3].cpu().numpy() else: - color_right_img = np.array(color_right_data)[env_id][:, :, :3] + color_right_img = np.array(color_right_data)[:, :, :3] if color_right_img.dtype in [np.float32, np.float64]: color_right_img = (color_right_img * 255).astype(np.uint8) @@ -389,25 +383,37 @@ def _convert_frame_to_lerobot( # Add state qpos = obs["robot"][JointType.QPOS.value] if isinstance(qpos, torch.Tensor): - state_data = qpos[env_id].cpu().numpy().astype(np.float32) + state_data = qpos.cpu().numpy().astype(np.float32) else: - state_data = np.array(qpos)[env_id].astype(np.float32) + state_data = np.array(qpos).astype(np.float32) frame["observation.state"] = state_data # Add action if isinstance(action, torch.Tensor): - action_data = action[env_id, :arm_dofs].cpu().numpy() + action_data = action[:arm_dofs].cpu().numpy() elif isinstance(action, np.ndarray): - action_data = action[env_id, :arm_dofs] + action_data = action[:arm_dofs] elif isinstance(action, dict): - action_data = action.get("action", action.get("arm_action", action)) - if isinstance(action_data, torch.Tensor): - action_data = action_data[env_id, :arm_dofs].cpu().numpy() - elif isinstance(action_data, np.ndarray): - action_data = action_data[env_id, :arm_dofs] + # Extract qpos from action dict + action_tensor = action.get( + "qpos", action.get("delta_qpos", action.get("action", None)) + ) + if action_tensor is None: + # Fallback to first tensor value + for v in action.values(): + if isinstance(v, (torch.Tensor, np.ndarray)): + action_tensor = v + break + + if isinstance(action_tensor, torch.Tensor): + action_data = action_tensor[:arm_dofs].cpu().numpy() + elif isinstance(action_tensor, np.ndarray): + action_data = action_tensor[:arm_dofs] + else: + action_data = np.array(action_tensor)[:arm_dofs] else: - action_data = np.array(action)[env_id, :arm_dofs] + action_data = np.array(action)[:arm_dofs] frame["action"] = action_data diff --git a/embodichain/lab/gym/envs/managers/rewards.py b/embodichain/lab/gym/envs/managers/rewards.py index 5cfbef55..dad89b7f 100644 --- a/embodichain/lab/gym/envs/managers/rewards.py +++ b/embodichain/lab/gym/envs/managers/rewards.py @@ -30,7 +30,7 @@ def distance_between_objects( env: EmbodiedEnv, obs: dict, - action: torch.Tensor, + action: torch.Tensor | dict, info: dict, source_entity_cfg: SceneEntityCfg = None, target_entity_cfg: SceneEntityCfg = None, @@ -92,7 +92,7 @@ def distance_between_objects( def joint_velocity_penalty( env: EmbodiedEnv, obs: dict, - action: torch.Tensor, + action: torch.Tensor | dict, info: dict, robot_uid: str = "robot", joint_ids: slice | list[int] | None = None, @@ -146,21 +146,21 @@ def joint_velocity_penalty( def action_smoothness_penalty( env: EmbodiedEnv, obs: dict, - action: torch.Tensor, + action: torch.Tensor | dict, info: dict, ) -> torch.Tensor: """Penalize large action changes between consecutive timesteps. Encourages smooth control commands by penalizing sudden changes in actions. - Stores previous action in env._reward_states for comparison. + Gets previous action from env.episode_action_buffer. Returns: - Penalty tensor of shape (num_envs,). Zero on first call (no previous action), - negative on subsequent calls (larger change = more negative). + Penalty tensor of shape (num_envs,). Zero on first step (no previous action), + negative on subsequent steps (larger change = more negative). Note: - This function maintains state across calls using env._reward_states['prev_actions']. - State is automatically reset when the environment resets. + This function reads from env.episode_action_buffer, which is automatically + cleared when the environment resets. Example: ```json @@ -171,20 +171,48 @@ def action_smoothness_penalty( } ``` """ - # Use dictionary-based state management - if not hasattr(env, "_reward_states"): - env._reward_states = {} - - # compute difference between current and previous action - if "prev_actions" in env._reward_states: - action_diff = action - env._reward_states["prev_actions"] - penalty = -torch.norm(action_diff, dim=-1) + # Extract current action tensor + if isinstance(action, torch.Tensor): + current_action_tensor = action + elif isinstance(action, dict): + # Extract from dict + current_action_tensor = None + for key in ["qpos", "qvel", "qf"]: + if key in action: + current_action_tensor = action[key] + break + if current_action_tensor is None: + return torch.zeros(env.num_envs, device=env.device) else: - # no previous action, no penalty - penalty = torch.zeros(env.num_envs, device=env.device) + return torch.zeros(env.num_envs, device=env.device) - # store current action for next step - env._reward_states["prev_actions"] = action.clone() + # Get previous action from buffer for each environment + penalty = torch.zeros(env.num_envs, device=env.device) + + for env_id in range(env.num_envs): + action_buffer = env.episode_action_buffer[env_id] + + if len(action_buffer) > 0: + # Get previous action (last in buffer) + prev_action = action_buffer[-1] + + # Extract tensor from previous action + if isinstance(prev_action, torch.Tensor): + prev_action_tensor = prev_action + elif isinstance(prev_action, dict): + prev_action_tensor = None + for key in ["qpos", "qvel", "qf"]: + if key in prev_action: + prev_action_tensor = prev_action[key] + break + else: + prev_action_tensor = None + + if prev_action_tensor is not None: + # Compute difference and penalty + action_diff = current_action_tensor[env_id] - prev_action_tensor + penalty[env_id] = -torch.norm(action_diff) + # else: first step, no penalty (already zero) return penalty @@ -192,7 +220,7 @@ def action_smoothness_penalty( def joint_limit_penalty( env: EmbodiedEnv, obs: dict, - action: torch.Tensor, + action: torch.Tensor | dict, info: dict, robot_uid: str = "robot", joint_ids: slice | list[int] = slice(None), @@ -258,7 +286,7 @@ def joint_limit_penalty( def orientation_alignment( env: EmbodiedEnv, obs: dict, - action: torch.Tensor, + action: torch.Tensor | dict, info: dict, source_entity_cfg: SceneEntityCfg = None, target_entity_cfg: SceneEntityCfg = None, @@ -313,7 +341,7 @@ def orientation_alignment( def success_reward( env: EmbodiedEnv, obs: dict, - action: torch.Tensor, + action: torch.Tensor | dict, info: dict, ) -> torch.Tensor: """Sparse bonus reward when task succeeds. @@ -361,7 +389,7 @@ def success_reward( def reaching_behind_object( env: EmbodiedEnv, obs: dict, - action: torch.Tensor, + action: torch.Tensor | dict, info: dict, object_cfg: SceneEntityCfg = None, target_pose_key: str = "goal_pose", @@ -452,7 +480,7 @@ def reaching_behind_object( def distance_to_target( env: "EmbodiedEnv", obs: dict, - action: torch.Tensor, + action: torch.Tensor | dict, info: dict, source_entity_cfg: SceneEntityCfg = None, target_pose_key: str = "target_pose", @@ -529,7 +557,7 @@ def distance_to_target( def incremental_distance_to_target( env: "EmbodiedEnv", obs: dict, - action: torch.Tensor, + action: torch.Tensor | dict, info: dict, source_entity_cfg: SceneEntityCfg = None, target_pose_key: str = "target_pose", diff --git a/embodichain/lab/gym/envs/rl_env.py b/embodichain/lab/gym/envs/rl_env.py new file mode 100644 index 00000000..9ce6b5f1 --- /dev/null +++ b/embodichain/lab/gym/envs/rl_env.py @@ -0,0 +1,277 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2025 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +"""Base environment for reinforcement learning tasks.""" + +import torch +from typing import Dict, Any, Sequence, Optional, Tuple + +from embodichain.lab.gym.envs import EmbodiedEnv, EmbodiedEnvCfg +from embodichain.lab.sim.cfg import MarkerCfg +from embodichain.lab.sim.types import EnvObs, EnvAction +from embodichain.utils.math import matrix_from_quat, matrix_from_euler + + +__all__ = ["RLEnv"] + + +class RLEnv(EmbodiedEnv): + """Base class for reinforcement learning tasks. + + Provides common utilities for RL tasks: + - Goal pose management and visualization + - Flexible action preprocessing (scaling, IK, normalization) + - Standardized info dictionary structure + """ + + def __init__(self, cfg: EmbodiedEnvCfg = None, **kwargs): + if cfg is None: + cfg = EmbodiedEnvCfg() + super().__init__(cfg, **kwargs) + + # Set default values for common RL parameters + if not hasattr(self, "action_scale"): + self.action_scale = 1.0 + if not hasattr(self, "episode_length"): + self.episode_length = 1000 + + @property + def goal_pose(self) -> Optional[torch.Tensor]: + """Get current goal poses (4x4 matrices) for all environments.""" + return self._goal_pose + + def draw_goal_marker(self, env_ids: Sequence[int] | None = None): + """Draw axis markers at goal positions for visualization. + + Args: + env_ids: Environment IDs to draw markers for (None = all envs) + """ + if self.goal_pose is None: + return + + if env_ids is None: + env_ids = range(self.num_envs) + + for arena_idx in env_ids: + marker_name = f"goal_marker_{arena_idx}" + self.sim.remove_marker(marker_name) + goal_pose = self.goal_pose[arena_idx].detach().cpu().numpy() + marker_cfg = MarkerCfg( + name=marker_name, + marker_type="axis", + axis_xpos=[goal_pose], + axis_size=0.003, + axis_len=0.02, + arena_index=arena_idx, + ) + self.sim.draw_marker(cfg=marker_cfg) + + def _preprocess_action(self, action: EnvAction) -> EnvAction: + """Preprocess action for RL tasks with flexible transformation. + + Supports multiple action formats: + 1. Dict input (keys specify action type): + - {"delta_qpos": tensor}: Delta joint positions (scaled and added to current) + - {"qpos": tensor}: Absolute joint positions (scaled) + - {"qpos_normalized": tensor}: Normalized qpos in [-1, 1] + - {"eef_pose": tensor}: End-effector pose (6D or 7D) converted via IK + - {"qvel": tensor}: Joint velocities (scaled) + - {"qf": tensor}: Joint forces/torques (scaled) + + 2. Tensor input: Interpreted based on self.action_type attribute + (default: "qpos") + + Args: + action: Raw action from policy (tensor or dict) + + Returns: + Dict action ready for robot control + """ + # Convert tensor input to dict based on action_type + if not isinstance(action, dict): + action_type = getattr(self, "action_type", "delta_qpos") + action = {action_type: action} + + # Step 1: Scale all action values by action_scale + scaled_action = {} + for key, value in action.items(): + if isinstance(value, torch.Tensor): + scaled_action[key] = value * self.action_scale + else: + scaled_action[key] = value + + # Step 2: Process based on dict keys + result = {} + + if "qpos" in scaled_action: + result["qpos"] = scaled_action["qpos"] + elif "delta_qpos" in scaled_action: + result["qpos"] = self._process_delta_qpos(scaled_action["delta_qpos"]) + elif "qpos_normalized" in scaled_action: + result["qpos"] = self._denormalize_action(scaled_action["qpos_normalized"]) + elif "eef_pose" in scaled_action: + result["qpos"] = self._process_eef_pose(scaled_action["eef_pose"]) + + # Velocity and force controls + if "qvel" in scaled_action: + result["qvel"] = scaled_action["qvel"] + + if "qf" in scaled_action: + result["qf"] = scaled_action["qf"] + + return result + + def _denormalize_action(self, action: torch.Tensor) -> torch.Tensor: + """Denormalize action from [-1, 1] to actual range. + + Args: + action: Normalized action in [-1, 1] + + Returns: + Denormalized action + """ + qpos_limits = self.robot.body_data.qpos_limits[0] + low = qpos_limits[:, 0] + high = qpos_limits[:, 1] + + # Map [-1, 1] to [low, high] + return low + (action + 1.0) * 0.5 * (high - low) + + def _process_delta_qpos(self, action: torch.Tensor) -> torch.Tensor: + """Process delta joint position action. + + Args: + action: Delta joint positions + + Returns: + Absolute joint positions + """ + current_qpos = self.robot.get_qpos() + return current_qpos + action + + def _process_eef_pose(self, action: torch.Tensor) -> torch.Tensor: + """Process end-effector pose action via inverse kinematics. + + TODO: Currently only supports single-arm robots (6-axis or 7-axis). + For multi-arm or complex robots, please use qpos/delta_qpos actions instead. + + Args: + action: End-effector pose (position + orientation) + Shape: (num_envs, 6) for pos+euler or (num_envs, 7) for pos+quat + + Returns: + Joint positions from IK + """ + # Get current joint positions as IK seed + current_qpos = self.robot.get_qpos() + + # Convert action to target pose matrix (4x4) + batch_size = action.shape[0] + target_pose = ( + torch.eye(4, device=self.device).unsqueeze(0).repeat(batch_size, 1, 1) + ) + + if action.shape[-1] == 6: + # pos (3) + euler angles (3) + target_pose[:, :3, 3] = action[:, :3] + target_pose[:, :3, :3] = matrix_from_euler(action[:, 3:6]) + elif action.shape[-1] == 7: + # pos (3) + quaternion (4) + target_pose[:, :3, 3] = action[:, :3] + target_pose[:, :3, :3] = matrix_from_quat(action[:, 3:7]) + else: + raise ValueError( + f"EEF pose action must be 6D or 7D, got {action.shape[-1]}D" + ) + + # Solve IK for each environment + ik_solutions = [] + for env_idx in range(self.num_envs): + qpos_ik = self.robot.compute_ik( + pose=target_pose[env_idx], + joint_seed=current_qpos[env_idx], + ) + ik_solutions.append(qpos_ik) + + # Stack IK solutions + result_qpos = torch.stack(ik_solutions, dim=0) + + return result_qpos + + def compute_task_state( + self, **kwargs + ) -> Tuple[torch.Tensor, torch.Tensor, Dict[str, Any]]: + """Compute task-specific state: success, fail, and metrics. + + Override this method in subclass to define task-specific logic. + + Returns: + Tuple of (success, fail, metrics): + - success: Boolean tensor of shape (num_envs,) + - fail: Boolean tensor of shape (num_envs,) + - metrics: Dict of metric tensors + """ + success = torch.zeros(self.num_envs, device=self.device, dtype=torch.bool) + fail = torch.zeros(self.num_envs, device=self.device, dtype=torch.bool) + metrics = {} + return success, fail, metrics + + def get_info(self, **kwargs) -> Dict[str, Any]: + """Get environment info dictionary. + + Calls compute_task_state() to get task-specific success/fail/metrics. + Subclasses should override compute_task_state() instead of this method. + + Returns: + Info dictionary with success, fail, elapsed_steps, goal_pose, metrics + """ + success, fail, metrics = self.compute_task_state(**kwargs) + + info = { + "success": success, + "fail": fail, + "elapsed_steps": self._elapsed_steps, + "goal_pose": self.goal_pose, + } + info["metrics"] = metrics + return info + + def check_truncated(self, obs: EnvObs, info: Dict[str, Any]) -> torch.Tensor: + """Check if episode should be truncated (timeout). + + Args: + obs: Current observation + info: Info dictionary + + Returns: + Boolean tensor of shape (num_envs,) + """ + return self._elapsed_steps >= self.episode_length + + def evaluate(self, **kwargs) -> Dict[str, Any]: + """Evaluate the environment state. + + Returns: + Evaluation dictionary with success and metrics + """ + info = self.get_info(**kwargs) + eval_dict = { + "success": info["success"][0].item(), + } + if "metrics" in info: + for key, value in info["metrics"].items(): + eval_dict[key] = value + return eval_dict diff --git a/embodichain/lab/gym/envs/tasks/rl/push_cube.py b/embodichain/lab/gym/envs/tasks/rl/push_cube.py index 072574fc..276a433a 100644 --- a/embodichain/lab/gym/envs/tasks/rl/push_cube.py +++ b/embodichain/lab/gym/envs/tasks/rl/push_cube.py @@ -15,19 +15,16 @@ # ---------------------------------------------------------------------------- import torch -import numpy as np -from typing import Dict, Any, Sequence -from gymnasium import spaces +from typing import Dict, Any, Tuple from embodichain.lab.gym.utils.registration import register_env -from embodichain.lab.gym.envs import EmbodiedEnv, EmbodiedEnvCfg -from embodichain.lab.sim.cfg import MarkerCfg -from embodichain.lab.sim.types import EnvObs, EnvAction -from embodichain.utils import logger +from embodichain.lab.gym.envs.rl_env import RLEnv +from embodichain.lab.gym.envs import EmbodiedEnvCfg +from embodichain.lab.sim.types import EnvObs @register_env("PushCubeRL", max_episode_steps=50, override=True) -class PushCubeEnv(EmbodiedEnv): +class PushCubeEnv(RLEnv): """Push cube task for reinforcement learning. The task involves pushing a cube to a target goal position using a robotic arm. @@ -37,100 +34,32 @@ class PushCubeEnv(EmbodiedEnv): def __init__(self, cfg=None, **kwargs): if cfg is None: cfg = EmbodiedEnvCfg() - super().__init__(cfg, **kwargs) - @property - def goal_pose(self) -> torch.Tensor: - """Get current goal poses (4x4 matrices) for all environments.""" - return self._goal_pose - - def _draw_goal_marker(self): - """Draw axis marker at goal position for visualization.""" - goal_sphere = self.sim.get_rigid_object("goal_sphere") - if goal_sphere is None: - return - - num_envs = self.cfg.num_envs - - # Get actual goal positions from each arena - goal_poses = goal_sphere.get_local_pose(to_matrix=True) # (num_envs, 4, 4) - - # Draw marker for each arena separately - for arena_idx in range(num_envs): - marker_name = f"goal_marker_{arena_idx}" - - self.sim.remove_marker(marker_name) - - goal_pose = goal_poses[arena_idx].detach().cpu().numpy() - marker_cfg = MarkerCfg( - name=marker_name, - marker_type="axis", - axis_xpos=[goal_pose], - axis_size=0.003, - axis_len=0.02, - arena_index=arena_idx, - ) - self.sim.draw_marker(cfg=marker_cfg) - - def _initialize_episode( - self, env_ids: Sequence[int] | None = None, **kwargs - ) -> None: - super()._initialize_episode(env_ids=env_ids, **kwargs) - - # Draw marker at goal position - # self._draw_goal_marker() - - def _step_action(self, action: EnvAction) -> EnvAction: - scaled_action = action * self.action_scale - scaled_action = torch.clamp( - scaled_action, -self.joint_limits, self.joint_limits - ) - current_qpos = self.robot.body_data.qpos - target_qpos = current_qpos.clone() - target_qpos[:, :6] += scaled_action[:, :6] - self.robot.set_qpos(qpos=target_qpos) - return scaled_action - - def get_info(self, **kwargs) -> Dict[str, Any]: + def compute_task_state( + self, **kwargs + ) -> Tuple[torch.Tensor, torch.Tensor, Dict[str, Any]]: cube = self.sim.get_rigid_object("cube") - cube_pos = cube.body_data.pose[:, :3] + cube_pos = cube.get_local_pose(to_matrix=True)[:, :3, 3] - # Get goal position from event-managed goal pose if self.goal_pose is not None: goal_pos = self.goal_pose[:, :3, 3] xy_distance = torch.norm(cube_pos[:, :2] - goal_pos[:, :2], dim=1) is_success = xy_distance < self.success_threshold else: - # Goal not yet set by randomize_target_pose event (e.g., before first reset) - xy_distance = torch.zeros(self.cfg.num_envs, device=self.device) + xy_distance = torch.zeros(self.num_envs, device=self.device) is_success = torch.zeros( - self.cfg.num_envs, device=self.device, dtype=torch.bool + self.num_envs, device=self.device, dtype=torch.bool ) - info = { - "success": is_success, - "fail": torch.zeros( - self.cfg.num_envs, device=self.device, dtype=torch.bool - ), - "elapsed_steps": self._elapsed_steps, - "goal_pose": self.goal_pose, - } - info["metrics"] = { - "distance_to_goal": xy_distance, - } - return info + is_fail = torch.zeros(self.num_envs, device=self.device, dtype=torch.bool) + metrics = {"distance_to_goal": xy_distance} + + return is_success, is_fail, metrics def check_truncated(self, obs: EnvObs, info: Dict[str, Any]) -> torch.Tensor: is_timeout = self._elapsed_steps >= self.episode_length cube = self.sim.get_rigid_object("cube") - cube_pos = cube.body_data.pose[:, :3] + cube_pos = cube.get_local_pose(to_matrix=True)[:, :3, 3] is_fallen = cube_pos[:, 2] < -0.1 return is_timeout | is_fallen - - def evaluate(self, **kwargs) -> Dict[str, Any]: - info = self.get_info(**kwargs) - return { - "success": info["success"][0].item(), - "distance_to_goal": info["distance_to_goal"], - } diff --git a/pyproject.toml b/pyproject.toml index 50fef254..d834132e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,8 +39,8 @@ dependencies = [ "polars==1.31.0", "PyYAML>=6.0", "accelerate>=1.10.0", - "wandb==0.20.1", - "tensorboard", + "wandb>=0.21.0", + "tensorboard>=2.20.0", "transformers>=4.53.0", "diffusers>=0.32.1", "deepspeed>=0.16.2", diff --git a/tests/agents/test_rl.py b/tests/agents/test_rl.py new file mode 100644 index 00000000..475ffa1d --- /dev/null +++ b/tests/agents/test_rl.py @@ -0,0 +1,131 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2025 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +import os +import json +import pytest +import tempfile +from pathlib import Path + + +class TestRLTraining: + """Test suite for RL training pipeline.""" + + temp_gym_config_path = None + temp_train_config_path = None + + def setup_method(self): + """Set up test configuration before each test method.""" + # Load the existing push_cube config + train_config_path = "configs/agents/rl/push_cube/train_config.json" + gym_config_path = "configs/agents/rl/push_cube/gym_config.json" + + with open(train_config_path, "r") as f: + train_config = json.load(f) + + with open(gym_config_path, "r") as f: + gym_config = json.load(f) + + # Add dataset configuration dynamically to gym_config + gym_config["env"]["dataset"] = { + "lerobot": { + "func": "LeRobotRecorder", + "mode": "save", + "params": { + "robot_meta": { + "robot_type": "UR10_DH_Gripper", + "control_freq": 25, + "arm_dofs": 6, + "observation": {"vision": {}, "states": ["qpos"]}, + }, + "instruction": {"lang": "push_cube_to_target"}, + "extra": { + "scene_type": "tabletop", + "task_description": "push_cube_rl_test", + }, + "use_videos": False, + "export_success_only": False, + }, + } + } + + # Create temporary gym config with dataset + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(gym_config, f) + self.temp_gym_config_path = f.name + + # Create temporary train config with reduced iterations for testing + test_train_config = train_config.copy() + test_train_config["trainer"]["gym_config"] = self.temp_gym_config_path + test_train_config["trainer"]["iterations"] = 2 + test_train_config["trainer"]["rollout_steps"] = 32 + test_train_config["trainer"]["eval_freq"] = 1000000 # Disable eval + test_train_config["trainer"]["save_freq"] = 1000000 # Disable save + test_train_config["trainer"]["headless"] = True + test_train_config["trainer"]["use_wandb"] = False + test_train_config["trainer"]["num_envs"] = 2 + + # Save temporary train config + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(test_train_config, f) + self.temp_train_config_path = f.name + + def teardown_method(self): + """Clean up resources after each test method.""" + # Clean up temporary files + if self.temp_train_config_path and os.path.exists(self.temp_train_config_path): + os.remove(self.temp_train_config_path) + self.temp_train_config_path = None + + if self.temp_gym_config_path and os.path.exists(self.temp_gym_config_path): + os.remove(self.temp_gym_config_path) + self.temp_gym_config_path = None + + def test_training_pipeline(self): + """Test basic RL training pipeline with minimal iterations.""" + from embodichain.agents.rl.train import train_from_config + + # This should run without errors + train_from_config(self.temp_train_config_path) + + @pytest.mark.parametrize("num_envs", [1, 2, 4]) + def test_multi_env_training(self, num_envs: int): + """Test training with different numbers of parallel environments.""" + # Reload and modify config for this specific test + with open(self.temp_train_config_path, "r") as f: + config = json.load(f) + + config["trainer"]["num_envs"] = num_envs + config["trainer"][ + "iterations" + ] = 1 # Even fewer iterations for parameterized test + + # Save modified config + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(config, f) + temp_config = f.name + + try: + from embodichain.agents.rl.train import train_from_config + + train_from_config(temp_config) + finally: + if os.path.exists(temp_config): + os.remove(temp_config) + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"])