#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : pybullet_sim_interfaces.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 03/29/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
"""Interface with Pybullet simulation. It contains:
- Controller interface for Pybullet simulation.
- Perception interface for Pybullet simulation.
"""
import os
import tempfile
import shutil
import atexit
import queue
from typing import Any, Optional, Callable, Tuple
from jacinle.utils.printing import colored
from jacinle.utils.network import get_free_port
from jacinle.comm.service import Service, SocketServer, SocketClient
from concepts.dm.crow.controller import CrowControllerApplier
from concepts.dm.crow.interfaces.perception_interface import CrowPerceptionInterface
from concepts.dm.crow.interfaces.controller_interface import CrowSimulationControllerInterface, CrowPhysicalControllerInterface
from concepts.dm.crow.crow_domain import CrowState
from concepts.dm.crowhat.robots.robot_interfaces import RobotControllerExecutionFailed, PrimitiveRobotControllerInterface, ContactPrimitiveRobotControlInterface
from concepts.simulator.pybullet.client import BulletClient
from concepts.simulator.pybullet.world import BulletSaver
__all__ = [
'PyBulletSimulationControllerInterface', 'PyBulletPhysicalControllerInterface', 'PyBulletRemoteService',
'PyBulletRemotePerceptionInterface', 'PybulletRemoteControllerInterface',
'make_pybullet_remote_interfaces', 'make_pybullet_simulator_tcp_ports', 'make_pybullet_simulator_ipc_ports',
]
[docs]
class PyBulletSimulationControllerInterface(CrowSimulationControllerInterface):
[docs]
def __init__(self, bullet_client: BulletClient):
super().__init__()
self._bullet_client = bullet_client
self._state_getter = None
self._saved_states = dict()
self._saved_states_counter = 0
@property
def bullet_client(self):
return self._bullet_client
@property
def saved_states(self):
return self._saved_states
[docs]
def reset(self):
self._saved_states.clear()
self._saved_states_counter = 0
[docs]
def save_state(self, **kwargs) -> int:
self._saved_states_counter += 1
indent = self._saved_states_counter
saver = self._bullet_client.world.save_world_builtin()
saver.save()
self._saved_states[indent] = saver
return indent
[docs]
def restore_state(self, state_identifier: int, **kwargs):
if state_identifier not in self._saved_states:
raise ValueError(f"State {state_identifier} not found. Note that the state can only be restored once.")
saver = self._saved_states[state_identifier]
if isinstance(saver, BulletSaver):
saver.restore()
else:
saver()
del self._saved_states[state_identifier]
[docs]
def register_controller(self, name: str, function: Callable):
super().register_controller(name, function)
[docs]
def register_state_getter(self, state_getter: Callable[['PyBulletSimulationControllerInterface'], CrowState]):
self._state_getter = state_getter
[docs]
def get_crow_state(self) -> CrowState:
if self._state_getter is not None:
return self._state_getter(self)
raise NotImplementedError()
[docs]
class PyBulletPhysicalControllerInterface(CrowPhysicalControllerInterface):
[docs]
def __init__(self, bullet_client: BulletClient, dry_run: bool = False):
super().__init__()
self._bullet_client = bullet_client
self._dry_run = dry_run
@property
def bullet_client(self):
return self._bullet_client
[docs]
def serve(
self, *, tcp_ports: Optional[Tuple[int, int]] = None, ipc_ports: Optional[Tuple[str, str]] = None,
redirect_ios: bool = False,
redirect_stdout: Optional[str] = '/dev/null', redirect_stderr: Optional[str] = '/dev/null'
) -> None:
service = PyBulletRemoteService(self, dry_run=self._dry_run)
server = SocketServer(service, 'pybullet-physical-controller-interface', tcp_port=tcp_ports, ipc_port=ipc_ports)
with server.activate():
if redirect_ios:
return service.mainloop(redirect_stdout=redirect_stdout, redirect_stderr=redirect_stderr)
else:
return service.mainloop()
[docs]
class PyBulletRemoteService(Service):
[docs]
def __init__(self, controller: PyBulletPhysicalControllerInterface, dry_run: bool = False):
super().__init__(spec={'controllers': list(controller.controllers.keys())})
self._controller = controller
self._dry_run = dry_run
self.queue = queue.Queue()
[docs]
def call(self, action_name, *args, **kwargs):
# Create a Future to store the result of the action.
q = queue.Queue(maxsize=1)
self.queue.put((action_name, args, kwargs, q))
return q.get()
def _redirect_output(self, redirect_stdout: Optional[str], redirect_stderr: Optional[str]):
if redirect_stdout is not None:
# If file not exists, create it.
if not os.path.exists(redirect_stdout):
with open(redirect_stdout, 'w') as f:
pass
os.dup2(os.open(redirect_stdout, os.O_WRONLY), 1)
if redirect_stderr is not None:
if not os.path.exists(redirect_stderr):
with open(redirect_stderr, 'w') as f:
pass
os.dup2(os.open(redirect_stderr, os.O_WRONLY), 2)
[docs]
def mainloop(self, *, redirect_stdout: str = '/dev/null', redirect_stderr: str = '/dev/null'):
# self._redirect_output(redirect_stdout, redirect_stderr)
while True:
while True:
try:
action_name, args, kwargs, result_q = self.queue.get(block=False)
break
except queue.Empty:
self._controller.bullet_client.update_viewer()
except KeyboardInterrupt:
return
try:
if action_name == '__get_scene__':
result_q.put({'status': 'done', 'scene': self._controller.bullet_client.world.save_world()})
else:
if self._dry_run:
print('Dry run:', action_name, args, kwargs)
result_q.put({'status': 'warning', 'message': f'Controller {action_name} is in dry run mode.'})
else:
self._controller.controllers[action_name](*args, **kwargs)
result_q.put({'status': 'done'})
except RobotControllerExecutionFailed as e:
result_q.put({'status': 'failed', 'message': e.args})
except KeyError:
result_q.put({'status': 'error', 'message': f'Controller {action_name} not found.'})
except KeyboardInterrupt:
return
[docs]
class PyBulletRemotePerceptionInterface(CrowPerceptionInterface):
[docs]
def __init__(self, client):
super().__init__()
self._client = client
self._bullet_client = None
self._state_getter = None
[docs]
def get_scene(self):
rv = self._client.call('__get_scene__')
if rv['status'] == 'done':
return rv['scene']
else:
raise RuntimeError(f"Failed to get scene: {rv}")
[docs]
def register_bullet_client(self, bullet_client: BulletClient):
self._bullet_client = bullet_client
[docs]
def register_state_getter(self, state_getter: Callable[['PyBulletRemotePerceptionInterface'], CrowState]):
self._state_getter = state_getter
[docs]
def step(self, action: Optional[Any] = None) -> None:
pass
[docs]
def update_simulator(self) -> None:
if self._bullet_client is None:
raise ValueError("Bullet client not registered.")
saver = self.get_scene()
saver.reset_client_id(self._bullet_client.client_id, self._bullet_client.world)
saver.restore()
[docs]
def get_crow_state(self) -> CrowState:
if self._state_getter is not None:
return self._state_getter(self)
raise NotImplementedError()
[docs]
class PybulletRemoteControllerInterface(CrowPhysicalControllerInterface):
[docs]
def __init__(self, client: SocketClient):
super().__init__()
self._client = client
self._spec = self._client.get_spec()
for name in self._spec['controllers']:
self.register_controller(name, self._make_controller(name))
def _make_controller(self, name):
def controller(*args, **kwargs):
rv = self._client.call(name, *args, **kwargs)
print(rv)
if rv['status'] == 'done':
return
elif rv['status'] == 'failed':
raise RobotControllerExecutionFailed(f"Controller {name} failed: {rv['message']}")
elif rv['status'] == 'warning':
print(colored(f"Controller {name} warning: {rv['message']}", 'yellow'))
elif rv['status'] == 'error':
raise RobotControllerExecutionFailed(f"Controller {name} failed with system error {rv['message']}")
else:
raise RobotControllerExecutionFailed(f"Controller {name} failed with unknown status: {rv}")
return controller
[docs]
def make_pybullet_remote_interfaces(*, ipc_ports: Optional[Tuple[str, str]] = None, tcp_ports: Optional[Tuple[int, int]] = None, host: str = '127.0.0.1') -> Tuple[PyBulletRemotePerceptionInterface, PybulletRemoteControllerInterface]:
if ipc_ports is not None:
client = SocketClient('pybullet-physical-controller-interface', [f'ipc://{ipc_ports[0]}', f'ipc://{ipc_ports[1]}'], echo=False)
elif tcp_ports is not None:
client = SocketClient('pybullet-physical-controller-interface', [f'tcp://{host}:{tcp_ports[0]}', f'tcp://{host}:{tcp_ports[1]}'], echo=False)
else:
raise ValueError("Either ipc_ports or tcp_ports should be provided.")
client.initialize()
def _atexit():
print(colored(f"Finalizing the client. {client.name}", 'yellow'))
client.finalize()
atexit.register(_atexit)
return PyBulletRemotePerceptionInterface(client), PybulletRemoteControllerInterface(client)
[docs]
def make_pybullet_simulator_ipc_ports() -> Tuple[str, str]:
temp_dir = tempfile.mkdtemp()
ipc1 = temp_dir + '/ipc1'
ipc2 = temp_dir + '/ipc2'
def _atexit():
print(colored(f"Removing the temporary directory: {temp_dir}", 'yellow'))
shutil.rmtree(temp_dir)
atexit.register(_atexit)
return ipc1, ipc2
[docs]
def make_pybullet_simulator_tcp_ports() -> Tuple[int, int]:
return get_free_port(), get_free_port()