Source code for concepts.dm.crowhat.world.manipulator_interface

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : manipulator_interface.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 07/23/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import numpy as np
from typing import Any, Optional, Union, Iterator, Tuple, List, Dict

from concepts.algorithm.configuration_space import BoxConfigurationSpace, CollisionFreeProblemSpace
from concepts.algorithm.rrt.rrt import birrt
from concepts.dm.crow.interfaces.controller_interface import CrowControllerExecutionError
from concepts.dm.crowhat.manipulation_utils.pose_utils import canonicalize_pose, pose_difference
from concepts.dm.crowhat.world.planning_world_interface import AttachmentInfo, PlanningWorldInterface
from concepts.math.range import Range
from concepts.math.frame_utils_xyzw import get_transform_a_to_b, solve_ee_from_tool, calc_ee_quat_from_directions
from concepts.utils.typing_utils import VecNf, Vec3f, Vec4f

__all__ = [
    'SingleArmMotionPlanningInterface', 'MotionPlanningResult',
    'RobotControllerExecutionFailed', 'RobotControllerExecutionContext', 'RobotArmJointTrajectory',
    'SingleArmControllerInterface'
]


[docs] class MotionPlanningResult(object):
[docs] def __init__(self, success: bool, result: Any, error: str = ''): self.succeeded = success self.result = result self.error = error
def __bool__(self): return self.succeeded def __str__(self): if self.succeeded: return f'MotionPlanningResult(SUCC: {self.result})' return f'MotionPlanningResult(FAIL: error="{self.error}")' def __repr__(self): return str(self)
[docs] @classmethod def success(cls, result: Any): return cls(True, result)
[docs] @classmethod def fail(cls, error: str): return cls(False, None, error)
[docs] class SingleArmMotionPlanningInterface(object): """General interface for robot arms. It specifies a set of basic operations for motion planning: - ``ik``: inverse kinematics. - ``fk``: forward kinematics. - ``jac``: jacobian matrix. - ``coriolis``: coriolis torque. - ``mass``: mass matrix. """
[docs] def __init__(self, planning_world: Optional[PlanningWorldInterface] = None): self._planning_world = planning_world
@property def planning_world(self) -> Optional[PlanningWorldInterface]: return self._planning_world
[docs] def set_planning_world(self, planning_world: Optional[PlanningWorldInterface]): self._planning_world = planning_world
[docs] def get_default_configuration_space_max_stepdiff(self) -> float: """Get the default maximum step difference for the configuration space. Default is 0.02 rad.""" # return np.pi / 180 * 5 return 0.02
[docs] def get_configuration_space(self, max_stepdiff: Optional[float] = None) -> BoxConfigurationSpace: if max_stepdiff is None: max_stepdiff = self.get_default_configuration_space_max_stepdiff() lower, upper = self.joint_limits return BoxConfigurationSpace( [Range(l, u) for l, u in zip(lower, upper)], cspace_max_stepdiff=max_stepdiff )
[docs] def get_collision_free_problem_space(self, ignored_collision_bodies: Optional[List[Union[str, int]]] = None, max_stepdiff: Optional[float] = None) -> CollisionFreeProblemSpace: if ignored_collision_bodies is not None: def collide_fn(qpos: VecNf): return self.check_collision(qpos, ignored_collision_bodies=ignored_collision_bodies) else: collide_fn = self.check_collision return CollisionFreeProblemSpace(self.get_configuration_space(max_stepdiff=max_stepdiff), collide_fn)
[docs] def check_collision( self, qpos: Optional[VecNf] = None, ignore_self_collision: bool = True, ignored_collision_bodies: Optional[List[Union[str, int]]] = None, checkpoint_world_state: Optional[bool] = None, return_list: bool = False ) -> Union[bool, List[Union[str, int]]]: """Check if the current configuration is in collision. Args: qpos: the configuration to check. If None, it will use the current configuration. ignore_self_collision: whether to ignore the collision between the robot and itself (including the attached objects). ignored_collision_bodies: a list of identifiers of the bodies to ignore. checkpoint_world_state: whether to checkpoint the world state before checking collision. If None, we will checkpoint the world state if qpos is not None. return_list: whether to return the list of collision bodies. Returns: True if the configuration is in collision, False otherwise. """ if checkpoint_world_state is None: checkpoint_world_state = qpos is not None if checkpoint_world_state: with self.planning_world.checkpoint_world(): return self.check_collision(qpos, ignore_self_collision=ignore_self_collision, ignored_collision_bodies=ignored_collision_bodies, checkpoint_world_state=False, return_list=return_list) if qpos is not None: self.set_qpos(qpos) if ignore_self_collision: if ignored_collision_bodies is None: ignored_collision_bodies = list() if (attached := self.get_attached_objects()) is not None: for attachment in attached: ignored_collision_bodies.append(attachment.body_b) return self.planning_world.check_collision_with_other_objects(self.body_id, ignore_self_collision=ignore_self_collision, ignored_collision_bodies=ignored_collision_bodies, return_list=return_list)
[docs] def rrt_collision_free(self, qpos1: np.ndarray, qpos2: Optional[np.ndarray] = None, ignored_collision_bodies: Optional[List[Union[str, int]]] = None, smooth_fine_path: bool = False, **kwargs) -> Tuple[bool, Optional[List[np.ndarray]]]: """RRT-based collision-free path planning. Args: qpos1: Start position. qpos2: End position. If None, the current position is used. ignored_collision_bodies: IDs of the objects to ignore in collision checking. Defaults to None. smooth_fine_path: Whether to smooth the path. Defaults to False. kwargs: Additional arguments. Returns: bool: True if the path is collision-free. List[np.ndarray]: Joint configuration trajectory. """ if qpos2 is None: qpos2 = qpos1 qpos1 = self.get_qpos() cfree_pspace = self.get_collision_free_problem_space(ignored_collision_bodies=ignored_collision_bodies) with self.planning_world.checkpoint_world(): path = birrt(cfree_pspace, qpos1, qpos2, smooth_fine_path=smooth_fine_path, **kwargs) if path[0] is not None: return True, path[0] return False, None
@property def nr_joints(self) -> int: return self.get_nr_joints()
[docs] def get_nr_joints(self) -> int: raise NotImplementedError()
@property def joint_limits(self) -> Tuple[np.ndarray, np.ndarray]: lower, upper = self.get_joint_limits() return np.asarray(lower), np.asarray(upper)
[docs] def get_joint_limits(self) -> Tuple[np.ndarray, np.ndarray]: raise NotImplementedError()
@property def body_id(self) -> int: return self.get_body_id()
[docs] def get_body_id(self) -> int: raise NotImplementedError()
@property def ee_link_id(self) -> int: return self.get_ee_link_id() @property def ee_default_quat(self) -> Vec4f: return self.get_ee_default_quat()
[docs] def get_ee_default_quat(self) -> Vec4f: """Get the default end-effector quaternion. This is defined by the quaternion of the EE when 1. the robot base is at the origin, 2. the "down" direction is (0, 0, -1), and 3. the "forward" direction is (1, 0, 0). Returns: Vec4f: The default end-effector quaternion. """ raise NotImplementedError()
[docs] def get_qpos(self) -> np.ndarray: return np.asarray(self._get_qpos())
def _get_qpos(self) -> VecNf: raise NotImplementedError()
[docs] def set_qpos(self, qpos: VecNf): self._set_qpos(np.asarray(qpos))
def _set_qpos(self, qpos: np.ndarray): raise NotImplementedError()
[docs] def get_ee_pose(self) -> Tuple[Vec3f, Vec4f]: return self._planning_world.get_link_pose(self.body_id, self.ee_link_id)
[docs] def get_attached_objects(self) -> List[AttachmentInfo]: return self._get_attached_objects()
def _get_attached_objects(self) -> List[AttachmentInfo]: raise NotImplementedError()
[docs] def add_attachment(self, body: Union[str, int], link: Optional[int] = None, self_link: Optional[int] = None, ee_to_object: Optional[Tuple[Vec3f, Vec4f]] = None) -> Any: return self._add_attachment(body, link, self_link, ee_to_object)
def _add_attachment(self, body: Union[str, int], link: Optional[int] = None, self_link: Optional[int] = None, ee_to_object: Optional[Tuple[Vec3f, Vec4f]] = None) -> Any: raise NotImplementedError()
[docs] def remove_attachment(self, body: Union[str, int], link: Optional[int] = None, self_link: Optional[int] = None): self._remove_attachment(body, link, self_link)
def _remove_attachment(self, body: Union[str, int], link: Optional[int] = None, self_link: Optional[int] = None): raise NotImplementedError()
[docs] def fk(self, qpos: VecNf) -> Tuple[Vec3f, Vec4f]: return self._fk(np.asarray(qpos))
def _fk(self, qpos: np.ndarray) -> Tuple[Vec3f, Vec4f]: raise NotImplementedError()
[docs] def ik(self, pos: Union[Vec3f, Tuple[Vec3f, Vec4f]], quat: Optional[Vec4f] = None, qpos: Optional[VecNf] = None, max_distance: Optional[float] = None) -> Optional[np.ndarray]: pos, quat = canonicalize_pose(pos, quat) return self._ik(pos, quat, qpos, max_distance=max_distance)
def _ik(self, pos: np.ndarray, quat: np.ndarray, qpos: Optional[np.ndarray] = None, max_distance: Optional[float] = None) -> Optional[np.ndarray]: raise NotImplementedError()
[docs] def gen_ik(self, pos: Union[Vec3f, Tuple[Vec3f, Vec4f]], quat: Optional[Vec4f] = None, qpos: Optional[VecNf] = None, max_distance: Optional[float] = None, max_returns: int = 5) -> Iterator[np.ndarray]: pos, quat = canonicalize_pose(pos, quat) for i in range(max_returns): rv = self._ik(pos, quat, qpos, max_distance=max_distance) if rv is not None: yield rv
[docs] def jacobian(self, qpos: VecNf) -> np.ndarray: return self._jacobian(np.asarray(qpos))
def _jacobian(self, qpos: np.ndarray) -> np.ndarray: raise NotImplementedError()
[docs] def coriolis(self, qpos: VecNf, qvel: VecNf) -> np.ndarray: return self._coriolis(np.asarray(qpos), np.asarray(qvel))
def _coriolis(self, qpos: np.ndarray, qvel: np.ndarray) -> np.ndarray: raise NotImplementedError()
[docs] def mass(self, qpos: VecNf) -> np.ndarray: return self._mass(np.asarray(qpos))
def _mass(self, qpos: np.ndarray) -> np.ndarray: raise NotImplementedError()
[docs] def calc_differential_ik_qpos_diff(self, current_qpos: VecNf, current_pose: Tuple[Vec3f, Vec4f], next_pose: Tuple[Vec3f, Vec4f]) -> np.ndarray: """Use the differential IK to compute the joint difference for the given pose difference.""" current_pose = canonicalize_pose(current_pose) next_pose = canonicalize_pose(next_pose) J = self.jacobian(current_qpos) # 6 x N solution = np.linalg.lstsq(J, pose_difference(current_pose, next_pose), rcond=None)[0] return solution
[docs] def calc_ee_pose_from_single_attached_object_pose(self, pos: Vec3f, quat: Vec4f) -> Tuple[Vec3f, Vec4f]: """Get the end-effector pose given the desired pose of the attached object.""" attached_object = self.get_attached_objects() if len(attached_object) != 1: raise ValueError('The end-effector should have exactly one attached object.') attached_object = attached_object[0].body_b world_to_ee = self.get_ee_pose() world_to_obj = self._planning_world.get_object_pose(attached_object) ee_to_tool = get_transform_a_to_b(world_to_ee[0], world_to_ee[1], world_to_obj[0], world_to_obj[1]) return solve_ee_from_tool(pos, quat, ee_to_tool)
[docs] def calc_ee_quat_from_vectors(self, u: Vec3f = (-1., 0., 0.), v: Vec3f = (1., 0., 0.)) -> Vec4f: """Compute the quaternion from two directions (the "down" direction for the end effector and the "forward" direction for the end effector). Args: u: the "down" direction for the end effector. v: the "forward" direction for the end effector. """ return calc_ee_quat_from_directions(u, v, self.get_ee_default_quat())
[docs] class RobotControllerExecutionFailed(CrowControllerExecutionError): pass
[docs] class RobotControllerExecutionContext(object):
[docs] def __init__(self, controller_name, *args, **kwargs): self.controller_name = controller_name self.args = args self.kwargs = kwargs
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): return False
[docs] def monitor(self, rv: bool): if not rv: raise RobotControllerExecutionFailed(f"Execution of {self.controller_name} failed. Args: {self.args}, kwargs: {self.kwargs}")
[docs] class RobotArmJointTrajectory(list): pass
[docs] class RobotControllerInterfaceBase(object):
[docs] def __init__(self): self._default_parameters = dict()
@property def default_parameters(self) -> Dict[str, Dict[str, Any]]: """Get the default parameters for each primitive.""" return self._default_parameters
[docs] def set_default_parameters(self, primitive: str, **kwargs): """Set the default parameters for a specific primitive.""" if primitive not in self._default_parameters: self._default_parameters[primitive] = dict() self._default_parameters[primitive].update(kwargs)
[docs] def get_update_default_parameters(self, primitive: str, kwargs_: Dict[str, Any]) -> None: """Update the default parameters for a specific primitive.""" if primitive not in self._default_parameters: return kwargs_.update(self._default_parameters[primitive])
[docs] class SingleArmControllerInterface(RobotControllerInterfaceBase):
[docs] def get_qpos(self) -> np.ndarray: raise NotImplementedError
[docs] def get_qvel(self) -> np.ndarray: raise NotImplementedError
[docs] def get_ee_pose(self) -> Tuple[np.ndarray, np.ndarray]: raise NotImplementedError
[docs] def move_home(self) -> bool: raise NotImplementedError
[docs] def move_qpos(self, qpos: np.ndarray) -> None: raise NotImplementedError
[docs] def move_qpos_trajectory(self, trajectory: RobotArmJointTrajectory) -> None: raise NotImplementedError
[docs] def open_gripper(self) -> None: raise NotImplementedError
[docs] def close_gripper(self) -> None: raise NotImplementedError
[docs] def grasp(self, width: float = 0.05, force: float = 40) -> None: raise NotImplementedError
[docs] def move_grasp(self, approaching_trajectory: RobotArmJointTrajectory, width: float = 0.05, force: float = 40) -> None: self.move_qpos_trajectory(approaching_trajectory) self.grasp(width, force)
[docs] def move_place(self, placing_trajectory: RobotArmJointTrajectory) -> None: self.move_qpos_trajectory(placing_trajectory) self.open_gripper()