"""xArm7 Agent Interface
Check user manual at https://www.ufactory.cc/download/
"""
from __future__ import annotations
import math
from collections import OrderedDict
import numpy as np
import pyrealsense2 as rs
from gymnasium import spaces
from sapien import Pose
from scipy.spatial.transform import Rotation
from transforms3d.euler import euler2quat, quat2euler
from transforms3d.quaternions import axangle2quat
from urchin import URDF
from xarm.wrapper import XArmAPI
from .. import ASSET_DIR
from ..sensors.camera import CameraConfig
from ..utils.common import clip_and_scale_action, vectorize_pose
from ..utils.logger import get_logger
from ..utils.multiprocessing import SharedObject, signal_process_ready
# TODO: remove return code from all functions, add return code checks
[docs]
class XArm7:
"""
xArm7 agent class
Mimics mani_skill2.agents.base_agent.BaseAgent interface
"""
SUPPORTED_CONTROL_MODES = (
"ee_pos",
"ee_delta_pos",
"ee_pose_axangle",
"ee_delta_pose_axangle",
"ee_pose_quat",
"ee_delta_pose_quat",
)
SUPPORTED_MOTION_MODES = (
"position",
"servo",
"joint_teaching",
"cartesian_teaching (invalid)",
"joint_vel",
"cartesian_vel",
"joint_online",
"cartesian_online",
)
def __init__(
self,
ip: str = "192.168.1.229",
control_mode: str = "ee_delta_pos",
motion_mode: str = "position",
*,
safety_boundary_mm: list[int] = [999, -999, 999, -999, 999, 0],
boundary_clip_mm: int = 10,
with_hand_camera: bool = True,
run_as_process: bool = False,
):
"""
:param ip: xArm7 ip address, see controller box
:param control_mode: xArm control mode (determines set_action type)
:param motion_mode: xArm motion mode (determines xArm motion mode)
:param safety_boundary_mm: [x_max, x_min, y_max, y_min, z_max, z_min] (mm)
:param boundary_clip_mm: clip action when TCP position to boundary is
within boundary_clip_eps (mm). No clipping if None.
:param with_hand_camera: whether to include hand camera mount in TCP offset.
:param run_as_process: whether to run XArm7 as a separate process for
streaming robot states and saving trajectory (not done)
If True, XArm7 needs to be created as a `mp.Process`.
Several SharedObject are created to control XArm7 and stream data:
* "join_xarm7_real": If triggered, the XArm7 process is joined.
* "sync_xarm7_real": If triggered, all processes should fetch data.
Used for synchronizing robot state capture.
* "start_xarm7_real": If True, starts the XArm7 streams; else, stops it.
* "xarm7_real_urdf_path": xArm7 URDF path, str
* "xarm7_real_qpos": xArm7 joint angles, [8,] np.float32 np.ndarray
* "xarm7_real_qvel": xArm7 joint velocities, [8,] np.float32 np.ndarray
* "xarm7_real_qf": xArm7 joint torques, [8,] np.float32 np.ndarray
* "xarm7_real_tcp_pose": tcp pose in world frame (unit: m), sapien.Pose
"""
self.logger = get_logger("XArm7")
self.ip = ip
self.arm = XArmAPI(ip)
if control_mode not in self.SUPPORTED_CONTROL_MODES:
raise NotImplementedError(f"Unsupported {control_mode = }")
if motion_mode not in self.SUPPORTED_MOTION_MODES:
raise NotImplementedError(f"Unsupported {motion_mode = }")
self._control_mode = control_mode
self._motion_mode = motion_mode
# NOTE: currently still using prismatic fingers in simulation
# TODO: When gear joint is properly implemented, this is not needed
self.joint_limits_ms2 = URDF.load(
f"{ASSET_DIR}/descriptions/xarm7_pris_finger_d435.urdf",
lazy_load_meshes=True,
).joint_limits.astype(np.float32)
# set joint_limits to correspond to [-10, 850]
self.joint_limits_ms2[-2:, 0] = self.joint_limits_ms2[-2:, 1] / 850 * -10
self.gripper_limits = np.asarray([-10, 850], dtype=np.float32)
self.init_qpos = np.asarray(
[
0,
0,
0,
np.pi / 3,
0,
np.pi / 3,
-np.pi / 2,
0.0453556139430441,
0.0453556139430441,
],
dtype=np.float32,
)
self.pose = Pose() # base pose in world frame
self.safety_boundary = np.asarray(safety_boundary_mm)
self.boundary_clip = boundary_clip_mm
if self.boundary_clip is not None:
self.safety_boundary_clip = self.safety_boundary.copy()
self.safety_boundary_clip[0::2] -= boundary_clip_mm
self.safety_boundary_clip[1::2] += boundary_clip_mm
self.with_hand_camera = with_hand_camera
if run_as_process:
self.run_as_process()
else:
self.reset()
def __del__(self):
self.arm.disconnect()
[docs]
def get_err_warn_code(self, show=False):
code, (error_code, warn_code) = self.arm.get_err_warn_code(show=True)
assert code == 0, "Failed to get_err_warn_code"
return error_code, warn_code
[docs]
def clean_warning_error(self, mode=None):
error_code, warn_code = self.get_err_warn_code(show=True)
if warn_code != 0:
self.arm.clean_warn()
if error_code != 0:
self.arm.clean_error()
self.arm.motion_enable(enable=True)
self.arm.set_mode(
self.SUPPORTED_MOTION_MODES.index(self._motion_mode)
if mode is None
else mode
)
self.arm.set_state(state=0)
[docs]
def reset(self, wait=True):
self.clean_warning_error(mode=0)
# NOTE: Remove satefy boundary during reset
# boundary: [x_max, x_min, y_max, y_min, z_max, z_min]
self.arm.set_reduced_tcp_boundary([999, -999, 999, -999, 999, -999])
self.set_qpos(self.init_qpos, wait=wait)
# boundary: [x_max, x_min, y_max, y_min, z_max, z_min]
self.arm.set_reduced_tcp_boundary(self.safety_boundary)
self.arm.set_tcp_load(0.82, [0.0, 0.0, 48.0])
if self.with_hand_camera:
self.arm.set_tcp_offset([0.0, 0.0, 177.0, 0.0, 0.0, 0.0])
else:
self.arm.set_tcp_offset([0.0, 0.0, 172.0, 0.0, 0.0, 0.0])
self.arm.set_self_collision_detection(True)
self.arm.set_collision_tool_model(1) # xArm Gripper
self.arm.set_collision_rebound(True)
self.arm.motion_enable(enable=True)
self.arm.set_mode(self.SUPPORTED_MOTION_MODES.index(self._motion_mode))
self.arm.set_state(state=0)
# Enable gripper and set to maximum speed
self.arm.set_gripper_enable(enable=True)
self.arm.set_gripper_speed(speed=5000)
# ---------------------------------------------------------------------- #
# Control robot
# ---------------------------------------------------------------------- #
def _preprocess_action(
self, action: np.ndarray, translation_scale: float, axangle_scale: float
):
"""Preprocess action:
* Keep current TCP orientation when action only has position
* apply translation_scale and axangle_scale for delta control_mode
* clip tgt_tcp_pose to avoid going out of safety_boundary (optionally)
* clip and rescale gripper action (action[-1])
:param action: control action (unit for translation is meters)
If in delta control_mode, action needs to apply scale
:return tgt_tcp_pose: target TCP pose in robot base frame (unit in mm)
:return gripper_pos: gripper action after rescaling [-10, 850]
"""
cur_tcp_pose = self.get_tcp_pose(unit_in_mm=True)
if self._control_mode == "ee_pos":
tgt_tcp_pose = Pose(p=action[:3] * 1000.0, q=cur_tcp_pose.q) # m => mm
elif self._control_mode == "ee_delta_pos":
delta_tcp_pos = action[:3] * translation_scale # in milimeters
tgt_tcp_pose = cur_tcp_pose * Pose(p=delta_tcp_pos)
# elif self._control_mode == "ee_pose":
# tgt_tcp_pose = Pose(action[:3] * 1000.0,
# euler2quat(*action[3:6], axes='sxyz'))
elif self._control_mode == "ee_pose_axangle":
tgt_tcp_pose = Pose(
p=action[:3] * 1000.0, # m => mm
q=Rotation.from_rotvec(action[3:6]).as_quat()[[3, 0, 1, 2]],
)
elif self._control_mode == "ee_delta_pose_axangle":
axangle = action[3:6]
if (theta := math.sqrt(axangle @ axangle)) < 1e-9:
q = [1, 0, 0, 0]
else:
q = axangle2quat(
axangle / theta,
axangle_scale if theta > 1 else theta * axangle_scale,
is_normalized=True,
)
delta_tcp_pose = Pose(
p=action[:3] * translation_scale, q=q
) # in milimeters
tgt_tcp_pose = cur_tcp_pose * delta_tcp_pose
elif self._control_mode == "ee_pose_quat":
tgt_tcp_pose = Pose(p=action[:3] * 1000.0, q=action[3:7]) # m => mm
elif self._control_mode == "ee_delta_pose_quat":
# TODO: Apply axangle_scale?
delta_tcp_pose = Pose(
p=action[:3] * translation_scale,
q=action[3:7], # in milimeters
)
tgt_tcp_pose = cur_tcp_pose * delta_tcp_pose
else:
raise NotImplementedError(f"{self._control_mode=} not implemented")
# Clip tgt_tcp_pose.p to safety_boundary_clip
if self.boundary_clip is not None:
tgt_tcp_pose.set_p(
np.clip(
tgt_tcp_pose.p,
self.safety_boundary_clip[1::2],
self.safety_boundary_clip[0::2],
)
)
# [-1, 1] => [-10, 850]
gripper_pos = clip_and_scale_action(action[-1], self.gripper_limits)
self.logger.info(f"Setting {tgt_tcp_pose = }, {gripper_pos = }")
return tgt_tcp_pose, gripper_pos
[docs]
def set_action(
self,
action: np.ndarray,
translation_scale=100.0,
axangle_scale=0.1,
speed=None,
mvacc=None,
gripper_speed=None,
skip_gripper=False,
wait=False,
):
"""
:param action: action corresponding to self.control_mode, np.floating np.ndarray
action[-1] is gripper action (always has range [-1, 1])
:param translation_scale: action [-1, 1] maps to [-100mm, 100mm],
Used for delta control_mode only.
:param axangle_scale: axangle action norm (rotation angle) is clipped by 0.1 rad
[-1, 0, 0] => rotate around [1, 0, 0] by -0.1 rad
Used for delta control_mode only.
:param speed: move speed.
For TCP motion: range is [0.1, 1000.0] mm/s (default=100)
For joint motion: range is [0.05, 180.0] deg/s (default=20)
:param mvacc: move acceleration.
For TCP motion: range [1.0, 50000.0] mm/s^2 (default=2000)
For joint motion: range [0.5, 1145.0] deg/s^2 (default=500)
:param gripper_speed: gripper speed, range [1, 5000] r/min (default=5000)
:param skip_gripper: whether to skip gripper action
:param wait: whether to wait for the arm to complete, default is False.
Has no effect in "joint_online" and "cartesian_online" motion mode
"""
# Clean existing warnings / errors
while self.arm.has_err_warn:
error_code, warn_code = self.arm.get_err_warn_code()
# if error_code in [35]: # 35: Safety Boundary Limit
# self.clean_warning_error()
self.logger.error(f"ErrorCode: {error_code}, need to manually clean it")
self.arm.get_err_warn_code(show=True)
_ = input("Press enter after cleaning error")
# Checks action shape and range
assert action in self.action_space, f"Wrong {action = }"
action = np.asarray(action)
# Preprocess action (apply scaling, clip to safety boundary, rescale gripper)
tgt_tcp_pose, gripper_pos = self._preprocess_action(
action, translation_scale, axangle_scale
)
# Control gripper position
ret_gripper = 0
if not skip_gripper:
# NOTE: wait=False for gripper position (it should never block)
ret_gripper = self.arm.set_gripper_position(
gripper_pos, speed=gripper_speed, wait=False, wait_motion=False
)
# Control xArm based on motion mode
if self._motion_mode == "position":
# NOTE: when wait=False, the second set_position() call will be blocked
# until previous motion is completed.
ret_arm = self.arm.set_position(
*tgt_tcp_pose.p,
*quat2euler(tgt_tcp_pose.q, axes="sxyz"),
speed=speed,
mvacc=mvacc,
relative=False,
is_radian=True,
wait=wait,
)
return ret_arm, ret_gripper
elif self._motion_mode == "servo":
raise NotImplementedError("Do not use servo mode! Need fine waypoints")
ret_arm = self.arm.set_servo_cartesian(
np.hstack([tgt_tcp_pose.p, quat2euler(tgt_tcp_pose.q, axes="sxyz")]),
is_radian=True,
is_tool_coord=False,
)
return ret_arm, ret_gripper
elif self._motion_mode == "joint_teaching":
raise ValueError("Joint teaching mode enabled (no action needed)")
elif self._motion_mode == "joint_vel":
raise NotImplementedError("Unverified")
# clip delta pos to prevent undesired rotation due to ik solutions
cur_tcp_pose = self.get_tcp_pose(unit_in_mm=True)
tgt_tcp_pose.set_p(
np.clip(tgt_tcp_pose.p, cur_tcp_pose.p - 30, cur_tcp_pose.p + 30)
)
_, tgt_qpos = self.arm.get_inverse_kinematics(
np.hstack([tgt_tcp_pose.p, quat2euler(tgt_tcp_pose.q, axes="sxyz")]),
input_is_radian=True,
return_is_radian=True,
)
_, (cur_qpos, _, _) = self.arm.get_joint_states(is_radian=True)
delta_qpos = np.asarray(tgt_qpos) - np.asarray(cur_qpos)
timestep = 0.5
qvel = delta_qpos / timestep
qvel = np.clip(qvel, -0.3, 0.3) # clip qvel for safety
ret_arm = self.arm.vc_set_joint_velocity(
qvel, is_radian=True, is_sync=True, duration=timestep
)
# time.sleep(timestep - 0.2)
return ret_arm, ret_gripper
elif self._motion_mode == "cartesian_vel":
raise NotImplementedError(f"{self._motion_mode = } is not yet implemented")
# [spd_x, spd_y, spd_z, spd_rx, spd_ry, spd_rz]
speeds_xyz_rpy = np.zeros(6)
ret_arm = self.arm.vc_set_cartesian_velocity(
speeds_xyz_rpy, is_radian=True, is_tool_coord=True, duration=timestep
)
# time.sleep(timestep - 0.2)
return ret_arm, ret_gripper
elif self._motion_mode == "joint_online":
_, tgt_qpos = self.arm.get_inverse_kinematics(
np.hstack([tgt_tcp_pose.p, quat2euler(tgt_tcp_pose.q, axes="sxyz")]),
input_is_radian=True,
return_is_radian=True,
)
ret_arm = self.arm.set_servo_angle(
angle=tgt_qpos,
speed=speed,
mvacc=mvacc,
relative=False,
is_radian=True,
wait=wait,
)
return ret_arm, ret_gripper
elif self._motion_mode == "cartesian_online":
ret_arm = self.arm.set_position(
*tgt_tcp_pose.p,
*quat2euler(tgt_tcp_pose.q, axes="sxyz"),
speed=speed,
mvacc=mvacc,
relative=False,
is_radian=True,
wait=wait,
)
return ret_arm, ret_gripper
else:
raise NotImplementedError()
[docs]
def set_qpos(
self,
qpos,
speed=None,
mvacc=None,
gripper_speed=None,
skip_gripper=False,
wait=False,
):
"""Set xarm qpos using maniskill2 qpos
:param qpos: joint qpos (angles for arm joints, gripper_qpos for gripper)
See self.joint_limits_ms2
:param speed: move speed.
For TCP motion: range is [0.1, 1000.0] mm/s (default=100)
For joint motion: range is [0.05, 180.0] deg/s (default=20)
:param mvacc: move acceleration.
For TCP motion: range [1.0, 50000.0] mm/s^2 (default=2000)
For joint motion: range [0.5, 1145.0] deg/s^2 (default=500)
:param gripper_speed: gripper speed, range [1, 5000] r/min
:param skip_gripper: whether to skip gripper action
:param wait: whether to wait for the arm to complete, default is False.
"""
assert len(qpos) == 9, f"Wrong qpos shape: {len(qpos)}"
arm_qpos, gripper_qpos = qpos[:7], qpos[-2:]
# Control gripper position
ret_gripper = 0
if not skip_gripper:
gripper_qpos = gripper_qpos[0] # NOTE: mimic action
gripper_pos = clip_and_scale_action(
gripper_qpos, self.gripper_limits, self.joint_limits_ms2[-1, :]
)
ret_gripper = self.arm.set_gripper_position(
gripper_pos, speed=gripper_speed, wait=False, wait_motion=False
)
ret_arm = self.arm.set_servo_angle(
angle=arm_qpos,
speed=speed,
mvacc=mvacc,
relative=False,
is_radian=True,
wait=wait,
)
return ret_arm, ret_gripper
[docs]
def set_gripper_position(
self, gripper_pos, unit_in_mm=False, speed=None, wait=False
):
"""Set gripper opening width
:param gripper_pos: gripper position (default unit is in meters)
:param unit_in_mm: whether gripper_pos has unit mm or m
:param speed: gripper speed, range [1, 5000] r/min
:param wait: whether to wait for the action to complete, default is False.
"""
ret_gripper = self.arm.set_gripper_position(
gripper_pos * 10.0 if unit_in_mm else gripper_pos * 10000.0,
speed=speed,
wait=wait,
wait_motion=wait,
)
return ret_gripper
[docs]
def close_gripper(self, speed=None, wait=False):
"""Close gripper
:param speed: gripper speed, range [1, 5000] r/min
:param wait: whether to wait for the action to complete, default is False.
"""
ret_gripper = self.arm.set_gripper_position(
self.gripper_limits[0], speed=speed, wait=wait, wait_motion=wait
)
return ret_gripper
[docs]
def open_gripper(self, speed=None, wait=False):
"""Open gripper
:param speed: gripper speed, range [1, 5000] r/min
:param wait: whether to wait for the action to complete, default is False.
"""
ret_gripper = self.arm.set_gripper_position(
self.gripper_limits[1], speed=speed, wait=wait, wait_motion=wait
)
return ret_gripper
[docs]
@staticmethod
def build_grasp_pose(
center, approaching=[0.0, 0.0, -1.0], closing=[1.0, 0.0, 0.0]
) -> Pose:
center = np.asarray(center)
approaching, closing = np.asarray(approaching), np.asarray(closing)
assert np.abs(1 - np.linalg.norm(approaching)) < 1e-3
assert np.abs(1 - np.linalg.norm(closing)) < 1e-3
assert np.abs(approaching @ closing) <= 1e-3
ortho = np.cross(closing, approaching)
T = np.eye(4)
T[:3, :3] = np.stack([ortho, closing, approaching], axis=1)
T[:3, 3] = center
return Pose(T)
# ---------------------------------------------------------------------- #
# Get robot information
# ---------------------------------------------------------------------- #
[docs]
def get_qpos(self) -> np.ndarray:
"""Get xarm qpos in maniskill2 format
:return qpos: xArm7 joint angles, [9,] np.float32 np.ndarray
"""
_, (qpos, qvel, effort) = self.arm.get_joint_states(is_radian=True)
_, gripper_pos = self.arm.get_gripper_position() # [-10, 850]
gripper_qpos = clip_and_scale_action(
gripper_pos, self.joint_limits_ms2[-1, :], self.gripper_limits
)
return np.asarray(qpos + [gripper_qpos, gripper_qpos], dtype=np.float32)
[docs]
def get_qvel(self) -> np.ndarray:
"""Get xarm qvel in maniskill2 format
:return qvel: xArm7 joint velocities, [9,] np.float32 np.ndarray
"""
_, (qpos, qvel, effort) = self.arm.get_joint_states(is_radian=True)
return np.asarray(qvel + [0.0, 0.0], dtype=np.float32) # No gripper qvel
[docs]
def get_qf(self) -> np.ndarray:
"""Get xarm qf (joint torques) in maniskill2 format
:return qf: xArm7 joint torques, [9,] np.float32 np.ndarray
"""
_, (qpos, qvel, effort) = self.arm.get_joint_states(is_radian=True)
return np.asarray(effort + [0.0, 0.0], dtype=np.float32) # No gripper qf
[docs]
def get_tcp_pose(self, unit_in_mm=False) -> Pose:
"""Get TCP pose in world frame
:return pose: If unit_in_mm, position unit is mm. Else, unit is m.
"""
# arm.get_position() rounds the values to 6 decimal places
# call arm_cmd.get_tcp_pose() instead
ret = self.arm._arm.arm_cmd.get_tcp_pose()
ret[0] = self.arm._arm._check_code(ret[0])
for v in ret[1:]:
if not math.isfinite(v):
self.logger.critical(f"Return value not finite: {ret[1:]=}")
xyzrpy = np.asarray(ret[1:], dtype=np.float32)
pose_base_tcp = Pose(
p=xyzrpy[:3] if unit_in_mm else xyzrpy[:3] / 1000,
q=euler2quat(*xyzrpy[3:], axes="sxyz"),
)
return self.pose * pose_base_tcp
[docs]
def get_gripper_position(self, unit_in_mm=False) -> float:
"""Get gripper opening width
:return pos: if unit_in_mm, position unit is mm. Else unit is m.
"""
_, gripper_pos = self.arm.get_gripper_position()
return gripper_pos / 10.0 if unit_in_mm else gripper_pos / 10000.0
# ---------------------------------------------------------------------- #
# Observations
# ---------------------------------------------------------------------- #
[docs]
def get_proprioception(self):
obs = OrderedDict(qpos=self.get_qpos(), qvel=self.get_qvel())
# controller_state = self.controller.get_state()
# if len(controller_state) > 0:
# obs.update(controller=controller_state)
return obs
[docs]
def get_state(self) -> dict:
"""Get current state, including robot state and controller state"""
state = OrderedDict()
# robot state
state["robot_root_pose"] = vectorize_pose(self.pose)
state["robot_qpos"] = self.get_qpos()
state["robot_qvel"] = self.get_qvel()
state["robot_qf"] = self.get_qf()
# controller state
# state["controller"] = self.controller.get_state()
return state
[docs]
def set_state(self, state: dict, ignore_controller=False):
# robot state
pose_array = state["robot_root_pose"]
self.pose = Pose(p=pose_array[:3], q=pose_array[3:])
self.set_qpos(state["robot_qpos"])
# self.robot.set_qvel(state["robot_qvel"])
# self.robot.set_qacc(state["robot_qacc"])
# if not ignore_controller and "controller" in state:
# self.controller.set_state(state["controller"])
[docs]
def run_as_process(self):
"""Run XArm7 as a separate process"""
self.logger.info(f"Running {self!r} as a separate process")
# XArm7 control
so_joined = SharedObject("join_xarm7_real")
so_sync = SharedObject("sync_xarm7_real")
so_start = SharedObject("start_xarm7_real", data=False)
# data
urdf_path = (
f"{ASSET_DIR}/descriptions/"
f"{'xarm7_d435.urdf' if self.with_hand_camera else 'xarm7.urdf'}"
)
so_urdf_path = SharedObject("xarm7_real_urdf_path", data=urdf_path)
so_qpos = SharedObject("xarm7_real_qpos", data=np.zeros(8, dtype=np.float32))
so_qvel = SharedObject("xarm7_real_qvel", data=np.zeros(8, dtype=np.float32))
so_qf = SharedObject("xarm7_real_qf", data=np.zeros(8, dtype=np.float32))
so_tcp_pose = SharedObject("xarm7_real_tcp_pose", data=Pose())
signal_process_ready() # current process is ready
while not so_joined.triggered:
if so_start.fetch():
_, (qpos, qvel, qf) = self.arm.get_joint_states(is_radian=True)
_, gripper_pos = self.arm.get_gripper_position() # [-10, 850]
pose_world_tcp = self.get_tcp_pose()
so_qpos.assign(
np.asarray(
qpos
+ [
gripper_pos / 1000.0,
],
dtype=np.float32,
)
)
so_qvel.assign(
np.asarray(
qvel
+ [
0.0,
],
dtype=np.float32,
)
)
so_qf.assign(
np.asarray(
qf
+ [
0.0,
],
dtype=np.float32,
)
)
so_tcp_pose.assign(pose_world_tcp)
self.logger.info(f"Process running {self!r} is joined")
# Unlink created SharedObject
so_joined.unlink()
so_sync.unlink()
so_start.unlink()
so_urdf_path.unlink()
so_qpos.unlink()
so_qvel.unlink()
so_qf.unlink()
so_tcp_pose.unlink()
@property
def robot(self):
"""An alias for compatibility."""
return self
@property
def control_mode(self):
"""Get the currently activated controller uid."""
return self._control_mode
@property
def motion_mode(self):
"""Get the currently activated controller uid."""
return self._motion_mode
@property
def action_space(self):
if self._control_mode == "ee_pos":
return spaces.Box(low=-np.inf, high=np.inf, shape=(4,), dtype=np.float32)
elif self._control_mode == "ee_delta_pos":
return spaces.Box(low=-1, high=1, shape=(4,), dtype=np.float32)
# elif self._control_mode == "ee_pose":
# # [x, y, z, r, p, y, gripper], xyz in meters, rpy in radian
# return spaces.Box(low=np.array([-np.inf]*3 + [-np.pi]*3 + [-1]),
# high=np.array([np.inf]*3 + [np.pi]*3 + [1]),
# shape=(7,), dtype=np.float32)
elif self._control_mode == "ee_pose_axangle":
# [x, y, z, *rotvec, gripper], xyz in meters
# rotvec is in axis of rotation and its norm gives rotation angle
return spaces.Box(
low=np.array([-np.inf] * 6 + [-1]),
high=np.array([np.inf] * 6 + [1]),
shape=(7,),
dtype=np.float32,
)
elif self._control_mode == "ee_delta_pose_axangle":
# [x, y, z, *rotvec, gripper], xyz in meters
# rotvec is in axis of rotation and its norm gives rotation angle
return spaces.Box(low=-1, high=1, shape=(7,), dtype=np.float32)
elif self._control_mode == "ee_pose_quat":
# [x, y, z, w, x, y, z, gripper], xyz in meters, wxyz is unit quaternion
return spaces.Box(
low=np.array([-np.inf] * 3 + [-1] * 4 + [-1]),
high=np.array([np.inf] * 3 + [1] * 4 + [1]),
shape=(8,),
dtype=np.float32,
)
elif self._control_mode == "ee_delta_pose_quat":
# [x, y, z, w, x, y, z, gripper], xyz in meters, wxyz is unit quaternion
return spaces.Box(low=-1, high=1, shape=(8,), dtype=np.float32)
else:
raise NotImplementedError(f"Unsupported {self._control_mode = }")
@property
def cameras(self) -> CameraConfig:
"""CameraConfig of cameras attached to agent"""
pose_tcp_cam = (
Pose(p=[0, 0, 0.177]).inv()
* Pose(
p=[-0.06042734, 0.0175, 0.02915237],
q=euler2quat(np.pi, -np.pi / 2 - np.pi / 12, np.pi),
)
* Pose(p=[0, 0.015, 0])
) # camera_color_frame
return CameraConfig(
uid="hand_camera",
device_sn="146322076186",
pose=pose_tcp_cam,
config=(848, 480, 30),
preset="High Accuracy",
# depth_option_kwargs={rs.option.exposure: 1500},
parent_pose_so_name="xarm7_real_tcp_pose",
)
def __repr__(self):
return (
f"<{self.__class__.__name__}: ip={self.ip}, "
f"control_mode={self._control_mode}, motion_mode={self._motion_mode}, "
f"with_hand_camera={self.with_hand_camera}>"
)