Source code for concepts.hw_interface.franka.deoxys_server

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : deoxys_server.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 09/10/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

"""Control interface for the Panda robot using the Deoxys library."""

import numpy as np
from dataclasses import dataclass
from typing import Optional, Tuple, List, Dict
from threading import Barrier, Thread

from jacinle.comm.service import Service, SocketClient

deoxys_available = True
try:
    from deoxys.franka_interface import FrankaInterface
    from deoxys.experimental.motion_utils import reset_joints_to_v2, follow_joint_traj, follow_ee_traj, open_gripper, close_gripper
    from deoxys.utils.config_utils import verify_controller_config
    from deoxys.utils.yaml_config import YamlConfig
except ImportError:
    deoxys_available = False
    FrankaInterface = object

from concepts.vision.franka_system_calibration.calibration import get_mounted_camera_pose_from_qpos
from concepts.hw_interface.realsense.device_f import CaptureRS

__all__ = ['DeoxysService', 'MoveQposTask', 'MoveQposTrajectoryTask', 'MultiRobotTaskBuilder']


[docs] class DeoxysService(Service): DEFAULT_PORTS = (12347, 12348)
[docs] def __init__(self, robots: Dict[int, FrankaInterface], camera_configs: Optional[Dict[str, Dict[str, np.ndarray]]] = None, cameras: Optional[Dict[str, CaptureRS]] = None, mock: bool = False): if not deoxys_available: raise ImportError('Deoxys is not available. Please install it first.') self.robots = robots self.camera_configs = camera_configs if camera_configs is not None else {} self.cameras = cameras if cameras is not None else {} self.mock = mock super().__init__(configs=dict(), spec={'available_robots': list(self.robots.keys()), 'available_cameras': list(self.cameras.keys())})
[docs] @classmethod def from_setup_name(cls, name: str, camera_configs: Optional[Dict[str, Dict[str, np.ndarray]]] = None, mock: bool = False): from concepts.hw_interface.franka.deoxys_interfaces import get_franka_interface_dict, get_realsense_capture_dict, get_setup_config setup_config = get_setup_config(name) robots = get_franka_interface_dict(setup_config['robots'], wait_for_state=True, auto_close=True) cameras = get_realsense_capture_dict(setup_config['cameras'], auto_close=True, skip_frames=30) return cls(robots, camera_configs, cameras, mock)
[docs] def get_captures(self, robot_move_to: Optional[Dict[int, np.ndarray]] = None): current_qpos = {robot_index: robot.last_q.copy() for robot_index, robot in self.robots.items()} if robot_move_to is not None: for robot_index, qpos in robot_move_to.items(): self.single_move_qpos(robot_index, qpos) rv = dict() for k, cam in self.cameras.items(): if k == 'robot1_hand': ee_to_camera_pos = [0.036499, -0.034889, 0.0594] ee_to_camera_quat = [0.00252743, 0.0065769, 0.70345566, 0.71070423] extrinsics = get_mounted_camera_pose_from_qpos(self.robots[1].last_q, ee_to_camera_pos, ee_to_camera_quat) elif k in self.camera_configs: extrinsics = self.camera_configs[k]['extrinsics'] else: extrinsics = np.eye(4) rgb, depth = cam.capture() rv[k] = { 'color': rgb, 'depth': depth, 'intrinsics': cam.intrinsics[0], 'extrinsics': extrinsics } if robot_move_to is not None: for robot_index, qpos in current_qpos.items(): self.single_move_qpos(robot_index, qpos) return rv
[docs] def get_all_qpos(self): return {robot_index: robot.last_q.copy() for robot_index, robot in self.robots.items()}
[docs] def get_all_qvel(self): return {robot_index: robot.last_dq.copy() for robot_index, robot in self.robots.items()}
[docs] def get_full_observation(self): return { 'qpos': self.get_all_qpos(), 'qvel': self.get_all_qvel(), 'captures': self.get_captures() }
[docs] def single_open_gripper(self, robot_index, steps: int = 100): if self.mock: return print(f'DeoxysService::open_gripper(robot_index={robot_index!r}, steps={steps!r})') open_gripper(self.robots[robot_index], steps)
[docs] def single_close_gripper(self, robot_index, steps: int = 100): if self.mock: return print(f'DeoxysService::close_gripper(robot_index={robot_index!r}, steps={steps!r})') close_gripper(self.robots[robot_index], steps)
[docs] def single_move_home(self, robot_index: int, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None): reset_joints_to_v2(self.robots[robot_index], self.robots[robot_index].init_q, gripper_open=gripper_open, gripper_close=gripper_close, gripper_default='open')
[docs] def single_move_qpos(self, robot_index: int, q: np.ndarray, cfg: Optional[Dict] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None): q = np.asarray(q) if q.shape != (7,): raise ValueError('Invalid joint position.') if self.mock: return print(f'DeoxysService::reset_joints_to_v2(robot_index={robot_index!r}, q={q!r}, controller_cfg={cfg!r})') reset_joints_to_v2(self.robots[robot_index], q, controller_cfg=cfg, gripper_open=gripper_open, gripper_close=gripper_close)
[docs] def single_move_qpos_trajectory(self, robot_index: int, q: List[np.ndarray], cfg: Optional[Dict] = None, num_addition_steps: int = 0, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, residual_tau_translation: Optional[np.ndarray] = None): if self.mock: return print(f'DeoxysService::follow_joint_traj(robot_index={robot_index!r}, q={q!r}, num_addition_steps={num_addition_steps}, controller_cfg={cfg!r})') cfg = verify_controller_config(cfg, use_default=True) if residual_tau_translation is not None: cfg.enable_residual_tau = True cfg.residual_tau_translation_vec = residual_tau_translation follow_joint_traj(self.robots[robot_index], q, num_addition_steps=num_addition_steps, controller_cfg=cfg, gripper_open=gripper_open, gripper_close=gripper_close)
[docs] def single_move_ee_trajectory(self, robot_index, ee_traj: List[Tuple[np.ndarray, np.ndarray]], compliance_traj: Optional[List[np.ndarray]] = None, cfg: Optional[Dict] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None): if self.mock: return print(f'DeoxysService::follow_ee_traj(robot_index={robot_index!r}, ee_traj={ee_traj!r}, controller_cfg={cfg!r})') follow_ee_traj(self.robots[robot_index], ee_traj, compliance_traj=compliance_traj, controller_cfg=cfg, gripper_open=gripper_open, gripper_close=gripper_close)
[docs] def run_multi_robot(self, task: List[Tuple[str, object]], activated_robots: Optional[List[int]] = None): builder = MultiRobotTaskBuilder(task) builder.run(self, activated_robots)
[docs] def call(self, name, *args, **kwargs): return getattr(self, name)(*args, **kwargs)
[docs] def serve_socket(self, name=None, tcp_port=None): if name is None: name = 'concepts::deoxys_interface' if tcp_port is None: tcp_port = DeoxysService.DEFAULT_PORTS super().serve_socket(name, tcp_port)
[docs] class DeoxysClient(object):
[docs] def __init__(self, server: str, ports: Optional[Tuple[int, int]] = None): if ports is None: ports = DeoxysService.DEFAULT_PORTS connection = (f'tcp://{server}:{ports[0]}', f'tcp://{server}:{ports[1]}') self.client = SocketClient('concepts::deoxys_interface', connection) self.client.initialize(auto_close=True)
[docs] def get_captures(self, robot_move_to: Optional[Dict[int, np.ndarray]] = None): return self.client.call('get_captures', robot_move_to)
[docs] def get_all_qpos(self): return self.client.call('get_all_qpos')
[docs] def get_all_qvel(self): return self.client.call('get_all_qvel')
DEFAULT_ROBOT_INDEX = 1
[docs] def single_open_gripper(self, robot_index, steps: int = 100): self.client.call('single_open_gripper', robot_index, steps)
[docs] def single_close_gripper(self, robot_index, steps: int = 100): self.client.call('single_close_gripper', robot_index, steps)
[docs] def single_move_home(self, robot_index: int, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None): self.client.call('single_move_home', robot_index, gripper_open=gripper_open, gripper_close=gripper_close)
[docs] def single_move_qpos(self, robot_index: int, q: np.ndarray, cfg: Optional[Dict] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None): self.client.call('single_move_qpos', robot_index, q, cfg, gripper_open=gripper_open, gripper_close=gripper_close)
[docs] def single_move_qpos_trajectory(self, robot_index: int, q: List[np.ndarray], cfg: Optional[Dict] = None, num_addition_steps: int = 0, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, residual_tau_translation: Optional[np.ndarray] = None): self.client.call('single_move_qpos_trajectory', robot_index, q, cfg, num_addition_steps=num_addition_steps, gripper_open=gripper_open, gripper_close=gripper_close, residual_tau_translation=residual_tau_translation)
[docs] def single_move_ee_trajectory(self, robot_index, ee_traj: List[Tuple[np.ndarray, np.ndarray]], cfg: Optional[Dict] = None, compliance_traj: Optional[List[np.ndarray]] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None): self.client.call('single_move_ee_trajectory', robot_index, ee_traj, cfg, compliance_traj=compliance_traj, gripper_open=gripper_open, gripper_close=gripper_close)
[docs] def get_qpos(self): return self.client.call('get_all_qpos')[type(self).DEFAULT_ROBOT_INDEX]
[docs] def get_qvel(self): return self.client.call('get_all_qvel')[type(self).DEFAULT_ROBOT_INDEX]
[docs] def open_gripper(self, steps: int = 100): self.client.call('single_open_gripper', type(self).DEFAULT_ROBOT_INDEX, steps)
[docs] def close_gripper(self, steps: int = 100): self.client.call('single_close_gripper', type(self).DEFAULT_ROBOT_INDEX, steps)
[docs] def move_home(self, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None): self.client.call('single_move_home', type(self).DEFAULT_ROBOT_INDEX, gripper_open=gripper_open, gripper_close=gripper_close)
[docs] def move_qpos(self, q: np.ndarray, cfg: Optional[Dict] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None): self.client.call('single_move_qpos', type(self).DEFAULT_ROBOT_INDEX, q, cfg, gripper_open=gripper_open, gripper_close=gripper_close)
[docs] def move_qpos_trajectory(self, q: List[np.ndarray], cfg: Optional[Dict] = None, num_addition_steps: int = 0, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None, residual_tau_translation: Optional[np.ndarray] = None): self.client.call('single_move_qpos_trajectory', type(self).DEFAULT_ROBOT_INDEX, q, cfg, num_addition_steps=num_addition_steps, gripper_open=gripper_open, gripper_close=gripper_close, residual_tau_translation=residual_tau_translation)
[docs] def move_ee_trajectory(self, ee_traj: List[Tuple[np.ndarray, np.ndarray]], cfg: Optional[Dict] = None, num_additional_steps: int = 0, compliance_traj: Optional[List[np.ndarray]] = None, gripper_open: Optional[bool] = None, gripper_close: Optional[bool] = None): if num_additional_steps > 0: raise NotImplementedError('additional_steps is not supported in DeoxysClient::move_ee_trajectory.') self.client.call('single_move_ee_trajectory', type(self).DEFAULT_ROBOT_INDEX, ee_traj, cfg, compliance_traj=compliance_traj, gripper_open=gripper_open, gripper_close=gripper_close)
[docs] def run_multi_robot(self, task: List[Tuple[str, object]], activated_robots: Optional[List[int]] = None): self.client.call('run_multi_robot', task, activated_robots)
[docs] @dataclass class MoveQposTask(object): qpos_dict: Dict[int, np.ndarray] cfg_dict: Dict[int, Dict]
[docs] @dataclass class MoveQposTrajectoryTask(object): qpos_traj_dict: Dict[int, List[np.ndarray]] cfg_dict: Dict[int, Dict]
[docs] @dataclass class MoveGripperTask(object): open: bool steps: int
[docs] class MultiRobotTaskBuilder(object):
[docs] def __init__(self, tasks: Optional[List[Tuple[str, object]]] = None): if tasks is None: tasks = list() self.tasks = tasks
[docs] def get_tasks(self): # Note: not deep copy return self.tasks.copy()
[docs] def move_qpos(self, tasks: Dict[int, np.ndarray], configs: Optional[Dict[int, Dict]] = None): if configs is None: configs = dict() self.tasks.append(('move_qpos', MoveQposTask(tasks, configs)))
[docs] def move_qpos_trajectorry(self, tasks: Dict[int, List[np.ndarray]], configs: Optional[Dict[int, Dict]] = None): if configs is None: configs = dict() self.tasks.append(('move_qpos_trajectory', MoveQposTrajectoryTask(tasks, configs)))
[docs] def open_gripper(self, steps: int = 100): self.tasks.append(('open_gripper', MoveGripperTask(True, steps)))
[docs] def close_gripper(self, steps: int = 100): self.tasks.append(('close_gripper', MoveGripperTask(False, steps)))
[docs] def thread_run(self, interface: DeoxysService, index: int, barriers: List): for i in range(len(self.tasks)): task, payload = self.tasks[i] if task == 'move_qpos': assert isinstance(payload, MoveQposTask) if index in payload.qpos_dict: interface.single_move_qpos(index, payload.qpos_dict[index], payload.cfg_dict.get(index, None)) elif task == 'move_qpos_trajectory': assert isinstance(payload, MoveQposTrajectoryTask) if index in payload.qpos_traj_dict: interface.single_move_qpos_trajectory(index, payload.qpos_traj_dict[index], payload.cfg_dict.get(index, None)) elif task == 'move_gripper': assert isinstance(payload, MoveGripperTask) if payload.open: interface.single_open_gripper(index, payload.steps) else: interface.single_close_gripper(index, payload.steps) else: raise ValueError(f'Invalid task: {task!r}') barriers[i].wait()
[docs] def run(self, interface: DeoxysService, activated_robots: Optional[List[int]] = None): if activated_robots is None: activated_robots = list(interface.robots.keys()) threads = list() barriers = [Barrier(len(activated_robots)) for _ in range(len(self.tasks))] for robot_index in activated_robots: thread = Thread(target=self.thread_run, args=(interface, robot_index, barriers)) threads.append(thread) for thread in threads: thread.start() for thread in threads: thread.join()
[docs] def run_remote(self, client: DeoxysClient, activated_robots: Optional[List[int]] = None): client.run_multi_robot(self.get_tasks(), activated_robots)