update: 增加随机指令并添加可视化箭头
当前可实现对于x轴线速度和z轴角速度的跟踪,但是转向跟踪总体仍较差,反映不灵敏,可能与模型有一定关系(此前测试在环境中差速转向比较困难)
This commit is contained in:
@@ -119,6 +119,12 @@ class FlexrV0Env(DirectRLEnv):
|
||||
self.joint_pos = self.robot.data.joint_pos
|
||||
self.joint_vel = self.robot.data.joint_vel
|
||||
|
||||
# 指令生成参数
|
||||
self._command_interval = 2.0 # 指令更新间隔(秒)
|
||||
self._command_time = torch.zeros(self.num_envs, device=self.device) # 指令计时器
|
||||
self._max_lin_vel = 1.0 # 最大线速度(m/s)
|
||||
self._max_ang_vel = 1.0 # 最大角速度(rad/s)
|
||||
|
||||
def _get_wheel_joint_indices(self, prefix: str, joint_names: list[str]) -> list[int]:
|
||||
"""获取指定腿部的所有轮子关节索引(返回整数列表)"""
|
||||
indices = []
|
||||
@@ -140,7 +146,7 @@ class FlexrV0Env(DirectRLEnv):
|
||||
light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
|
||||
light_cfg.func("/World/Light", light_cfg)
|
||||
|
||||
# self.visualization_markers = define_markers()
|
||||
self.visualization_markers = define_markers()
|
||||
|
||||
# add height_scaner
|
||||
self.height_sensor = define_height_sensor()
|
||||
@@ -148,8 +154,33 @@ class FlexrV0Env(DirectRLEnv):
|
||||
|
||||
|
||||
def _pre_physics_step(self, actions: torch.Tensor) -> None:
|
||||
# 更新时间
|
||||
self._command_time += self.dt
|
||||
|
||||
# 检查是否需要生成新指令
|
||||
need_new_command = self._command_time >= self._command_interval
|
||||
if torch.any(need_new_command):
|
||||
# 只为需要更新的环境生成新指令
|
||||
if torch.all(need_new_command):
|
||||
self._generate_commands()
|
||||
else:
|
||||
# 部分环境需要更新
|
||||
env_ids = torch.where(need_new_command)[0]
|
||||
# 生成临时指令
|
||||
cmd_lin_vel_xy = torch.rand((len(env_ids), 2), device=self.device) * 2 * self._max_lin_vel - self._max_lin_vel
|
||||
cmd_ang_vel_z = torch.rand((len(env_ids), 1), device=self.device) * 2 * self._max_ang_vel - self._max_ang_vel
|
||||
|
||||
# 更新指定环境的指令
|
||||
self.cmd_lin_vel[env_ids, :2] = cmd_lin_vel_xy
|
||||
self.cmd_ang_vel[env_ids, 2:3] = cmd_ang_vel_z
|
||||
self._command_time[env_ids] = 0
|
||||
|
||||
self.actions = actions.clone()
|
||||
|
||||
# 更新标记
|
||||
if hasattr(self, 'visualization_markers'):
|
||||
self._update_markers()
|
||||
|
||||
def _apply_action(self) -> None:
|
||||
|
||||
# self._debug_print_idx([0])
|
||||
@@ -283,16 +314,6 @@ class FlexrV0Env(DirectRLEnv):
|
||||
|
||||
return total_reward.reshape(-1)
|
||||
|
||||
# def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
|
||||
# self.joint_pos = self.robot.data.joint_pos
|
||||
# self.joint_vel = self.robot.data.joint_vel
|
||||
|
||||
# # time_out = self.episode_length_buf >= self.max_episode_length - 1
|
||||
# # out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
|
||||
# # out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
|
||||
# # return out_of_bounds, time_out
|
||||
# return torch.zeros_like(self.reset_terminated), torch.zeros_like(self.reset_terminated)
|
||||
|
||||
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
|
||||
|
||||
# 初始化终止标志
|
||||
@@ -325,6 +346,18 @@ class FlexrV0Env(DirectRLEnv):
|
||||
env_ids = self.robot._ALL_INDICES # type: ignore
|
||||
super()._reset_idx(env_ids) # type: ignore
|
||||
|
||||
# 重置时生成新指令
|
||||
if len(env_ids) == self.num_envs: # type: ignore
|
||||
self._generate_commands()
|
||||
else:
|
||||
# 部分重置
|
||||
cmd_lin_vel_xy = torch.rand((len(env_ids), 2), device=self.device) * 2 * self._max_lin_vel - self._max_lin_vel # type: ignore
|
||||
cmd_ang_vel_z = torch.rand((len(env_ids), 1), device=self.device) * 2 * self._max_ang_vel - self._max_ang_vel # type: ignore
|
||||
|
||||
self.cmd_lin_vel[env_ids, :2] = cmd_lin_vel_xy
|
||||
self.cmd_ang_vel[env_ids, 2:3] = cmd_ang_vel_z
|
||||
self._command_time[env_ids] = 0
|
||||
|
||||
joint_pos = self.robot.data.default_joint_pos[env_ids]
|
||||
joint_vel = self.robot.data.default_joint_vel[env_ids]
|
||||
|
||||
@@ -374,6 +407,114 @@ class FlexrV0Env(DirectRLEnv):
|
||||
# logging.debug(f"orientations: {self.orientations[env_ids]}")
|
||||
logging.debug(f"euler_angles: {euler_xyz_from_quat(self.orientations[env_ids])}")
|
||||
|
||||
def _generate_commands(self):
|
||||
"""为每个环境生成随机指令"""
|
||||
# 重置计时器
|
||||
self._command_time[:] = 0
|
||||
|
||||
# 生成随机指令 - y方向始终为0
|
||||
cmd_lin_vel_x = torch.rand((self.num_envs, 1), device=self.device) * 2 * self._max_lin_vel - self._max_lin_vel
|
||||
cmd_lin_vel_y = torch.zeros((self.num_envs, 1), device=self.device) # y方向始终为0
|
||||
cmd_ang_vel_z = torch.rand((self.num_envs, 1), device=self.device) * 2 * self._max_ang_vel - self._max_ang_vel
|
||||
|
||||
# 组合成完整指令
|
||||
self.cmd_lin_vel = torch.cat([
|
||||
cmd_lin_vel_x,
|
||||
cmd_lin_vel_y,
|
||||
torch.zeros((self.num_envs, 1), device=self.device)
|
||||
], dim=1)
|
||||
|
||||
self.cmd_ang_vel = torch.cat([
|
||||
torch.zeros((self.num_envs, 2), device=self.device),
|
||||
cmd_ang_vel_z
|
||||
], dim=1)
|
||||
|
||||
def _update_markers(self):
|
||||
"""更新指令和实际速度的标记"""
|
||||
# 指令方向计算
|
||||
command_vx = self.cmd_lin_vel[:, 0:1]
|
||||
command_wz = self.cmd_ang_vel[:, 2:3]
|
||||
command_directions = self._compute_direction_vector(command_vx, command_wz)
|
||||
|
||||
# 实际方向计算
|
||||
actual_vx = self.base_lin_vel[:, 0:1]
|
||||
actual_wz = self.base_ang_vel[:, 2:3]
|
||||
actual_directions = self._compute_direction_vector(actual_vx, actual_wz)
|
||||
|
||||
# 位置设置
|
||||
command_positions = self.robot.data.root_pos_w + torch.tensor([0, 0, 0.5], device=self.device)
|
||||
actual_positions = self.robot.data.root_pos_w + torch.tensor([0, 0, 0.3], device=self.device)
|
||||
|
||||
# 四元数计算
|
||||
command_orientations = self._compute_arrow_orientation(command_directions)
|
||||
actual_orientations = self._compute_arrow_orientation(actual_directions)
|
||||
|
||||
# 调用 visualize 接口
|
||||
translations = torch.cat([command_positions, actual_positions], dim=0)
|
||||
orientations = torch.cat([command_orientations, actual_orientations], dim=0)
|
||||
marker_indices = torch.cat([
|
||||
torch.zeros(self.num_envs, dtype=torch.long, device=self.device), # command
|
||||
torch.ones(self.num_envs, dtype=torch.long, device=self.device) # actual
|
||||
], dim=0)
|
||||
|
||||
self.visualization_markers.visualize(
|
||||
translations=translations,
|
||||
orientations=orientations,
|
||||
marker_indices=marker_indices
|
||||
)
|
||||
|
||||
def _compute_arrow_orientation(self, directions: torch.Tensor) -> torch.Tensor:
|
||||
"""计算箭头方向对应的四元数"""
|
||||
# 将二维方向扩展为三维 (z=0)
|
||||
directions_3d = torch.cat([
|
||||
directions,
|
||||
torch.zeros(directions.shape[0], 1, device=self.device)
|
||||
], dim=1)
|
||||
|
||||
# 归一化方向向量
|
||||
norm = torch.norm(directions_3d, dim=1, keepdim=True)
|
||||
valid = norm > 0
|
||||
directions_norm = torch.where(valid, directions_3d / norm, torch.zeros_like(directions_3d))
|
||||
|
||||
# 默认朝向x轴
|
||||
default_forward = torch.tensor([1.0, 0.0, 0.0], device=self.device).repeat(self.num_envs, 1)
|
||||
|
||||
# 计算旋转
|
||||
cross = torch.cross(default_forward, directions_norm, dim=1) # 指定 dim=1
|
||||
dot = (default_forward * directions_norm).sum(dim=1, keepdim=True)
|
||||
angle = torch.acos(torch.clamp(dot, -1.0, 1.0))
|
||||
|
||||
# 转换为四元数
|
||||
axis = cross / (torch.norm(cross, dim=1, keepdim=True) + 1e-6)
|
||||
quats = torch.zeros((self.num_envs, 4), device=self.device)
|
||||
quats[:, 0:1] = torch.cos(angle / 2)
|
||||
quats[:, 1:4] = axis * torch.sin(angle / 2)
|
||||
|
||||
return quats
|
||||
|
||||
def _compute_direction_vector(self, lin_vel_x: torch.Tensor, ang_vel_z: torch.Tensor) -> torch.Tensor:
|
||||
"""
|
||||
根据线速度 vx 和角速度 wz 合成一个二维方向向量(xy平面)
|
||||
|
||||
Args:
|
||||
lin_vel_x (Tensor): 形状 [num_envs, 1],x轴线速度
|
||||
ang_vel_z (Tensor): 形状 [num_envs, 1],z轴角速度
|
||||
|
||||
Returns:
|
||||
Tensor: 合成方向向量,形状 [num_envs, 2]
|
||||
"""
|
||||
# 计算等效的横向偏移方向(绕z轴旋转时相当于侧向移动)
|
||||
lateral_dir = torch.sign(ang_vel_z) * torch.tensor([0.0, 1.0], device=self.device) # [2]
|
||||
lateral_dir = lateral_dir.expand(lin_vel_x.shape[0], -1) # 扩展到 num_envs 行
|
||||
|
||||
# 构造合成方向:vx 在 x 方向,wz 在 y 方向(等效为转向)
|
||||
direction = torch.cat([
|
||||
lin_vel_x,
|
||||
ang_vel_z.sign() * ang_vel_z.abs().clamp(max=1.0), # 可选归一化或限制最大影响
|
||||
], dim=1)
|
||||
|
||||
return direction
|
||||
|
||||
@torch.jit.script
|
||||
def compute_rewards(
|
||||
# 输入参数
|
||||
@@ -398,14 +539,13 @@ def compute_rewards(
|
||||
# 线速度/角速度跟踪(计算两个向量之间的欧几里得距离)
|
||||
sigma_squared = 0.25
|
||||
# 线速度部分
|
||||
# 提取 xy 方向的速度
|
||||
v_target_xy = cmd_lin_vel[:, :2] # [num_envs, 2]
|
||||
v_actual_xy = base_lin_vel[:, :2] # [num_envs, 2]
|
||||
# 计算偏差的范数平方
|
||||
v_diff_xy = v_target_xy - v_actual_xy
|
||||
v_diff_norm_squared = torch.sum(v_diff_xy ** 2, dim=1, keepdim=True)
|
||||
# 计算线速度跟踪的奖励
|
||||
linear_error = torch.exp(-v_diff_norm_squared / sigma_squared) # [num_envs, 1]
|
||||
# # 提取 x 方向的速度
|
||||
v_target_x = cmd_lin_vel[:, 0:1] # 只取x方向
|
||||
v_actual_x = base_lin_vel[:, 0:1]
|
||||
v_diff_x = v_target_x - v_actual_x
|
||||
v_diff_squared = v_diff_x.pow(2)
|
||||
linear_error = torch.exp(-v_diff_squared / sigma_squared)
|
||||
|
||||
# 角速度部分
|
||||
omega_target_z = cmd_ang_vel[:, 2].unsqueeze(1) # [num_envs, 1]
|
||||
omega_actual_z = base_ang_vel[:, 2].unsqueeze(1) # [num_envs, 1]
|
||||
|
||||
@@ -63,7 +63,7 @@ class FlexrV0EnvCfg(DirectRLEnvCfg):
|
||||
|
||||
# 奖励权重参数 # TODO 写入外部配置
|
||||
rew_scale_lin_vel = 1.0 # 线速度跟踪
|
||||
rew_scale_ang_vel = 0.5 # 角速度跟踪
|
||||
rew_scale_ang_vel = 0.8 # 角速度跟踪
|
||||
rew_scale_z = 0.1 # z 轴稳定性
|
||||
rew_scale_orientation = 0.2 # 姿态稳定性
|
||||
rew_scale_joint_motion = 0.001 # 关节运动
|
||||
|
||||
Reference in New Issue
Block a user