Source code for concepts.dm.crowhat.domains.pybullet_single_robot_rwi_pp_simplified

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : pybullet_single_robot_rwi_pp_simplified.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 03/27/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import itertools
import os.path as osp
from typing import Union, Tuple, Dict

import jacinle
import torch
import numpy as np

from concepts.math.frame_utils_xyzw import solve_ee_from_tool
from concepts.simulator.pybullet.client import BulletClient
from concepts.simulator.pybullet.world import BulletWorld
from concepts.simulator.pybullet.components.robot_base import BulletArmRobotBase
from concepts.simulator.pybullet.manipulation_utils.path_generation_utils import gen_qpos_path_from_ee_path
from concepts.simulator.pybullet.qddl_interface import QDDLSceneMetainfo

import concepts.dm.crow as crow
from concepts.dm.crowhat.impl.pybullet.pybullet_sim_interfaces import PyBulletRemotePerceptionInterface, PyBulletSimulationControllerInterface
from concepts.dm.crowhat.world.manipulator_interface import RobotArmJointTrajectory
from concepts.dm.crowhat.impl.pybullet.pybullet_planning_world_interface import PyBulletPlanningWorldInterface
from concepts.dm.crowhat.impl.pybullet.pybullet_manipulator_interface import PyBulletSingleArmMotionPlanningInterface
from concepts.dm.crowhat.manipulation_utils.pick_place_sampler import GraspParameter, gen_grasp_parameter, calc_grasp_approach_ee_pose_trajectory
from concepts.dm.crowhat.manipulation_utils.pick_place_sampler import PlacementParameter, gen_placement_parameter

logger = jacinle.get_logger(__name__)


[docs] class PyBulletSingleArmRWIPPSimplifiedDomain(object): domain_filename = osp.join(osp.dirname(__file__), 'RWI-pp-simplified.cdl')
[docs] def __init__(self, robot: BulletArmRobotBase, scene_metainfo: QDDLSceneMetainfo, verbose: bool = False, check_z_in_on_test: bool = False, use_specialized_block_placement_sampler: bool = True): """Initialize the RWI-PP-Simplified domain. Args: robot: the robot in the PyBullet environment. scene_metainfo: the metainfo of the scene. verbose: whether to print verbose information. check_z_in_on_test: whether to check object1.z > object2.z when testing "on(object1, object2)". use_specialized_block_placement_sampler: whether to use a specialized block placement sampler. When the object to be placed onto is a block, we will use a specialized sampler that always return the "center" of the block as the placement position. """ self.robot = robot self.world = self.robot.world self.client = self.robot.client self.metainfo = scene_metainfo self.verbose = verbose self.world_interface = PyBulletPlanningWorldInterface(self.client) self.robot_interface = PyBulletSingleArmMotionPlanningInterface(self.robot, self.world_interface) self.domain = crow.load_domain_file(type(self).domain_filename) self.executor = crow.CrowExecutor(self.domain) self.check_z_in_on_test = check_z_in_on_test self.use_specialized_block_placement_sampler = use_specialized_block_placement_sampler self.bind_executor_functions()
world: BulletWorld """The PyBullet world.""" client: BulletClient """The PyBullet client.""" robot: BulletArmRobotBase """The Panda robot in the PyBullet environment.""" metainfo: QDDLSceneMetainfo """The metainfo of the scene.""" verbose: bool """Whether to print verbose information.""" domain: crow.CrowDomain """The domain of the RWI-PP-Simplified domain.""" executor: crow.CrowExecutor """The executor of the RWI-PP-Simplified domain.""" _initial_object_poses: Dict[str, Tuple[np.ndarray, np.ndarray]]
[docs] def bind_perception_interface(self, perception_interface: PyBulletRemotePerceptionInterface): perception_interface.register_bullet_client(self.client) perception_interface.register_state_getter(self._get_state)
[docs] def bind_simulation_interface(self, simulation_interface: PyBulletSimulationControllerInterface): simulation_interface.register_state_getter(self._get_state)
def _get_state(self, interface: Union[PyBulletRemotePerceptionInterface, PyBulletSimulationControllerInterface]) -> crow.CrowState: objects = dict() robot_name = None for name, info in self.metainfo.objects.items(): if info.id == self.robot.get_body_id(): robot_name = name object_type = 'Hand' else: object_type = 'Object' objects[name] = object_type state = crow.CrowState.make_empty_state(self.domain, objects) for name, info in self.metainfo.objects.items(): index = info.id if index == self.robot.get_body_id(): state.fast_set_value('qpos_of', [name], torch.tensor(self.robot.get_qpos(), dtype=torch.float32)) else: state.fast_set_value('pose_of', [name], torch.tensor(self.world.get_body_state_by_id(index).get_7dpose(), dtype=torch.float32)) if 'support' in self.domain.features: for name, info in self.metainfo.objects.items(): if info.id != self.robot.get_body_id(): for name2 in self.world.get_supporting_objects_by_id(info.id): if self.metainfo.objects[name2].id != self.robot.get_body_id(): state.fast_set_value('support', [name, name2], True) for name, info in self.metainfo.objects.items(): if info.moveable: state.fast_set_value('moveable', [name], True) if hasattr(self.robot, 'gripper_constraint'): if self.robot.gripper_constraint is None: state.fast_set_value('hand_available', [robot_name], True) else: constraint = self.world.get_constraint(self.robot.gripper_constraint) name = self.world.body_names[constraint.child_body] state.fast_set_value('hand_available', [robot_name], False) state.fast_set_value('holding', [robot_name, name], True) else: state.fast_set_value('hand_available', [robot_name], True) return state
[docs] def bind_executor_functions(self): self._init_object_poses_table() @crow.config_function_implementation(support_batch=False) def on_with_pose(x, y, x_pose, y_pose): if self.check_z_in_on_test: rv = (x_pose[0] - y_pose[0]) ** 2 + (x_pose[1] - y_pose[1]) ** 2 < 0.05 and (x_pose[2] - y_pose[2]) > 0.0 else: rv = (x_pose[0] - y_pose[0]) ** 2 + (x_pose[1] - y_pose[1]) ** 2 < 0.05 return rv @crow.config_function_implementation(is_iterator=True) def sample_grasp_inner(robot_name, target_name): target_id = self.metainfo.get_object_identifier(target_name) sampler = gen_grasp_parameter(self.world_interface, self.robot_interface, target_id, 0.08, max_test_points_before_first=200, verbose=False) if 'box' in self.world.get_body_name(target_id): sampler = itertools.islice(sampler, 5) for grasp_param in sampler: yield grasp_param @crow.config_function_implementation(is_iterator=True) def sample_placement_inner(robot_id, target_name, support_name): target_id = self.metainfo.get_object_identifier(target_name) support_id = self.metainfo.get_object_identifier(support_name) if support_id == self.world.get_body_index('table'): # NB(Jiayuan Mao @ 2023/04/13): try two different placements: the original placement, and a random placement. pos, quat = self._initial_object_poses[target_name] x = np.random.rand() * 0.2 + 0.1 y = np.random.rand() * 0.8 - 0.4 yield torch.tensor([x, y, 0.1, quat[0], quat[1], quat[2], quat[3]], dtype=torch.float32) yield torch.tensor([pos[0], pos[1], 0.1, quat[0], quat[1], quat[2], quat[3]], dtype=torch.float32) else: if self.use_specialized_block_placement_sampler and self.world.get_collision_shape_data_by_id(support_id)[0].shape_type == 3: target_pos, target_quat = self.world.get_body_state_by_id(target_id).get_transformation() support_pos, support_quat = self.world.get_body_state_by_id(support_id).get_transformation() yield torch.tensor([support_pos[0], support_pos[1], support_pos[2] + 0.05, target_quat[0], target_quat[1], target_quat[2], target_quat[3]], dtype=torch.float32) else: for param in gen_placement_parameter(self.world_interface, target_id, support_id, retain_target_orientation=True): yield torch.tensor([param.target_pos[0], param.target_pos[1], param.target_pos[2], param.target_quat[0], param.target_quat[1], param.target_quat[2], param.target_quat[3]], dtype=torch.float32) @crow.config_function_implementation(is_iterator=True) def sample_grasp_trajectory_inner(robot_name, target_name, param: GraspParameter): target_id = self.metainfo.get_object_identifier(target_name) trajectory = calc_grasp_approach_ee_pose_trajectory(param, 0.05) succ, qpos_trajectory = gen_qpos_path_from_ee_path(self.robot, trajectory) if succ: # If the trajectory is valid, we then do cfree planning to generate a trajectory from the current qpos to the first qpos. succ, qpos_trajectory_first = self.robot.rrt_collision_free(qpos_trajectory[0], smooth_fine_path=True) if succ: yield RobotArmJointTrajectory(list(qpos_trajectory_first) + qpos_trajectory[1:]) else: if self.verbose: logger.warning('Failed to generate a valid grasp trajectory.') @crow.config_function_implementation(is_iterator=True) def sample_placement_trajectory_inner(robot_name, target_name, support_name, tensor_param: torch.Tensor): target_id = self.metainfo.get_object_identifier(target_name) support_id = self.metainfo.get_object_identifier(support_name) param = PlacementParameter( object_id=target_id, support_id=support_id, target_pos=np.array((tensor_param[0], tensor_param[1], tensor_param[2]), dtype=np.float32), target_quat=np.array((tensor_param[3], tensor_param[4], tensor_param[5], tensor_param[6]), dtype=np.float32), support_normal=np.array([0, 0, 1], dtype=np.float32), ) ee_to_target = self.robot.get_ee_to_tool(target_id) trajectory = list() for placement_height in [0.09, 0.08, 0.07, 0.06, 0.05, 0.03, 0.01]: robot_start_pos, robot_start_quat = solve_ee_from_tool(param.target_pos + param.support_normal * placement_height, param.target_quat, ee_to_target) trajectory.append((robot_start_pos, robot_start_quat)) succ, qpos_trajectory = gen_qpos_path_from_ee_path(self.robot, trajectory, max_pairwise_distance=0.5) if not succ: import ipdb; ipdb.set_trace() if succ: last_qpos = self.robot.get_qpos() cfree_qpos_trajectory = list() for qpos in qpos_trajectory: succ, cfree_qpos = self.robot.rrt_collision_free(last_qpos, qpos, smooth_fine_path=True) if succ: cfree_qpos_trajectory.extend(cfree_qpos) last_qpos = qpos else: succ = False break if succ: yield RobotArmJointTrajectory(cfree_qpos_trajectory) else: if self.verbose: logger.warning('Failed to generate a valid placement trajectory.') @crow.config_function_implementation(support_batch=False) def _is_valid_grasp_param(robot_id, target_id, param: GraspParameter): return True @crow.config_function_implementation(support_batch=False) def _is_valid_place_param(robot_id, target_id, support_id, param: torch.Tensor): return True @crow.config_function_implementation(support_batch=False) def _is_valid_grasp_traj(robot_id, target_id, param: GraspParameter, traj: RobotArmJointTrajectory): return True @crow.config_function_implementation(support_batch=False) def _is_valid_place_traj(robot_id, target_id, support_id, param: torch.Tensor, traj: RobotArmJointTrajectory): return True self.executor.register_function('on_with_pose', on_with_pose) self.executor.register_function('gen_valid_grasp', sample_grasp_inner) self.executor.register_function('gen_valid_placement', sample_placement_inner) self.executor.register_function('gen_valid_grasp_traj', sample_grasp_trajectory_inner) self.executor.register_function('gen_valid_placement_traj', sample_placement_trajectory_inner) self.executor.register_function('valid_grasp', _is_valid_grasp_param) self.executor.register_function('valid_grasp_traj', _is_valid_grasp_traj) self.executor.register_function('valid_placement', _is_valid_place_param) self.executor.register_function('valid_placement_traj', _is_valid_place_traj)
def _init_object_poses_table(self): self._initial_object_poses = dict() for body_id, body_name in self.world.body_names: self._initial_object_poses[body_id] = self.world.get_body_state_by_id(body_id).get_transformation()