ALOHA开源项目学习
目标 了解采集机器人(机械臂)运动时序信息的过程,了解使用运动时序信息进行模仿学习的过程。
采集机械臂运动时序信息
整体过程是,人类操纵master双臂,采集master双臂的信息同步到puppet双臂,记录puppet双臂执行后的观测信息和图像信息。
根据readme中: 运行 one_side_teleop.py 进行远程操作测试,不进行数据采集。如果要采集数据,请运行:
~~~shell
python3 record_episodes.py --dataset_dir <data save dir>
~~~
这一条,看record_episodes.py中,实际进行数据采集的capture_one_episode()函数:
def capture_one_episode(dt, max_timesteps, camera_names, dataset_dir, dataset_name, overwrite):
print(f'Dataset name: {dataset_name}')
### 此处获取主机械臂的对象
# source of data
master_bot_left = InterbotixManipulatorXS(robot_model="wx250s", group_name="arm", gripper_name="gripper",
robot_name=f'master_left', init_node=True)
master_bot_right = InterbotixManipulatorXS(robot_model="wx250s", group_name="arm", gripper_name="gripper",
robot_name=f'master_right', init_node=False)
env = make_real_env(iznit_node=False, setup_robots=False)
# saving dataset
if not os.path.isdir(dataset_dir):
os.makedirs(dataset_dir)
dataset_path = os.path.join(dataset_dir, dataset_name)
if os.path.isfile(dataset_path) and not overwrite:
print(f'Dataset already exist at \n{dataset_path}\nHint: set overwrite to True.')
exit()
# move all 4 robots to a starting pose where it is easy to start teleoperation, then wait till both gripper closed
opening_ceremony(master_bot_left, master_bot_right, env.puppet_bot_left, env.puppet_bot_right)
### 按照设定帧率采集主机械臂的关节数据
# Data collection
ts = env.reset(fake=True)
timesteps = [ts]
actions = []
actual_dt_history = []
time0 = time.time()
DT = 1 / FPS
for t in tqdm(range(max_timesteps)):
t0 = time.time() #
action = get_action(master_bot_left, master_bot_right) ### 每个时间片左右主机械臂各六个关节的.dxl.joint_states.position和gripper数据采集位置,这个函数做了一个归一化的包装
t1 = time.time() #
ts = env.step(action) ### 根据主机械臂信息驱动从机械臂关节、末梢,获得从机械臂images、qpos、qvel、effort、base_vel、tracer_vel的观测值
t2 = time.time() #
timesteps.append(ts)
actions.append(action)
actual_dt_history.append([t0, t1, t2])
time.sleep(max(0, DT - (time.time() - t0)))
print(f'Avg fps: {max_timesteps / (time.time() - time0)}')
# Torque on both master bots
torque_on(master_bot_left)
torque_on(master_bot_right)
# Open puppet grippers
env.puppet_bot_left.dxl.robot_set_operating_modes("single", "gripper", "position")
env.puppet_bot_right.dxl.robot_set_operating_modes("single", "gripper", "position")
move_grippers([env.puppet_bot_left, env.puppet_bot_right], [PUPPET_GRIPPER_JOINT_OPEN] * 2, move_time=0.5)
freq_mean = print_dt_diagnosis(actual_dt_history)
if freq_mean < 30:
print(f'\n\nfreq_mean is {freq_mean}, lower than 30, re-collecting... \n\n\n\n')
return False
"""
For each timestep:
observations
- images
- cam_high (480, 640, 3) 'uint8'
- cam_low (480, 640, 3) 'uint8'
- cam_left_wrist (480, 640, 3) 'uint8'
- cam_right_wrist (480, 640, 3) 'uint8'
- qpos (14,) 'float64'
- qvel (14,) 'float64'
action (14,) 'float64'
base_action (2,) 'float64'
"""
data_dict = {
'/observations/qpos': [],
'/observations/qvel': [],
'/observations/effort': [],
'/action': [],
'/base_action': [],
# '/base_action_t265': [],
}
for cam_name in camera_names:
data_dict[f'/observations/images/{cam_name}'] = []
### 实际记录主机械臂的信息
# len(action): max_timesteps, len(time_steps): max_timesteps + 1
while actions:
action = actions.pop(0)
ts = timesteps.pop(0)
data_dict['/observations/qpos'].append(ts.observation['qpos'])
data_dict['/observations/qvel'].append(ts.observation['qvel'])
data_dict['/observations/effort'].append(ts.observation['effort'])
data_dict['/action'].append(action)
data_dict['/base_action'].append(ts.observation['base_vel'])
# data_dict['/base_action_t265'].append(ts.observation['base_vel_t265'])
for cam_name in camera_names:
data_dict[f'/observations/images/{cam_name}'].append(ts.observation['images'][cam_name])
# plot /base_action vs /base_action_t265
# import matplotlib.pyplot as plt
# plt.plot(np.array(data_dict['/base_action'])[:, 0], label='base_action_linear')
# plt.plot(np.array(data_dict['/base_action'])[:, 1], label='base_action_angular')
# plt.plot(np.array(data_dict['/base_action_t265'])[:, 0], '--', label='base_action_t265_linear')
# plt.plot(np.array(data_dict['/base_action_t265'])[:, 1], '--', label='base_action_t265_angular')
# plt.legend()
# plt.savefig('record_episodes_vel_debug.png', dpi=300)
COMPRESS = True
if COMPRESS:
# JPEG compression
t0 = time.time()
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50] # tried as low as 20, seems fine
compressed_len = []
for cam_name in camera_names:
image_list = data_dict[f'/observations/images/{cam_name}']
compressed_list = []
compressed_len.append([])
for image in image_list:
result, encoded_image = cv2.imencode('.jpg', image, encode_param) # 0.02 sec # cv2.imdecode(encoded_image, 1)
compressed_list.append(encoded_image)
compressed_len[-1].append(len(encoded_image))
data_dict[f'/observations/images/{cam_name}'] = compressed_list
print(f'compression: {time.time() - t0:.2f}s')
# pad so it has same length
t0 = time.time()
compressed_len = np.array(compressed_len)
padded_size = compressed_len.max()
for cam_name in camera_names:
compressed_image_list = data_dict[f'/observations/images/{cam_name}']
padded_compressed_image_list = []
for compressed_image in compressed_image_list:
padded_compressed_image = np.zeros(padded_size, dtype='uint8')
image_len = len(compressed_image)
padded_compressed_image[:image_len] = compressed_image
padded_compressed_image_list.append(padded_compressed_image)
data_dict[f'/observations/images/{cam_name}'] = padded_compressed_image_list
print(f'padding: {time.time() - t0:.2f}s')
# HDF5
t0 = time.time()
with h5py.File(dataset_path + '.hdf5', 'w', rdcc_nbytes=1024**2*2) as root:
root.attrs['sim'] = False
root.attrs['compress'] = COMPRESS
obs = root.create_group('observations')
image = obs.create_group('images')
for cam_name in camera_names:
if COMPRESS:
_ = image.create_dataset(cam_name, (max_timesteps, padded_size), dtype='uint8',
chunks=(1, padded_size), )
else:
_ = image.create_dataset(cam_name, (max_timesteps, 480, 640, 3), dtype='uint8',
chunks=(1, 480, 640, 3), )
_ = obs.create_dataset('qpos', (max_timesteps, 14))
_ = obs.create_dataset('qvel', (max_timesteps, 14))
_ = obs.create_dataset('effort', (max_timesteps, 14))
_ = root.create_dataset('action', (max_timesteps, 14))
_ = root.create_dataset('base_action', (max_timesteps, 2))
# _ = root.create_dataset('base_action_t265', (max_timesteps, 2))
for name, array in data_dict.items():
root[name][...] = array
if COMPRESS:
_ = root.create_dataset('compress_len', (len(camera_names), max_timesteps))
root['/compress_len'][...] = compressed_len
print(f'Saving: {time.time() - t0:.1f} secs')
return True
可以看到根据给定帧率(但似乎不保证处理较慢的情况)调用主左右机械臂API获取每个时间片:
root ├── observations │ ├── images【当前摄像头图片】 │ │ ├── cam_name1 (max_timesteps, padded_size) 或 (max_timesteps, 480, 640, 3) │ │ ├── cam_name2 (max_timesteps, padded_size) 或 (max_timesteps, 480, 640, 3) │ │ └── ... (更多相机名称) │ ├── qpos (max_timesteps, 14) 【机械臂位置】 │ ├── qvel (max_timesteps, 14) 【机械臂速度】 │ └── effort (max_timesteps, 14) 【机械臂力矩】 ├── action (max_timesteps, 14) 【机械臂关节信息、末梢gripper信息】 └── base_action (max_timesteps, 2) 【base_vel这个变量】
的数据,并存储为HDF5格式(HDF5(Hierarchical Data Format Version 5)是一种用于存储和管理异构和大规模科学数据的文件格式。HDF5使用一种层次结构,将数据存储在层次结构中的各种节点中。HDF5节点可以是数据集、组、属性或链接。数据集是存储一组数据的基本节点类型,可以是多维数组或表格。组是一种逻辑容器,可以包含多个数据集和子组。属性是与数据集或组关联的元数据,它们描述了数据或组的特征。链接是指向数据集或组的符号链接或硬链接。HDF5文件可以存储不同类型的数据,包括数值、字符串、图像、声音和视频等。HDF5文件还支持压缩和加密数据的功能。http://www.bimant.com/docs/3dformat/HDF5/)
机械臂厂家提供的数据接口被包装于real_env.py中提供的env实例,env中的Recoder对象实际与ros节点通信获得图片、机械臂的信息。
env的类定义为:
class RealEnv:
"""
Environment for real robot bi-manual manipulation
Action space: [left_arm_qpos (6), # absolute joint position
left_gripper_positions (1), # normalized gripper position (0: close, 1: open)
right_arm_qpos (6), # absolute joint position
right_gripper_positions (1),] # normalized gripper position (0: close, 1: open)
Observation space: {"qpos": Concat[ left_arm_qpos (6), # absolute joint position
left_gripper_position (1), # normalized gripper position (0: close, 1: open)
right_arm_qpos (6), # absolute joint position
right_gripper_qpos (1)] # normalized gripper position (0: close, 1: open)
"qvel": Concat[ left_arm_qvel (6), # absolute joint velocity (rad)
left_gripper_velocity (1), # normalized gripper velocity (pos: opening, neg: closing)
right_arm_qvel (6), # absolute joint velocity (rad)
right_gripper_qvel (1)] # normalized gripper velocity (pos: opening, neg: closing)
"images": {"cam_high": (480x640x3), # h, w, c, dtype='uint8'
"cam_low": (480x640x3), # h, w, c, dtype='uint8'
"cam_left_wrist": (480x640x3), # h, w, c, dtype='uint8'
"cam_right_wrist": (480x640x3)} # h, w, c, dtype='uint8'
"""
def __init__(self, init_node, setup_robots=True, setup_base=False):
self.puppet_bot_left = InterbotixManipulatorXS(robot_model="vx300s", group_name="arm", gripper_name="gripper",
robot_name=f'puppet_left', init_node=init_node)
self.puppet_bot_right = InterbotixManipulatorXS(robot_model="vx300s", group_name="arm", gripper_name="gripper",
robot_name=f'puppet_right', init_node=False)
if setup_robots:
self.setup_robots()
if setup_base:
self.setup_base()
# self.setup_t265()
self.setup_dxl()
### 以下的三个recorder是实际与ros节点通信的
self.recorder_left = Recorder('left', init_node=False)
self.recorder_right = Recorder('right', init_node=False)
self.image_recorder = ImageRecorder(init_node=False)
### 这个函数不在项目本身的代码中,可能是机械臂提供的API
self.gripper_command = JointSingleCommand(name="gripper")
def setup_t265(self):
self.pipeline = rs.pipeline()
cfg = rs.config()
# if only pose stream is enabled, fps is higher (202 vs 30)
cfg.enable_stream(rs.stream.pose)
self.pipeline.start(cfg)
def setup_dxl(self):
self.dxl_client = DynamixelClient([1, 2], port='/dev/ttyDXL_wheels', lazy_connect=True)
self.wheel_r = 0.101 / 2 # 101 mm is the diameter
self.base_r = 0.622 # 622 mm is the distance between the two wheels
def setup_base(self):
self.tracer = pyagxrobots.pysdkugv.TracerBase()
self.tracer.EnableCAN()
def setup_robots(self):
setup_puppet_bot(self.puppet_bot_left)
setup_puppet_bot(self.puppet_bot_right)
def get_qpos(self):
left_qpos_raw = self.recorder_left.qpos
right_qpos_raw = self.recorder_right.qpos
left_arm_qpos = left_qpos_raw[:6]
right_arm_qpos = right_qpos_raw[:6]
left_gripper_qpos = [PUPPET_GRIPPER_POSITION_NORMALIZE_FN(left_qpos_raw[7])] # this is position not joint
right_gripper_qpos = [PUPPET_GRIPPER_POSITION_NORMALIZE_FN(right_qpos_raw[7])] # this is position not joint
return np.concatenate([left_arm_qpos, left_gripper_qpos, right_arm_qpos, right_gripper_qpos])
def get_qvel(self):
left_qvel_raw = self.recorder_left.qvel
right_qvel_raw = self.recorder_right.qvel
left_arm_qvel = left_qvel_raw[:6]
right_arm_qvel = right_qvel_raw[:6]
left_gripper_qvel = [PUPPET_GRIPPER_VELOCITY_NORMALIZE_FN(left_qvel_raw[7])]
right_gripper_qvel = [PUPPET_GRIPPER_VELOCITY_NORMALIZE_FN(right_qvel_raw[7])]
return np.concatenate([left_arm_qvel, left_gripper_qvel, right_arm_qvel, right_gripper_qvel])
def get_effort(self):
left_effort_raw = self.recorder_left.effort
right_effort_raw = self.recorder_right.effort
left_robot_effort = left_effort_raw[:7]
right_robot_effort = right_effort_raw[:7]
return np.concatenate([left_robot_effort, right_robot_effort])
def get_images(self):
return self.image_recorder.get_images()
def get_base_vel_t265(self):
raise NotImplementedError
frames = self.pipeline.wait_for_frames()
pose_frame = frames.get_pose_frame()
pose = pose_frame.get_pose_data()
q1 = Quaternion(w=pose.rotation.w, x=pose.rotation.x, y=pose.rotation.y, z=pose.rotation.z)
rotation = -np.array(q1.yaw_pitch_roll)[0]
rotation_vec = np.array([np.cos(rotation), np.sin(rotation)])
linear_vel_vec = np.array([pose.velocity.z, pose.velocity.x])
is_forward = rotation_vec.dot(linear_vel_vec) > 0
base_linear_vel = np.sqrt(pose.velocity.z ** 2 + pose.velocity.x ** 2) * (1 if is_forward else -1)
base_angular_vel = pose.angular_velocity.y
return np.array([base_linear_vel, base_angular_vel])
def get_base_vel(self):
left_vel, right_vel = self.dxl_client.read_pos_vel_cur()[1]
right_vel = -right_vel # right wheel is inverted
base_linear_vel = (left_vel + right_vel) * self.wheel_r / 2
base_angular_vel = (right_vel - left_vel) * self.wheel_r / self.base_r
return np.array([base_linear_vel, base_angular_vel])
def get_tracer_vel(self):
linear_vel, angular_vel = self.tracer.GetLinearVelocity(), self.tracer.GetAngularVelocity()
return np.array([linear_vel, angular_vel])
def set_gripper_pose(self, left_gripper_desired_pos_normalized, right_gripper_desired_pos_normalized):
left_gripper_desired_joint = PUPPET_GRIPPER_JOINT_UNNORMALIZE_FN(left_gripper_desired_pos_normalized)
self.gripper_command.cmd = left_gripper_desired_joint
self.puppet_bot_left.gripper.core.pub_single.publish(self.gripper_command)
right_gripper_desired_joint = PUPPET_GRIPPER_JOINT_UNNORMALIZE_FN(right_gripper_desired_pos_normalized)
self.gripper_command.cmd = right_gripper_desired_joint
self.puppet_bot_right.gripper.core.pub_single.publish(self.gripper_command)
def _reset_joints(self):
reset_position = START_ARM_POSE[:6]
move_arms([self.puppet_bot_left, self.puppet_bot_right], [reset_position, reset_position], move_time=1)
def _reset_gripper(self):
"""Set to position mode and do position resets: first open then close. Then change back to PWM mode"""
move_grippers([self.puppet_bot_left, self.puppet_bot_right], [PUPPET_GRIPPER_JOINT_OPEN] * 2, move_time=0.5)
move_grippers([self.puppet_bot_left, self.puppet_bot_right], [PUPPET_GRIPPER_JOINT_CLOSE] * 2, move_time=1)
def get_observation(self, get_tracer_vel=False):
obs = collections.OrderedDict()
obs['qpos'] = self.get_qpos()
obs['qvel'] = self.get_qvel()
obs['effort'] = self.get_effort()
obs['images'] = self.get_images()
# obs['base_vel_t265'] = self.get_base_vel_t265()
obs['base_vel'] = self.get_base_vel()
if get_tracer_vel:
obs['tracer_vel'] = self.get_tracer_vel()
return obs
def get_reward(self):
return 0
def reset(self, fake=False):
if not fake:
# Reboot puppet robot gripper motors
self.puppet_bot_left.dxl.robot_reboot_motors("single", "gripper", True)
self.puppet_bot_right.dxl.robot_reboot_motors("single", "gripper", True)
self._reset_joints()
self._reset_gripper()
return dm_env.TimeStep(
step_type=dm_env.StepType.FIRST,
reward=self.get_reward(),
discount=None,
observation=self.get_observation())
def step(self, action, base_action=None, get_tracer_vel=False, get_obs=True):
state_len = int(len(action) / 2)
left_action = action[:state_len]
right_action = action[state_len:]
self.puppet_bot_left.arm.set_joint_positions(left_action[:6], blocking=False)
self.puppet_bot_right.arm.set_joint_positions(right_action[:6], blocking=False)
self.set_gripper_pose(left_action[-1], right_action[-1])
if base_action is not None:
# linear_vel_limit = 1.5
# angular_vel_limit = 1.5
# base_action_linear = np.clip(base_action[0], -linear_vel_limit, linear_vel_limit)
# base_action_angular = np.clip(base_action[1], -angular_vel_limit, angular_vel_limit)
base_action_linear, base_action_angular = base_action
self.tracer.SetMotionCommand(linear_vel=base_action_linear, angular_vel=base_action_angular)
# time.sleep(DT)
if get_obs:
obs = self.get_observation(get_tracer_vel)
else:
obs = None
return dm_env.TimeStep(
step_type=dm_env.StepType.MID,
reward=self.get_reward(),
discount=None,
observation=obs)
def get_action(master_bot_left, master_bot_right):
action = np.zeros(14) # 6 joint + 1 gripper, for two arms
# Arm actions
action[:6] = master_bot_left.dxl.joint_states.position[:6]
action[7:7+6] = master_bot_right.dxl.joint_states.position[:6]
# Gripper actions
action[6] = MASTER_GRIPPER_JOINT_NORMALIZE_FN(master_bot_left.dxl.joint_states.position[6])
action[7+6] = MASTER_GRIPPER_JOINT_NORMALIZE_FN(master_bot_right.dxl.joint_states.position[6])
return action
原始到对机械臂厂家提供的API的调用:
位置
action = np.zeros(14) # 6 joint + 1 gripper, for two arms
# Arm actions
action[:6] = master_bot_left.dxl.joint_states.position[:6]
action[7:7+6] = master_bot_right.dxl.joint_states.position[:6]
# Gripper actions
action[6] = MASTER_GRIPPER_JOINT_NORMALIZE_FN(master_bot_left.dxl.joint_states.position[6])
action[7+6] = MASTER_GRIPPER_JOINT_NORMALIZE_FN(master_bot_right.dxl.joint_states.position[6])
dxl.joint_states.position这个属性是个数组,记录了当前机械臂各个关节的位置信息和夹爪位置。