#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : default_env.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 01/08/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
import os.path as osp
from typing import Optional, Any, List, Dict
import numpy as np
import jacinle.io as io
from jacinle.logging import get_logger
from concepts.simulator.pybullet.client import BulletClient
from concepts.simulator.pybullet.components.asset_libraries.shapenet import ShapeNetCoreLoader
from concepts.utils.typing_utils import Vec3f, Vec4f
logger = get_logger(__file__)
[docs]
class BulletEnvBase(object):
[docs]
def __init__(self, client: Optional[BulletClient] = None, np_random: Optional[np.random.RandomState] = None, seed: Optional[int] = None, is_gui: bool = True):
if client is None:
client = BulletClient(is_gui=is_gui)
self._client = client
if np_random is None:
if seed is None:
self._np_random = np.random.RandomState()
else:
self._np_random = np.random.RandomState(seed)
else:
self._np_random = np_random
@property
def client(self) -> BulletClient:
return self._client
@property
def world(self):
return self.client.world
@property
def w(self):
return self.client.world
@property
def p(self):
return self.client.p
@property
def np_random(self) -> np.random.RandomState:
return self._np_random
[docs]
def seed(self, seed: int):
self.np_random.seed(seed)
@property
def ocrtoc_root(self) -> str:
return osp.join(self.client.assets_root, 'objects', 'ocrtoc')
[docs]
def list_ocrtoc_objects(self) -> List[str]:
all_model_names = list()
for dirname in io.lsdir(self.ocrtoc_root, '*'):
if osp.isdir(dirname) and osp.exists(osp.join(dirname, 'model.urdf')):
all_model_names.append(osp.basename(dirname))
return all_model_names
[docs]
def add_ocrtoc_object(
self, identifier: str, location_3d: Vec3f, scale: float = 1.0, name: Optional[str] = None, *,
static: bool = False, quat: Vec4f = (0, 0, 0, 1)
):
if name is None:
name = identifier
return self.client.load_urdf(
osp.join(self.ocrtoc_root, identifier, 'model.urdf'),
location_3d,
quat,
body_name=name,
scale=scale,
static=static,
)
@property
def ycb_simple_root(self) -> str:
return osp.join(self.client.assets_root, 'objects', 'ycb_simplified')
[docs]
def list_ycb_simplified_objects(self) -> List[str]:
all_model_names = list()
for dirname in io.lsdir(self.ycb_simple_root, '*'):
if osp.isdir(dirname) and osp.exists(osp.join(dirname, 'textured_simple.obj.urdf')):
all_model_names.append(osp.basename(dirname))
return all_model_names
[docs]
def add_ycb_simplified_object(
self, identifier: str, location_3d: Vec3f, scale: float = 1.0, name: Optional[str] = None, *,
static: bool = False, quat: Vec4f = (0, 0, 0, 1)
):
if name is None:
name = identifier
return self.client.load_urdf(
osp.join(self.ycb_simple_root, identifier, 'textured_simple.obj.urdf'),
location_3d,
quat,
body_name=name,
scale=scale,
static=static,
)
_shapenet_core_loader: Optional[ShapeNetCoreLoader] = None
@property
def shapenet_core_loader(self) -> ShapeNetCoreLoader:
return self._shapenet_core_loader
[docs]
def initialize_shapenet_core_loader(self, shapenet_dir: str):
if self._shapenet_core_loader is not None:
logger.warning('ShapeNetCoreLoader is already initialized. Ignored.')
return
self._shapenet_core_loader = ShapeNetCoreLoader(self.client, shapenet_dir)
[docs]
def list_shape_net_core_objects(self) -> Dict[str, Dict[str, Any]]:
assert self._shapenet_core_loader is not None, 'ShapeNetCoreLoader is not initialized.'
return self._shapenet_core_loader.available_models
[docs]
def add_shape_net_core_object(
self, synset_identifier: str, model_identifier: str, location_3d: Vec3f, scale: float = 1.0, name: Optional[str] = None, *,
static: bool = False, quat: Vec4f = (0, 0, 0, 1)
):
assert self._shapenet_core_loader is not None, 'ShapeNetCoreLoader is not initialized.'
return self._shapenet_core_loader.load_urdf(synset_identifier, model_identifier, location_3d, scale=scale, name=name, static=static, quat=quat)
@property
def igibson_root(self) -> str:
return osp.join(self.client.assets_root, 'igibson')
[docs]
def find_igibson_object_by_category(self, category: str) -> List[str]:
path = osp.join(self.client.assets_root, 'igibson', 'objects', category)
all_files = io.lsdir(path, '*/*.urdf')
assert len(all_files) > 0, f'No urdf files found in {path}.'
return all_files
[docs]
def add_igibson_object_by_category(
self, category: str, location_3d: Vec3f, scale: float = 1.0, name: Optional[str] = None, *,
urdf_file: Optional[str] = None,
static: bool = False, quat: Vec4f = (0, 0, 0, 1),
):
if name is None:
name = category
if urdf_file is None:
all_files = self.find_igibson_object_by_category(category)
urdf_file = all_files[0]
return self.client.load_urdf(
urdf_file,
location_3d,
quat,
body_name=name,
scale=scale,
static=static,
)