#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : deoxys_server.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 09/10/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
"""Control interface for the Panda robot using the Deoxys library."""
import threading
import time
import atexit
import queue
from dataclasses import dataclass
from typing import Optional, Tuple, List, Dict
from threading import Barrier, Thread, Event
from copy import deepcopy
import cv2
import numpy as np
from jacinle.comm.service import Service, SocketClient
from jacinle.logging import get_logger
from concepts.math.rotationlib_xyzw import quat2axisangle_xyzw, mat2pos_quat_xyzw, quat_diff_xyzw
deoxys_available = True
try:
from deoxys.franka_interface import FrankaInterface
from deoxys.experimental.motion_utils import reset_joints_to_v1
from deoxys.experimental.motion_utils import reset_joints_to_v2, follow_joint_traj, follow_ee_traj, open_gripper, close_gripper
from deoxys.utils.config_utils import verify_controller_config, get_default_controller_config
from deoxys.utils.yaml_config import YamlConfig
except ImportError:
deoxys_available = False
FrankaInterface = object
from concepts.vision.franka_system_calibration.calibration import get_mounted_camera_pose_from_qpos
from concepts.hw_interface.realsense.device_f import CaptureRS
__all__ = ['DeoxysService', 'DeoxysServiceVisualizerInterface', 'MoveQposTask', 'MoveQposTrajectoryTask', 'MultiRobotTaskBuilder', 'DeoxysClient']
logger = get_logger(__file__)
[docs]
class DeoxysService(Service):
DEFAULT_PORTS = (12347, 12348)
ee_to_camera_pos = [0.036499, -0.034889, 0.0594]
ee_to_camera_quat = [0.00252743, 0.0065769, 0.70345566, 0.71070423]
[docs]
def __init__(
self,
robots: Dict[int, FrankaInterface], robot_configs: Optional[Dict[int, np.ndarray]] = None,
cameras: Optional[Dict[str, CaptureRS]] = None, camera_configs: Optional[Dict[str, Dict[str, np.ndarray]]] = None,
camera_to_robot_mapping: Optional[Dict[str, int]] = None,
mock: bool = False
):
if not deoxys_available:
raise ImportError('Deoxys is not available. Please install it first.')
self.robots = robots
self.robot_configs = robot_configs if robot_configs is not None else dict()
self.cameras = cameras if cameras is not None else dict()
self.camera_configs = camera_configs if camera_configs is not None else dict()
self.camera_to_robot_mapping = camera_to_robot_mapping if camera_to_robot_mapping is not None else dict()
self.visualizer = None
self.mock = mock
self.async_threads = dict()
if True:
logger.critical('Starting DeoxysService with the following configurations:')
logger.info(f'Available robots: {list(self.robots.keys())}')
logger.info(f'Available cameras: {list(self.cameras.keys())}')
logger.info(f'Camera to robot mapping: {self.camera_to_robot_mapping}')
logger.info(f'Robot configs: {self.robot_configs}')
logger.info(f'Camera configs: {self.camera_configs}')
super().__init__(configs=dict(), spec={'available_robots': list(self.robots.keys()), 'available_cameras': list(self.cameras.keys())})
[docs]
@classmethod
def from_setup_name(cls, name: str, robot_configs: Optional[Dict[str, np.ndarray]], camera_configs: Optional[Dict[str, Dict[str, np.ndarray]]] = None, mock: bool = False, use_camera_subscriber: bool = False):
from concepts.hw_interface.franka.deoxys_interfaces import get_franka_interface_dict, get_realsense_capture_dict, get_setup_config
from concepts.hw_interface.franka.deoxys_interfaces import get_robot_config_content_by_index
setup_config = get_setup_config(name)
robots = get_franka_interface_dict(setup_config['robots'], wait_for_state=True, auto_close=True)
cameras = get_realsense_capture_dict(setup_config['cameras'], auto_close=True, skip_frames=30, subscriber=use_camera_subscriber)
camera_to_robot_mapping = dict()
for robot_index in robots:
robot_config = get_robot_config_content_by_index(robot_index)
if 'GRIPPER_CAMERA_NAME' in robot_config['ROBOT']:
camera_to_robot_mapping[robot_config['ROBOT']['GRIPPER_CAMERA_NAME']] = robot_index
return cls(robots, robot_configs, cameras, camera_configs, camera_to_robot_mapping, mock)
[docs]
def start_visualizer(self):
self.visualizer = DeoxysServiceVisualizerInterface(self)
self.visualizer.start()
import time; time.sleep(1.0)
self.get_captures()
[docs]
def get_first_robot_index(self):
return next(iter(self.robots))
[docs]
def get_robot_base_poses(self):
# Return the configs['base_pose']
return {robot_index: config['base_pose'] for robot_index, config in self.robot_configs.items()}
[docs]
def get_camera_to_robot_mapping(self):
return self.camera_to_robot_mapping
[docs]
def get_camera_configs(self):
camera_configs = dict()
for k, cam in self.cameras.items():
if k in self.camera_to_robot_mapping and self.camera_to_robot_mapping[k] in self.robots:
robot = self.robots[self.camera_to_robot_mapping[k]]
robot_base_pose = self.robot_configs[self.camera_to_robot_mapping[k]]['base_pose']
extrinsics = robot_base_pose @ get_mounted_camera_pose_from_qpos(robot.last_q, self.ee_to_camera_pos, self.ee_to_camera_quat)
elif k in self.camera_configs:
extrinsics = self.camera_configs[k]['extrinsics']
else:
extrinsics = np.eye(4)
camera_configs[k] = {
'intrinsics': cam.intrinsics[0],
'extrinsics': extrinsics
}
return camera_configs
[docs]
def get_captures(self, robot_move_to: Optional[Dict[int, np.ndarray]] = None):
current_qpos = {robot_index: robot.last_q.copy() for robot_index, robot in self.robots.items()}
if robot_move_to is not None:
for robot_index, qpos in robot_move_to.items():
self.single_move_qpos(robot_index, qpos)
camera_configs = self.get_camera_configs()
rv = dict()
for k, cam in self.cameras.items():
rgb, depth = cam.capture()
rv[k] = {
'color': rgb,
'depth': depth,
'intrinsics': camera_configs[k]['intrinsics'],
'extrinsics': camera_configs[k]['extrinsics']
}
if self.visualizer is not None:
self.visualizer.update_captures(rv)
if robot_move_to is not None:
for robot_index, qpos in current_qpos.items():
self.single_move_qpos(robot_index, qpos)
return rv
[docs]
def get_all_qpos(self):
return {robot_index: robot.last_q.copy() for robot_index, robot in self.robots.items()}
[docs]
def get_all_qvel(self):
return {robot_index: robot.last_dq.copy() for robot_index, robot in self.robots.items()}
[docs]
def get_full_observation(self):
return {
'qpos': self.get_all_qpos(),
'qvel': self.get_all_qvel(),
'captures': self.get_captures()
}
[docs]
def single_open_gripper(self, robot_index, steps: int = 100):
if self.mock:
return print(f'DeoxysService::open_gripper(robot_index={robot_index!r}, steps={steps!r})')
open_gripper(self.robots[robot_index], steps)
[docs]
def single_close_gripper(self, robot_index, steps: int = 100):
if self.mock:
return print(f'DeoxysService::close_gripper(robot_index={robot_index!r}, steps={steps!r})')
close_gripper(self.robots[robot_index], steps)
[docs]
def single_move_home(self, robot_index: int, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None):
if self.mock:
return print(f'DeoxysService::reset_joints_to_v2(robot_index={robot_index!r})')
self.wait_async_thread(robot_index)
reset_joints_to_v2(self.robots[robot_index], self.robots[robot_index].init_q, gripper_open=gripper_open, gripper_close=gripper_close, gripper_default='open')
[docs]
def single_move_qpos(self, robot_index: int, q: np.ndarray, cfg: Optional[Dict] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None):
q = np.asarray(q)
if q.shape != (7,):
raise ValueError('Invalid joint position.')
if self.mock:
return print(f'DeoxysService::reset_joints_to_v2(robot_index={robot_index!r}, q={q!r}, controller_cfg={cfg!r})')
self.wait_async_thread(robot_index)
reset_joints_to_v2(self.robots[robot_index], q, controller_cfg=cfg, gripper_open=gripper_open, gripper_close=gripper_close)
[docs]
def wait_async_thread(self, index):
if index in self.async_threads:
self.single_async_servo_wait(index)
[docs]
def single_move_qpos_trajectory(self, robot_index: int, q: List[np.ndarray], cfg: Optional[Dict] = None, num_addition_steps: int = 0, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, residual_tau_translation: Optional[np.ndarray] = None):
if self.mock:
return print(f'DeoxysService::follow_joint_traj(robot_index={robot_index!r}, q={q!r}, num_addition_steps={num_addition_steps}, controller_cfg={cfg!r})')
if cfg is None:
cfg = get_default_controller_config('JOINT_IMPEDANCE')
cfg = deepcopy(cfg)
cfg = verify_controller_config(cfg, use_default=True)
if residual_tau_translation is not None:
cfg.enable_residual_tau = True
cfg.residual_tau_translation_vec = residual_tau_translation
self.wait_async_thread(robot_index)
follow_joint_traj(self.robots[robot_index], q, num_addition_steps=num_addition_steps, controller_cfg=cfg, gripper_open=gripper_open, gripper_close=gripper_close)
[docs]
def single_async_servo_start(self, robot_index: int):
if robot_index in self.async_threads:
print('Warning: async thread is already running for the robot.')
return
msg_queue = queue.Queue()
exit_event = threading.Event()
started_event = threading.Event()
waiting_event = threading.Event()
waiting_event.set()
async_thread = threading.Thread(target=self.async_servo_thread, args=(msg_queue, self.robots[robot_index], exit_event, started_event, waiting_event))
async_thread.start()
self.async_threads[robot_index] = (async_thread, msg_queue, exit_event, started_event, waiting_event)
[docs]
def async_servo_thread(self, msg_queue, robot_interface: FrankaInterface, exit_event, started_event, waiting_event):
current_task_queue = list()
while True:
if len(current_task_queue) > 0:
try:
task = msg_queue.get_nowait()
if task is None:
break
started_event.set()
print('\nNew task received', task['task'], 'queued=', task['queued'])
self._async_mix_task(current_task_queue, task)
except queue.Empty:
pass
else:
print('Waiting for new task...')
waiting_event.set()
while True:
try:
current_task = msg_queue.get_nowait()
current_task_queue.append(current_task)
started_event.set()
break
except queue.Empty:
pass
if exit_event.is_set():
waiting_event.clear()
return
exit_event.wait(0.1)
waiting_event.clear()
print('\nNew task received', current_task['task'], 'queued=', current_task['queued'])
current_task = current_task_queue[0]
print('\rCurrent task type=', current_task['task'], 'index=', current_task.get('index', -1), end='')
if current_task['task'] == 'stop':
continue
elif current_task['task'] == 'move_qpos':
index = current_task['index']
target_joint_pos = current_task['qpos_trajectory'][index]
gripper_close_i = current_task['gripper_close_trajectory'][index]
cfg = current_task['controller_cfg']
current_task['index'] += 1
if current_task['index'] == len(current_task['qpos_trajectory']):
current_task_queue = current_task_queue[1:]
assert len(target_joint_pos) >= 7
if type(target_joint_pos) is np.ndarray:
action = target_joint_pos.tolist()
else:
action = target_joint_pos
if len(action) == 7:
action += [1.0 if gripper_close_i else -1.0]
robot_interface.control(controller_type="JOINT_IMPEDANCE", action=action, controller_cfg=cfg)
elif current_task['task'] == 'move_ee':
index = current_task['index']
target_ee_pose = current_task['ee_trajectory'][index]
gripper_close_i = current_task['gripper_close_trajectory'][index]
compliance_i = current_task['compliance_trajectory'][index] if current_task['compliance_trajectory'] is not None else None
cfg = current_task['controller_cfg']
current_task['index'] += 1
if current_task['index'] == len(current_task['ee_trajectory']):
current_task_queue = current_task_queue[1:]
target_pos, target_rot = target_ee_pose[:2]
target_axis, target_angle = quat2axisangle_xyzw(target_rot)
target_axis_angle = np.asarray(target_axis) * target_angle
action = np.concatenate([target_pos, target_axis_angle])
if len(target_ee_pose) == 3:
action = np.concatenate([action, [target_ee_pose[2]]])
else:
action = np.concatenate([action, [1.0]]) if gripper_close_i else np.concatenate([action, [-1.0]])
if compliance_i is not None:
cfg = deepcopy(cfg)
cfg["Kp"]['translation'] = compliance_i[:3]
cfg["Kp"]['rotation'] = compliance_i[3:]
current_pos, current_rot = mat2pos_quat_xyzw(robot_interface.last_pose)
pos_diff = np.linalg.norm(target_pos - current_pos)
quat_diff = quat_diff_xyzw(current_rot, target_rot)
# print(f' Current pos:', current_pos, 'Target pos:', target_pos, 'Zdiff', target_pos[2] - current_pos[2])
if pos_diff > 0.1 or quat_diff > np.pi / 4:
import ipdb; ipdb.set_trace()
raise ValueError(f'Target pose is too far away! \n\tCurrent {current_pos}\n\tTarget{target_pos}\n\tExiting...')
for i in range(100):
robot_interface.control(controller_type="OSC_POSE", action=action, controller_cfg=cfg)
current_pos, current_rot = mat2pos_quat_xyzw(robot_interface.last_pose)
pos_diff = np.linalg.norm(target_pos - current_pos)
quat_diff = quat_diff_xyzw(current_rot, target_rot)
if pos_diff > 0.05 or quat_diff > 15 / 180 * np.pi:
continue
else:
break
else:
raise ValueError(f'Failed to reach the target pose after 100 iterations. \n\tCurrent {current_pos}\n\tTarget{target_pos}\n\tExiting...')
else:
raise ValueError(f'Invalid task: {current_task!r}')
def _async_mix_task(self, current_task_queue, new_task):
if new_task['queued']:
current_task_queue.append(new_task)
else:
current_task_queue.clear()
current_task_queue.append(new_task)
[docs]
def single_async_servo_started(self, robot_index: int):
if robot_index not in self.async_threads:
print('Warning: no async thread is running for the robot.')
return
return self.async_threads[robot_index][3].is_set()
[docs]
def single_async_servo_wait_for_start(self, robot_index: int):
if robot_index not in self.async_threads:
print('Warning: no async thread is running for the robot.')
return
self.async_threads[robot_index][3].wait()
[docs]
def single_async_servo_finished(self, robot_index: int):
if robot_index not in self.async_threads:
print('Warning: no async thread is running for the robot.')
return
# started_event.is_set() and waiting_event.is_set()
return self.async_threads[robot_index][3].is_set() and self.async_threads[robot_index][4].is_set()
[docs]
def single_async_servo_cancel(self, robot_index: int, terminate: bool = True):
if robot_index not in self.async_threads:
print('Warning: no async thread is running for the robot.')
return
# Immediately stop the current task and clear the task
self.async_threads[robot_index][1].put({'task': 'stop', 'queued': False})
if terminate:
self.async_threads[robot_index][2].set()
self.async_threads[robot_index][0].join()
del self.async_threads[robot_index]
[docs]
def single_async_servo_wait(self, robot_index: int, terminate: bool = True):
if robot_index not in self.async_threads:
print('Warning: no async thread is running for the robot.')
return
if not self.async_threads[robot_index][0].is_alive():
del self.async_threads[robot_index]
return
# waiting for at least one task to start
self.async_threads[robot_index][3].wait()
# waiting for the current task to finish
self.async_threads[robot_index][4].wait()
if terminate:
self.async_threads[robot_index][2].set()
self.async_threads[robot_index][0].join()
del self.async_threads[robot_index]
[docs]
def single_move_qpos_trajectory_async(self, robot_index: int, q: List[np.ndarray], cfg: Optional[Dict] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, residual_tau_translation: Optional[np.ndarray] = None, queued: bool = False):
if self.mock:
return print(f'DeoxysService::follow_joint_traj_async(robot_index={robot_index!r}, q={q!r}, controller_cfg={cfg!r})')
assert robot_index in self.async_threads
cfg, q, gripper_close_list = _canonicalize_follow_traj_args(cfg, q, gripper_open, gripper_close, residual_tau_translation=residual_tau_translation)
self.async_threads[robot_index][1].put({
'task': 'move_qpos',
'index': 0,
'qpos_trajectory': q,
'gripper_close_trajectory': gripper_close_list,
'controller_cfg': cfg,
'queued': queued
})
[docs]
def single_move_ee_trajectory_async(self, robot_index, ee_traj: List[Tuple[np.ndarray, np.ndarray]], *, cfg: Optional[Dict] = None, compliance_traj: Optional[List[np.ndarray]] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, queued: bool = False):
if self.mock:
return print(f'DeoxysService::follow_ee_traj_async(robot_index={robot_index!r}, ee_traj={ee_traj!r}, controller_cfg={cfg!r})')
assert robot_index in self.async_threads
cfg, ee_traj, compliance_traj, gripper_close_list = _canonicalize_follow_ee_traj_args(cfg, ee_traj, compliance_traj, gripper_open, gripper_close)
self.async_threads[robot_index][1].put({
'task': 'move_ee',
'index': 0,
'ee_trajectory': ee_traj,
'compliance_trajectory': compliance_traj,
'gripper_close_trajectory': gripper_close_list,
'controller_cfg': cfg,
'queued': queued
})
[docs]
def single_move_ee_trajectory(self, robot_index, ee_traj: List[Tuple[np.ndarray, np.ndarray]], *, cfg: Optional[Dict] = None, compliance_traj: Optional[List[np.ndarray]] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None):
if self.mock:
return print(f'DeoxysService::follow_ee_traj(robot_index={robot_index!r}, ee_traj={ee_traj!r}, controller_cfg={cfg!r})')
follow_ee_traj(self.robots[robot_index], ee_traj, controller_cfg=cfg, compliance_traj=compliance_traj, gripper_open=gripper_open, gripper_close=gripper_close)
[docs]
def run_multi_robot(self, task: List[Tuple[str, object]], activated_robots: Optional[List[int]] = None):
builder = MultiRobotTaskBuilder(task)
builder.run(self, activated_robots)
[docs]
def call(self, name, *args, **kwargs):
return getattr(self, name)(*args, **kwargs)
[docs]
def serve_socket(self, name=None, tcp_port=None, use_simple=True, register_name_server=False, verbose=False):
if name is None:
name = 'concepts/deoxys_interface'
if tcp_port is None:
tcp_port = DeoxysService.DEFAULT_PORTS
super().serve_socket(name, tcp_port, use_simple=use_simple, register_name_server=register_name_server, verbose=False)
def _canonicalize_gripper_open_close(gripper_open, gripper_close, default='close'):
if gripper_open is None and gripper_close is None:
return default == 'open', default == 'close'
if gripper_open is not None and gripper_close is not None:
raise ValueError("Cannot specify both gripper_open and gripper_close")
if gripper_open is None:
gripper_open = not gripper_close if type(gripper_close) is bool else ~gripper_close
if gripper_close is None:
gripper_close = not gripper_open if type(gripper_open) is bool else ~gripper_open
return gripper_open, gripper_close
def _canonicalize_follow_traj_args(cfg, joint_trajectory, gripper_open, gripper_close, residual_tau_translation=None, gripper_default='close'):
if cfg is None:
cfg = get_default_controller_config('JOINT_IMPEDANCE')
cfg = deepcopy(cfg)
cfg = verify_controller_config(cfg, use_default=True)
if residual_tau_translation is not None:
cfg.enable_residual_tau = True
cfg.residual_tau_translation_vec = residual_tau_translation
gripper_open, gripper_close = _canonicalize_gripper_open_close(gripper_open, gripper_close, default=gripper_default)
if type(gripper_close) is not bool:
gripper_close_list = gripper_close
else:
gripper_close_list = [gripper_close for _ in joint_trajectory]
return cfg, joint_trajectory, gripper_close_list
def _canonicalize_follow_ee_traj_args(cfg, ee_trajectory, compliance_trajectory, gripper_open, gripper_close, gripper_default='close'):
if cfg is None:
cfg = get_default_controller_config(controller_type="OSC_POSE")
cfg['is_delta'] = False
cfg = deepcopy(cfg)
cfg = verify_controller_config(cfg, use_default=True)
_, gripper_close = _canonicalize_gripper_open_close(gripper_open, gripper_close, default=gripper_default)
if type(gripper_close) is not bool:
gripper_close_list = gripper_close
else:
gripper_close_list = [gripper_close for _ in ee_trajectory]
if compliance_trajectory is not None:
assert len(compliance_trajectory) == len(ee_trajectory)
return cfg, ee_trajectory, compliance_trajectory, gripper_close_list
[docs]
class DeoxysClient(object):
[docs]
def __init__(self, server: str, ports: Optional[Tuple[int, int]] = None):
if ports is None:
ports = DeoxysService.DEFAULT_PORTS
connection = (f'tcp://{server}:{ports[0]}', f'tcp://{server}:{ports[1]}')
self.client = SocketClient('concepts::deoxys_interface', connection, use_simple=True)
self.client.initialize(auto_close=True)
self.default_robot_index = self.client.call('get_first_robot_index')
[docs]
def get_default_robot_index(self):
return self.default_robot_index
[docs]
def set_default_robot_index(self, index):
self.default_robot_index = index
[docs]
def get_robot_base_poses(self):
return self.client.call('get_robot_base_poses')
[docs]
def get_camera_to_robot_mapping(self):
return self.client.call('get_camera_to_robot_mapping')
[docs]
def get_camera_configs(self):
return self.client.call('get_camera_configs')
[docs]
def get_captures(self, robot_move_to: Optional[Dict[int, np.ndarray]] = None):
return self.client.call('get_captures', robot_move_to)
[docs]
def get_all_qpos(self):
return self.client.call('get_all_qpos')
[docs]
def get_all_qvel(self):
return self.client.call('get_all_qvel')
[docs]
def single_open_gripper(self, robot_index, steps: int = 100):
self.client.call('single_open_gripper', robot_index, steps)
[docs]
def single_close_gripper(self, robot_index, steps: int = 100):
self.client.call('single_close_gripper', robot_index, steps)
[docs]
def single_move_home(self, robot_index: int, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None):
self.client.call('single_move_home', robot_index, gripper_open=gripper_open, gripper_close=gripper_close)
[docs]
def single_move_qpos(self, robot_index: int, q: np.ndarray, cfg: Optional[Dict] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None):
self.client.call('single_move_qpos', robot_index, q, cfg, gripper_open=gripper_open, gripper_close=gripper_close)
[docs]
def single_move_qpos_trajectory(self, robot_index: int, q: List[np.ndarray], cfg: Optional[Dict] = None, num_addition_steps: int = 0, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, residual_tau_translation: Optional[np.ndarray] = None):
self.client.call('single_move_qpos_trajectory', robot_index, q, cfg, num_addition_steps=num_addition_steps, gripper_open=gripper_open, gripper_close=gripper_close, residual_tau_translation=residual_tau_translation)
[docs]
def single_move_ee_trajectory(self, robot_index, ee_traj: List[Tuple[np.ndarray, np.ndarray]], *, cfg: Optional[Dict] = None, compliance_traj: Optional[List[np.ndarray]] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None):
self.client.call('single_move_ee_trajectory', robot_index, ee_traj, cfg=cfg, compliance_traj=compliance_traj, gripper_open=gripper_open, gripper_close=gripper_close)
[docs]
def single_async_servo_start(self, robot_index: int):
self.client.call('single_async_servo_start', robot_index)
[docs]
def single_async_servo_started(self, robot_index: int):
return self.client.call('single_async_servo_started', robot_index)
[docs]
def single_async_servo_wait_for_start(self, robot_index: int):
self.client.call('single_async_servo_wait_for_start', robot_index)
[docs]
def single_async_servo_finished(self, robot_index: int):
return self.client.call('single_async_servo_finished', robot_index)
[docs]
def single_async_servo_cancel(self, robot_index: int, terminate: bool = True):
self.client.call('single_async_servo_cancel', robot_index, terminate)
[docs]
def single_async_servo_wait(self, robot_index: int, terminate: bool = True):
self.client.call('single_async_servo_wait', robot_index, terminate)
[docs]
def single_move_qpos_trajectory_async(self, robot_index: int, q: List[np.ndarray], cfg: Optional[Dict] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, residual_tau_translation: Optional[np.ndarray] = None, queued: bool = False):
self.client.call('single_move_qpos_trajectory_async', robot_index, q, cfg, gripper_open=gripper_open, gripper_close=gripper_close, residual_tau_translation=residual_tau_translation, queued=queued)
[docs]
def single_move_ee_trajectory_async(self, robot_index, ee_traj: List[Tuple[np.ndarray, np.ndarray]], *, cfg: Optional[Dict] = None, compliance_traj: Optional[List[np.ndarray]] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, queued: bool = False):
self.client.call('single_move_ee_trajectory_async', robot_index, ee_traj, cfg=cfg, compliance_traj=compliance_traj, gripper_open=gripper_open, gripper_close=gripper_close, queued=queued)
[docs]
def get_qpos(self):
return self.client.call('get_all_qpos')[self.default_robot_index]
[docs]
def get_qvel(self):
return self.client.call('get_all_qvel')[self.default_robot_index]
[docs]
def open_gripper(self, steps: int = 100):
self.client.call('single_open_gripper', self.default_robot_index, steps)
[docs]
def close_gripper(self, steps: int = 100):
self.client.call('single_close_gripper', self.default_robot_index, steps)
[docs]
def move_home(self, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None):
self.client.call('single_move_home', self.default_robot_index, gripper_open=gripper_open, gripper_close=gripper_close)
[docs]
def move_qpos(self, q: np.ndarray, cfg: Optional[Dict] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None):
self.client.call('single_move_qpos', self.default_robot_index, q, cfg, gripper_open=gripper_open, gripper_close=gripper_close)
[docs]
def move_qpos_trajectory(self, q: List[np.ndarray], cfg: Optional[Dict] = None, num_addition_steps: int = 0, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, residual_tau_translation: Optional[np.ndarray] = None):
self.client.call('single_move_qpos_trajectory', self.default_robot_index, q, cfg, num_addition_steps=num_addition_steps, gripper_open=gripper_open, gripper_close=gripper_close, residual_tau_translation=residual_tau_translation)
[docs]
def move_ee_trajectory(self, ee_traj: List[Tuple[np.ndarray, np.ndarray]], *, cfg: Optional[Dict] = None, compliance_traj: Optional[List[np.ndarray]] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None):
self.client.call('single_move_ee_trajectory', self.default_robot_index, ee_traj, cfg=cfg, compliance_traj=compliance_traj, gripper_open=gripper_open, gripper_close=gripper_close)
[docs]
def async_servo_start(self):
"""Start a async servo thread for controlling the robot."""
self.client.call('single_async_servo_start', self.default_robot_index)
[docs]
def async_servo_started(self) -> bool:
"""Returns whether at least one of the task has been sent and started executing."""
return self.client.call('single_async_servo_started', self.default_robot_index)
[docs]
def async_servo_wait_for_start(self):
"""Wait until the async servo thread has started executing the task."""
self.client.call('single_async_servo_wait_for_start', self.default_robo_index)
[docs]
def async_servo_finished(self) -> bool:
"""Returns whether the async servo thread has finished executing at least one task. Note that this function will first check if at least one task has been started."""
return self.client.call('single_async_servo_finished', self.default_robot_index)
[docs]
def async_servo_cancel(self, terminate: bool = True):
"""Cancel the current async servo thread. If `terminate` is True, the thread will be terminated after the cancel operation."""
self.client.call('single_async_servo_cancel', self.default_robot_index, terminate)
[docs]
def async_servo_wait(self, terminate: bool = True):
self.client.call('single_async_servo_wait', self.default_robot_index, terminate)
[docs]
def move_qpos_trajectory_async(self, q: List[np.ndarray], cfg: Optional[Dict] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, residual_tau_translation: Optional[np.ndarray] = None, queued: bool = False):
self.client.call('single_move_qpos_trajectory_async', self.default_robot_index, q, cfg, gripper_open=gripper_open, gripper_close=gripper_close, residual_tau_translation=residual_tau_translation, queued=queued)
[docs]
def move_ee_trajectory_async(self, ee_traj: List[Tuple[np.ndarray, np.ndarray]], *, cfg: Optional[Dict] = None, compliance_traj: Optional[List[np.ndarray]] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, queued: bool = False):
self.client.call('single_move_ee_trajectory_async', self.default_robot_index, ee_traj, cfg=cfg, compliance_traj=compliance_traj, gripper_open=gripper_open, gripper_close=gripper_close, queued=queued)
[docs]
def run_multi_robot(self, task: List[Tuple[str, object]], activated_robots: Optional[List[int]] = None):
self.client.call('run_multi_robot', task, activated_robots)
[docs]
@dataclass
class MoveQposTask(object):
qpos_dict: Dict[int, np.ndarray]
cfg_dict: Dict[int, Dict]
[docs]
@dataclass
class MoveQposTrajectoryTask(object):
qpos_traj_dict: Dict[int, List[np.ndarray]]
cfg_dict: Dict[int, Dict]
[docs]
@dataclass
class MoveGripperTask(object):
open: bool
steps: int
[docs]
class MultiRobotTaskBuilder(object):
[docs]
def __init__(self, tasks: Optional[List[Tuple[str, object]]] = None):
if tasks is None:
tasks = list()
self.tasks = tasks
[docs]
def get_tasks(self):
# Note: not deep copy
return self.tasks.copy()
[docs]
def move_qpos(self, tasks: Dict[int, np.ndarray], configs: Optional[Dict[int, Dict]] = None):
if configs is None:
configs = dict()
self.tasks.append(('move_qpos', MoveQposTask(tasks, configs)))
[docs]
def move_qpos_trajectorry(self, tasks: Dict[int, List[np.ndarray]], configs: Optional[Dict[int, Dict]] = None):
if configs is None:
configs = dict()
self.tasks.append(('move_qpos_trajectory', MoveQposTrajectoryTask(tasks, configs)))
[docs]
def open_gripper(self, steps: int = 100):
self.tasks.append(('open_gripper', MoveGripperTask(True, steps)))
[docs]
def close_gripper(self, steps: int = 100):
self.tasks.append(('close_gripper', MoveGripperTask(False, steps)))
[docs]
def thread_run(self, interface: DeoxysService, index: int, barriers: List):
for i in range(len(self.tasks)):
task, payload = self.tasks[i]
if task == 'move_qpos':
assert isinstance(payload, MoveQposTask)
if index in payload.qpos_dict:
interface.single_move_qpos(index, payload.qpos_dict[index], payload.cfg_dict.get(index, None))
elif task == 'move_qpos_trajectory':
assert isinstance(payload, MoveQposTrajectoryTask)
if index in payload.qpos_traj_dict:
interface.single_move_qpos_trajectory(index, payload.qpos_traj_dict[index], payload.cfg_dict.get(index, None))
elif task == 'move_gripper':
assert isinstance(payload, MoveGripperTask)
if payload.open:
interface.single_open_gripper(index, payload.steps)
else:
interface.single_close_gripper(index, payload.steps)
else:
raise ValueError(f'Invalid task: {task!r}')
barriers[i].wait()
[docs]
def run(self, interface: DeoxysService, activated_robots: Optional[List[int]] = None):
if activated_robots is None:
activated_robots = list(interface.robots.keys())
threads = list()
barriers = [Barrier(len(activated_robots)) for _ in range(len(self.tasks))]
for robot_index in activated_robots:
thread = Thread(target=self.thread_run, args=(interface, robot_index, barriers))
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
[docs]
def run_remote(self, client: DeoxysClient, activated_robots: Optional[List[int]] = None):
client.run_multi_robot(self.get_tasks(), activated_robots)
[docs]
class DeoxysServiceVisualizerInterface(object):
"""Create a matplotlib visualizer for joint angles and end-effector poses."""
[docs]
def __init__(self, service: DeoxysService):
self.service = service
self.stop_event = Event()
self.visualizer = None
def atexit_handler():
self.stop_event.set()
atexit.register(atexit_handler)
[docs]
def start(self):
thread = Thread(target=self.mainloop, daemon=True)
thread.start()
[docs]
def update_captures(self, captures):
print('Updating captures...')
for name, data in captures.items():
self.visualizer.update_queue(f'Camera_{name} Color', time.time(), data['color'], tab='Camera')
depth = data['depth']
depth_vis = cv2.applyColorMap(cv2.convertScaleAbs(depth, alpha=0.03), cv2.COLORMAP_JET)
self.visualizer.update_queue(f'Camera_{name} Depth', time.time(), depth_vis[..., ::-1], tab='Camera')
[docs]
def mainloop(self):
# Create a subplot of 7 rows x N columns (N = number of robots)
# For each robot, plot the joint angles
from concepts.hw_interface.robot_state_visualizer.visualizer import RobotStateVisualizer
self.visualizer = visualizer = RobotStateVisualizer('Franka System')
for camera_index in self.service.cameras:
visualizer.register_queue('Camera', f'Camera_{camera_index} Color', 'image', 1, group='Camera', width_in_group=6)
visualizer.register_queue('Camera', f'Camera_{camera_index} Depth', 'image', 1, group='Camera', width_in_group=6)
for robot_index in self.service.robots:
for joint_index in range(1, 8):
group_index = (joint_index - 1) // 4
visualizer.register_queue(f'Robot{robot_index}', f'R{robot_index} Joint{joint_index}', 'float', 500, group=f'R{robot_index}Group{group_index}', width_in_group=3)
for EE_axis in 'XYZ':
visualizer.register_queue(f'Robot{robot_index}', f'R{robot_index} EE_{EE_axis}', 'float', 500, group=f'R{robot_index}EE_Pos', width_in_group=4)
for EE_axis in 'XYZ':
visualizer.register_queue(f'Robot{robot_index}', f'R{robot_index} EEForce_{EE_axis}', 'float', 500, group=f'R{robot_index}EE_Force', width_in_group=4)
visualizer.start()
tau_ext_initial = dict()
for i, (robot_index, robot) in enumerate(self.service.robots.items()):
tau_ext_initial[robot_index] = np.array(robot._state_buffer[-1].tau_J, copy=True) - np.array(robot._state_buffer[-1].generalized_gravity, copy=True)
from concepts.simulator.pybullet.client import BulletClient
from concepts.simulator.pybullet.components.panda.panda_robot import PandaRobot
bc = BulletClient(is_gui=False)
from concepts.benchmark.manip_tabletop.pybullet_tabletop_base.pybullet_tabletop import TableTopEnv
env = TableTopEnv(bc)
env.add_robot('panda')
def compute_jacobian(robot, q):
r: PandaRobot = env.robots[0]
return r.get_jacobian(q, 11)
def compute_ee_pos(robot, q):
r: PandaRobot = env.robots[0]
r.set_qpos(q)
return r.get_ee_pose(fk=True)
def update():
if self.stop_event.is_set():
return False
for i, (robot_index, robot) in enumerate(self.service.robots.items()):
qpos = np.array(robot._state_buffer[-1].q, copy=True)
measured = np.array(robot._state_buffer[-1].tau_J, copy=True)
gravity = np.array(robot._state_buffer[-1].generalized_gravity, copy=True)
tau_ext = measured - gravity - tau_ext_initial[robot_index]
jac_franka = np.array(robot._state_buffer[-1].jacobian_T_EE, copy=True).reshape(7, 6).T
jac = compute_jacobian(robot, robot.last_q)
cartesian_torque = np.linalg.pinv(jac.T) @ tau_ext
cartesian_torque = cartesian_torque
ee_pos, ee_quat = compute_ee_pos(robot, robot.last_q)
time_f = time.time()
for i in range(7):
visualizer.update_queue(f'R{robot_index} Joint{i+1}', time_f, qpos[i])
visualizer.update_queue(f'R{robot_index} EE_X', time_f, ee_pos[0])
visualizer.update_queue(f'R{robot_index} EE_Y', time_f, ee_pos[1])
visualizer.update_queue(f'R{robot_index} EE_Z', time_f, ee_pos[2])
cartesian_force = cartesian_torque[:3]
from concepts.math.rotationlib_xyzw import quat2mat
cartesian_force_in_world = np.linalg.inv(quat2mat(ee_quat)) @ cartesian_force
visualizer.update_queue(f'R{robot_index} EEForce_X', time_f, cartesian_force_in_world[0])
visualizer.update_queue(f'R{robot_index} EEForce_Y', time_f, cartesian_force_in_world[1])
visualizer.update_queue(f'R{robot_index} EEForce_Z', time_f, cartesian_force_in_world[2])
time.sleep(0.1)
return True
while True:
if not update():
break