Source code for concepts.simulator.pybullet.components.panda.panda_robot

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : panda_robot.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 08/23/2022
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import itertools
import numpy as np
import pybullet as p
from typing import Any, Optional, Union, Iterable, Sequence, Tuple, Dict

from jacinle.logging import get_logger
from jacinle.utils.defaults import ARGDEF, default_args

from concepts.math.interpolation_utils import gen_linear_spline, get_next_target_linear_spline
from concepts.math.rotationlib_xyzw import quat_mul, quat_conjugate, rotate_vector, quat_diff_in_axis_angle
from concepts.utils.typing_utils import VecNf

from concepts.simulator.pybullet.client import BulletClient
from concepts.simulator.pybullet.world import ConstraintInfo, BulletSaver, BulletWorld
from concepts.simulator.pybullet.components.robot_base import BulletArmRobotBase, GripperObjectIndices, BulletRobotActionPrimitive
from concepts.simulator.pybullet.control_utils import get_os_imp_control_command, get_default_os_imp_control_parameters

logger = get_logger(__file__)

np.set_printoptions(precision=5, suppress=True)

__all__ = [
    'PandaRobot', 'PandaRobotMagicGripperStateSaver',
    'PandaReachTwoStage', 'PandaPlanarPush', 'PandaPushTwoStage', 'PandaPickAndPlace'
]


[docs] class PandaRobot(BulletArmRobotBase): # PANDA_FILE = 'assets://franka_panda/panda.urdf' # PANDA_JOINT_HOMES = np.array([0.98, 0.458, 0.31, -2.24, -0.30, 2.66, 2.32]) PANDA_FILE = 'assets://robots/franka_description/robots/panda_arm_hand_with_inertial.urdf' PANDA_JOINT_HOMES = np.array([-0.45105, -0.38886, 0.45533, -2.19163, 0.13169, 1.81720, 0.51563]) PANDA_GRIPPER_HOME = 0.04 PANDA_GRIPPER_OPEN = 0.04 PANDA_GRIPPER_CLOSE = 0.00
[docs] def __init__( self, client: BulletClient, body_id: Optional[int] = None, body_name: str = 'panda', gripper_objects: Optional[GripperObjectIndices] = None, use_magic_gripper: bool = True, pos: Optional[np.ndarray] = None, quat: Optional[np.ndarray] = None, ik_fast: bool = False, ): super().__init__(client, body_name, gripper_objects) if body_id is not None: self.panda = body_id else: self.panda = client.load_urdf(type(self).PANDA_FILE, pos=pos, quat=quat, static=True, body_name=body_name, group=None) all_joints = client.w.get_joint_info_by_body(self.panda) self.panda_joints = [joint.joint_index for joint in all_joints if joint.joint_type == p.JOINT_REVOLUTE] self.gripper_joints = [joint.joint_index for joint in all_joints if joint.joint_type == p.JOINT_PRISMATIC] self.full_joints = self.panda_joints + self.gripper_joints assert set(self.gripper_joints) == {9, 10} self.panda_ee_tip = 11 self.ik_fast = ik_fast self.ik_fast_wrapper = None # for joint_index in self.panda_joints: # self.client.p.changeDynamics(self.panda, joint_index, linearDamping=0.0, angularDamping=0.0) # Create a constraint to keep the fingers centered # https://github.com/bulletphysics/bullet3/blob/daadfacfff365852ffc96f373c834216a25b11e5/examples/pybullet/gym/pybullet_robots/panda/panda_sim_grasp.py#L46-L54 c = self.client.p.createConstraint( self.panda, 9, self.panda, 10, jointType=p.JOINT_GEAR, jointAxis=[1, 0, 0], parentFramePosition=[0, 0, 0], childFramePosition=[0, 0, 0] ) self.client.p.changeConstraint(c, gearRatio=-1, erp=0.1, maxForce=50) joint_info = [self.client.w.get_joint_info_by_id(self.panda, i) for i in self.panda_joints] self.panda_joints_lower = np.array([joint.joint_lower_limit for joint in joint_info]) self.panda_joints_upper = np.array([joint.joint_upper_limit for joint in joint_info]) self.panda_leftfinger = self.w.link_names[f'{self.body_name}/panda_leftfinger'].link_id self.panda_rightfinger = self.w.link_names[f'{self.body_name}/panda_rightfinger'].link_id # State variable: gripper_activated. True if the gripper is activated. # For all control actions, make sure to call self._set_gripper_control before steping the simulation. self.gripper_activated = False self.use_magic_gripper = use_magic_gripper self.gripper_constraint = None if body_id is None: self.reset_home_qpos() self.ee_home_pos, self.ee_home_quat = self.get_ee_pose() self.reach_two_stage = PandaReachTwoStage(self) self.planar_push = PandaPlanarPush(self) self.push_two_stage = PandaPushTwoStage(self) self.pick_and_place = PandaPickAndPlace(self) self._cspace = None self._cfree_default_pspace = None self.register_action('move_pose', self.move_pose) self.register_action('move_home', self.move_home) self.register_action('move_home_cfree', self.move_home_cfree) self.register_action('grasp', self.grasp) self.register_action('open_gripper_free', self.open_gripper_free) self.register_action('close_gripper_free', self.close_gripper_free) self.register_action('reach_two_stage', self.reach_two_stage) self.register_action('planar_push', self.planar_push) self.register_action('push_two_stage', self.push_two_stage) self.register_action('pick_and_place', self.pick_and_place) self.register_action('reach_two_stage', self.reach_two_stage, interface='franka') self.register_action('planar_push', self.planar_push, interface='franka') self.register_action('pick_and_place', self.pick_and_place, interface='franka') self.world.register_managed_interface('PandaRobot@' + str(self.panda), self) self.world.register_additional_state_saver( 'PandaRobot@' + str(self.panda), lambda: PandaRobotMagicGripperStateSaver(self.client.client_id, self.world, 'PandaRobot@' + str(self.panda)) )
[docs] def reset_home_qpos(self): self.client.w.set_batched_qpos_by_id(self.panda, self.panda_joints, type(self).PANDA_JOINT_HOMES) self.client.w.set_batched_qpos_by_id(self.panda, self.gripper_joints, [type(self).PANDA_GRIPPER_HOME] * 2) self.gripper_activated = None
[docs] def get_body_id(self) -> int: return self.panda
[docs] def get_joint_ids(self) -> Sequence[int]: return self.panda_joints
[docs] def get_home_qpos(self) -> np.ndarray: return np.asarray(type(self).PANDA_JOINT_HOMES)
[docs] def get_full_joint_ids(self) -> Sequence[int]: return self.full_joints
[docs] def get_full_home_qpos(self) -> np.ndarray: return np.concatenate([self.get_home_qpos(), [type(self).PANDA_GRIPPER_HOME] * 2])
[docs] def get_ee_home_pos(self) -> np.ndarray: return self.ee_home_pos
[docs] def get_ee_home_quat(self) -> np.ndarray: return self.ee_home_quat
[docs] def get_ee_default_quat(self) -> np.ndarray: return np.array([0.0, 1.0, 0.0, 0.0])
[docs] def get_gripper_body_id(self) -> int: return self.panda
[docs] def get_gripper_joint_ids(self) -> Sequence[int]: return self.gripper_joints
[docs] def get_gripper_home_qpos(self) -> np.ndarray: return np.array([type(self).PANDA_GRIPPER_HOME] * 2)
[docs] def get_gripper_state(self) -> Optional[bool]: return self.gripper_activated
[docs] def set_qpos_with_holding(self, qpos: np.ndarray) -> None: self.set_qpos(qpos) if self.gripper_constraint is not None: # TODO(Jiayuan Mao @ 2023/03/02): should consider the actual link that the gripper is attached to, and self-collision, etc. constraint = self.p.getConstraintInfo(self.gripper_constraint) other_body_id = constraint[2] parent_frame_pos = constraint[6] parent_frame_orn = constraint[8] ee_link = self.panda_ee_tip body_pose = self.world.get_link_state_by_id(self.panda, ee_link, fk=True) # body_pose = self.p.getLinkState(self.panda, self.w.link_names[f'{self.body_name}/panda_leftfinger'].link_id) obj_to_body = (parent_frame_pos, parent_frame_orn) obj_pose = p.multiplyTransforms(body_pose[0], body_pose[1], *obj_to_body) # print('other_body_id:', other_body_id, 'pose:', self.world.get_body_state_by_id(other_body_id).get_transformation()) self.w.set_body_state2_by_id(other_body_id, obj_pose[0], obj_pose[1])
[docs] def attach_object(self, object_id: int, ee_to_object: Tuple[np.ndarray, np.ndarray]) -> None: assert self.use_magic_gripper, 'The magic gripper is not enabled.' assert self.gripper_constraint is None, 'The gripper is already attached to an object.' obj_to_body = p.invertTransform(ee_to_object[0], ee_to_object[1]) body_pose = self.world.get_link_state_by_id(self.panda, self.panda_ee_tip, fk=True) obj_pose = p.multiplyTransforms(body_pose[0], body_pose[1], *obj_to_body) self.w.set_body_state2_by_id(object_id, obj_pose[0], obj_pose[1]) # NB(Jiayuan Mao @ 2024/08/2): We will first create a constraint between the gripper and the object --- this ensures that the object will stay the same when the gripper moves. self.create_gripper_constraint(object_id) # Next, we call the grasp() function to close the gripper. When the gripper hits the object, the object will be attached to the gripper. # In this step, it will automatically clear up the previous constraint we created and attach the object to the gripper with a new constraint. self.grasp()
def _init_ikfast(self): if self.ik_fast_wrapper is None: from concepts.simulator.pybullet.pybullet_ikfast_utils import IKFastPyBulletWrapper import concepts.simulator.ikfast.franka_panda.ikfast_panda_arm as ikfast_module self.ik_fast_wrapper = IKFastPyBulletWrapper( self.world, ikfast_module, body_id=self.panda, joint_ids=self.panda_joints, free_joint_ids=[self.panda_joints[-1]], use_xyzw=True )
[docs] def ikfast(self, pos: np.ndarray, quat: np.ndarray, last_qpos: Optional[np.ndarray] = None, max_attempts: int = 1000, max_distance: float = float('inf'), error_on_fail: bool = True, verbose: bool = False) -> Optional[np.ndarray]: self._init_ikfast() if verbose: print('Solving IK for pos:', pos, 'quat:', quat) # NB(Jiayuan Mao @ 2023/08/09): relative transformation between the base and the end-effector. pos_delta = [0, 0, 0.1] # quat_delta = (0, 0, 1, 0) # quat_delta = (0, 0, 1, 0) quat_delta = (0.0, 0.0, 0.9238795325108381, 0.38268343236617297) inner_quat = quat_mul(quat, quat_conjugate(quat_delta)) inner_pos = np.array(pos) - rotate_vector(pos_delta, quat) body_state = self.world.get_body_state_by_id(self.panda) try: ik_solution = list(itertools.islice(self.ik_fast_wrapper.gen_ik( inner_pos, inner_quat, last_qpos=last_qpos, max_attempts=max_attempts, max_distance=max_distance, body_pos=body_state.position, body_quat=body_state.orientation, verbose=False ), 1))[0] except IndexError: if error_on_fail: raise return None if verbose: print('IK (solution, lower, upper):\n', np.stack([ik_solution, self.panda_joints_lower, self.panda_joints_upper], axis=0), sep='') print('FK:', self.fk(ik_solution)) return ik_solution
[docs] def is_colliding(self, q: Optional[np.ndarray] = None, return_contacts: bool = False, ignore_collision_objects: Optional[Sequence[int]] = None): """Check if the robot is colliding with other objects. When the joint configuration (q) is provided, we will set the robot to that configuration before checking the collision. Note that this function will not restore the robot to the original configuration after the check. If you want to restore the robot to the original configuration, you should use :meth:`is_colliding_with_saved_state` instead. """ if q is not None: self.set_qpos_with_holding(q) contacts = self.world.get_contact(self.panda, update=True) ignore_bodies = [self.panda] if self.gripper_constraint is not None: constraint = self.p.getConstraintInfo(self.gripper_constraint) other_body_id = constraint[2] contacts += self.world.get_contact(other_body_id, update=True) ignore_bodies.append(other_body_id) # print('other_body_id:', other_body_id, 'pose:', self.world.get_body_state_by_id(other_body_id).get_transformation()) if ignore_collision_objects is not None: ignore_bodies.extend(ignore_collision_objects) filtered_contacts = list() for c in contacts: if c.body_b not in ignore_bodies: filtered_contacts.append(c) # jacinle.log_function.print('Detected collision between', c.body_a_name, 'and', c.body_b_name) if not return_contacts: return True return False if not return_contacts else filtered_contacts
[docs] @default_args def set_arm_joint_position_control(self, target_qpos: np.ndarray, control_mode: int = p.POSITION_CONTROL, gains: float = 0.3, set_gripper_control: bool = True): vector_gains = np.ones(len(self.panda_joints)) * gains self.client.p.setJointMotorControlArray( bodyIndex=self.panda, jointIndices=self.panda_joints, controlMode=control_mode, targetPositions=target_qpos, positionGains=vector_gains, ) if set_gripper_control: self.set_gripper_control()
[docs] @default_args def set_ee_impedance_control( self, target_pos: np.ndarray, target_quat: np.ndarray, kp_pos: Union[float, VecNf] = 200, kp_ori: Union[float, VecNf] = 1, kd_pos: Optional[Union[float, VecNf]] = None, kd_ori: Optional[Union[float, VecNf]] = 0.01, max_torque: float = 100, damping_scale: float = 2.0, simulate_with_position_pd: bool = False, tau_to_qpos_ratio: float = 0.0005, set_gripper_control: bool = True, verbose: bool = True, ): """Set the end-effector impedance control command. Args: target_pos: the target position of the end-effector. target_quat: the target orientation of the end-effector. kp_pos: the proportional gains for the position control. kp_ori: the proportional gains for the orientation control. kd_pos: the derivative gains for the position control. kd_ori: the derivative gains for the orientation control. It is recommended to manual set this to a small value to avoid oscillation (instead of using the critical damping). max_torque: the maximum torque allowed for the robot. damping_scale: the scale for the damping term (as a multiplicative term to the critical damping). simulate_with_position_pd: if True, we will simulate the torque control using the position control. This is useful for debugging and it can be more stable. tau_to_qpos_ratio: the ratio between the torque and the joint position. This is only used when `simulate_with_position_pd` is True. We will use `qpos += tau * tau_to_qpos_ratio` to simulate the position control. set_gripper_control: if True, the gripper control will be set. verbose: if True, the debug information will be printed. """ config = get_default_os_imp_control_parameters(kp_pos, kp_ori, kd_pos=kd_pos, kd_ori=kd_ori, damping_scale=damping_scale) curr_pos, curr_quat = self.get_ee_pose() curr_vel, curr_omg = self.get_ee_velocity() J = self.get_jacobian() tau = get_os_imp_control_command(curr_pos, curr_quat, target_pos, target_quat, curr_vel, curr_omg, J, config) # config['P_ori'][:] = 1.0 # config['D_ori'][:] = 0.01 # F_p = np.concatenate([config['P_pos'] * delta_pos, config['P_ori'] * delta_ori]) # F_d = np.concatenate([config['D_pos'] * curr_vel, config['D_ori'] * curr_omg]) # F = F_p - F_d # Equivalent to F = Kp * (x_d - x) + Kd * (0 - x_dot) # tau = np.dot(J.T, F) if verbose: delta_pos = (target_pos - curr_pos) delta_ori = quat_diff_in_axis_angle(target_quat, curr_quat) print('EE pos:', tuple(curr_pos), 'quat:', tuple(curr_quat)) print('EE vel:', tuple(curr_vel), 'omg: ', tuple(curr_omg)) print('Delta pos:', tuple(delta_pos), 'norm', np.linalg.norm(delta_pos), 'delta ori:', tuple(delta_ori), 'norm', np.linalg.norm(delta_ori)) print('Tau norm:', np.linalg.norm(tau, ord=2)) # input('Press Enter to continue...') if simulate_with_position_pd: tau = np.clip(tau, -max_torque, max_torque) qpos = self.get_qpos() qpos += tau * tau_to_qpos_ratio self.client.p.setJointMotorControlArray( bodyIndex=self.panda, jointIndices=self.panda_joints, controlMode=p.POSITION_CONTROL, targetPositions=qpos, positionGains=np.ones(len(self.panda_joints)) * 1.0, ) else: tau += self.get_coriolis_torque() tau = np.clip(tau, -max_torque, max_torque) self.client.p.setJointMotorControlArray( bodyIndex=self.panda, jointIndices=self.panda_joints, controlMode=p.VELOCITY_CONTROL, forces=np.zeros_like(tau), ) self.client.p.setJointMotorControlArray( bodyIndex=self.panda, jointIndices=self.panda_joints, controlMode=p.TORQUE_CONTROL, forces=tau, ) if set_gripper_control: self.set_gripper_control()
[docs] def set_gripper_control(self, target: Optional[float] = None): if target is None: target = type(self).PANDA_GRIPPER_CLOSE if self.gripper_activated else type(self).PANDA_GRIPPER_OPEN for joint_index in self.gripper_joints: self.client.p.setJointMotorControl2( self.panda, joint_index, controlMode=p.POSITION_CONTROL, targetPosition=target, force=40 )
[docs] @default_args def move_qpos_set_control(self, target_qpos, speed=0.01, timeout=10.0, gains=ARGDEF, local_smoothing: bool = True): """Move the robot to the target joint configuration. Args: target_qpos: the target joint configuration. speed: the maximum speed of the robot. timeout: the maximum time allowed for the robot to reach the target joint configuration. gains: the gains for the joint position control. local_smoothing: if True, the robot will move to the target joint configuration smoothly. This is implemented by setting intermediate joint configurations between the current joint configuration and the target joint configuration. If False, the PD controller will directly move the robot to the target joint configuration. """ last_qpos = None for _ in self.client.timeout(timeout): current_qpos = self.client.w.get_batched_qpos_by_id(self.panda, self.panda_joints) diff = target_qpos - current_qpos norm = np.linalg.norm(diff, ord=2) rel_diff, rel_norm = None, 1e9 if last_qpos is not None: rel_diff = last_qpos - current_qpos rel_norm = np.linalg.norm(rel_diff, ord=1) else: last_qpos = current_qpos rel_norm = 1e9 # pos, quat = self.get_ee_pose() # print(' current pos', pos, 'quat', quat) # print(' current/target joint state\n', np.stack([current_qpos, target_qpos], axis=0), sep='') # print(' diff', diff, 'norm', norm) # if last_qpos is not None: # print(' rel_diff', rel_diff, 'rel_norm', rel_norm) if norm < 0.01: return True # if rel_norm < 0.001: # return False if local_smoothing: # If speed is bigger than norm, we clip it to norm. step_qpos = current_qpos + diff / norm * min(speed, norm) else: step_qpos = target_qpos self.set_arm_joint_position_control(step_qpos, gains=gains, set_gripper_control=True) yield if not self.warnings_suppressed: logger.warning(f'{self.body_name}: Moving timeout ({timeout}s).') return False
[docs] def move_qpos(self, target_qpos, speed=ARGDEF, timeout=ARGDEF, gains=ARGDEF, local_smoothing: bool = True) -> bool: try: for _ in self.move_qpos_set_control(target_qpos, speed=speed, timeout=timeout, gains=gains, local_smoothing=local_smoothing): self.client.step() return True except StopIteration as e: return e.value
[docs] @default_args def move_qpos_path_v2_set_control( self, qpos_path: Iterable[np.ndarray], step_size: float = 1, gains: float = 0.3, atol: float = 0.03, timeout: float = 20, verbose: bool = False, return_world_states: bool = False, ): qpos_path = dedup_qpos_path(qpos_path) spl = gen_linear_spline(qpos_path) prev_qpos = None prev_qpos_not_moving = 0 next_id = None world_states = [] for _ in self.client.timeout(timeout): current_qpos = self.client.w.get_batched_qpos_by_id(self.panda, self.panda_joints) # next_target = get_next_target_cubic_spline(spl, current_qpos, step_size, qpos_trajectory) next_id, next_target = get_next_target_linear_spline( spl, current_qpos, step_size, minimum_x=next_id - step_size + 0.2 if next_id is not None else None ) if verbose: print('this step size', np.linalg.norm(next_target - current_qpos, ord=1)) print('next_id', next_id) print('this_target (lower, current, next, upper)\n', np.stack([ self.panda_joints_lower, current_qpos, next_target, self.panda_joints_upper, ]), sep='') last_norm = np.linalg.norm(qpos_path[-1] - current_qpos, ord=1) if verbose: print('last_norm', last_norm) if prev_qpos is not None: last_moving_dist = np.linalg.norm(prev_qpos - current_qpos, ord=1) if last_moving_dist < 0.001: prev_qpos_not_moving += 1 else: prev_qpos_not_moving = 0 if prev_qpos_not_moving > 10: if last_norm < atol * 10: return True else: if not self.warnings_suppressed: logger.warning(f'{self.body_name}: No progress for 10 steps.') return False prev_qpos = current_qpos if last_norm < atol: if return_world_states: return world_states return True self.set_arm_joint_position_control(next_target, gains=gains, set_gripper_control=True) yield if return_world_states: world_states.append(self.world.save_world()) if not self.warnings_suppressed: logger.warning(f'{self.body_name}: Moving timeout ({timeout}s).') return False
[docs] def move_qpos_path_v2( self, qpos_path: Iterable[np.ndarray], step_size: float = ARGDEF, gains: float = ARGDEF, atol: float = ARGDEF, timeout: float = ARGDEF, verbose: bool = False, return_world_states: bool = False, ): try: for _ in self.move_qpos_path_v2_set_control( qpos_path, step_size=step_size, gains=gains, atol=atol, timeout=timeout, verbose=verbose, return_world_states=return_world_states ): self.client.step() return True except StopIteration as e: return e.value
[docs] def move_cartesian_trajectory_set_control( self, pose_trajectory: Iterable[Tuple[np.ndarray, np.ndarray]], kp_pos: Union[float, VecNf] = ARGDEF, kp_ori: Union[float, VecNf] = ARGDEF, kd_pos: Optional[Union[float, VecNf]] = ARGDEF, kd_ori: Optional[Union[float, VecNf]] = ARGDEF, max_torque: float = ARGDEF, tau_to_qpos_ratio=ARGDEF ): for target_pos, target_quat in pose_trajectory: self.set_ee_impedance_control( target_pos, target_quat, kp_pos, kp_ori, kd_pos, kd_ori, max_torque=max_torque, tau_to_qpos_ratio=tau_to_qpos_ratio, set_gripper_control=True ) yield
[docs] def move_cartesian_trajectory( self, pose_trajectory: Iterable[Tuple[np.ndarray, np.ndarray]], kp_pos: Union[float, VecNf] = ARGDEF, kp_ori: Union[float, VecNf] = ARGDEF, kd_pos: Optional[Union[float, VecNf]] = ARGDEF, kd_ori: Optional[Union[float, VecNf]] = ARGDEF, max_torque: float = ARGDEF, tau_to_qpos_ratio: float = ARGDEF, ): try: for _ in self.move_cartesian_trajectory_set_control( pose_trajectory, kp_pos=kp_pos, kp_ori=kp_ori, kd_pos=kd_pos, kd_ori=kd_ori, max_torque=max_torque, tau_to_qpos_ratio=tau_to_qpos_ratio ): self.client.step() return True except StopIteration as e: return e.value
[docs] def internal_set_gripper_state(self, activate: bool, constraint_info: Optional[ConstraintInfo] = None) -> None: if not activate: # Turn gripper off. if self.use_magic_gripper: if self.gripper_constraint is not None: self.client.p.removeConstraint(self.gripper_constraint) self.gripper_constraint = None else: # Turn gripper on. if self.use_magic_gripper: if constraint_info is not None: self.create_gripper_constraint(constraint_info.child_body) self.gripper_activated = activate
[docs] def open_gripper_free(self, timeout: float = ARGDEF, force: bool = False) -> bool: return self._change_gripper_state_free(False, timeout=timeout, force=force)
[docs] def close_gripper_free(self, timeout: float = ARGDEF, force: bool = False) -> bool: return self._change_gripper_state_free(True, timeout=timeout, force=force)
[docs] def open_gripper_free_set_control(self, timeout: float = ARGDEF, force: bool = False): return self._change_gripper_state_free_set_control(False, timeout=timeout, force=force)
[docs] def close_gripper_free_set_control(self, timeout: float = ARGDEF, force: bool = False): return self._change_gripper_state_free_set_control(True, timeout=timeout, force=force)
@default_args def _change_gripper_state_free_set_control(self, activate: bool, timeout: float = 2.0, atol: float = 0.002, force: bool = False, verbose: bool = False): """A helper function that changes the gripper state assuming no contact with other objects. Args: activate: True to activate the gripper, False to release it. timeout: the timeout for the gripper to reach the target state. atol: the tolerance for the gripper to reach the target state. force: if True, the gripper will be forced to move to the target state. even if it is already in the target state. verbose: if True, print verbose information. """ if self.gripper_activated is not None and activate == self.gripper_activated and not force: return True if verbose: logger.info(f'{self.body_name}: Change gripper state to {activate}, use_magic_gripper={self.use_magic_gripper}, gripper_constraint={self.gripper_constraint}') if not activate: if self.use_magic_gripper: if self.gripper_constraint is not None: self.client.p.removeConstraint(self.gripper_constraint) self.gripper_constraint = None self.gripper_activated = activate target = type(self).PANDA_GRIPPER_CLOSE if activate else type(self).PANDA_GRIPPER_OPEN target_qpos = np.array([target, target]) arm_qpos = self.get_qpos() for _ in self.client.timeout(timeout): current_qpos = self.client.w.get_batched_qpos_by_id(self.panda, self.gripper_joints) diff = target_qpos - current_qpos # print('current_qpos', current_qpos, 'target_qpos', target_qpos, 'diff', diff, 'atol', atol) if all(np.abs(diff) < atol): return True self.set_arm_joint_position_control(arm_qpos, set_gripper_control=True) yield if not self.warnings_suppressed: logger.warning(f'{self.body_name}: Moving gripper timeout ({timeout}s).') return True @default_args def _change_gripper_state_free(self, activate: bool, timeout: float = ARGDEF, atol: float = ARGDEF, force: bool = False, verbose: bool = False) -> bool: """A helper function that changes the gripper state assuming no contact with other objects. Args: activate: True to activate the gripper, False to release it. timeout: the timeout for the gripper to reach the target state. atol: the tolerance for the gripper to reach the target state. force: if True, the gripper will be forced to move to the target state. even if it is already in the target state. verbose: if True, print verbose information. Returns: bool: True if the gripper state changed, False otherwise. """ try: for _ in self._change_gripper_state_free_set_control(activate, timeout=timeout, atol=atol, force=force, verbose=verbose): self.client.step() return True except StopIteration as e: return e.value
[docs] @default_args def grasp_set_control(self, timeout: float = 2, target_object: Optional[int] = None, force_constraint: bool = False, verbose: bool = False): target_qpos = np.array([type(self).PANDA_GRIPPER_CLOSE] * 2) self.gripper_activated = True for _ in self.client.timeout(timeout): current_qpos = self.w.get_batched_qpos_by_id(self.panda, self.gripper_joints) diff = target_qpos - current_qpos if self.detect_gripper_contact(verbose=verbose): return True if all(np.abs(diff) < 1e-3): return False for joint_index, target in zip(self.gripper_joints, target_qpos): self.p.setJointMotorControl2( self.panda, joint_index, controlMode=p.POSITION_CONTROL, targetPosition=target, force=40 ) yield if force_constraint: if target_object is not None and self.use_magic_gripper and self.gripper_constraint is None: self.create_gripper_constraint(target_object) if not self.warnings_suppressed: logger.warning(f'{self.body_name}: Grasping timeout ({timeout}s).') return False
[docs] def grasp(self, timeout: float = ARGDEF, target_object: Optional[int] = None, force_constraint: bool = False, verbose: bool = False) -> bool: try: for _ in self.grasp_set_control(timeout=timeout, target_object=target_object, force_constraint=force_constraint, verbose=verbose): self.client.step() return True except StopIteration as e: return e.value
[docs] def detect_gripper_contact(self, verbose: bool = False): points1 = self.client.p.getContactPoints(bodyA=self.panda, linkIndexA=self.panda_leftfinger) points2 = self.client.p.getContactPoints(bodyA=self.panda, linkIndexA=self.panda_rightfinger) objects1 = {point[2] for point in points1 if point[2] != self.panda} objects2 = {point[2] for point in points2 if point[2] != self.panda} objects_inter = list(set.intersection(objects1, objects2)) if len(objects_inter) > 0: if verbose: for obj_id in objects_inter: logger.info(f'{self.body_name}: Contact detected: {obj_id}, name = {self.w.body_names[obj_id]}.') if self.use_magic_gripper: obj_id = objects_inter[0] self.create_gripper_constraint(obj_id) return True return False
[docs] def create_gripper_constraint(self, object_id, verbose: bool = False): if self.gripper_constraint is not None: self.client.p.removeConstraint(self.gripper_constraint) self.gripper_constraint = None if verbose: logger.info(f'{self.body_name}: Creating constraint between {self.panda} and {object_id}.') ee_link = self.panda_ee_tip body_pose = self.world.get_link_state_by_id(self.panda, ee_link, fk=True) obj_pose = self.p.getBasePositionAndOrientation(object_id) world_to_body = p.invertTransform(body_pose[0], body_pose[1]) obj_to_body = p.multiplyTransforms(world_to_body[0], world_to_body[1], obj_pose[0], obj_pose[1]) self.gripper_constraint = self.p.createConstraint( parentBodyUniqueId=self.panda, parentLinkIndex=ee_link, childBodyUniqueId=object_id, childLinkIndex=-1, # should be the link index of the contact link. jointType=p.JOINT_FIXED, jointAxis=(0, 0, 0), parentFramePosition=obj_to_body[0], parentFrameOrientation=obj_to_body[1], childFramePosition=(0, 0, 0), childFrameOrientation=(0, 0, 0) )
[docs] class PandaRobotMagicGripperStateSaver(BulletSaver):
[docs] def __init__(self, client_id: int, world: BulletWorld, managed_interface: str): super().__init__(client_id, world) self.managed_interface = managed_interface self.gripper_activated = None self.gripper_constraint_info = None
@property def robot(self) -> PandaRobot: return self.world.managed_interfaces[self.managed_interface]
[docs] def save(self): self.gripper_activated = self.robot.gripper_activated if self.robot.gripper_constraint is not None: contact_info = self.world.get_constraint(self.robot.gripper_constraint) self.gripper_constraint_info = contact_info
[docs] def restore(self): self.robot.internal_set_gripper_state(self.gripper_activated, self.gripper_constraint_info)
[docs] class PandaReachTwoStage(BulletRobotActionPrimitive): robot: PandaRobot
[docs] def __call__(self, pos: np.ndarray, quat: np.ndarray, height: float = 0.2, speed: float = 0.01) -> bool: logger.info(f'{self.robot.body_name}: Moving to pose (two-stage): {pos}, {quat}.') self.robot.move_pose(pos + rotate_vector([0, 0, -height], quat), quat, speed=speed) for i in reversed(range(10)): self.robot.move_pose(pos + rotate_vector([0, 0, -height / 10 * i], quat), quat, speed=speed)
[docs] class PandaPushTwoStage(BulletRobotActionPrimitive): robot: PandaRobot
[docs] def __call__(self, start_pos: np.ndarray, normal: np.ndarray, distance: float, timeout: float = 10, prepush_height: float = 0.1, speed: float = 0.01) -> bool: self.robot.close_gripper_free() pos, quat = self.robot.get_ee_pose() self.robot.move_pose(start_pos - prepush_height * normal / np.linalg.norm(normal), quat, speed=speed) pos, quat = self.robot.get_ee_pose() end_pos = start_pos + normal * distance for _ in self.client.timeout(timeout): diff = end_pos - pos if np.linalg.norm(diff) < 0.01: return True self.robot.move_pose_smooth(pos + diff / np.linalg.norm(diff) * speed, quat, speed=speed) pos, _ = self.robot.get_ee_pose() if not self.warnings_suppressed: logger.info(f'{self.robot.body_name}: Pushing timeout ({timeout}s).') return False
[docs] class PandaPlanarPush(BulletRobotActionPrimitive): robot: PandaRobot
[docs] def __call__(self, target_pos: np.ndarray, timeout: float = 10, speed: float = 0.01, tol: float = 0.01) -> bool: target_pos = np.array(target_pos) pos, quat = self.robot.get_ee_pose() for _ in self.client.timeout(timeout): diff = target_pos - pos if np.linalg.norm(diff) < tol: return True self.robot.move_pose_smooth(pos + diff / np.linalg.norm(diff) * speed, quat, speed=speed) pos, _ = self.robot.get_ee_pose() if not self.warnings_suppressed: logger.warning(f'{self.robot.body_name}: Planar push timeout ({timeout}s).') return False
[docs] class PandaPickAndPlace(BulletRobotActionPrimitive): """A helpful pick-and-place primitive.""" robot: PandaRobot
[docs] def __call__( self, pick_pos: np.ndarray, pick_quat: np.ndarray, place_pos: np.ndarray, place_quat: np.ndarray, reach_kwargs: Optional[Dict[str, Any]] = None, grasp_kwargs: Optional[Dict[str, Any]] = None ) -> bool: pick_pos = np.array(pick_pos) pick_quat = np.array(pick_quat) pick_after_pos = pick_pos + np.array([0, 0, 0.2]) place_pos = np.array(place_pos) place_quat = np.array(place_quat) place_after_pos = place_pos + np.array([0, 0, 0.2]) if reach_kwargs is None: reach_kwargs = {} if grasp_kwargs is None: grasp_kwargs = {} self.robot.do('open_gripper_free') self.robot.do('reach_two_stage', pick_pos, pick_quat, **reach_kwargs) self.robot.do('grasp', **grasp_kwargs) self.robot.do('move_pose', pick_after_pos, pick_quat) self.robot.do('move_pose', place_pos, place_quat) self.robot.do('open_gripper_free') self.robot.do('move_pose', place_after_pos, place_quat)
[docs] def dedup_qpos_path(qpos_trajectory: Iterable[np.ndarray]) -> np.ndarray: """Canonicalize a qpos trajectory by removing duplicates.""" qpos_trajectory = np.asarray(qpos_trajectory) qpos_trajectory_dedup = list() last_qpos = None for qpos in qpos_trajectory: if qpos is None: continue if last_qpos is None or np.linalg.norm(qpos - last_qpos, ord=2) > 0.01: qpos_trajectory_dedup.append(qpos) last_qpos = qpos qpos_trajectory = np.array(qpos_trajectory_dedup) return qpos_trajectory