#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : pybullet_sim_interfaces.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 03/29/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
"""Interface with Pybullet simulation. It contains:
- Controller interface for Pybullet simulation.
- Perception interface for Pybullet simulation.
"""
import os
import tempfile
import shutil
import atexit
import queue
from typing import Any, Optional, Callable, Tuple, TYPE_CHECKING
from jacinle.utils.printing import colored
from jacinle.utils.network import get_free_port
from jacinle.comm.service import Service, SocketServer, SocketClient
from concepts.dm.crow.interfaces.perception_interface import CrowPerceptionInterface
from concepts.dm.crow.interfaces.controller_interface import CrowSimulationControllerInterface, CrowPhysicalControllerInterface
from concepts.dm.crow.crow_domain import CrowState
from concepts.dm.crowhat.world.manipulator_interface import RobotControllerExecutionFailed
from concepts.simulator.pybullet.client import BulletClient
from concepts.simulator.pybullet.world import BulletSaver
if TYPE_CHECKING:
from concepts.simulator.mplib.client import MPLibClient
__all__ = [
'PyBulletSimulationControllerInterface', 'PyBulletPhysicalControllerInterface', 'PyBulletRemoteService',
'PyBulletRemotePerceptionInterface', 'PybulletRemoteControllerInterface',
'make_pybullet_remote_interfaces', 'make_pybullet_simulator_tcp_ports', 'make_pybullet_simulator_ipc_ports',
]
[docs]
class PyBulletSimulationControllerInterface(CrowSimulationControllerInterface):
[docs]
def __init__(self, bullet_client: BulletClient, mplib_client: Optional['MPLibClient'] = None):
super().__init__()
self._bullet_client = bullet_client
self._mplib_client = mplib_client
self._state_getter = None
self._saved_states = dict()
self._saved_states_counter = 0
@property
def bullet_client(self):
return self._bullet_client
@property
def mplib_client(self):
return self._mplib_client
@property
def saved_states(self):
return self._saved_states
[docs]
def reset(self):
self._saved_states.clear()
self._saved_states_counter = 0
[docs]
def save_state(self, **kwargs) -> int:
self._saved_states_counter += 1
indent = self._saved_states_counter
saver = self._bullet_client.world.save_world()
saver.save()
self._saved_states[indent] = saver
return indent
[docs]
def restore_state(self, state_identifier: int, **kwargs):
if state_identifier not in self._saved_states:
raise ValueError(f"State {state_identifier} not found. Note that the state can only be restored once.")
saver = self._saved_states[state_identifier]
if isinstance(saver, BulletSaver):
saver.restore()
if self._mplib_client is not None:
self._mplib_client.sync_object_states(self._bullet_client)
else:
saver()
del self._saved_states[state_identifier]
[docs]
def restore_state_keep(self, state_identifier: int, action_counter: Optional[int] = None, **kwargs):
if state_identifier not in self._saved_states:
raise ValueError(f"State {state_identifier} not found. Note that the state can only be restored once.")
saver = self._saved_states[state_identifier]
if isinstance(saver, BulletSaver):
saver.restore()
if self._mplib_client is not None:
self._mplib_client.sync_object_states(self._bullet_client)
else:
saver()
if action_counter is not None:
self._action_counter = action_counter
[docs]
def register_controller(self, name: str, function: Callable):
super().register_controller(name, function)
[docs]
def register_state_getter(self, state_getter: Callable[['PyBulletSimulationControllerInterface'], CrowState]):
self._state_getter = state_getter
[docs]
def get_crow_state(self) -> CrowState:
if self._state_getter is not None:
return self._state_getter(self)
raise NotImplementedError()
[docs]
class PyBulletPhysicalControllerInterface(CrowPhysicalControllerInterface):
[docs]
def __init__(self, bullet_client: BulletClient, dry_run: bool = False):
super().__init__()
self._bullet_client = bullet_client
self._dry_run = dry_run
@property
def bullet_client(self):
return self._bullet_client
[docs]
def serve(
self, *, tcp_ports: Optional[Tuple[int, int]] = None, ipc_ports: Optional[Tuple[str, str]] = None,
redirect_ios: bool = False,
redirect_stdout: Optional[str] = '/dev/null', redirect_stderr: Optional[str] = '/dev/null'
) -> None:
service = PyBulletRemoteService(self, dry_run=self._dry_run)
server = SocketServer(service, 'pybullet-physical-controller-interface', tcp_port=tcp_ports, ipc_port=ipc_ports)
with server.activate():
if redirect_ios:
return service.mainloop(redirect_stdout=redirect_stdout, redirect_stderr=redirect_stderr)
else:
return service.mainloop()
[docs]
class PyBulletRemoteService(Service):
[docs]
def __init__(self, controller: PyBulletPhysicalControllerInterface, dry_run: bool = False):
super().__init__(spec={'controllers': list(controller.controllers.keys())})
self._controller = controller
self._dry_run = dry_run
self.queue = queue.Queue()
[docs]
def call(self, action_name, *args, **kwargs):
# Create a Future to store the result of the action.
q = queue.Queue(maxsize=1)
self.queue.put((action_name, args, kwargs, q))
return q.get()
def _redirect_output(self, redirect_stdout: Optional[str], redirect_stderr: Optional[str]):
if redirect_stdout is not None:
# If file not exists, create it.
if not os.path.exists(redirect_stdout):
with open(redirect_stdout, 'w') as f:
pass
os.dup2(os.open(redirect_stdout, os.O_WRONLY), 1)
if redirect_stderr is not None:
if not os.path.exists(redirect_stderr):
with open(redirect_stderr, 'w') as f:
pass
os.dup2(os.open(redirect_stderr, os.O_WRONLY), 2)
[docs]
def mainloop(self, *, redirect_stdout: str = '/dev/null', redirect_stderr: str = '/dev/null'):
# self._redirect_output(redirect_stdout, redirect_stderr)
while True:
while True:
try:
action_name, args, kwargs, result_q = self.queue.get(block=False)
break
except queue.Empty:
self._controller.bullet_client.update_viewer()
except KeyboardInterrupt:
return
try:
if action_name == '__get_scene__':
result_q.put({'status': 'done', 'scene': self._controller.bullet_client.world.save_world()})
else:
if self._dry_run:
print('Dry run:', action_name, args, kwargs)
result_q.put({'status': 'warning', 'message': f'Controller {action_name} is in dry run mode.'})
else:
self._controller.controllers[action_name](*args, **kwargs)
result_q.put({'status': 'done'})
except RobotControllerExecutionFailed as e:
result_q.put({'status': 'failed', 'message': e.args})
except KeyError:
result_q.put({'status': 'error', 'message': f'Controller {action_name} not found.'})
except KeyboardInterrupt:
return
[docs]
class PyBulletRemotePerceptionInterface(CrowPerceptionInterface):
[docs]
def __init__(self, client):
super().__init__()
self._client = client
self._bullet_client = None
self._state_getter = None
[docs]
def get_scene(self):
rv = self._client.call('__get_scene__')
if rv['status'] == 'done':
return rv['scene']
else:
raise RuntimeError(f"Failed to get scene: {rv}")
[docs]
def register_bullet_client(self, bullet_client: BulletClient):
self._bullet_client = bullet_client
[docs]
def register_state_getter(self, state_getter: Callable[['PyBulletRemotePerceptionInterface'], CrowState]):
self._state_getter = state_getter
[docs]
def step(self, action: Optional[Any] = None) -> None:
pass
[docs]
def update_simulator(self) -> None:
if self._bullet_client is None:
raise ValueError("Bullet client not registered.")
saver = self.get_scene()
saver.reset_client_id(self._bullet_client.client_id, self._bullet_client.world)
saver.restore()
[docs]
def get_crow_state(self) -> CrowState:
if self._state_getter is not None:
return self._state_getter(self)
raise NotImplementedError()
[docs]
class PybulletRemoteControllerInterface(CrowPhysicalControllerInterface):
[docs]
def __init__(self, client: SocketClient):
super().__init__()
self._client = client
self._spec = self._client.get_spec()
for name in self._spec['controllers']:
self.register_controller(name, self._make_controller(name))
def _make_controller(self, name):
def controller(*args, **kwargs):
rv = self._client.call(name, *args, **kwargs)
print(rv)
if rv['status'] == 'done':
return
elif rv['status'] == 'failed':
raise RobotControllerExecutionFailed(f"Controller {name} failed: {rv['message']}")
elif rv['status'] == 'warning':
print(colored(f"Controller {name} warning: {rv['message']}", 'yellow'))
elif rv['status'] == 'error':
raise RobotControllerExecutionFailed(f"Controller {name} failed with system error {rv['message']}")
else:
raise RobotControllerExecutionFailed(f"Controller {name} failed with unknown status: {rv}")
return controller
[docs]
def make_pybullet_remote_interfaces(*, ipc_ports: Optional[Tuple[str, str]] = None, tcp_ports: Optional[Tuple[int, int]] = None, host: str = '127.0.0.1') -> Tuple[PyBulletRemotePerceptionInterface, PybulletRemoteControllerInterface]:
if ipc_ports is not None:
client = SocketClient('pybullet-physical-controller-interface', [f'ipc://{ipc_ports[0]}', f'ipc://{ipc_ports[1]}'], echo=False)
elif tcp_ports is not None:
client = SocketClient('pybullet-physical-controller-interface', [f'tcp://{host}:{tcp_ports[0]}', f'tcp://{host}:{tcp_ports[1]}'], echo=False)
else:
raise ValueError("Either ipc_ports or tcp_ports should be provided.")
client.initialize()
def _atexit():
print(colored(f"Finalizing the client. {client.name}", 'yellow'))
client.finalize()
atexit.register(_atexit)
return PyBulletRemotePerceptionInterface(client), PybulletRemoteControllerInterface(client)
[docs]
def make_pybullet_simulator_ipc_ports() -> Tuple[str, str]:
temp_dir = tempfile.mkdtemp()
ipc1 = temp_dir + '/ipc1'
ipc2 = temp_dir + '/ipc2'
def _atexit():
print(colored(f"Removing the temporary directory: {temp_dir}", 'yellow'))
shutil.rmtree(temp_dir)
atexit.register(_atexit)
return ipc1, ipc2
[docs]
def make_pybullet_simulator_tcp_ports() -> Tuple[int, int]:
return get_free_port(), get_free_port()