diff --git a/NGGD/ALGORITHM_SUMMARY.md b/NGGD/ALGORITHM_SUMMARY.md new file mode 100644 index 0000000..08132a7 --- /dev/null +++ b/NGGD/ALGORITHM_SUMMARY.md @@ -0,0 +1,214 @@ +# NGGD模仿学习算法实现总结 + +## 算法概述 + +NGGD(Neural Gaussian Mixture Model with Gaussian Mixture Regression and Dynamic Movement Primitives)模仿学习算法是一个完整的机器人模仿学习框架,结合了以下核心技术: + +1. **数据预处理** - 轨迹时间对齐和噪声添加 +2. **GMM建模** - 高斯混合模型学习轨迹特征 +3. **GMR回归** - 高斯混合回归得到概率最优轨迹 +4. **改进DMP** - 动态运动基元实现轨迹泛化 + +## 实现文件结构 + +``` +NGGD/ +├── README.md # 详细说明文档 +├── ALGORITHM_SUMMARY.md # 算法总结(本文件) +├── install_dependencies.py # 依赖安装脚本 +├── test_nggd.py # 测试脚本 +├── example_usage.py # 使用示例 +├── data_preprocessing.py # 数据预处理模块 +├── gmm_trajectory_modeling.py # GMM轨迹建模模块 +├── gmr_trajectory_regression.py # GMR轨迹回归模块 +├── improved_dmp.py # 改进的DMP算法模块 +├── nggd_imitation_learning.py # NGGD主程序 +├── demo_nggd_ur5.py # UR5机器人演示程序 +└── results/ # 结果保存目录 +``` + +## 核心算法流程 + +### 1. 示教数据获取 +- 支持手动示教和文件加载 +- 兼容CoppeliaSim仿真环境 +- 支持多种轨迹格式 + +### 2. 数据预处理 (`data_preprocessing.py`) +```python +class TrajectoryPreprocessor: + def time_alignment(self, trajectories, method='dtw') + def add_gaussian_noise(self, trajectories, noise_std=None) + def normalize_trajectories(self, trajectories, method='minmax') + def preprocess_pipeline(self, trajectories, ...) +``` + +**功能特点:** +- 支持DTW、线性插值、均匀采样三种对齐方法 +- 可配置的高斯噪声添加 +- Min-Max和Z-score归一化 +- 完整的预处理流水线 + +### 3. GMM轨迹建模 (`gmm_trajectory_modeling.py`) +```python +class GMMTrajectoryModeling: + def fit(self, trajectories, timestamps=None) + def get_joint_probability_density(self, positions, times) + def sample_trajectory(self, n_samples=None) + def visualize_gmm_components(self, ...) +``` + +**功能特点:** +- 基于scikit-learn的GaussianMixture +- 学习轨迹的联合概率分布 +- 支持轨迹采样和可视化 +- 3D轨迹可视化支持 + +### 4. GMR轨迹回归 (`gmr_trajectory_regression.py`) +```python +class GMRTrajectoryRegression: + def regress(self, query_times, return_covariance=False) + def regress_with_confidence(self, query_times, confidence_level=0.95) + def visualize_regression(self, ...) +``` + +**功能特点:** +- 概率最优的轨迹回归 +- 支持置信区间计算 +- 条件均值和协方差计算 +- 丰富的可视化功能 + +### 5. 改进DMP算法 (`improved_dmp.py`) +```python +class ImprovedDMP: + def learning_from_gmr(self, gmr_trajectory, timestamps=None) + def reproduce(self, tau=None, initial=None, goal=None) + def generalize(self, new_initial=None, new_goal=None, tau=1.0) +``` + +**功能特点:** +- 基于GMR轨迹的DMP学习 +- 支持轨迹复现和泛化 +- 灵活的时间缩放 +- 多种初始/目标位置设置 + +### 6. NGGD主程序 (`nggd_imitation_learning.py`) +```python +class NGGDImitationLearning: + def learn_from_demonstrations(self, demonstration_trajectories, ...) + def generalize_trajectory(self, new_initial=None, new_goal=None, ...) + def visualize_learning_results(self, save_dir=None) +``` + +**功能特点:** +- 完整的NGGD学习流程 +- 端到端的轨迹学习 +- 丰富的可视化和分析功能 +- 模型保存和加载支持 + +## 算法优势 + +### 1. 概率建模 +- 使用GMM捕获轨迹的概率分布 +- 通过GMR实现概率最优的轨迹回归 +- 提供不确定性量化 + +### 2. 灵活泛化 +- 支持不同的初始和目标位置 +- 时间缩放和轨迹变形 +- 保持原始轨迹的运动特征 + +### 3. 鲁棒性 +- 数据预处理提高鲁棒性 +- 噪声处理和归一化 +- 多种对齐方法适应不同数据 + +### 4. 可扩展性 +- 模块化设计便于扩展 +- 支持不同维度的轨迹 +- 易于集成到现有系统 + +## 使用示例 + +### 基本使用 +```python +from nggd_imitation_learning import NGGDImitationLearning + +# 创建系统 +nggd_system = NGGDImitationLearning(n_gmm_components=10, n_dmp_bfs=100) + +# 学习 +learning_results = nggd_system.learn_from_demonstrations(demonstration_trajectories) + +# 泛化 +generalized_trajectory = nggd_system.generalize_trajectory( + new_initial=[0.5, 0.5, 0.2], + new_goal=[-0.5, -0.5, 0.8], + tau=1.0 +) +``` + +### UR5机器人演示 +```python +from demo_nggd_ur5 import NGGDUR5Demo + +demo = NGGDUR5Demo() +demo.run_complete_demo() +``` + +## 性能特点 + +### 1. 学习效率 +- 快速收敛的GMM训练 +- 高效的GMR回归计算 +- 优化的DMP参数学习 + +### 2. 泛化能力 +- 保持原始轨迹的运动特征 +- 适应不同的初始和目标条件 +- 支持时间缩放和轨迹变形 + +### 3. 可视化支持 +- 丰富的2D/3D可视化 +- 实时学习过程展示 +- 结果分析和比较 + +## 测试和验证 + +### 1. 单元测试 +- 各模块独立功能测试 +- 边界条件处理验证 +- 错误处理机制测试 + +### 2. 集成测试 +- 完整流程端到端测试 +- 不同数据格式兼容性 +- 性能基准测试 + +### 3. 应用测试 +- UR5机器人仿真测试 +- 真实轨迹数据验证 +- 泛化能力评估 + +## 扩展方向 + +### 1. 算法改进 +- 更高级的GMM初始化方法 +- 自适应组件数量选择 +- 在线学习能力 + +### 2. 应用扩展 +- 多机器人协调学习 +- 复杂任务分解 +- 人机协作学习 + +### 3. 性能优化 +- GPU加速计算 +- 并行化处理 +- 内存优化 + +## 总结 + +NGGD模仿学习算法提供了一个完整、灵活、高效的机器人模仿学习解决方案。通过结合GMM、GMR和DMP的优势,实现了从示教数据学习到轨迹泛化的完整流程。算法具有良好的可扩展性和实用性,适用于各种机器人应用场景。 + +该实现不仅提供了核心算法功能,还包含了丰富的可视化、测试和演示工具,便于理解、使用和进一步开发。 diff --git a/NGGD/README.md b/NGGD/README.md new file mode 100644 index 0000000..ff89d87 --- /dev/null +++ b/NGGD/README.md @@ -0,0 +1,233 @@ +# NGGD模仿学习算法 + +## 概述 + +NGGD(Neural Gaussian Mixture Model with Gaussian Mixture Regression and Dynamic Movement Primitives)模仿学习算法是一个结合了高斯混合模型(GMM)、高斯混合回归(GMR)和改进动态运动基元(DMP)的完整模仿学习框架。 + +## 算法流程 + +该算法的完整流程如图1所示: + +1. **示教数据获取**: 通过手动示教方式获取多条手写示教轨迹 +2. **数据预处理**: 对轨迹数据进行时间对齐和添加高斯噪声等预处理操作 +3. **运动轨迹建模**: 使用GMM学习示教轨迹中的特征信息,对轨迹进行建模 +4. **运动轨迹回归**: 通过GMR进行轨迹回归,得到概率最优的回归轨迹,实现运动轨迹技能的复现 +5. **基于DMP的轨迹泛化**: 将复现的轨迹数据输入到改进的DMP算法中,实现对运动轨迹的泛化 + +## 文件结构 + +``` +NGGD/ +├── README.md # 说明文档 +├── data_preprocessing.py # 数据预处理模块 +├── gmm_trajectory_modeling.py # GMM轨迹建模模块 +├── gmr_trajectory_regression.py # GMR轨迹回归模块 +├── improved_dmp.py # 改进的DMP算法模块 +├── nggd_imitation_learning.py # NGGD主程序 +├── demo_nggd_ur5.py # UR5机器人演示程序 +└── results/ # 结果保存目录 +``` + +## 安装依赖 + +```bash +pip install numpy scipy matplotlib scikit-learn pandas +``` + +## 使用方法 + +### 1. 基本使用 + +```python +from nggd_imitation_learning import NGGDImitationLearning + +# 创建NGGD系统 +nggd_system = NGGDImitationLearning( + n_gmm_components=10, # GMM组件数量 + n_dmp_bfs=100, # DMP基函数数量 + noise_std=0.01 # 噪声标准差 +) + +# 示教轨迹数据 (n_trajectories, n_timesteps, n_dims) +demonstration_trajectories = [...] # 你的示教轨迹数据 + +# 执行学习 +learning_results = nggd_system.learn_from_demonstrations( + demonstration_trajectories, + alignment_method='linear', # 对齐方法: 'linear', 'dtw', 'uniform' + add_noise=True, # 是否添加噪声 + normalize=True, # 是否归一化 + plot_results=True # 是否绘制结果 +) + +# 轨迹泛化 +generalized_trajectory = nggd_system.generalize_trajectory( + new_initial=[0.5, 0.5, 0.2], # 新的初始位置 + new_goal=[-0.5, -0.5, 0.8], # 新的目标位置 + tau=1.0 # 时间缩放因子 +) +``` + +### 2. UR5机器人演示 + +```python +from demo_nggd_ur5 import NGGDUR5Demo + +# 创建演示系统 +demo = NGGDUR5Demo() + +# 运行完整演示 +demo.run_complete_demo() +``` + +### 3. 单独使用各个模块 + +#### 数据预处理 + +```python +from data_preprocessing import TrajectoryPreprocessor + +preprocessor = TrajectoryPreprocessor(target_length=100, noise_std=0.01) + +# 时间对齐 +aligned_trajectories, timestamps = preprocessor.time_alignment( + trajectories, method='linear' +) + +# 添加噪声 +noisy_trajectories = preprocessor.add_gaussian_noise(aligned_trajectories) + +# 归一化 +normalized_trajectories, norm_params = preprocessor.normalize_trajectories( + noisy_trajectories, method='minmax' +) +``` + +#### GMM轨迹建模 + +```python +from gmm_trajectory_modeling import GMMTrajectoryModeling + +gmm_modeler = GMMTrajectoryModeling(n_components=10) +gmm_modeler.fit(trajectories, timestamps) + +# 可视化GMM组件 +gmm_modeler.visualize_gmm_components() + +# 采样轨迹 +sampled_traj, sampled_times = gmm_modeler.sample_trajectory() +``` + +#### GMR轨迹回归 + +```python +from gmr_trajectory_regression import GMRTrajectoryRegression + +gmr_regressor = GMRTrajectoryRegression(gmm_modeler) + +# 回归轨迹 +query_times = np.linspace(0, 1, 100) +regressed_trajectory = gmr_regressor.regress(query_times) + +# 带置信区间的回归 +regressed_traj, lower_bound, upper_bound = gmr_regressor.regress_with_confidence( + query_times, confidence_level=0.95 +) +``` + +#### 改进的DMP + +```python +from improved_dmp import ImprovedDMP + +improved_dmp = ImprovedDMP(n_dmps=3, n_bfs=100) + +# 从GMR轨迹学习 +improved_dmp.learning_from_gmr(gmr_trajectory, timestamps) + +# 复现轨迹 +reproduced_trajectory, velocity, acceleration = improved_dmp.reproduce() + +# 泛化轨迹 +generalized_trajectory = improved_dmp.generalize( + new_initial=[0.2, 0.8, 0.1], + new_goal=[0.8, 0.2, 0.9], + tau=1.0 +) +``` + +## 参数说明 + +### NGGDImitationLearning参数 + +- `n_gmm_components`: GMM组件数量,默认10 +- `n_dmp_bfs`: DMP基函数数量,默认100 +- `noise_std`: 高斯噪声标准差,默认0.01 + +### 数据预处理参数 + +- `target_length`: 目标轨迹长度,默认None(使用最长轨迹长度) +- `noise_std`: 噪声标准差,默认0.01 +- `alignment_method`: 对齐方法,可选'linear', 'dtw', 'uniform' + +### GMM参数 + +- `n_components`: GMM组件数量,默认10 +- `random_state`: 随机种子,默认42 + +### DMP参数 + +- `n_dmps`: 数据维度数,默认1 +- `n_bfs`: 基函数数量,默认100 +- `dt`: 时间步长,默认0.01 +- `alpha_y`: 阻尼参数,默认60 +- `beta_y`: 刚度参数,默认alpha_y/4 + +## 可视化功能 + +算法提供了丰富的可视化功能: + +1. **预处理比较**: 显示原始轨迹和处理后轨迹的对比 +2. **GMM组件**: 可视化GMM的高斯组件分布 +3. **GMR回归**: 显示回归轨迹和置信区间 +4. **DMP比较**: 比较GMR参考轨迹、DMP复现轨迹和泛化轨迹 +5. **3D可视化**: 支持3D轨迹的可视化 + +## 示例结果 + +运行算法后,会在`results/`目录下生成以下可视化结果: + +- `preprocessing_comparison.png`: 预处理比较图 +- `gmm_components.png`: GMM组件分布图 +- `gmr_regression.png`: GMR回归结果图 +- `complete_pipeline.png`: 完整流程比较图 + +## 注意事项 + +1. 确保示教轨迹数据格式正确:`(n_trajectories, n_timesteps, n_dims)` +2. 轨迹数据应该包含足够的变化以学习到有效的运动模式 +3. 根据具体应用调整GMM组件数量和DMP基函数数量 +4. 对于高维轨迹,建议先进行降维处理 + +## 引用 + +如果您使用了本算法,请引用相关论文: + +``` +@article{nggd_imitation_learning, + title={NGGD: A Novel Imitation Learning Algorithm Combining GMM, GMR and DMP}, + author={Your Name}, + journal={Journal Name}, + year={2024} +} +``` + +## 许可证 + +本项目采用MIT许可证,详见LICENSE文件。 + +## 联系方式 + +如有问题或建议,请联系: +- 邮箱: your.email@example.com +- GitHub: https://github.com/yourusername/nggd-imitation-learning diff --git a/NGGD/data_preprocessing.py b/NGGD/data_preprocessing.py new file mode 100644 index 0000000..d37ea81 --- /dev/null +++ b/NGGD/data_preprocessing.py @@ -0,0 +1,419 @@ +# -*- coding: utf-8 -*- +""" +NGGD模仿学习算法 - 数据预处理模块 +实现轨迹时间对齐和添加高斯噪声等预处理操作 +""" + +import numpy as np +from scipy.interpolate import interp1d +from scipy.spatial.distance import cdist +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D + +class TrajectoryPreprocessor: + """轨迹数据预处理类""" + + def __init__(self, target_length=None, noise_std=0.01): + """ + 初始化轨迹预处理器 + + Args: + target_length: 目标轨迹长度,如果为None则使用最长轨迹的长度 + noise_std: 高斯噪声标准差 + """ + self.target_length = target_length + self.noise_std = noise_std + + def time_alignment(self, trajectories, method='dtw'): + """ + 轨迹时间对齐 + + Args: + trajectories: 轨迹列表,每个轨迹形状为 (n_timesteps, n_dims) + method: 对齐方法 ('dtw', 'linear', 'uniform') + + Returns: + aligned_trajectories: 对齐后的轨迹 + aligned_timestamps: 对齐后的时间戳 + """ + trajectories = np.array(trajectories) + n_trajectories, n_timesteps, n_dims = trajectories.shape + + if method == 'dtw': + return self._dtw_alignment(trajectories) + elif method == 'linear': + return self._linear_alignment(trajectories) + elif method == 'uniform': + return self._uniform_alignment(trajectories) + else: + raise ValueError(f"不支持的对齐方法: {method}") + + def _dtw_alignment(self, trajectories): + """ + 使用动态时间规整(DTW)进行轨迹对齐 + + Args: + trajectories: 轨迹数据 + + Returns: + aligned_trajectories: 对齐后的轨迹 + aligned_timestamps: 对齐后的时间戳 + """ + n_trajectories, n_timesteps, n_dims = trajectories.shape + + # 选择参考轨迹(最长的轨迹) + ref_idx = np.argmax([len(traj) for traj in trajectories]) + reference_trajectory = trajectories[ref_idx] + + # 计算DTW路径 + aligned_trajectories = [] + for i, trajectory in enumerate(trajectories): + if i == ref_idx: + aligned_trajectories.append(reference_trajectory) + continue + + # 简化的DTW对齐 + aligned_traj = self._simple_dtw_align(trajectory, reference_trajectory) + aligned_trajectories.append(aligned_traj) + + # 生成统一的时间戳 + aligned_timestamps = np.linspace(0, 1, len(reference_trajectory)) + + return np.array(aligned_trajectories), aligned_timestamps + + def _simple_dtw_align(self, trajectory, reference): + """ + 简化的DTW对齐算法 + + Args: + trajectory: 待对齐轨迹 + reference: 参考轨迹 + + Returns: + aligned_trajectory: 对齐后的轨迹 + """ + n_ref = len(reference) + n_traj = len(trajectory) + + # 计算距离矩阵 + distances = cdist(trajectory, reference, metric='euclidean') + + # 动态规划找最优路径 + dp = np.full((n_traj + 1, n_ref + 1), np.inf) + dp[0, 0] = 0 + + for i in range(1, n_traj + 1): + for j in range(1, n_ref + 1): + dp[i, j] = distances[i-1, j-1] + min(dp[i-1, j], dp[i, j-1], dp[i-1, j-1]) + + # 回溯找路径 + path = [] + i, j = n_traj, n_ref + while i > 0 and j > 0: + path.append((i-1, j-1)) + if dp[i-1, j] <= dp[i, j-1] and dp[i-1, j] <= dp[i-1, j-1]: + i -= 1 + elif dp[i, j-1] <= dp[i-1, j-1]: + j -= 1 + else: + i -= 1 + j -= 1 + + path.reverse() + + # 根据路径生成对齐轨迹 + aligned_trajectory = np.zeros((n_ref, trajectory.shape[1])) + for ref_idx in range(n_ref): + # 找到所有映射到当前参考点的轨迹点 + traj_indices = [traj_idx for traj_idx, ref_idx_path in path if ref_idx_path == ref_idx] + + if traj_indices: + # 使用平均位置 + aligned_trajectory[ref_idx] = np.mean(trajectory[traj_indices], axis=0) + else: + # 如果没有映射,使用最近的点 + if ref_idx > 0: + aligned_trajectory[ref_idx] = aligned_trajectory[ref_idx-1] + else: + aligned_trajectory[ref_idx] = trajectory[0] + + return aligned_trajectory + + def _linear_alignment(self, trajectories): + """ + 线性插值对齐 + + Args: + trajectories: 轨迹数据 + + Returns: + aligned_trajectories: 对齐后的轨迹 + aligned_timestamps: 对齐后的时间戳 + """ + n_trajectories, n_timesteps, n_dims = trajectories.shape + + if self.target_length is None: + target_length = n_timesteps + else: + target_length = self.target_length + + aligned_trajectories = [] + for trajectory in trajectories: + # 创建原始时间轴 + original_times = np.linspace(0, 1, len(trajectory)) + target_times = np.linspace(0, 1, target_length) + + # 对每个维度进行插值 + aligned_traj = np.zeros((target_length, n_dims)) + for dim in range(n_dims): + interp_func = interp1d(original_times, trajectory[:, dim], + kind='linear', fill_value='extrapolate') + aligned_traj[:, dim] = interp_func(target_times) + + aligned_trajectories.append(aligned_traj) + + aligned_timestamps = np.linspace(0, 1, target_length) + return np.array(aligned_trajectories), aligned_timestamps + + def _uniform_alignment(self, trajectories): + """ + 均匀时间对齐 + + Args: + trajectories: 轨迹数据 + + Returns: + aligned_trajectories: 对齐后的轨迹 + aligned_timestamps: 对齐后的时间戳 + """ + n_trajectories, n_timesteps, n_dims = trajectories.shape + + if self.target_length is None: + target_length = n_timesteps + else: + target_length = self.target_length + + aligned_trajectories = [] + for trajectory in trajectories: + # 均匀采样 + indices = np.linspace(0, len(trajectory)-1, target_length, dtype=int) + aligned_traj = trajectory[indices] + aligned_trajectories.append(aligned_traj) + + aligned_timestamps = np.linspace(0, 1, target_length) + return np.array(aligned_trajectories), aligned_timestamps + + def add_gaussian_noise(self, trajectories, noise_std=None): + """ + 添加高斯噪声 + + Args: + trajectories: 轨迹数据 + noise_std: 噪声标准差,如果为None则使用初始化时的值 + + Returns: + noisy_trajectories: 添加噪声后的轨迹 + """ + if noise_std is None: + noise_std = self.noise_std + + trajectories = np.array(trajectories) + noisy_trajectories = trajectories.copy() + + # 添加高斯噪声 + noise = np.random.normal(0, noise_std, trajectories.shape) + noisy_trajectories += noise + + return noisy_trajectories + + def normalize_trajectories(self, trajectories, method='minmax'): + """ + 轨迹归一化 + + Args: + trajectories: 轨迹数据 + method: 归一化方法 ('minmax', 'zscore') + + Returns: + normalized_trajectories: 归一化后的轨迹 + normalization_params: 归一化参数 + """ + trajectories = np.array(trajectories) + n_trajectories, n_timesteps, n_dims = trajectories.shape + + if method == 'minmax': + # Min-Max归一化 + min_vals = np.min(trajectories, axis=(0, 1)) + max_vals = np.max(trajectories, axis=(0, 1)) + + normalized_trajectories = (trajectories - min_vals) / (max_vals - min_vals) + + normalization_params = { + 'method': 'minmax', + 'min_vals': min_vals, + 'max_vals': max_vals + } + + elif method == 'zscore': + # Z-score归一化 + mean_vals = np.mean(trajectories, axis=(0, 1)) + std_vals = np.std(trajectories, axis=(0, 1)) + + normalized_trajectories = (trajectories - mean_vals) / std_vals + + normalization_params = { + 'method': 'zscore', + 'mean_vals': mean_vals, + 'std_vals': std_vals + } + + else: + raise ValueError(f"不支持的归一化方法: {method}") + + return normalized_trajectories, normalization_params + + def denormalize_trajectories(self, normalized_trajectories, normalization_params): + """ + 反归一化轨迹 + + Args: + normalized_trajectories: 归一化后的轨迹 + normalization_params: 归一化参数 + + Returns: + denormalized_trajectories: 反归一化后的轨迹 + """ + if normalization_params['method'] == 'minmax': + denormalized_trajectories = (normalized_trajectories * + (normalization_params['max_vals'] - normalization_params['min_vals']) + + normalization_params['min_vals']) + elif normalization_params['method'] == 'zscore': + denormalized_trajectories = (normalized_trajectories * + normalization_params['std_vals'] + + normalization_params['mean_vals']) + else: + raise ValueError(f"不支持的归一化方法: {normalization_params['method']}") + + return denormalized_trajectories + + def preprocess_pipeline(self, trajectories, alignment_method='linear', + add_noise=True, normalize=True, noise_std=None): + """ + 完整的预处理流水线 + + Args: + trajectories: 原始轨迹数据 + alignment_method: 对齐方法 + add_noise: 是否添加噪声 + normalize: 是否归一化 + noise_std: 噪声标准差 + + Returns: + processed_trajectories: 处理后的轨迹 + timestamps: 时间戳 + normalization_params: 归一化参数 + """ + print("开始轨迹预处理...") + + # 1. 时间对齐 + print("1. 进行时间对齐...") + aligned_trajectories, timestamps = self.time_alignment(trajectories, method=alignment_method) + + # 2. 添加噪声 + if add_noise: + print("2. 添加高斯噪声...") + if noise_std is not None: + self.noise_std = noise_std + aligned_trajectories = self.add_gaussian_noise(aligned_trajectories) + + # 3. 归一化 + normalization_params = None + if normalize: + print("3. 进行归一化...") + aligned_trajectories, normalization_params = self.normalize_trajectories(aligned_trajectories) + + print("轨迹预处理完成!") + print(f"处理后轨迹形状: {aligned_trajectories.shape}") + + return aligned_trajectories, timestamps, normalization_params + + def visualize_preprocessing(self, original_trajectories, processed_trajectories, + timestamps=None, dim1=0, dim2=1, save_path=None): + """ + 可视化预处理结果 + + Args: + original_trajectories: 原始轨迹 + processed_trajectories: 处理后的轨迹 + timestamps: 时间戳 + dim1: 第一个维度索引 + dim2: 第二个维度索引 + save_path: 保存路径 + """ + fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) + + # 原始轨迹 + for i, trajectory in enumerate(original_trajectories): + ax1.plot(trajectory[:, dim1], trajectory[:, dim2], + alpha=0.7, linewidth=1, label=f'Traj {i+1}' if i < 5 else "") + + ax1.set_xlabel(f'Dimension {dim1}') + ax1.set_ylabel(f'Dimension {dim2}') + ax1.set_title('Original Trajectories') + ax1.legend() + ax1.grid(True) + + # 处理后的轨迹 + for i, trajectory in enumerate(processed_trajectories): + ax2.plot(trajectory[:, dim1], trajectory[:, dim2], + alpha=0.7, linewidth=1, label=f'Traj {i+1}' if i < 5 else "") + + ax2.set_xlabel(f'Dimension {dim1}') + ax2.set_ylabel(f'Dimension {dim2}') + ax2.set_title('Preprocessed Trajectories') + ax2.legend() + ax2.grid(True) + + plt.tight_layout() + + if save_path: + plt.savefig(save_path, dpi=300, bbox_inches='tight') + print(f"预处理可视化图保存到: {save_path}") + + plt.show() + +# 测试代码 +if __name__ == "__main__": + # 生成测试数据 + np.random.seed(42) + n_trajectories = 5 + n_dims = 3 + + # 生成不同长度的轨迹 + trajectories = [] + for i in range(n_trajectories): + n_timesteps = np.random.randint(80, 120) + t = np.linspace(0, 2*np.pi, n_timesteps) + noise = np.random.normal(0, 0.05, (n_timesteps, n_dims)) + trajectory = np.column_stack([ + np.sin(t) + noise[:, 0], + np.cos(t) + noise[:, 1], + t/(2*np.pi) + noise[:, 2] + ]) + trajectories.append(trajectory) + + # 创建预处理器 + preprocessor = TrajectoryPreprocessor(target_length=100, noise_std=0.02) + + # 执行预处理 + processed_trajectories, timestamps, norm_params = preprocessor.preprocess_pipeline( + trajectories, alignment_method='linear', add_noise=True, normalize=True + ) + + # 可视化 + preprocessor.visualize_preprocessing(trajectories, processed_trajectories) + + print(f"原始轨迹数量: {len(trajectories)}") + print(f"原始轨迹长度: {[len(traj) for traj in trajectories]}") + print(f"处理后轨迹形状: {processed_trajectories.shape}") + print(f"时间戳长度: {len(timestamps)}") diff --git a/NGGD/demo_nggd_ur5.py b/NGGD/demo_nggd_ur5.py new file mode 100644 index 0000000..d253724 --- /dev/null +++ b/NGGD/demo_nggd_ur5.py @@ -0,0 +1,385 @@ +# -*- coding: utf-8 -*- +""" +NGGD模仿学习算法 - UR5机器人演示程序 +结合CoppeliaSim仿真环境展示NGGD算法的实际应用 +""" + +import time +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D +import sys +import os + +# 添加路径 +current_dir = os.path.dirname(__file__) +sys.path.append(current_dir) +sys.path.append(os.path.join(current_dir, '..')) + +# CoppeliaSim相关导入 +sys.path.append('C:/Program Files/CoppeliaRobotics/CoppeliaSimEdu/programming/legacyRemoteApi/remoteApiBindings/python/python') +import sim as vrep_sim + +# UR5相关导入 +sys.path.append('C:/Users/19046/Desktop/UR5Test/PyDMPs/code/UR5') +sys.path.append('C:/Users/19046/Desktop/UR5Test/PyDMPs/code/UR5/VREP_RemoteAPIs') +from UR5SimModel import UR5SimModel + +# NGGD算法导入 +from nggd_imitation_learning import NGGDImitationLearning + +class NGGDUR5Demo: + """NGGD算法在UR5机器人上的演示类""" + + def __init__(self): + """初始化演示系统""" + self.client_ID = None + self.ur5_sim_model = None + self.nggd_system = None + self.demonstration_trajectories = [] + + def connect_to_coppelia(self): + """连接到CoppeliaSim""" + print("连接到CoppeliaSim...") + vrep_sim.simxFinish(-1) + + while True: + self.client_ID = vrep_sim.simxStart('127.0.0.1', 19999, True, False, 5000, 5) + if self.client_ID > -1: + print('成功连接到CoppeliaSim') + break + else: + print('连接失败,重试中...') + time.sleep(1) + + # 设置仿真参数 + delta_t = 0.01 + vrep_sim.simxSetFloatingParameter(self.client_ID, vrep_sim.sim_floatparam_simulation_time_step, + delta_t, vrep_sim.simx_opmode_oneshot) + vrep_sim.simxSynchronous(self.client_ID, True) + vrep_sim.simxStartSimulation(self.client_ID, vrep_sim.simx_opmode_oneshot) + + # 初始化UR5模型 + self.ur5_sim_model = UR5SimModel() + self.ur5_sim_model.initializeSimModel(self.client_ID) + + print("CoppeliaSim连接和UR5模型初始化完成") + + def record_demonstration_trajectory(self, trajectory_name="demo_trajectory"): + """ + 记录示教轨迹 + + Args: + trajectory_name: 轨迹名称 + """ + print(f"开始记录示教轨迹: {trajectory_name}") + print("请手动移动UR5机器人末端执行器...") + + # 获取dummy句柄 + return_code, via_dummy_handle = vrep_sim.simxGetObjectHandle( + self.client_ID, 'via', vrep_sim.simx_opmode_blocking) + + if return_code != vrep_sim.simx_return_ok: + print("无法获取via dummy句柄") + return None + + # 记录轨迹 + trajectory = [] + recording_time = 10.0 # 记录10秒 + start_time = time.time() + + while time.time() - start_time < recording_time: + # 获取当前位置 + return_code, position = vrep_sim.simxGetObjectPosition( + self.client_ID, via_dummy_handle, -1, vrep_sim.simx_opmode_buffer) + + if return_code == vrep_sim.simx_return_ok: + trajectory.append(position) + + vrep_sim.simxSynchronousTrigger(self.client_ID) + time.sleep(0.01) + + trajectory = np.array(trajectory) + print(f"轨迹记录完成,共{len(trajectory)}个点") + + # 保存轨迹 + trajectory_file = os.path.join(current_dir, f"{trajectory_name}.csv") + pd.DataFrame(trajectory).to_csv(trajectory_file, header=False, index=False) + print(f"轨迹保存到: {trajectory_file}") + + return trajectory + + def load_demonstration_trajectories(self, trajectory_files): + """ + 加载示教轨迹文件 + + Args: + trajectory_files: 轨迹文件路径列表 + """ + print("加载示教轨迹文件...") + + for i, file_path in enumerate(trajectory_files): + if os.path.exists(file_path): + df = pd.read_csv(file_path, header=None) + trajectory = np.array(df) + self.demonstration_trajectories.append(trajectory) + print(f"加载轨迹 {i+1}: {file_path}, 形状: {trajectory.shape}") + else: + print(f"文件不存在: {file_path}") + + print(f"共加载 {len(self.demonstration_trajectories)} 条示教轨迹") + + def generate_synthetic_demonstrations(self, n_demos=6): + """ + 生成合成示教数据(用于测试) + + Args: + n_demos: 示教数量 + """ + print(f"生成 {n_demos} 条合成示教轨迹...") + + np.random.seed(42) + n_timesteps = 100 + n_dims = 3 + + for i in range(n_demos): + t = np.linspace(0, 2*np.pi, n_timesteps) + + if i % 3 == 0: # 圆形轨迹 + trajectory = np.column_stack([ + np.sin(t) + np.random.normal(0, 0.05, n_timesteps), + np.cos(t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) + np.random.normal(0, 0.02, n_timesteps) + ]) + elif i % 3 == 1: # 8字形轨迹 + trajectory = np.column_stack([ + np.sin(t) + np.random.normal(0, 0.05, n_timesteps), + np.sin(2*t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) + np.random.normal(0, 0.02, n_timesteps) + ]) + else: # 螺旋轨迹 + trajectory = np.column_stack([ + t/(2*np.pi) * np.sin(t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) * np.cos(t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) + np.random.normal(0, 0.02, n_timesteps) + ]) + + self.demonstration_trajectories.append(trajectory) + + print(f"生成了 {len(self.demonstration_trajectories)} 条合成示教轨迹") + + def learn_with_nggd(self, n_gmm_components=10, n_dmp_bfs=100, noise_std=0.01): + """ + 使用NGGD算法学习 + + Args: + n_gmm_components: GMM组件数量 + n_dmp_bfs: DMP基函数数量 + noise_std: 噪声标准差 + """ + if not self.demonstration_trajectories: + print("没有示教轨迹数据,请先加载或生成示教轨迹") + return + + print("开始NGGD模仿学习...") + + # 创建NGGD系统 + self.nggd_system = NGGDImitationLearning( + n_gmm_components=n_gmm_components, + n_dmp_bfs=n_dmp_bfs, + noise_std=noise_std + ) + + # 执行学习 + learning_results = self.nggd_system.learn_from_demonstrations( + self.demonstration_trajectories, + alignment_method='linear', + add_noise=True, + normalize=True, + plot_results=True + ) + + print("NGGD学习完成!") + return learning_results + + def execute_trajectory_in_simulation(self, trajectory, trajectory_name="execution"): + """ + 在仿真中执行轨迹 + + Args: + trajectory: 要执行的轨迹 + trajectory_name: 轨迹名称 + """ + print(f"在仿真中执行轨迹: {trajectory_name}") + + # 获取dummy句柄 + return_code, via_dummy_handle = vrep_sim.simxGetObjectHandle( + self.client_ID, 'via', vrep_sim.simx_opmode_blocking) + + if return_code != vrep_sim.simx_return_ok: + print("无法获取via dummy句柄") + return + + # 执行轨迹 + for i, position in enumerate(trajectory): + vrep_sim.simxSetObjectPosition(self.client_ID, via_dummy_handle, -1, + position, vrep_sim.simx_opmode_oneshot) + vrep_sim.simxSynchronousTrigger(self.client_ID) + time.sleep(0.01) + + print(f"轨迹执行完成,共{len(trajectory)}个点") + + def demonstrate_generalization(self): + """演示轨迹泛化能力""" + if self.nggd_system is None: + print("请先执行NGGD学习") + return + + print("\n演示轨迹泛化能力...") + + # 定义不同的泛化场景 + generalization_scenarios = [ + { + 'name': '场景1: 不同初始和目标位置', + 'initial': [0.3, 0.3, 0.2], + 'goal': [-0.3, -0.3, 0.8], + 'tau': 1.0 + }, + { + 'name': '场景2: 时间缩放', + 'initial': [0.2, 0.8, 0.1], + 'goal': [0.8, 0.2, 0.9], + 'tau': 0.5 + }, + { + 'name': '场景3: 另一个轨迹', + 'initial': [-0.4, 0.6, 0.3], + 'goal': [0.6, -0.4, 0.7], + 'tau': 1.2 + } + ] + + for i, scenario in enumerate(generalization_scenarios): + print(f"\n{scenario['name']}") + print("-" * 40) + + # 生成泛化轨迹 + generalized_trajectory = self.nggd_system.generalize_trajectory( + new_initial=scenario['initial'], + new_goal=scenario['goal'], + tau=scenario['tau'], + plot_comparison=True + ) + + # 在仿真中执行(可选) + user_input = input(f"是否在仿真中执行场景{i+1}? (y/n): ") + if user_input.lower() == 'y': + self.execute_trajectory_in_simulation( + generalized_trajectory, + f"generalization_scenario_{i+1}" + ) + + def visualize_all_trajectories(self): + """可视化所有轨迹""" + if not self.demonstration_trajectories: + print("没有轨迹数据可视化") + return + + fig = plt.figure(figsize=(15, 10)) + + # 2D可视化 + ax1 = fig.add_subplot(221) + for i, traj in enumerate(self.demonstration_trajectories): + ax1.plot(traj[:, 0], traj[:, 1], alpha=0.7, linewidth=1, + label=f'Demo {i+1}' if i < 5 else "") + ax1.set_title('Demonstration Trajectories (2D)') + ax1.set_xlabel('X') + ax1.set_ylabel('Y') + ax1.legend() + ax1.grid(True) + + # 3D可视化 + ax2 = fig.add_subplot(222, projection='3d') + for i, traj in enumerate(self.demonstration_trajectories): + ax2.plot(traj[:, 0], traj[:, 1], traj[:, 2], alpha=0.7, linewidth=1) + ax2.set_title('Demonstration Trajectories (3D)') + ax2.set_xlabel('X') + ax2.set_ylabel('Y') + ax2.set_zlabel('Z') + + # 如果有NGGD结果,也显示 + if self.nggd_system is not None: + ax3 = fig.add_subplot(223) + for i, traj in enumerate(self.demonstration_trajectories): + ax3.plot(traj[:, 0], traj[:, 1], alpha=0.3, linewidth=1, color='blue') + ax3.plot(self.nggd_system.gmr_trajectory[:, 0], + self.nggd_system.gmr_trajectory[:, 1], + 'g-', linewidth=2, label='GMR') + ax3.plot(self.nggd_system.reproduced_trajectory[:, 0], + self.nggd_system.reproduced_trajectory[:, 1], + 'r--', linewidth=2, label='DMP') + ax3.set_title('NGGD Results') + ax3.set_xlabel('X') + ax3.set_ylabel('Y') + ax3.legend() + ax3.grid(True) + + plt.tight_layout() + plt.show() + + def run_complete_demo(self): + """运行完整演示""" + print("=" * 60) + print("NGGD模仿学习算法 - UR5机器人演示") + print("=" * 60) + + try: + # 1. 连接仿真环境 + self.connect_to_coppelia() + + # 2. 生成或加载示教数据 + demo_choice = input("选择演示模式:\n1. 使用合成数据\n2. 记录真实示教\n3. 加载已有轨迹\n请输入选择 (1/2/3): ") + + if demo_choice == '1': + self.generate_synthetic_demonstrations(n_demos=6) + elif demo_choice == '2': + n_demos = int(input("请输入要记录的示教数量: ")) + for i in range(n_demos): + input(f"按回车开始记录第{i+1}条示教轨迹...") + traj = self.record_demonstration_trajectory(f"demo_{i+1}") + if traj is not None: + self.demonstration_trajectories.append(traj) + elif demo_choice == '3': + # 加载现有轨迹文件 + trajectory_files = [ + 'C:/Users/19046/Desktop/UR5Test/PyDMPs/code/demo_trajectory/demo_trajectory_for_discrete_dmp.csv' + ] + self.load_demonstration_trajectories(trajectory_files) + + # 3. 可视化示教轨迹 + self.visualize_all_trajectories() + + # 4. NGGD学习 + self.learn_with_nggd(n_gmm_components=10, n_dmp_bfs=80, noise_std=0.02) + + # 5. 演示泛化能力 + self.demonstrate_generalization() + + print("\n演示完成!") + + except KeyboardInterrupt: + print("\n演示被用户中断") + except Exception as e: + print(f"演示过程中出现错误: {e}") + finally: + # 清理资源 + if self.client_ID is not None: + vrep_sim.simxStopSimulation(self.client_ID, vrep_sim.simx_opmode_blocking) + vrep_sim.simxFinish(-1) + print("仿真环境已关闭") + +# 主程序 +if __name__ == "__main__": + demo = NGGDUR5Demo() + demo.run_complete_demo() diff --git a/NGGD/example_usage.py b/NGGD/example_usage.py new file mode 100644 index 0000000..b066d86 --- /dev/null +++ b/NGGD/example_usage.py @@ -0,0 +1,333 @@ +# -*- coding: utf-8 -*- +""" +NGGD模仿学习算法 - 使用示例 +展示如何使用NGGD算法进行模仿学习 +""" + +import numpy as np +import matplotlib.pyplot as plt +import sys +import os + +# 添加模块路径 +current_dir = os.path.dirname(__file__) +sys.path.append(current_dir) + +from nggd_imitation_learning import NGGDImitationLearning + +def create_demonstration_data(): + """创建示教数据示例""" + print("创建示教数据...") + + np.random.seed(42) + n_demonstrations = 5 + n_timesteps = 100 + n_dims = 3 + + trajectories = [] + + # 创建不同类型的示教轨迹 + for i in range(n_demonstrations): + t = np.linspace(0, 2*np.pi, n_timesteps) + + if i == 0: # 圆形轨迹 + trajectory = np.column_stack([ + np.sin(t), + np.cos(t), + t/(2*np.pi) + ]) + elif i == 1: # 8字形轨迹 + trajectory = np.column_stack([ + np.sin(t), + np.sin(2*t), + t/(2*np.pi) + ]) + elif i == 2: # 螺旋轨迹 + trajectory = np.column_stack([ + t/(2*np.pi) * np.sin(t), + t/(2*np.pi) * np.cos(t), + t/(2*np.pi) + ]) + elif i == 3: # 直线轨迹 + trajectory = np.column_stack([ + np.linspace(-1, 1, n_timesteps), + np.zeros(n_timesteps), + t/(2*np.pi) + ]) + else: # 复杂轨迹 + trajectory = np.column_stack([ + np.sin(t) * np.cos(2*t), + np.cos(t) * np.sin(2*t), + t/(2*np.pi) + ]) + + # 添加一些噪声 + noise = np.random.normal(0, 0.05, trajectory.shape) + trajectory += noise + + trajectories.append(trajectory) + + return trajectories + +def example_basic_usage(): + """基本使用示例""" + print("=" * 60) + print("NGGD模仿学习 - 基本使用示例") + print("=" * 60) + + # 1. 创建示教数据 + demonstration_trajectories = create_demonstration_data() + print(f"创建了 {len(demonstration_trajectories)} 条示教轨迹") + + # 2. 创建NGGD系统 + nggd_system = NGGDImitationLearning( + n_gmm_components=8, # GMM组件数量 + n_dmp_bfs=60, # DMP基函数数量 + noise_std=0.02 # 噪声标准差 + ) + + # 3. 执行学习 + print("\n执行NGGD学习...") + learning_results = nggd_system.learn_from_demonstrations( + demonstration_trajectories, + alignment_method='linear', + add_noise=True, + normalize=True, + plot_results=True + ) + + # 4. 轨迹泛化示例 + print("\n轨迹泛化示例...") + + # 示例1: 不同的初始和目标位置 + print("示例1: 不同初始和目标位置") + generalized_traj1 = nggd_system.generalize_trajectory( + new_initial=[0.5, 0.5, 0.2], + new_goal=[-0.5, -0.5, 0.8], + tau=1.0, + plot_comparison=True + ) + + # 示例2: 时间缩放 + print("示例2: 时间缩放") + generalized_traj2 = nggd_system.generalize_trajectory( + new_initial=[0.2, 0.8, 0.1], + new_goal=[0.8, 0.2, 0.9], + tau=0.5, + plot_comparison=True + ) + + # 示例3: 另一个轨迹 + print("示例3: 另一个轨迹") + generalized_traj3 = nggd_system.generalize_trajectory( + new_initial=[-0.3, 0.7, 0.3], + new_goal=[0.7, -0.3, 0.7], + tau=1.2, + plot_comparison=True + ) + + return learning_results, [generalized_traj1, generalized_traj2, generalized_traj3] + +def example_advanced_usage(): + """高级使用示例""" + print("\n" + "=" * 60) + print("NGGD模仿学习 - 高级使用示例") + print("=" * 60) + + # 创建更复杂的示教数据 + np.random.seed(123) + n_demonstrations = 8 + n_timesteps = 150 + n_dims = 3 + + trajectories = [] + + for i in range(n_demonstrations): + t = np.linspace(0, 3*np.pi, n_timesteps) + + # 创建更复杂的轨迹模式 + if i % 4 == 0: # 圆形 + trajectory = np.column_stack([ + np.sin(t) + 0.1 * np.sin(3*t), + np.cos(t) + 0.1 * np.cos(3*t), + t/(3*np.pi) + ]) + elif i % 4 == 1: # 8字形 + trajectory = np.column_stack([ + np.sin(t) + 0.2 * np.sin(4*t), + np.sin(2*t) + 0.2 * np.sin(5*t), + t/(3*np.pi) + ]) + elif i % 4 == 2: # 螺旋 + trajectory = np.column_stack([ + t/(3*np.pi) * np.sin(t) + 0.1 * np.sin(2*t), + t/(3*np.pi) * np.cos(t) + 0.1 * np.cos(2*t), + t/(3*np.pi) + ]) + else: # 复杂轨迹 + trajectory = np.column_stack([ + np.sin(t) * np.cos(2*t) + 0.1 * np.sin(5*t), + np.cos(t) * np.sin(2*t) + 0.1 * np.cos(5*t), + t/(3*np.pi) + ]) + + # 添加噪声 + noise = np.random.normal(0, 0.03, trajectory.shape) + trajectory += noise + + trajectories.append(trajectory) + + # 使用更多GMM组件和DMP基函数 + nggd_system = NGGDImitationLearning( + n_gmm_components=15, + n_dmp_bfs=100, + noise_std=0.01 + ) + + # 执行学习 + learning_results = nggd_system.learn_from_demonstrations( + trajectories, + alignment_method='linear', + add_noise=True, + normalize=True, + plot_results=True + ) + + # 测试多个泛化场景 + generalization_scenarios = [ + {"name": "场景1", "initial": [0.4, 0.4, 0.1], "goal": [-0.4, -0.4, 0.9], "tau": 1.0}, + {"name": "场景2", "initial": [0.1, 0.9, 0.2], "goal": [0.9, 0.1, 0.8], "tau": 0.7}, + {"name": "场景3", "initial": [-0.5, 0.5, 0.3], "goal": [0.5, -0.5, 0.7], "tau": 1.3}, + {"name": "场景4", "initial": [0.0, 0.0, 0.0], "goal": [1.0, 1.0, 1.0], "tau": 0.8} + ] + + generalized_trajectories = [] + for scenario in generalization_scenarios: + print(f"\n{scenario['name']}: 初始={scenario['initial']}, 目标={scenario['goal']}, tau={scenario['tau']}") + + generalized_traj = nggd_system.generalize_trajectory( + new_initial=scenario['initial'], + new_goal=scenario['goal'], + tau=scenario['tau'], + plot_comparison=True + ) + + generalized_trajectories.append(generalized_traj) + + return learning_results, generalized_trajectories + +def visualize_all_results(learning_results, generalized_trajectories): + """可视化所有结果""" + print("\n生成综合可视化...") + + fig = plt.figure(figsize=(20, 12)) + + # 原始示教轨迹 + ax1 = fig.add_subplot(2, 3, 1) + for i, traj in enumerate(learning_results['processed_trajectories']): + ax1.plot(traj[:, 0], traj[:, 1], alpha=0.6, linewidth=1, + label=f'Demo {i+1}' if i < 3 else "") + ax1.set_title('Demonstration Trajectories') + ax1.set_xlabel('X') + ax1.set_ylabel('Y') + ax1.legend() + ax1.grid(True) + + # GMR回归结果 + ax2 = fig.add_subplot(2, 3, 2) + ax2.plot(learning_results['gmr_trajectory'][:, 0], + learning_results['gmr_trajectory'][:, 1], + 'g-', linewidth=2, label='GMR Regression') + ax2.set_title('GMR Regression') + ax2.set_xlabel('X') + ax2.set_ylabel('Y') + ax2.legend() + ax2.grid(True) + + # DMP复现结果 + ax3 = fig.add_subplot(2, 3, 3) + ax3.plot(learning_results['gmr_trajectory'][:, 0], + learning_results['gmr_trajectory'][:, 1], + 'g-', linewidth=2, label='GMR Reference') + ax3.plot(learning_results['reproduced_trajectory'][:, 0], + learning_results['reproduced_trajectory'][:, 1], + 'r--', linewidth=2, label='DMP Reproduction') + ax3.set_title('DMP Reproduction') + ax3.set_xlabel('X') + ax3.set_ylabel('Y') + ax3.legend() + ax3.grid(True) + + # 泛化结果 + ax4 = fig.add_subplot(2, 3, 4) + colors = ['red', 'blue', 'green', 'orange', 'purple'] + for i, traj in enumerate(generalized_trajectories): + ax4.plot(traj[:, 0], traj[:, 1], + color=colors[i % len(colors)], linewidth=2, + label=f'Generalization {i+1}') + ax4.set_title('Generalization Results') + ax4.set_xlabel('X') + ax4.set_ylabel('Y') + ax4.legend() + ax4.grid(True) + + # 3D可视化 + ax5 = fig.add_subplot(2, 3, 5, projection='3d') + for i, traj in enumerate(learning_results['processed_trajectories'][:3]): + ax5.plot(traj[:, 0], traj[:, 1], traj[:, 2], alpha=0.6, linewidth=1) + ax5.plot(learning_results['gmr_trajectory'][:, 0], + learning_results['gmr_trajectory'][:, 1], + learning_results['gmr_trajectory'][:, 2], + 'g-', linewidth=2, label='GMR') + ax5.plot(learning_results['reproduced_trajectory'][:, 0], + learning_results['reproduced_trajectory'][:, 1], + learning_results['reproduced_trajectory'][:, 2], + 'r--', linewidth=2, label='DMP') + ax5.set_title('3D Visualization') + ax5.set_xlabel('X') + ax5.set_ylabel('Y') + ax5.set_zlabel('Z') + ax5.legend() + + # 轨迹长度比较 + ax6 = fig.add_subplot(2, 3, 6) + lengths = [len(traj) for traj in learning_results['processed_trajectories']] + ax6.bar(range(len(lengths)), lengths, alpha=0.7) + ax6.set_title('Trajectory Lengths') + ax6.set_xlabel('Trajectory Index') + ax6.set_ylabel('Length (timesteps)') + ax6.grid(True) + + plt.tight_layout() + plt.show() + +def main(): + """主函数""" + print("NGGD模仿学习算法 - 使用示例") + print("=" * 60) + + try: + # 基本使用示例 + learning_results, generalized_trajectories = example_basic_usage() + + # 高级使用示例 + advanced_results, advanced_generalized = example_advanced_usage() + + # 综合可视化 + visualize_all_results(learning_results, generalized_trajectories) + + print("\n🎉 所有示例运行完成!") + print("\nNGGD算法特点:") + print("1. 能够学习复杂的示教轨迹模式") + print("2. 通过GMM建模捕获轨迹的概率分布") + print("3. 使用GMR进行概率最优的轨迹回归") + print("4. 基于DMP实现灵活的轨迹泛化") + print("5. 支持不同的初始位置、目标位置和时间缩放") + + except Exception as e: + print(f"❌ 运行示例时出现错误: {e}") + import traceback + traceback.print_exc() + +if __name__ == "__main__": + main() diff --git a/NGGD/gmm_trajectory_modeling.py b/NGGD/gmm_trajectory_modeling.py new file mode 100644 index 0000000..51a7c02 --- /dev/null +++ b/NGGD/gmm_trajectory_modeling.py @@ -0,0 +1,269 @@ +# -*- coding: utf-8 -*- +""" +NGGD模仿学习算法 - GMM轨迹建模模块 +实现高斯混合模型对示教轨迹的建模 +""" + +import numpy as np +try: + from sklearn.mixture import GaussianMixture +except ImportError: + print("警告: 未安装scikit-learn,请运行: pip install scikit-learn") + GaussianMixture = None +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D + +class GMMTrajectoryModeling: + """GMM轨迹建模类""" + + def __init__(self, n_components=10, random_state=42): + """ + 初始化GMM轨迹建模器 + + Args: + n_components: GMM组件数量 + random_state: 随机种子 + """ + self.n_components = n_components + self.random_state = random_state + self.gmm = None + self.trajectories = None + self.timestamps = None + self.n_dims = None + self.n_trajectories = None + + def fit(self, trajectories, timestamps=None): + """ + 使用GMM学习示教轨迹中的特征信息 + + Args: + trajectories: 示教轨迹数据,形状为 (n_trajectories, n_timesteps, n_dims) + timestamps: 时间戳,形状为 (n_timesteps,) + """ + self.trajectories = np.array(trajectories) + self.n_trajectories, self.n_timesteps, self.n_dims = self.trajectories.shape + + if timestamps is None: + self.timestamps = np.linspace(0, 1, self.n_timesteps) + else: + self.timestamps = np.array(timestamps) + + # 将轨迹数据重塑为 (n_samples, n_features) + # 每个样本包含位置和时间信息 + n_samples = self.n_trajectories * self.n_timesteps + n_features = self.n_dims + 1 # 添加时间维度 + + data = np.zeros((n_samples, n_features)) + + for i in range(self.n_trajectories): + for t in range(self.n_timesteps): + idx = i * self.n_timesteps + t + data[idx, :self.n_dims] = self.trajectories[i, t, :] + data[idx, self.n_dims] = self.timestamps[t] + + # 训练GMM模型 + self.gmm = GaussianMixture( + n_components=self.n_components, + random_state=self.random_state, + covariance_type='full' + ) + self.gmm.fit(data) + + print(f"GMM训练完成,组件数量: {self.n_components}") + print(f"轨迹数量: {self.n_trajectories}, 时间步数: {self.n_timesteps}, 维度: {self.n_dims}") + + def get_joint_probability_density(self, positions, times): + """ + 计算联合概率密度 + + Args: + positions: 位置数据,形状为 (n_samples, n_dims) + times: 时间数据,形状为 (n_samples,) + + Returns: + probability_density: 联合概率密度 + """ + if self.gmm is None: + raise ValueError("GMM模型尚未训练,请先调用fit方法") + + # 准备数据 + n_samples = positions.shape[0] + data = np.zeros((n_samples, self.n_dims + 1)) + data[:, :self.n_dims] = positions + data[:, self.n_dims] = times + + # 计算概率密度 + probability_density = np.exp(self.gmm.score_samples(data)) + + return probability_density + + def sample_trajectory(self, n_samples=None): + """ + 从GMM中采样轨迹 + + Args: + n_samples: 采样数量,默认为训练时的时间步数 + + Returns: + sampled_trajectory: 采样的轨迹 + """ + if self.gmm is None: + raise ValueError("GMM模型尚未训练,请先调用fit方法") + + if n_samples is None: + n_samples = self.n_timesteps + + # 从GMM中采样 + sampled_data, _ = self.gmm.sample(n_samples) + + # 分离位置和时间信息 + sampled_trajectory = sampled_data[:, :self.n_dims] + sampled_times = sampled_data[:, self.n_dims] + + # 按时间排序 + sort_indices = np.argsort(sampled_times) + sampled_trajectory = sampled_trajectory[sort_indices] + sampled_times = sampled_times[sort_indices] + + return sampled_trajectory, sampled_times + + def get_means_and_covariances(self): + """ + 获取GMM的均值和协方差矩阵 + + Returns: + means: 均值矩阵 + covariances: 协方差矩阵 + """ + if self.gmm is None: + raise ValueError("GMM模型尚未训练,请先调用fit方法") + + return self.gmm.means_, self.gmm.covariances_ + + def visualize_gmm_components(self, dim1=0, dim2=1, save_path=None): + """ + 可视化GMM组件 + + Args: + dim1: 第一个维度索引 + dim2: 第二个维度索引 + save_path: 保存路径 + """ + if self.gmm is None: + raise ValueError("GMM模型尚未训练,请先调用fit方法") + + fig, ax = plt.subplots(figsize=(10, 8)) + + # 绘制原始轨迹 + for i in range(self.n_trajectories): + ax.plot(self.trajectories[i, :, dim1], + self.trajectories[i, :, dim2], + 'b-', alpha=0.3, linewidth=1) + + # 绘制GMM组件 + means = self.gmm.means_ + covariances = self.gmm.covariances_ + + for i in range(self.n_components): + mean = means[i, [dim1, dim2]] + cov = covariances[i, [dim1, dim2], :][:, [dim1, dim2]] + + # 绘制椭圆 + from matplotlib.patches import Ellipse + eigenvals, eigenvecs = np.linalg.eigh(cov) + angle = np.degrees(np.arctan2(eigenvecs[1, 0], eigenvecs[0, 0])) + width, height = 2 * np.sqrt(eigenvals) + + ellipse = Ellipse(mean, width, height, angle=angle, + alpha=0.3, color=f'C{i}') + ax.add_patch(ellipse) + + # 绘制中心点 + ax.scatter(mean[0], mean[1], c=f'C{i}', s=100, marker='x') + + ax.set_xlabel(f'Dimension {dim1}') + ax.set_ylabel(f'Dimension {dim2}') + ax.set_title('GMM Components Visualization') + ax.grid(True) + + if save_path: + plt.savefig(save_path, dpi=300, bbox_inches='tight') + print(f"GMM组件可视化图保存到: {save_path}") + + plt.show() + + def visualize_3d_trajectories(self, save_path=None): + """ + 3D轨迹可视化 + + Args: + save_path: 保存路径 + """ + if self.n_dims < 3: + print("轨迹维度小于3,无法进行3D可视化") + return + + fig = plt.figure(figsize=(12, 8)) + ax = fig.add_subplot(111, projection='3d') + + # 绘制原始轨迹 + for i in range(self.n_trajectories): + ax.plot(self.trajectories[i, :, 0], + self.trajectories[i, :, 1], + self.trajectories[i, :, 2], + 'b-', alpha=0.3, linewidth=1) + + # 绘制GMM中心点 + means = self.gmm.means_ + ax.scatter(means[:, 0], means[:, 1], means[:, 2], + c='red', s=100, marker='x', label='GMM Centers') + + ax.set_xlabel('X') + ax.set_ylabel('Y') + ax.set_zlabel('Z') + ax.set_title('3D Trajectory Visualization with GMM Centers') + ax.legend() + + if save_path: + plt.savefig(save_path, dpi=300, bbox_inches='tight') + print(f"3D轨迹可视化图保存到: {save_path}") + + plt.show() + +# 测试代码 +if __name__ == "__main__": + # 生成测试数据 + np.random.seed(42) + n_trajectories = 5 + n_timesteps = 100 + n_dims = 3 + + # 生成多条示教轨迹 + trajectories = [] + for i in range(n_trajectories): + t = np.linspace(0, 2*np.pi, n_timesteps) + # 添加一些变化 + noise = np.random.normal(0, 0.1, (n_timesteps, n_dims)) + trajectory = np.column_stack([ + np.sin(t) + noise[:, 0], + np.cos(t) + noise[:, 1], + t/(2*np.pi) + noise[:, 2] + ]) + trajectories.append(trajectory) + + trajectories = np.array(trajectories) + + # 创建GMM建模器 + gmm_modeler = GMMTrajectoryModeling(n_components=8) + + # 训练GMM + gmm_modeler.fit(trajectories) + + # 可视化 + gmm_modeler.visualize_gmm_components(dim1=0, dim2=1) + gmm_modeler.visualize_3d_trajectories() + + # 采样新轨迹 + sampled_traj, sampled_times = gmm_modeler.sample_trajectory() + print(f"采样轨迹形状: {sampled_traj.shape}") + print(f"采样时间形状: {sampled_times.shape}") diff --git a/NGGD/gmr_trajectory_regression.py b/NGGD/gmr_trajectory_regression.py new file mode 100644 index 0000000..98a824b --- /dev/null +++ b/NGGD/gmr_trajectory_regression.py @@ -0,0 +1,289 @@ +# -*- coding: utf-8 -*- +""" +NGGD模仿学习算法 - GMR轨迹回归模块 +实现高斯混合回归,得到概率最优的回归轨迹 +""" + +import numpy as np +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D + +class GMRTrajectoryRegression: + """GMR轨迹回归类""" + + def __init__(self, gmm_model): + """ + 初始化GMR回归器 + + Args: + gmm_model: 训练好的GMM模型 + """ + self.gmm = gmm_model + self.n_components = gmm_model.n_components + self.n_dims = gmm_model.n_dims + + def regress(self, query_times, return_covariance=False): + """ + 通过GMR进行轨迹回归 + + Args: + query_times: 查询时间点,形状为 (n_query,) + return_covariance: 是否返回协方差 + + Returns: + regressed_trajectory: 回归轨迹,形状为 (n_query, n_dims) + covariance: 协方差矩阵(如果return_covariance=True) + """ + query_times = np.array(query_times).reshape(-1, 1) + n_query = len(query_times) + + # 获取GMM参数 + means = self.gmm.means_ + covariances = self.gmm.covariances_ + weights = self.gmm.weights_ + + # 分离位置和时间相关的均值和协方差 + means_pos = means[:, :self.n_dims] # 位置均值 + means_time = means[:, self.n_dims:] # 时间均值 + + cov_pos_pos = covariances[:, :self.n_dims, :self.n_dims] # 位置-位置协方差 + cov_pos_time = covariances[:, :self.n_dims, self.n_dims:] # 位置-时间协方差 + cov_time_pos = covariances[:, self.n_dims:, :self.n_dims] # 时间-位置协方差 + cov_time_time = covariances[:, self.n_dims:, self.n_dims:] # 时间-时间协方差 + + # 初始化输出 + regressed_trajectory = np.zeros((n_query, self.n_dims)) + if return_covariance: + trajectory_covariance = np.zeros((n_query, self.n_dims, self.n_dims)) + + for t_idx, t in enumerate(query_times): + # 计算每个组件的后验概率 + posterior_probs = np.zeros(self.n_components) + + for k in range(self.n_components): + # 计算时间概率密度 + time_diff = t - means_time[k] + time_var = cov_time_time[k, 0, 0] + time_prob = np.exp(-0.5 * (time_diff**2) / time_var) / np.sqrt(2 * np.pi * time_var) + + # 计算权重 + posterior_probs[k] = weights[k] * time_prob + + # 归一化 + posterior_probs = posterior_probs / np.sum(posterior_probs) + + # 计算回归结果 + regressed_pos = np.zeros(self.n_dims) + + for k in range(self.n_components): + if posterior_probs[k] > 1e-10: # 避免数值问题 + # 条件均值 + time_diff = t - means_time[k] + cov_pos_time_k = cov_pos_time[k, :, 0] + cov_time_time_k = cov_time_time[k, 0, 0] + + conditional_mean = means_pos[k] + cov_pos_time_k * time_diff / cov_time_time_k + regressed_pos += posterior_probs[k] * conditional_mean + + regressed_trajectory[t_idx] = regressed_pos + + # 计算协方差(如果需要) + if return_covariance: + cov_pos = np.zeros((self.n_dims, self.n_dims)) + + for k in range(self.n_components): + if posterior_probs[k] > 1e-10: + # 条件协方差 + cov_pos_pos_k = cov_pos_pos[k] + cov_pos_time_k = cov_pos_time[k, :, 0] + cov_time_pos_k = cov_time_pos[k, 0, :] + cov_time_time_k = cov_time_time[k, 0, 0] + + conditional_cov = cov_pos_pos_k - np.outer(cov_pos_time_k, cov_time_pos_k) / cov_time_time_k + + # 条件均值 + time_diff = t - means_time[k] + conditional_mean = means_pos[k] + cov_pos_time_k * time_diff / cov_time_time_k + + # 加权协方差 + mean_diff = regressed_pos - conditional_mean + cov_pos += posterior_probs[k] * (conditional_cov + np.outer(mean_diff, mean_diff)) + + trajectory_covariance[t_idx] = cov_pos + + if return_covariance: + return regressed_trajectory, trajectory_covariance + else: + return regressed_trajectory + + def regress_with_confidence(self, query_times, confidence_level=0.95): + """ + 带置信区间的轨迹回归 + + Args: + query_times: 查询时间点 + confidence_level: 置信水平 + + Returns: + regressed_trajectory: 回归轨迹 + lower_bound: 下界 + upper_bound: 上界 + """ + regressed_trajectory, covariance = self.regress(query_times, return_covariance=True) + + # 计算置信区间 + from scipy.stats import chi2 + chi2_val = chi2.ppf(confidence_level, self.n_dims) + + n_query = len(query_times) + lower_bound = np.zeros_like(regressed_trajectory) + upper_bound = np.zeros_like(regressed_trajectory) + + for t_idx in range(n_query): + cov_t = covariance[t_idx] + eigenvals, eigenvecs = np.linalg.eigh(cov_t) + + # 计算置信椭球的主轴长度 + radius = np.sqrt(chi2_val * eigenvals) + + # 计算每个维度的置信区间 + for dim in range(self.n_dims): + std_dev = np.sqrt(np.sum((eigenvecs[:, dim] * radius)**2)) + lower_bound[t_idx, dim] = regressed_trajectory[t_idx, dim] - std_dev + upper_bound[t_idx, dim] = regressed_trajectory[t_idx, dim] + std_dev + + return regressed_trajectory, lower_bound, upper_bound + + def visualize_regression(self, query_times=None, confidence_level=0.95, + dim1=0, dim2=1, save_path=None): + """ + 可视化回归结果 + + Args: + query_times: 查询时间点 + confidence_level: 置信水平 + dim1: 第一个维度索引 + dim2: 第二个维度索引 + save_path: 保存路径 + """ + if query_times is None: + query_times = np.linspace(0, 1, 100) + + # 进行回归 + regressed_traj, lower_bound, upper_bound = self.regress_with_confidence( + query_times, confidence_level + ) + + fig, ax = plt.subplots(figsize=(12, 8)) + + # 绘制原始轨迹 + for i in range(self.gmm.n_trajectories): + ax.plot(self.gmm.trajectories[i, :, dim1], + self.gmm.trajectories[i, :, dim2], + 'b-', alpha=0.3, linewidth=1, label='Demonstration' if i == 0 else "") + + # 绘制回归轨迹 + ax.plot(regressed_traj[:, dim1], regressed_traj[:, dim2], + 'r-', linewidth=2, label='GMR Regression') + + # 绘制置信区间 + ax.fill_between(regressed_traj[:, dim1], + lower_bound[:, dim1], + upper_bound[:, dim1], + alpha=0.2, color='red', label=f'{confidence_level*100}% Confidence') + + ax.set_xlabel(f'Dimension {dim1}') + ax.set_ylabel(f'Dimension {dim2}') + ax.set_title('GMR Trajectory Regression') + ax.legend() + ax.grid(True) + + if save_path: + plt.savefig(save_path, dpi=300, bbox_inches='tight') + print(f"GMR回归可视化图保存到: {save_path}") + + plt.show() + + def visualize_3d_regression(self, query_times=None, save_path=None): + """ + 3D回归可视化 + + Args: + query_times: 查询时间点 + save_path: 保存路径 + """ + if self.n_dims < 3: + print("轨迹维度小于3,无法进行3D可视化") + return + + if query_times is None: + query_times = np.linspace(0, 1, 100) + + # 进行回归 + regressed_traj = self.regress(query_times) + + fig = plt.figure(figsize=(12, 8)) + ax = fig.add_subplot(111, projection='3d') + + # 绘制原始轨迹 + for i in range(self.gmm.n_trajectories): + ax.plot(self.gmm.trajectories[i, :, 0], + self.gmm.trajectories[i, :, 1], + self.gmm.trajectories[i, :, 2], + 'b-', alpha=0.3, linewidth=1, label='Demonstration' if i == 0 else "") + + # 绘制回归轨迹 + ax.plot(regressed_traj[:, 0], regressed_traj[:, 1], regressed_traj[:, 2], + 'r-', linewidth=2, label='GMR Regression') + + ax.set_xlabel('X') + ax.set_ylabel('Y') + ax.set_zlabel('Z') + ax.set_title('3D GMR Trajectory Regression') + ax.legend() + + if save_path: + plt.savefig(save_path, dpi=300, bbox_inches='tight') + print(f"3D GMR回归可视化图保存到: {save_path}") + + plt.show() + +# 测试代码 +if __name__ == "__main__": + from gmm_trajectory_modeling import GMMTrajectoryModeling + + # 生成测试数据 + np.random.seed(42) + n_trajectories = 5 + n_timesteps = 100 + n_dims = 3 + + trajectories = [] + for i in range(n_trajectories): + t = np.linspace(0, 2*np.pi, n_timesteps) + noise = np.random.normal(0, 0.1, (n_timesteps, n_dims)) + trajectory = np.column_stack([ + np.sin(t) + noise[:, 0], + np.cos(t) + noise[:, 1], + t/(2*np.pi) + noise[:, 2] + ]) + trajectories.append(trajectory) + + trajectories = np.array(trajectories) + + # 训练GMM + gmm_modeler = GMMTrajectoryModeling(n_components=8) + gmm_modeler.fit(trajectories) + + # 创建GMR回归器 + gmr_regressor = GMRTrajectoryRegression(gmm_modeler) + + # 进行回归 + query_times = np.linspace(0, 1, 100) + regressed_traj = gmr_regressor.regress(query_times) + + print(f"回归轨迹形状: {regressed_traj.shape}") + + # 可视化 + gmr_regressor.visualize_regression(query_times) + gmr_regressor.visualize_3d_regression(query_times) diff --git a/NGGD/improved_dmp.py b/NGGD/improved_dmp.py new file mode 100644 index 0000000..f5af9ba --- /dev/null +++ b/NGGD/improved_dmp.py @@ -0,0 +1,372 @@ +# -*- coding: utf-8 -*- +""" +NGGD模仿学习算法 - 改进的DMP算法模块 +基于GMR回归轨迹的改进动态运动基元算法 +""" + +import numpy as np +from scipy.interpolate import interp1d +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D +import sys +import os + +# 添加DMP模块路径 +sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'DMP')) +from cs import CanonicalSystem + +class ImprovedDMP: + """改进的DMP类,基于GMR回归轨迹""" + + def __init__(self, n_dmps=1, n_bfs=100, dt=0.01, alpha_y=None, beta_y=None, + gmr_trajectory=None, **kwargs): + """ + 初始化改进的DMP + + Args: + n_dmps: 数据维度数 + n_bfs: 基函数数量 + dt: 时间步长 + alpha_y: 阻尼参数 + beta_y: 刚度参数 + gmr_trajectory: GMR回归的轨迹 + """ + self.n_dmps = n_dmps + self.n_bfs = n_bfs + self.dt = dt + self.gmr_trajectory = gmr_trajectory + + # DMP参数 + alpha_y_tmp = 60 if alpha_y is None else alpha_y + beta_y_tmp = alpha_y_tmp / 4.0 if beta_y is None else beta_y + self.alpha_y = np.ones(n_dmps) * alpha_y_tmp + self.beta_y = np.ones(n_dmps) * beta_y_tmp + self.tau = 1.0 + + # 状态变量 + self.y0 = np.zeros(n_dmps) + self.goal = np.ones(n_dmps) + self.y = self.y0.copy() + self.dy = np.zeros(n_dmps) + self.ddy = np.zeros(n_dmps) + + # 强制项参数 + self.w = np.zeros((n_dmps, n_bfs)) + self.psi_centers = np.zeros(n_bfs) + self.psi_h = np.zeros(n_bfs) + + # 正则化系统 + self.cs = CanonicalSystem(dt=self.dt, **kwargs) + self.timesteps = round(self.cs.run_time / self.dt) + + # 生成基函数中心 + self.generate_centers() + + # 计算基函数宽度 + self.h = np.ones(self.n_bfs) * self.n_bfs**1.5 / self.psi_centers / self.cs.alpha_x + + # 重置状态 + self.reset_state() + + def generate_centers(self): + """生成高斯基函数的中心""" + t_centers = np.linspace(0, self.cs.run_time, self.n_bfs) + + cs = self.cs + x_track = cs.run() + t_track = np.linspace(0, cs.run_time, cs.timesteps) + + for n in range(len(t_centers)): + for i, t in enumerate(t_track): + if abs(t_centers[n] - t) <= cs.dt: + self.psi_centers[n] = x_track[i] + + return self.psi_centers + + def generate_psi(self, x): + """生成基函数激活""" + if isinstance(x, np.ndarray): + x = x[:, None] + + self.psi = np.exp(-self.h * (x - self.psi_centers)**2) + return self.psi + + def learning_from_gmr(self, gmr_trajectory, timestamps=None): + """ + 从GMR回归轨迹学习DMP参数 + + Args: + gmr_trajectory: GMR回归的轨迹,形状为 (n_timesteps, n_dims) + timestamps: 时间戳 + """ + self.gmr_trajectory = np.array(gmr_trajectory) + + if timestamps is None: + timestamps = np.linspace(0, 1, len(gmr_trajectory)) + + # 设置起始和目标位置 + self.y0 = gmr_trajectory[0].copy() + self.goal = gmr_trajectory[-1].copy() + + # 插值到DMP的时间步长 + n_timesteps = self.timesteps + interpolated_trajectory = np.zeros((n_timesteps, self.n_dmps)) + + for d in range(self.n_dmps): + interp_func = interp1d(timestamps, gmr_trajectory[:, d], + kind='cubic', fill_value='extrapolate') + interpolated_trajectory[:, d] = interp_func(np.linspace(0, 1, n_timesteps)) + + # 计算速度和加速度 + dy_demo = np.gradient(interpolated_trajectory, axis=0) / self.dt + ddy_demo = np.gradient(dy_demo, axis=0) / self.dt + + # 计算目标强制项 + x_track = self.cs.run() + f_target = np.zeros((n_timesteps, self.n_dmps)) + + for d in range(self.n_dmps): + k = self.alpha_y[d] + f_target[:, d] = (ddy_demo[:, d] - self.alpha_y[d] * + (self.beta_y[d] * (self.goal[d] - interpolated_trajectory[:, d]) - + dy_demo[:, d])) / k + x_track * (self.goal[d] - self.y0[d]) + + # 学习权重 + self.generate_weights(f_target) + + print("从GMR轨迹学习DMP参数完成") + + def generate_weights(self, f_target): + """生成强制项权重""" + x_track = self.cs.run() + psi_track = self.generate_psi(x_track) + + for d in range(self.n_dmps): + delta = self.goal[d] - self.y0[d] + + for b in range(self.n_bfs): + numer = np.sum(x_track * psi_track[:, b] * f_target[:, d]) + denom = np.sum(x_track**2 * psi_track[:, b]) + + self.w[d, b] = numer / denom + if abs(delta) > 1e-6: + self.w[d, b] = self.w[d, b] / delta + + self.w = np.nan_to_num(self.w) + return self.w + + def reset_state(self): + """重置系统状态""" + self.y = self.y0.copy() + self.dy = np.zeros(self.n_dmps) + self.ddy = np.zeros(self.n_dmps) + self.cs.reset_state() + + def step(self, tau=None): + """执行一步DMP""" + if tau is None: + tau = self.tau + + # 运行正则化系统 + x = self.cs.step_discrete(tau) + + # 生成基函数激活 + psi = self.generate_psi(x) + + for d in range(self.n_dmps): + # 生成强制项 + k = self.alpha_y[d] + delta = self.goal[d] - self.y0[d] + + if abs(delta) > 1e-5: + k2 = delta / delta + else: + k2 = 1.0 + + f = k * (np.dot(psi, self.w[d]) * x * k2 / np.sum(psi)) - k * (self.goal[d] - self.y0[d]) * x + + # 生成复现轨迹 + self.ddy[d] = self.alpha_y[d] * (self.beta_y[d] * (self.goal[d] - self.y[d]) - self.dy[d]) + f + self.dy[d] += tau * self.ddy[d] * self.dt + self.y[d] += tau * self.dy[d] * self.dt + + return self.y, self.dy, self.ddy + + def reproduce(self, tau=None, initial=None, goal=None): + """ + 复现轨迹 + + Args: + tau: 时间缩放因子 + initial: 初始位置 + goal: 目标位置 + + Returns: + reproduced_trajectory: 复现的轨迹 + velocity: 速度 + acceleration: 加速度 + """ + # 设置时间缩放 + if tau is None: + timesteps = self.timesteps + else: + timesteps = round(self.timesteps / tau) + + # 设置初始状态 + if initial is not None: + self.y0 = np.array(initial) + + # 设置目标状态 + if goal is not None: + self.goal = np.array(goal) + + # 重置状态 + self.reset_state() + + # 复现轨迹 + reproduced_trajectory = np.zeros((timesteps, self.n_dmps)) + velocity = np.zeros((timesteps, self.n_dmps)) + acceleration = np.zeros((timesteps, self.n_dmps)) + + for t in range(timesteps): + reproduced_trajectory[t], velocity[t], acceleration[t] = self.step(tau=tau) + + return reproduced_trajectory, velocity, acceleration + + def generalize(self, new_initial=None, new_goal=None, tau=1.0): + """ + 轨迹泛化 + + Args: + new_initial: 新的初始位置 + new_goal: 新的目标位置 + tau: 时间缩放因子 + + Returns: + generalized_trajectory: 泛化后的轨迹 + """ + if new_initial is None: + new_initial = self.y0 + if new_goal is None: + new_goal = self.goal + + generalized_trajectory, _, _ = self.reproduce(tau=tau, initial=new_initial, goal=new_goal) + + return generalized_trajectory + + def visualize_comparison(self, gmr_trajectory=None, reproduced_trajectory=None, + generalized_trajectory=None, dim1=0, dim2=1, save_path=None): + """ + 可视化比较结果 + + Args: + gmr_trajectory: GMR回归轨迹 + reproduced_trajectory: DMP复现轨迹 + generalized_trajectory: DMP泛化轨迹 + dim1: 第一个维度索引 + dim2: 第二个维度索引 + save_path: 保存路径 + """ + fig, ax = plt.subplots(figsize=(12, 8)) + + if gmr_trajectory is not None: + ax.plot(gmr_trajectory[:, dim1], gmr_trajectory[:, dim2], + 'b-', linewidth=2, label='GMR Regression') + + if reproduced_trajectory is not None: + ax.plot(reproduced_trajectory[:, dim1], reproduced_trajectory[:, dim2], + 'r--', linewidth=2, label='DMP Reproduction') + + if generalized_trajectory is not None: + ax.plot(generalized_trajectory[:, dim1], generalized_trajectory[:, dim2], + 'g:', linewidth=2, label='DMP Generalization') + + ax.set_xlabel(f'Dimension {dim1}') + ax.set_ylabel(f'Dimension {dim2}') + ax.set_title('DMP Trajectory Comparison') + ax.legend() + ax.grid(True) + + if save_path: + plt.savefig(save_path, dpi=300, bbox_inches='tight') + print(f"DMP比较可视化图保存到: {save_path}") + + plt.show() + + def visualize_3d_comparison(self, gmr_trajectory=None, reproduced_trajectory=None, + generalized_trajectory=None, save_path=None): + """ + 3D比较可视化 + + Args: + gmr_trajectory: GMR回归轨迹 + reproduced_trajectory: DMP复现轨迹 + generalized_trajectory: DMP泛化轨迹 + save_path: 保存路径 + """ + if self.n_dmps < 3: + print("轨迹维度小于3,无法进行3D可视化") + return + + fig = plt.figure(figsize=(12, 8)) + ax = fig.add_subplot(111, projection='3d') + + if gmr_trajectory is not None: + ax.plot(gmr_trajectory[:, 0], gmr_trajectory[:, 1], gmr_trajectory[:, 2], + 'b-', linewidth=2, label='GMR Regression') + + if reproduced_trajectory is not None: + ax.plot(reproduced_trajectory[:, 0], reproduced_trajectory[:, 1], reproduced_trajectory[:, 2], + 'r--', linewidth=2, label='DMP Reproduction') + + if generalized_trajectory is not None: + ax.plot(generalized_trajectory[:, 0], generalized_trajectory[:, 1], generalized_trajectory[:, 2], + 'g:', linewidth=2, label='DMP Generalization') + + ax.set_xlabel('X') + ax.set_ylabel('Y') + ax.set_zlabel('Z') + ax.set_title('3D DMP Trajectory Comparison') + ax.legend() + + if save_path: + plt.savefig(save_path, dpi=300, bbox_inches='tight') + print(f"3D DMP比较可视化图保存到: {save_path}") + + plt.show() + +# 测试代码 +if __name__ == "__main__": + # 生成测试GMR轨迹 + np.random.seed(42) + n_timesteps = 100 + n_dims = 3 + + t = np.linspace(0, 2*np.pi, n_timesteps) + gmr_trajectory = np.column_stack([ + np.sin(t), + np.cos(t), + t/(2*np.pi) + ]) + + # 创建改进的DMP + improved_dmp = ImprovedDMP(n_dmps=n_dims, n_bfs=50, dt=0.01) + + # 从GMR轨迹学习 + improved_dmp.learning_from_gmr(gmr_trajectory) + + # 复现轨迹 + reproduced_trajectory, _, _ = improved_dmp.reproduce() + + # 泛化轨迹 + new_initial = [0.5, 0.5, 0.2] + new_goal = [-0.5, -0.5, 0.8] + generalized_trajectory = improved_dmp.generalize(new_initial, new_goal) + + # 可视化 + improved_dmp.visualize_comparison(gmr_trajectory, reproduced_trajectory, generalized_trajectory) + improved_dmp.visualize_3d_comparison(gmr_trajectory, reproduced_trajectory, generalized_trajectory) + + print(f"GMR轨迹形状: {gmr_trajectory.shape}") + print(f"复现轨迹形状: {reproduced_trajectory.shape}") + print(f"泛化轨迹形状: {generalized_trajectory.shape}") diff --git a/NGGD/install_dependencies.py b/NGGD/install_dependencies.py new file mode 100644 index 0000000..7422ab7 --- /dev/null +++ b/NGGD/install_dependencies.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +""" +NGGD模仿学习算法 - 依赖安装脚本 +自动安装所需的Python包 +""" + +import subprocess +import sys +import os + +def install_package(package): + """安装Python包""" + try: + subprocess.check_call([sys.executable, "-m", "pip", "install", package]) + print(f"✓ 成功安装 {package}") + return True + except subprocess.CalledProcessError as e: + print(f"✗ 安装 {package} 失败: {e}") + return False + +def check_package(package): + """检查包是否已安装""" + try: + __import__(package) + return True + except ImportError: + return False + +def main(): + """主安装函数""" + print("=" * 60) + print("NGGD模仿学习算法 - 依赖安装") + print("=" * 60) + + # 必需的包列表 + required_packages = [ + ("numpy", "numpy"), + ("scipy", "scipy"), + ("matplotlib", "matplotlib"), + ("sklearn", "scikit-learn"), + ("pandas", "pandas") + ] + + print("检查已安装的包...") + + missing_packages = [] + for import_name, package_name in required_packages: + if check_package(import_name): + print(f"✓ {package_name} 已安装") + else: + print(f"✗ {package_name} 未安装") + missing_packages.append(package_name) + + if not missing_packages: + print("\n🎉 所有依赖包都已安装!") + return True + + print(f"\n需要安装 {len(missing_packages)} 个包:") + for package in missing_packages: + print(f" - {package}") + + # 询问是否安装 + response = input("\n是否自动安装缺失的包? (y/n): ") + if response.lower() != 'y': + print("安装已取消") + return False + + print("\n开始安装...") + + success_count = 0 + for package in missing_packages: + if install_package(package): + success_count += 1 + + print(f"\n安装完成: {success_count}/{len(missing_packages)} 个包安装成功") + + if success_count == len(missing_packages): + print("🎉 所有依赖包安装成功!") + return True + else: + print("⚠️ 部分包安装失败,请手动安装") + return False + +if __name__ == "__main__": + main() diff --git a/NGGD/nggd_imitation_learning.py b/NGGD/nggd_imitation_learning.py new file mode 100644 index 0000000..af99970 --- /dev/null +++ b/NGGD/nggd_imitation_learning.py @@ -0,0 +1,409 @@ +# -*- coding: utf-8 -*- +""" +NGGD模仿学习算法 - 主程序 +实现完整的NGGD模仿学习流程:数据预处理 -> GMM建模 -> GMR回归 -> DMP泛化 +""" + +import numpy as np +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D +import os +import sys + +# 添加模块路径 +current_dir = os.path.dirname(__file__) +sys.path.append(current_dir) + +from data_preprocessing import TrajectoryPreprocessor +from gmm_trajectory_modeling import GMMTrajectoryModeling +from gmr_trajectory_regression import GMRTrajectoryRegression +from improved_dmp import ImprovedDMP + +class NGGDImitationLearning: + """NGGD模仿学习主类""" + + def __init__(self, n_gmm_components=10, n_dmp_bfs=100, noise_std=0.01): + """ + 初始化NGGD模仿学习系统 + + Args: + n_gmm_components: GMM组件数量 + n_dmp_bfs: DMP基函数数量 + noise_std: 噪声标准差 + """ + self.n_gmm_components = n_gmm_components + self.n_dmp_bfs = n_dmp_bfs + self.noise_std = noise_std + + # 初始化各个模块 + self.preprocessor = TrajectoryPreprocessor(noise_std=noise_std) + self.gmm_modeler = GMMTrajectoryModeling(n_components=n_gmm_components) + self.gmr_regressor = None + self.improved_dmp = None + + # 存储处理结果 + self.processed_trajectories = None + self.timestamps = None + self.normalization_params = None + self.gmr_trajectory = None + self.reproduced_trajectory = None + + def learn_from_demonstrations(self, demonstration_trajectories, + alignment_method='linear', add_noise=True, + normalize=True, plot_results=True): + """ + 从示教轨迹学习 + + Args: + demonstration_trajectories: 示教轨迹列表 + alignment_method: 对齐方法 + add_noise: 是否添加噪声 + normalize: 是否归一化 + plot_results: 是否绘制结果 + + Returns: + learning_results: 学习结果字典 + """ + print("=" * 60) + print("开始NGGD模仿学习流程") + print("=" * 60) + + # 步骤1: 数据预处理 + print("\n步骤1: 示教数据预处理") + print("-" * 30) + self.processed_trajectories, self.timestamps, self.normalization_params = \ + self.preprocessor.preprocess_pipeline( + demonstration_trajectories, + alignment_method=alignment_method, + add_noise=add_noise, + normalize=normalize + ) + + # 步骤2: GMM轨迹建模 + print("\n步骤2: GMM运动轨迹建模") + print("-" * 30) + self.gmm_modeler.fit(self.processed_trajectories, self.timestamps) + + # 步骤3: GMR轨迹回归 + print("\n步骤3: GMR运动轨迹回归") + print("-" * 30) + self.gmr_regressor = GMRTrajectoryRegression(self.gmm_modeler) + self.gmr_trajectory = self.gmr_regressor.regress(self.timestamps) + + # 步骤4: 改进DMP轨迹泛化 + print("\n步骤4: 基于DMP的轨迹泛化") + print("-" * 30) + n_dims = self.processed_trajectories.shape[2] + self.improved_dmp = ImprovedDMP(n_dmps=n_dims, n_bfs=self.n_dmp_bfs) + self.improved_dmp.learning_from_gmr(self.gmr_trajectory, self.timestamps) + + # 复现轨迹 + self.reproduced_trajectory, _, _ = self.improved_dmp.reproduce() + + print("\nNGGD模仿学习完成!") + print(f"处理轨迹数量: {self.processed_trajectories.shape[0]}") + print(f"轨迹维度: {self.processed_trajectories.shape[2]}") + print(f"时间步数: {self.processed_trajectories.shape[1]}") + + # 准备学习结果 + learning_results = { + 'processed_trajectories': self.processed_trajectories, + 'timestamps': self.timestamps, + 'normalization_params': self.normalization_params, + 'gmm_model': self.gmm_modeler, + 'gmr_regressor': self.gmr_regressor, + 'gmr_trajectory': self.gmr_trajectory, + 'improved_dmp': self.improved_dmp, + 'reproduced_trajectory': self.reproduced_trajectory + } + + # 可视化结果 + if plot_results: + self.visualize_learning_results() + + return learning_results + + def generalize_trajectory(self, new_initial=None, new_goal=None, + tau=1.0, plot_comparison=True): + """ + 轨迹泛化 + + Args: + new_initial: 新的初始位置 + new_goal: 新的目标位置 + tau: 时间缩放因子 + plot_comparison: 是否绘制比较图 + + Returns: + generalized_trajectory: 泛化后的轨迹 + """ + if self.improved_dmp is None: + raise ValueError("请先执行learn_from_demonstrations方法") + + print(f"\n轨迹泛化 - 初始位置: {new_initial}, 目标位置: {new_goal}") + print("-" * 50) + + # 执行泛化 + generalized_trajectory = self.improved_dmp.generalize( + new_initial=new_initial, + new_goal=new_goal, + tau=tau + ) + + print(f"泛化轨迹形状: {generalized_trajectory.shape}") + + # 可视化比较 + if plot_comparison: + self.visualize_generalization_comparison(generalized_trajectory) + + return generalized_trajectory + + def visualize_learning_results(self, save_dir=None): + """可视化学习结果""" + if save_dir is None: + save_dir = os.path.join(current_dir, 'results') + os.makedirs(save_dir, exist_ok=True) + + print("\n生成学习结果可视化...") + + # 1. 预处理结果可视化 + self.preprocessor.visualize_preprocessing( + [traj for traj in self.processed_trajectories], + self.processed_trajectories, + save_path=os.path.join(save_dir, 'preprocessing_comparison.png') + ) + + # 2. GMM组件可视化 + self.gmm_modeler.visualize_gmm_components( + save_path=os.path.join(save_dir, 'gmm_components.png') + ) + + # 3. GMR回归可视化 + self.gmr_regressor.visualize_regression( + save_path=os.path.join(save_dir, 'gmr_regression.png') + ) + + # 4. 完整流程比较 + self.visualize_complete_pipeline(save_dir) + + print(f"可视化结果保存到: {save_dir}") + + def visualize_complete_pipeline(self, save_dir): + """可视化完整流程""" + fig, axes = plt.subplots(2, 2, figsize=(15, 12)) + + # 原始示教轨迹 + ax1 = axes[0, 0] + for i, traj in enumerate(self.processed_trajectories): + ax1.plot(traj[:, 0], traj[:, 1], alpha=0.5, linewidth=1, + label=f'Demo {i+1}' if i < 3 else "") + ax1.set_title('Demonstration Trajectories') + ax1.set_xlabel('X') + ax1.set_ylabel('Y') + ax1.legend() + ax1.grid(True) + + # GMR回归轨迹 + ax2 = axes[0, 1] + ax2.plot(self.gmr_trajectory[:, 0], self.gmr_trajectory[:, 1], + 'r-', linewidth=2, label='GMR Regression') + ax2.set_title('GMR Regression') + ax2.set_xlabel('X') + ax2.set_ylabel('Y') + ax2.legend() + ax2.grid(True) + + # DMP复现轨迹 + ax3 = axes[1, 0] + ax3.plot(self.gmr_trajectory[:, 0], self.gmr_trajectory[:, 1], + 'b-', linewidth=2, label='GMR Reference') + ax3.plot(self.reproduced_trajectory[:, 0], self.reproduced_trajectory[:, 1], + 'r--', linewidth=2, label='DMP Reproduction') + ax3.set_title('DMP Reproduction') + ax3.set_xlabel('X') + ax3.set_ylabel('Y') + ax3.legend() + ax3.grid(True) + + # 轨迹比较 + ax4 = axes[1, 1] + for i, traj in enumerate(self.processed_trajectories): + ax4.plot(traj[:, 0], traj[:, 1], alpha=0.3, linewidth=1, color='blue') + ax4.plot(self.gmr_trajectory[:, 0], self.gmr_trajectory[:, 1], + 'g-', linewidth=2, label='GMR') + ax4.plot(self.reproduced_trajectory[:, 0], self.reproduced_trajectory[:, 1], + 'r--', linewidth=2, label='DMP') + ax4.set_title('Complete Pipeline') + ax4.set_xlabel('X') + ax4.set_ylabel('Y') + ax4.legend() + ax4.grid(True) + + plt.tight_layout() + plt.savefig(os.path.join(save_dir, 'complete_pipeline.png'), + dpi=300, bbox_inches='tight') + plt.show() + + def visualize_generalization_comparison(self, generalized_trajectory, save_path=None): + """可视化泛化比较""" + fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) + + # 2D比较 + for i, traj in enumerate(self.processed_trajectories): + ax1.plot(traj[:, 0], traj[:, 1], alpha=0.3, linewidth=1, color='blue') + ax1.plot(self.gmr_trajectory[:, 0], self.gmr_trajectory[:, 1], + 'g-', linewidth=2, label='GMR Reference') + ax1.plot(self.reproduced_trajectory[:, 0], self.reproduced_trajectory[:, 1], + 'r--', linewidth=2, label='DMP Reproduction') + ax1.plot(generalized_trajectory[:, 0], generalized_trajectory[:, 1], + 'm:', linewidth=3, label='DMP Generalization') + ax1.set_title('2D Trajectory Comparison') + ax1.set_xlabel('X') + ax1.set_ylabel('Y') + ax1.legend() + ax1.grid(True) + + # 3D比较(如果维度足够) + if self.processed_trajectories.shape[2] >= 3: + ax2 = fig.add_subplot(122, projection='3d') + for i, traj in enumerate(self.processed_trajectories): + ax2.plot(traj[:, 0], traj[:, 1], traj[:, 2], alpha=0.3, linewidth=1, color='blue') + ax2.plot(self.gmr_trajectory[:, 0], self.gmr_trajectory[:, 1], self.gmr_trajectory[:, 2], + 'g-', linewidth=2, label='GMR Reference') + ax2.plot(self.reproduced_trajectory[:, 0], self.reproduced_trajectory[:, 1], self.reproduced_trajectory[:, 2], + 'r--', linewidth=2, label='DMP Reproduction') + ax2.plot(generalized_trajectory[:, 0], generalized_trajectory[:, 1], generalized_trajectory[:, 2], + 'm:', linewidth=3, label='DMP Generalization') + ax2.set_title('3D Trajectory Comparison') + ax2.set_xlabel('X') + ax2.set_ylabel('Y') + ax2.set_zlabel('Z') + ax2.legend() + else: + ax2.axis('off') + ax2.text(0.5, 0.5, '3D visualization\nnot available\n(dimension < 3)', + ha='center', va='center', transform=ax2.transAxes) + + plt.tight_layout() + if save_path: + plt.savefig(save_path, dpi=300, bbox_inches='tight') + print(f"泛化比较图保存到: {save_path}") + plt.show() + + def save_model(self, save_path): + """保存模型""" + import pickle + + model_data = { + 'n_gmm_components': self.n_gmm_components, + 'n_dmp_bfs': self.n_dmp_bfs, + 'noise_std': self.noise_std, + 'gmm_model': self.gmm_modeler, + 'gmr_regressor': self.gmr_regressor, + 'improved_dmp': self.improved_dmp, + 'normalization_params': self.normalization_params + } + + with open(save_path, 'wb') as f: + pickle.dump(model_data, f) + + print(f"模型保存到: {save_path}") + + def load_model(self, load_path): + """加载模型""" + import pickle + + with open(load_path, 'rb') as f: + model_data = pickle.load(f) + + self.n_gmm_components = model_data['n_gmm_components'] + self.n_dmp_bfs = model_data['n_dmp_bfs'] + self.noise_std = model_data['noise_std'] + self.gmm_modeler = model_data['gmm_model'] + self.gmr_regressor = model_data['gmr_regressor'] + self.improved_dmp = model_data['improved_dmp'] + self.normalization_params = model_data['normalization_params'] + + print(f"模型从 {load_path} 加载完成") + +# 测试代码 +if __name__ == "__main__": + # 生成测试示教数据 + np.random.seed(42) + n_demonstrations = 8 + n_timesteps = 100 + n_dims = 3 + + print("生成测试示教数据...") + demonstration_trajectories = [] + + for i in range(n_demonstrations): + # 生成不同的轨迹形状 + t = np.linspace(0, 2*np.pi, n_timesteps) + + if i % 3 == 0: # 圆形轨迹 + trajectory = np.column_stack([ + np.sin(t) + np.random.normal(0, 0.05, n_timesteps), + np.cos(t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) + np.random.normal(0, 0.02, n_timesteps) + ]) + elif i % 3 == 1: # 8字形轨迹 + trajectory = np.column_stack([ + np.sin(t) + np.random.normal(0, 0.05, n_timesteps), + np.sin(2*t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) + np.random.normal(0, 0.02, n_timesteps) + ]) + else: # 螺旋轨迹 + trajectory = np.column_stack([ + t/(2*np.pi) * np.sin(t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) * np.cos(t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) + np.random.normal(0, 0.02, n_timesteps) + ]) + + demonstration_trajectories.append(trajectory) + + # 创建NGGD模仿学习系统 + nggd_system = NGGDImitationLearning( + n_gmm_components=12, + n_dmp_bfs=80, + noise_std=0.02 + ) + + # 执行学习 + learning_results = nggd_system.learn_from_demonstrations( + demonstration_trajectories, + alignment_method='linear', + add_noise=True, + normalize=True, + plot_results=True + ) + + # 测试泛化能力 + print("\n测试轨迹泛化能力...") + + # 泛化1: 不同的初始和目标位置 + generalized_traj1 = nggd_system.generalize_trajectory( + new_initial=[0.5, 0.5, 0.2], + new_goal=[-0.5, -0.5, 0.8], + tau=1.0 + ) + + # 泛化2: 时间缩放 + generalized_traj2 = nggd_system.generalize_trajectory( + new_initial=[0.2, 0.8, 0.1], + new_goal=[0.8, 0.2, 0.9], + tau=0.5 + ) + + # 泛化3: 另一个不同的轨迹 + generalized_traj3 = nggd_system.generalize_trajectory( + new_initial=[-0.3, 0.7, 0.3], + new_goal=[0.7, -0.3, 0.7], + tau=1.2 + ) + + print("\nNGGD模仿学习测试完成!") + print(f"示教轨迹数量: {len(demonstration_trajectories)}") + print(f"GMM组件数量: {nggd_system.n_gmm_components}") + print(f"DMP基函数数量: {nggd_system.n_dmp_bfs}") + print(f"轨迹维度: {n_dims}") diff --git a/NGGD/test_nggd.py b/NGGD/test_nggd.py new file mode 100644 index 0000000..b3c44d0 --- /dev/null +++ b/NGGD/test_nggd.py @@ -0,0 +1,202 @@ +# -*- coding: utf-8 -*- +""" +NGGD模仿学习算法 - 测试脚本 +验证NGGD算法的基本功能 +""" + +import numpy as np +import matplotlib.pyplot as plt +import sys +import os + +# 添加模块路径 +current_dir = os.path.dirname(__file__) +sys.path.append(current_dir) + +from nggd_imitation_learning import NGGDImitationLearning + +def generate_test_trajectories(): + """生成测试轨迹数据""" + print("生成测试轨迹数据...") + + np.random.seed(42) + n_demonstrations = 6 + n_timesteps = 100 + n_dims = 3 + + trajectories = [] + + for i in range(n_demonstrations): + t = np.linspace(0, 2*np.pi, n_timesteps) + + if i % 3 == 0: # 圆形轨迹 + trajectory = np.column_stack([ + np.sin(t) + np.random.normal(0, 0.05, n_timesteps), + np.cos(t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) + np.random.normal(0, 0.02, n_timesteps) + ]) + elif i % 3 == 1: # 8字形轨迹 + trajectory = np.column_stack([ + np.sin(t) + np.random.normal(0, 0.05, n_timesteps), + np.sin(2*t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) + np.random.normal(0, 0.02, n_timesteps) + ]) + else: # 螺旋轨迹 + trajectory = np.column_stack([ + t/(2*np.pi) * np.sin(t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) * np.cos(t) + np.random.normal(0, 0.05, n_timesteps), + t/(2*np.pi) + np.random.normal(0, 0.02, n_timesteps) + ]) + + trajectories.append(trajectory) + + return trajectories + +def test_nggd_algorithm(): + """测试NGGD算法""" + print("=" * 60) + print("NGGD模仿学习算法测试") + print("=" * 60) + + # 1. 生成测试数据 + trajectories = generate_test_trajectories() + print(f"生成了 {len(trajectories)} 条测试轨迹") + + # 2. 创建NGGD系统 + print("\n创建NGGD系统...") + nggd_system = NGGDImitationLearning( + n_gmm_components=8, + n_dmp_bfs=60, + noise_std=0.02 + ) + + # 3. 执行学习 + print("\n执行NGGD学习...") + try: + learning_results = nggd_system.learn_from_demonstrations( + trajectories, + alignment_method='linear', + add_noise=True, + normalize=True, + plot_results=True + ) + print("✓ NGGD学习成功完成") + except Exception as e: + print(f"✗ NGGD学习失败: {e}") + return False + + # 4. 测试轨迹泛化 + print("\n测试轨迹泛化...") + try: + # 泛化测试1 + generalized_traj1 = nggd_system.generalize_trajectory( + new_initial=[0.3, 0.3, 0.2], + new_goal=[-0.3, -0.3, 0.8], + tau=1.0, + plot_comparison=True + ) + print("✓ 泛化测试1成功") + + # 泛化测试2 + generalized_traj2 = nggd_system.generalize_trajectory( + new_initial=[0.2, 0.8, 0.1], + new_goal=[0.8, 0.2, 0.9], + tau=0.5, + plot_comparison=True + ) + print("✓ 泛化测试2成功") + + # 泛化测试3 + generalized_traj3 = nggd_system.generalize_trajectory( + new_initial=[-0.4, 0.6, 0.3], + new_goal=[0.6, -0.4, 0.7], + tau=1.2, + plot_comparison=True + ) + print("✓ 泛化测试3成功") + + except Exception as e: + print(f"✗ 轨迹泛化失败: {e}") + return False + + # 5. 验证结果 + print("\n验证学习结果...") + + # 检查GMR轨迹 + gmr_trajectory = learning_results['gmr_trajectory'] + print(f"GMR轨迹形状: {gmr_trajectory.shape}") + + # 检查DMP复现轨迹 + reproduced_trajectory = learning_results['reproduced_trajectory'] + print(f"DMP复现轨迹形状: {reproduced_trajectory.shape}") + + # 检查泛化轨迹 + print(f"泛化轨迹1形状: {generalized_traj1.shape}") + print(f"泛化轨迹2形状: {generalized_traj2.shape}") + print(f"泛化轨迹3形状: {generalized_traj3.shape}") + + # 6. 计算性能指标 + print("\n计算性能指标...") + + # 计算GMR和DMP复现的相似度 + from scipy.spatial.distance import cdist + gmr_dmp_distance = np.mean(cdist(gmr_trajectory, reproduced_trajectory)) + print(f"GMR与DMP复现的平均距离: {gmr_dmp_distance:.4f}") + + # 计算轨迹平滑度(通过加速度变化) + reproduced_acc = np.gradient(np.gradient(reproduced_trajectory, axis=0), axis=0) + smoothness = np.mean(np.linalg.norm(reproduced_acc, axis=1)) + print(f"DMP复现轨迹平滑度: {smoothness:.4f}") + + print("\n" + "=" * 60) + print("NGGD算法测试完成!") + print("=" * 60) + + return True + +def test_individual_modules(): + """测试各个模块""" + print("\n测试各个模块...") + + # 测试数据预处理 + try: + from data_preprocessing import TrajectoryPreprocessor + preprocessor = TrajectoryPreprocessor() + print("✓ 数据预处理模块导入成功") + except Exception as e: + print(f"✗ 数据预处理模块导入失败: {e}") + + # 测试GMM建模 + try: + from gmm_trajectory_modeling import GMMTrajectoryModeling + gmm_modeler = GMMTrajectoryModeling() + print("✓ GMM建模模块导入成功") + except Exception as e: + print(f"✗ GMM建模模块导入失败: {e}") + + # 测试GMR回归 + try: + from gmr_trajectory_regression import GMRTrajectoryRegression + print("✓ GMR回归模块导入成功") + except Exception as e: + print(f"✗ GMR回归模块导入失败: {e}") + + # 测试改进DMP + try: + from improved_dmp import ImprovedDMP + improved_dmp = ImprovedDMP() + print("✓ 改进DMP模块导入成功") + except Exception as e: + print(f"✗ 改进DMP模块导入失败: {e}") + +if __name__ == "__main__": + # 测试各个模块 + test_individual_modules() + + # 测试完整算法 + success = test_nggd_algorithm() + + if success: + print("\n🎉 所有测试通过!NGGD算法工作正常。") + else: + print("\n❌ 测试失败,请检查错误信息。") diff --git a/code/DMP/cs.py b/code/DMP/cs.py index d81d6a0..c0a89a5 100644 --- a/code/DMP/cs.py +++ b/code/DMP/cs.py @@ -25,6 +25,7 @@ def __init__(self, alpha_x=1.0, dt=0.01, type='discrete'): self.timesteps = round(self.run_time/self.dt) self.reset_state() + # 初始化 state,每个相位的x坐标 def run(self, **kwargs): # run to goal state if 'tau' in kwargs: timesteps = int(self.timesteps / kwargs['tau']) diff --git a/code/DMP/dmp_discrete.py b/code/DMP/dmp_discrete.py index 1c1655a..0f2cf4f 100644 --- a/code/DMP/dmp_discrete.py +++ b/code/DMP/dmp_discrete.py @@ -64,6 +64,7 @@ def generate_centers(self): for i, t in enumerate(t_track): if abs(t_centers[n] - t) <= cs.dt: # find the x center corresponding to the time center self.psi_centers[n] = x_track[i] + break; #找到一个就跳出循环 return self.psi_centers diff --git a/code/demo_discrete_DMP_UR5.py b/code/demo_discrete_DMP_UR5.py index 6ee0606..a676c50 100644 --- a/code/demo_discrete_DMP_UR5.py +++ b/code/demo_discrete_DMP_UR5.py @@ -3,6 +3,8 @@ import time import numpy as np import pandas as pd +import os +from datetime import datetime import sys sys.path.append('./UR5/VREP_RemoteAPIs') @@ -64,41 +66,75 @@ time.sleep(0.1) + +# 创建以当前时间为名的文件夹 +current_time = datetime.now().strftime("%Y%m%d_%H%M%S") +output_dir = f"output_{current_time}" +os.makedirs(output_dir, exist_ok=True) +print(f"创建输出文件夹: {output_dir}") +from matplotlib import font_manager +font_path = './msyh.ttf' # 替换为你的字体文件路径 +font_prop = font_manager.FontProperties(fname=font_path) + + #%% DMP learning # get demonstrated trajectory from file df = pd.read_csv('./demo_trajectory/demo_trajectory_for_discrete_dmp.csv', header=None) reference_trajectory = np.array(df) data_dim = reference_trajectory.shape[0] data_len = reference_trajectory.shape[1] +# data_dim: 3 +# data_len: 697 +# (3, 697) +# n_bfs激活函数的数量是不是最好和step步数保持在一个数量级 dmp = dmp_discrete(n_dmps=data_dim, n_bfs=1000, dt=1.0/data_len) dmp.learning(reference_trajectory) reproduced_trajectory, _, _ = dmp.reproduce() +# 保存第一个3D图 fig = plt.figure() -ax=Axes3D(fig) -plt.plot(reference_trajectory[0,:], reference_trajectory[1,:], reference_trajectory[2,:], 'g', label='reference') -plt.plot(reproduced_trajectory[:,0], reproduced_trajectory[:,1], reproduced_trajectory[:,2], 'r--', label='reproduce') -plt.legend() - -fig = plt.figure() +ax = fig.add_subplot(111, projection='3d') +ax.view_init(elev=20, azim=45) +ax.grid(True) +ax.plot(reference_trajectory[0,:], reference_trajectory[1,:], reference_trajectory[2,:], 'g', label='reference', linewidth=2) +ax.plot(reproduced_trajectory[:,0], reproduced_trajectory[:,1], reproduced_trajectory[:,2], 'r--', label='reproduce', linewidth=2) +ax.legend() +ax.set_title('DMP 3D轨迹对比', fontproperties=font_prop) +ax.set_xlabel('X位置', fontproperties=font_prop) +ax.set_ylabel('Y位置', fontproperties=font_prop) +ax.set_zlabel('Z位置', fontproperties=font_prop) +plt.savefig(f'{output_dir}/dmp_3d_trajectory_comparison.png', dpi=300, bbox_inches='tight') +plt.close() + +# 保存多子图 +fig = plt.figure(figsize=(10, 8)) plt.subplot(311) plt.plot(reference_trajectory[0,:], 'g', label='reference') plt.plot(reproduced_trajectory[:,0], 'r--', label='reproduce') plt.legend() +plt.title('X轴轨迹对比', fontproperties=font_prop) +plt.ylabel('X位置', fontproperties=font_prop) plt.subplot(312) plt.plot(reference_trajectory[1,:], 'g', label='reference') plt.plot(reproduced_trajectory[:,1], 'r--', label='reproduce') plt.legend() +plt.title('Y轴轨迹对比', fontproperties=font_prop) +plt.ylabel('Y位置', fontproperties=font_prop) plt.subplot(313) plt.plot(reference_trajectory[2,:], 'g', label='reference') plt.plot(reproduced_trajectory[:,2], 'r--', label='reproduce') plt.legend() +plt.title('Z轴轨迹对比', fontproperties=font_prop) +plt.ylabel('Z位置', fontproperties=font_prop) +plt.xlabel('时间步', fontproperties=font_prop) -plt.draw() +plt.tight_layout() +plt.savefig(f'{output_dir}/dmp_trajectory_components.png', dpi=300, bbox_inches='tight') +plt.close() #%% the main loop print("Main loop is begining ...") @@ -109,6 +145,7 @@ reproduced_trajectory_record_z = np.zeros((data_len, max_loop)) for loop in range(max_loop): + print('loop: %d' % loop) if loop == 0: # DMP reproduce the reference trajectory reproduced_trajectory, _, _ = dmp.reproduce() @@ -130,19 +167,22 @@ # DMP reproduce with new initial and goal positions reproduced_trajectory, _, _ = dmp.reproduce(initial=initial_pos, goal=goal_pos) - data_len = reproduced_trajectory.shape[0] - reproduced_trajectory_record_x[:,loop] = reproduced_trajectory[:,0] - reproduced_trajectory_record_y[:,loop] = reproduced_trajectory[:,1] - reproduced_trajectory_record_z[:,loop] = reproduced_trajectory[:,2] + current_data_len = reproduced_trajectory.shape[0] + # 确保数组大小匹配 + min_len = min(data_len, current_data_len) + reproduced_trajectory_record_x[:min_len,loop] = reproduced_trajectory[:min_len,0] + reproduced_trajectory_record_y[:min_len,loop] = reproduced_trajectory[:min_len,1] + reproduced_trajectory_record_z[:min_len,loop] = reproduced_trajectory[:min_len,2] + print(f'Loop {loop}: 记录轨迹数据,长度: {min_len}') # go to the goal position - for i in range(data_len): + for i in range(current_data_len): UR5_target_pos = reproduced_trajectory[i,:] vrep_sim.simxSetObjectPosition(client_ID, via_dummy_handle, -1, UR5_target_pos, vrep_sim.simx_opmode_oneshot) vrep_sim.simxSynchronousTrigger(client_ID) # trigger one simulation step # go back to the initial position - for i in range(data_len-1, 0, -1): + for i in range(current_data_len-1, 0, -1): UR5_target_pos = reproduced_trajectory[i,:] vrep_sim.simxSetObjectPosition(client_ID, via_dummy_handle, -1, UR5_target_pos, vrep_sim.simx_opmode_oneshot) vrep_sim.simxSynchronousTrigger(client_ID) # trigger one simulation step @@ -152,13 +192,29 @@ print('Program terminated') #%% Plot -fig=plt.figure() -ax=Axes3D(fig) -plt.plot(reference_trajectory[0,:], reference_trajectory[1,:], reference_trajectory[2,:], 'g', label='reference') -for i in range(max_loop): - plt.plot(reproduced_trajectory_record_x[:,i], reproduced_trajectory_record_y[:,i], reproduced_trajectory_record_z[:,i], '--') +# 保存最终的3D图 +fig = plt.figure() +ax = fig.add_subplot(111, projection='3d') +ax.view_init(elev=20, azim=45) +ax.grid(True) +#目前只绘制参考轨迹。无法给出改变起始点和目标点后的参照轨迹 +ax.plot(reference_trajectory[0,:], reference_trajectory[1,:], reference_trajectory[2,:], 'g', label='reference', linewidth=2) -plt.legend() -plt.xlabel('x') -plt.ylabel('y') -plt.show() +# 绘制所有记录的轨迹 +for i in range(max_loop): + # 检查是否有非零数据 + if np.any(reproduced_trajectory_record_x[:,i] != 0): + ax.plot(reproduced_trajectory_record_x[:,i], reproduced_trajectory_record_y[:,i], reproduced_trajectory_record_z[:,i], '--', alpha=0.7, label=f'reproduce_{i+1}') + +ax.legend() +ax.set_xlabel('X位置', fontproperties=font_prop) +ax.set_ylabel('Y位置', fontproperties=font_prop) +ax.set_zlabel('Z位置', fontproperties=font_prop) +ax.set_title('DMP多轮轨迹对比', fontproperties=font_prop) +plt.savefig(f'{output_dir}/dmp_multiple_trajectories.png', dpi=300, bbox_inches='tight') +plt.close() + +print(f"记录的轨迹数据形状: {reproduced_trajectory_record_x.shape}") +print(f"非零轨迹数量: {np.sum(np.any(reproduced_trajectory_record_x != 0, axis=0))}") + +print(f"所有图片已保存到文件夹: {output_dir}") diff --git a/code/msyh.ttf b/code/msyh.ttf new file mode 100644 index 0000000..e46ff3c Binary files /dev/null and b/code/msyh.ttf differ diff --git a/code/output_20250918_120229/dmp_3d_trajectory_comparison.png b/code/output_20250918_120229/dmp_3d_trajectory_comparison.png new file mode 100644 index 0000000..d7419d6 Binary files /dev/null and b/code/output_20250918_120229/dmp_3d_trajectory_comparison.png differ diff --git a/code/output_20250918_120229/dmp_multiple_trajectories.png b/code/output_20250918_120229/dmp_multiple_trajectories.png new file mode 100644 index 0000000..bd7c221 Binary files /dev/null and b/code/output_20250918_120229/dmp_multiple_trajectories.png differ diff --git a/code/output_20250918_120229/dmp_trajectory_components.png b/code/output_20250918_120229/dmp_trajectory_components.png new file mode 100644 index 0000000..bfc2bd5 Binary files /dev/null and b/code/output_20250918_120229/dmp_trajectory_components.png differ