Source code for concepts.dm.crowhat.impl.pybullet.pybullet_sim_interfaces

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : pybullet_sim_interfaces.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 03/29/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

"""Interface with Pybullet simulation. It contains:

- Controller interface for Pybullet simulation.
- Perception interface for Pybullet simulation.
"""

import os
import tempfile
import shutil
import atexit
import queue
from typing import Any, Optional, Callable, Tuple

from jacinle.utils.printing import colored
from jacinle.utils.network import get_free_port
from jacinle.comm.service import Service, SocketServer, SocketClient
from concepts.dm.crow.interfaces.perception_interface import CrowPerceptionInterface
from concepts.dm.crow.interfaces.controller_interface import CrowSimulationControllerInterface, CrowPhysicalControllerInterface
from concepts.dm.crow.crow_domain import CrowState
from concepts.dm.crowhat.world.manipulator_interface import RobotControllerExecutionFailed
from concepts.simulator.pybullet.client import BulletClient
from concepts.simulator.pybullet.world import BulletSaver

__all__ = [
    'PyBulletSimulationControllerInterface', 'PyBulletPhysicalControllerInterface', 'PyBulletRemoteService',
    'PyBulletRemotePerceptionInterface', 'PybulletRemoteControllerInterface',
    'make_pybullet_remote_interfaces', 'make_pybullet_simulator_tcp_ports', 'make_pybullet_simulator_ipc_ports',
]


[docs] class PyBulletSimulationControllerInterface(CrowSimulationControllerInterface):
[docs] def __init__(self, bullet_client: BulletClient): super().__init__() self._bullet_client = bullet_client self._state_getter = None self._saved_states = dict() self._saved_states_counter = 0
@property def bullet_client(self): return self._bullet_client @property def saved_states(self): return self._saved_states
[docs] def reset(self): self._saved_states.clear() self._saved_states_counter = 0
[docs] def save_state(self, **kwargs) -> int: self._saved_states_counter += 1 indent = self._saved_states_counter saver = self._bullet_client.world.save_world_builtin() saver.save() self._saved_states[indent] = saver return indent
[docs] def restore_state(self, state_identifier: int, **kwargs): if state_identifier not in self._saved_states: raise ValueError(f"State {state_identifier} not found. Note that the state can only be restored once.") saver = self._saved_states[state_identifier] if isinstance(saver, BulletSaver): saver.restore() else: saver() del self._saved_states[state_identifier]
[docs] def register_controller(self, name: str, function: Callable): super().register_controller(name, function)
[docs] def register_state_getter(self, state_getter: Callable[['PyBulletSimulationControllerInterface'], CrowState]): self._state_getter = state_getter
[docs] def get_crow_state(self) -> CrowState: if self._state_getter is not None: return self._state_getter(self) raise NotImplementedError()
[docs] class PyBulletPhysicalControllerInterface(CrowPhysicalControllerInterface):
[docs] def __init__(self, bullet_client: BulletClient, dry_run: bool = False): super().__init__() self._bullet_client = bullet_client self._dry_run = dry_run
@property def bullet_client(self): return self._bullet_client
[docs] def serve( self, *, tcp_ports: Optional[Tuple[int, int]] = None, ipc_ports: Optional[Tuple[str, str]] = None, redirect_ios: bool = False, redirect_stdout: Optional[str] = '/dev/null', redirect_stderr: Optional[str] = '/dev/null' ) -> None: service = PyBulletRemoteService(self, dry_run=self._dry_run) server = SocketServer(service, 'pybullet-physical-controller-interface', tcp_port=tcp_ports, ipc_port=ipc_ports) with server.activate(): if redirect_ios: return service.mainloop(redirect_stdout=redirect_stdout, redirect_stderr=redirect_stderr) else: return service.mainloop()
[docs] class PyBulletRemoteService(Service):
[docs] def __init__(self, controller: PyBulletPhysicalControllerInterface, dry_run: bool = False): super().__init__(spec={'controllers': list(controller.controllers.keys())}) self._controller = controller self._dry_run = dry_run self.queue = queue.Queue()
[docs] def call(self, action_name, *args, **kwargs): # Create a Future to store the result of the action. q = queue.Queue(maxsize=1) self.queue.put((action_name, args, kwargs, q)) return q.get()
def _redirect_output(self, redirect_stdout: Optional[str], redirect_stderr: Optional[str]): if redirect_stdout is not None: # If file not exists, create it. if not os.path.exists(redirect_stdout): with open(redirect_stdout, 'w') as f: pass os.dup2(os.open(redirect_stdout, os.O_WRONLY), 1) if redirect_stderr is not None: if not os.path.exists(redirect_stderr): with open(redirect_stderr, 'w') as f: pass os.dup2(os.open(redirect_stderr, os.O_WRONLY), 2)
[docs] def mainloop(self, *, redirect_stdout: str = '/dev/null', redirect_stderr: str = '/dev/null'): # self._redirect_output(redirect_stdout, redirect_stderr) while True: while True: try: action_name, args, kwargs, result_q = self.queue.get(block=False) break except queue.Empty: self._controller.bullet_client.update_viewer() except KeyboardInterrupt: return try: if action_name == '__get_scene__': result_q.put({'status': 'done', 'scene': self._controller.bullet_client.world.save_world()}) else: if self._dry_run: print('Dry run:', action_name, args, kwargs) result_q.put({'status': 'warning', 'message': f'Controller {action_name} is in dry run mode.'}) else: self._controller.controllers[action_name](*args, **kwargs) result_q.put({'status': 'done'}) except RobotControllerExecutionFailed as e: result_q.put({'status': 'failed', 'message': e.args}) except KeyError: result_q.put({'status': 'error', 'message': f'Controller {action_name} not found.'}) except KeyboardInterrupt: return
[docs] class PyBulletRemotePerceptionInterface(CrowPerceptionInterface):
[docs] def __init__(self, client): super().__init__() self._client = client self._bullet_client = None self._state_getter = None
[docs] def get_scene(self): rv = self._client.call('__get_scene__') if rv['status'] == 'done': return rv['scene'] else: raise RuntimeError(f"Failed to get scene: {rv}")
[docs] def register_bullet_client(self, bullet_client: BulletClient): self._bullet_client = bullet_client
[docs] def register_state_getter(self, state_getter: Callable[['PyBulletRemotePerceptionInterface'], CrowState]): self._state_getter = state_getter
[docs] def step(self, action: Optional[Any] = None) -> None: pass
[docs] def update_simulator(self) -> None: if self._bullet_client is None: raise ValueError("Bullet client not registered.") saver = self.get_scene() saver.reset_client_id(self._bullet_client.client_id, self._bullet_client.world) saver.restore()
[docs] def get_crow_state(self) -> CrowState: if self._state_getter is not None: return self._state_getter(self) raise NotImplementedError()
[docs] class PybulletRemoteControllerInterface(CrowPhysicalControllerInterface):
[docs] def __init__(self, client: SocketClient): super().__init__() self._client = client self._spec = self._client.get_spec() for name in self._spec['controllers']: self.register_controller(name, self._make_controller(name))
def _make_controller(self, name): def controller(*args, **kwargs): rv = self._client.call(name, *args, **kwargs) print(rv) if rv['status'] == 'done': return elif rv['status'] == 'failed': raise RobotControllerExecutionFailed(f"Controller {name} failed: {rv['message']}") elif rv['status'] == 'warning': print(colored(f"Controller {name} warning: {rv['message']}", 'yellow')) elif rv['status'] == 'error': raise RobotControllerExecutionFailed(f"Controller {name} failed with system error {rv['message']}") else: raise RobotControllerExecutionFailed(f"Controller {name} failed with unknown status: {rv}") return controller
[docs] def make_pybullet_remote_interfaces(*, ipc_ports: Optional[Tuple[str, str]] = None, tcp_ports: Optional[Tuple[int, int]] = None, host: str = '127.0.0.1') -> Tuple[PyBulletRemotePerceptionInterface, PybulletRemoteControllerInterface]: if ipc_ports is not None: client = SocketClient('pybullet-physical-controller-interface', [f'ipc://{ipc_ports[0]}', f'ipc://{ipc_ports[1]}'], echo=False) elif tcp_ports is not None: client = SocketClient('pybullet-physical-controller-interface', [f'tcp://{host}:{tcp_ports[0]}', f'tcp://{host}:{tcp_ports[1]}'], echo=False) else: raise ValueError("Either ipc_ports or tcp_ports should be provided.") client.initialize() def _atexit(): print(colored(f"Finalizing the client. {client.name}", 'yellow')) client.finalize() atexit.register(_atexit) return PyBulletRemotePerceptionInterface(client), PybulletRemoteControllerInterface(client)
[docs] def make_pybullet_simulator_ipc_ports() -> Tuple[str, str]: temp_dir = tempfile.mkdtemp() ipc1 = temp_dir + '/ipc1' ipc2 = temp_dir + '/ipc2' def _atexit(): print(colored(f"Removing the temporary directory: {temp_dir}", 'yellow')) shutil.rmtree(temp_dir) atexit.register(_atexit) return ipc1, ipc2
[docs] def make_pybullet_simulator_tcp_ports() -> Tuple[int, int]: return get_free_port(), get_free_port()