#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : pybullet_manipulator_interface.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 08/31/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
from typing import Any, Optional, Union, Tuple, List
import numpy as np
from concepts.dm.crowhat.impl.pybullet.pybullet_planning_world_interface import PyBulletPlanningWorldInterface
from concepts.dm.crowhat.impl.pybullet.pybullet_sim_interfaces import PyBulletPhysicalControllerInterface, PyBulletSimulationControllerInterface
from concepts.dm.crowhat.world.manipulator_interface import (
RobotArmJointTrajectory, RobotControllerExecutionContext, RobotControllerExecutionFailed, SingleArmControllerInterface,
SingleArmMotionPlanningInterface
)
from concepts.dm.crowhat.world.planning_world_interface import AttachmentInfo, PlanningWorldInterface
from concepts.simulator.pybullet.components.panda.panda_robot import PandaRobot
from concepts.simulator.pybullet.components.robot_base import BulletArmRobotBase
from concepts.utils.typing_utils import Vec3f, Vec4f, VecNf
__all__ = ['PyBulletSingleArmMotionPlanningInterface', 'PyBulletSingleArmControllerInterface']
[docs]
class PyBulletSingleArmMotionPlanningInterface(SingleArmMotionPlanningInterface):
[docs]
def __init__(self, robot: BulletArmRobotBase, planning_world: Optional[PlanningWorldInterface]):
super().__init__(planning_world)
self._robot = robot
self._joints = self._robot.get_joint_ids()
self._joint_limits = self._robot.get_joint_ids()
[docs]
def get_nr_joints(self) -> int:
return len(self._joints)
[docs]
def get_joint_limits(self):
return self._robot.get_joint_limits()
[docs]
def get_body_id(self) -> int:
return self._robot.get_body_id()
[docs]
def get_ee_link_id(self) -> int:
return self._robot.get_ee_link_id()
[docs]
def get_ee_default_quat(self) -> Vec4f:
return self._robot.get_ee_default_quat()
def _fk(self, qpos):
return self._robot.fk(qpos)
def _ik(self, pos, quat, qpos=None, max_distance=None) -> Optional[np.ndarray]:
if max_distance is None:
max_distance = float(1e9)
return self._robot.ikfast(pos, quat, qpos, max_distance=max_distance, error_on_fail=False)
def _jacobian(self, qpos):
return self._robot.get_jacobian(qpos)
def _mass(self, qpos: np.ndarray) -> np.ndarray:
return self._robot.get_mass_matrix(qpos)
def _coriolis(self, qpos: np.ndarray, qvel: np.ndarray) -> np.ndarray:
return self._robot.get_coriolis_torque(qpos, qvel)
def _get_qpos(self) -> VecNf:
return self._robot.get_qpos()
def _set_qpos(self, qpos: np.ndarray):
return self._robot.set_qpos_with_holding(qpos)
def _add_attachment(self, body: Union[str, int], link: Optional[int] = None, self_link: Optional[int] = None, ee_to_object: Optional[Tuple[Vec3f, Vec4f]] = None) -> Any:
assert link is None, 'PyBullet does not support attachment to a specific link (other than the base link of the object).'
assert self_link is None, 'PyBullet does not support non-EE attachments.'
assert self._robot.get_attached_object() is None, 'PyBullet does not support multiple attachments.'
self._robot.attach_object(body, ee_to_object)
return None
def _remove_attachment(self, body: Union[str, int], link: Optional[int] = None, self_link: Optional[int] = None):
self._robot.detach_object()
def _get_attached_objects(self) -> List[AttachmentInfo]:
attached_object = self._robot.get_attached_object()
if attached_object is None:
return []
return [AttachmentInfo(
body_a=self.get_body_id(), link_a=self.ee_link_id, body_b=attached_object, link_b=-1,
a_to_b=self._robot.get_attached_object_pose_in_ee_frame()
)]
[docs]
def rrt_collision_free(
self, qpos1: np.ndarray, qpos2: Optional[np.ndarray] = None,
ignored_collision_bodies: Optional[List[Union[str, int]]] = None,
smooth_fine_path: bool = False, ignore_rendering: bool = True, **kwargs
) -> Tuple[bool, Optional[List[np.ndarray]]]:
if ignore_rendering:
assert isinstance(self.planning_world, PyBulletPlanningWorldInterface), 'The planning world must be a PyBulletPlanningWorldInterface.'
client = self.planning_world.client
with client.disable_rendering(suppress_stdout=False):
return super().rrt_collision_free(qpos1, qpos2, ignored_collision_bodies, smooth_fine_path, **kwargs)
return super().rrt_collision_free(qpos1, qpos2, ignored_collision_bodies, smooth_fine_path, **kwargs)
[docs]
class PyBulletSingleArmControllerInterface(SingleArmControllerInterface):
[docs]
def __init__(self, panda_robot, timeout_multiplier: float = 1.0):
super().__init__()
self._panda_robot = panda_robot
self.timeout_multiplier = timeout_multiplier
self.set_default_parameters('move_grasp', move_home_timeout=30, trajectory_timeout=50)
self.set_default_parameters('move_place', trajectory_timeout=50)
@property
def panda_robot(self) -> PandaRobot:
return self._panda_robot
[docs]
def get_qpos(self) -> np.ndarray:
return self.panda_robot.get_qpos()
[docs]
def get_qvel(self) -> np.ndarray:
raise NotImplementedError('qvel for panda robot is not implemented.')
[docs]
def get_ee_pose(self) -> Tuple[np.ndarray, np.ndarray]:
return self.panda_robot.get_ee_pose()
[docs]
def move_home(self, **kwargs) -> None:
self.get_update_default_parameters('move_home', kwargs)
with RobotControllerExecutionContext('move_home', **kwargs) as ctx:
ctx.monitor(self.panda_robot.move_home(**kwargs))
[docs]
def move_qpos(self, qpos: np.ndarray, **kwargs) -> None:
self.get_update_default_parameters('move_qpos', kwargs)
with RobotControllerExecutionContext('move_qpos', qpos, **kwargs) as ctx:
ctx.monitor(self.panda_robot.move_qpos(qpos, **kwargs))
[docs]
def move_pose(self, pos: np.ndarray, quat: np.ndarray, **kwargs) -> None:
self.get_update_default_parameters('move_pose', kwargs)
with RobotControllerExecutionContext('move_pose', pos, quat, **kwargs) as ctx:
ctx.monitor(self.panda_robot.move_pose(pos, quat, **kwargs))
[docs]
def move_qpos_trajectory(self, trajectory: RobotArmJointTrajectory, **kwargs) -> None:
self.get_update_default_parameters('move_qpos_trajectory', kwargs)
with RobotControllerExecutionContext('move_qpos_trajectory', trajectory, **kwargs) as ctx:
ctx.monitor(self.panda_robot.move_qpos_path_v2(trajectory, **kwargs))
[docs]
def open_gripper(self, **kwargs) -> None:
self.get_update_default_parameters('open_gripper', kwargs)
with RobotControllerExecutionContext('open_gripper', **kwargs) as ctx:
ctx.monitor(self.panda_robot.open_gripper_free(**kwargs))
[docs]
def close_gripper(self, **kwargs) -> None:
self.get_update_default_parameters('close_gripper', kwargs)
with RobotControllerExecutionContext('close_gripper', **kwargs) as ctx:
ctx.monitor(self.panda_robot.close_gripper_free(**kwargs))
[docs]
def grasp(self, width: float = 0.05, force: float = 40, **kwargs) -> None:
# NB(Jiayuan Mao @ 2024/03/30): width and force parameters are ignored in the PyBullet-based implementation because it uses magic attachments.
self.get_update_default_parameters('grasp', kwargs)
with RobotControllerExecutionContext('grasp', **kwargs) as ctx:
ctx.monitor(self.panda_robot.grasp(**kwargs))
[docs]
def move_grasp(self, approaching_trajectory: RobotArmJointTrajectory, width: float = 0.05, force: float = 40, **kwargs):
self.get_update_default_parameters('move_grasp', kwargs)
move_home_timeout = kwargs['move_home_timeout']
trajectory_timeout = kwargs['trajectory_timeout']
try:
self.move_home(timeout=move_home_timeout)
self._smooth_move_qpos_trajectory(approaching_trajectory, timeout=trajectory_timeout)
self.grasp(width, force)
self._smooth_move_qpos_trajectory(approaching_trajectory[:-5][::-1], timeout=trajectory_timeout)
except RobotControllerExecutionFailed:
raise
[docs]
def move_place(self, placing_trajectory: RobotArmJointTrajectory, **kwargs):
self.get_update_default_parameters('move_place', kwargs)
trajectory_timeout = kwargs['trajectory_timeout']
try:
self._smooth_move_qpos_trajectory(placing_trajectory, timeout=trajectory_timeout)
self.open_gripper()
self.move_home()
except RobotControllerExecutionFailed:
raise
def _smooth_move_qpos_trajectory(self, qt: RobotArmJointTrajectory, timeout: float = 10):
robot = self.panda_robot
qt = qt.copy()
qt.insert(0, robot.get_qpos())
smooth_qt = [qt[0]]
cspace = robot.get_configuration_space()
for qpos1, qpos2 in zip(qt[:-1], qt[1:]):
smooth_qt.extend(cspace.gen_path(qpos1, qpos2)[1][1:])
with RobotControllerExecutionContext('move_qpos_trajectory', smooth_qt) as ctx:
ctx.monitor(robot.move_qpos_path_v2(smooth_qt, timeout=timeout * self.timeout_multiplier))
[docs]
def attach_physical_interface(self, physical_interface: PyBulletPhysicalControllerInterface):
physical_interface.register_controller('move_home', self.move_home)
physical_interface.register_controller('move_qpos', self.move_qpos)
physical_interface.register_controller('move_pose', self.move_pose)
physical_interface.register_controller('move_qpos_trajectory', self.move_qpos_trajectory)
physical_interface.register_controller('open_gripper', self.open_gripper)
physical_interface.register_controller('close_gripper', self.close_gripper)
physical_interface.register_controller('grasp', self.grasp)
physical_interface.register_controller('ctl_grasp', self.move_grasp)
physical_interface.register_controller('ctl_place', self.move_place)
[docs]
def attach_simulation_interface(self, simulation_interface: PyBulletSimulationControllerInterface):
simulation_interface.register_controller('move_home', self.move_home)
simulation_interface.register_controller('move_qpos', self.move_qpos)
simulation_interface.register_controller('move_pose', self.move_pose)
simulation_interface.register_controller('move_qpos_trajectory', self.move_qpos_trajectory)
simulation_interface.register_controller('open_gripper', self.open_gripper)
simulation_interface.register_controller('close_gripper', self.close_gripper)
simulation_interface.register_controller('grasp', self.grasp)
simulation_interface.register_controller('ctl_grasp', self.move_grasp)
simulation_interface.register_controller('ctl_place', self.move_place)