Source code for concepts.benchmark.namo.namo_polygon.namo_polygon_env

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : namo_polygon_env.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 05/26/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import pygame
from pymunk import Body
from typing import Optional, Any, Sequence, Tuple, NamedTuple, Dict

import numpy as np
import jacinle.io as io

import concepts.simulator.pymunk.body_utils as body_utils
from concepts.simulator.pymunk.default_env import PymunkDefaultEnv
from concepts.simulator.pymunk.collision import SpacePositionRestorer
from concepts.simulator.pymunk.collision import collision_test_current


[docs] class NamoPolygonExecutionResult(NamedTuple): success: bool episode_end: bool episode_success: bool
[docs] class NamoPolygonEnvState(NamedTuple): simulation_state: Dict[str, Any] current_attached_body: Optional[Body] current_attached_body_relative_pos: Optional[Tuple[float, float]]
[docs] @classmethod def make(cls, env: 'NamoPolygonEnv') -> 'NamoPolygonEnvState': saved = SpacePositionRestorer(env.world).positions return cls( simulation_state={k.label: v for k, v in saved.items()}, current_attached_body=env.current_attached_body, current_attached_body_relative_pos=env.current_attached_body_relative_pos )
[docs] def restore(self, env: 'NamoPolygonEnv'): for k, v in self.simulation_state.items(): env.world.get_body_by_label(k).position = v env.set_current_attached_body(self.current_attached_body) env.current_attached_body_relative_pos = self.current_attached_body_relative_pos env.world.step(1e-6)
[docs] class NamoPolygonEnv(PymunkDefaultEnv):
[docs] def __init__(self, scene_json, **kwargs): self.scene_json = scene_json self.scene = io.load_json(scene_json) kwargs.setdefault('damping', 0.0) kwargs.setdefault('world_width', self.scene['width']) kwargs.setdefault('world_height', self.scene['height']) kwargs.setdefault('action_velocity', 1000) super().__init__(**kwargs) self._agent = None self._obstacles = list() self.current_attached_body = None self.current_attached_body_relative_pos = None self.set_additional_step_callable(self._handle_attached_body)
[docs] def reset_scene(self): if self.scene['start'] is not None: self._agent = body_utils.add_ball(self.world, label='agent', pos=self.scene['start'], radius=8, selectable=True) self.set_current_selection(self._agent) if self.scene['goal'] is not None: self.add_additional_drawing_region_circle(self.scene['goal'], 15, (0, 255, 0)) # Walls for i, obs in enumerate(self.scene['raw_walls']): center, vertices = center_polygon(obs) body_utils.add_polygon(self.world, vertices, pos=center, movable=False, label=f'wall_{i}', use_convex_decomposition=True, color=(0, 0, 0, 255)) # Obstacles self._obstacles = list() for i, obs in enumerate(self.scene['obstacles']): center, vertices = center_polygon(obs) obstacle = body_utils.add_polygon(self.world, vertices, pos=center, movable=True, color=(128, 128, 255, 255), label=f'obstacle_{i}', use_convex_decomposition=True) self._obstacles.append(obstacle) self.set_current_attached_body(None)
[docs] def set_current_attached_body(self, body: Optional[Body]): self.current_attached_body = body if body is not None: self.current_attached_body_relative_pos = np.array(self.agent_pos) - np.array(body.position)
def _handle_attached_body(self, env): if self.current_attached_body is not None: self.current_attached_body.position = tuple(np.array(self.agent_pos) - self.current_attached_body_relative_pos) @property def agent(self): return self._agent @property def agent_start(self) -> Tuple[float, float]: return self.scene['start'] @property def agent_goal(self) -> Tuple[float, float]: return self.scene['goal'] @property def agent_pos(self): return self._agent.position
[docs] def set_agent_pos(self, pos: Tuple[float, float]): self._agent.position = pos if self.current_attached_body is not None: self.current_attached_body.position = tuple(np.array(pos) - self.current_attached_body_relative_pos)
@property def obstacles(self) -> Sequence[Body]: return self._obstacles
[docs] def move_cfree_trajectory(self, trajectory, render_mode: str = 'human') -> NamoPolygonExecutionResult: for pos in trajectory: if np.linalg.norm(np.array(pos) - self._agent.position) < 2: self.set_agent_pos(pos) self.step(1, render_mode=render_mode) if collision_test_current(self.world, [self._agent]): return NamoPolygonExecutionResult(success=False, episode_end=True, episode_success=False) episode_end, episode_success = self._check_episode_end() return NamoPolygonExecutionResult(success=True, episode_end=episode_end, episode_success=episode_success)
[docs] def attach_object(self, body: Body, render_mode: str = 'human') -> NamoPolygonExecutionResult: if is_point_close_to_body(self.agent_pos, body): self.set_current_attached_body(body) self.step(1, render_mode=render_mode) return NamoPolygonExecutionResult(success=True, episode_end=False, episode_success=False) return NamoPolygonExecutionResult(success=False, episode_end=False, episode_success=False)
[docs] def move_cfree_trajectory_with_attached_object(self, trajectory, render_mode: str = 'human') -> NamoPolygonExecutionResult: if self.current_attached_body is None: return NamoPolygonExecutionResult(success=False, episode_end=True, episode_success=False) relative_pos = np.array(self.agent_pos) - np.array(self.current_attached_body.position) for pos in trajectory: if np.linalg.norm(np.array(pos) - self._agent.position) < 2: self.set_agent_pos(pos) self.current_attached_body.position = tuple(np.array(pos) - relative_pos) self.step(1, render_mode=render_mode) if collision_test_current(self.world, [self._agent, self.current_attached_body]): return NamoPolygonExecutionResult(success=False, episode_end=True, episode_success=False) episode_end, episode_success = self._check_episode_end() return NamoPolygonExecutionResult(success=True, episode_end=episode_end, episode_success=episode_success)
[docs] def detach_object(self, render_mode: str = 'human') -> NamoPolygonExecutionResult: self.set_current_attached_body(None) self.step(1, render_mode=render_mode) return NamoPolygonExecutionResult(success=True, episode_end=False, episode_success=False)
def _check_episode_end(self) -> Tuple[bool, bool]: return False, np.linalg.norm(np.array(self.agent_pos) - np.array(self.scene['goal'])) < 10
[docs] def distance_to_body(self, body: Body) -> float: distances = list() for shape in body.shapes: distances.append(shape.point_query(self.agent_pos).distance) return min(distances)
def _handle_keyboard_event(self, event: pygame.event.Event) -> None: if event.type == pygame.KEYDOWN: if event.key == pygame.K_a: for obs in self._obstacles: if is_point_close_to_body(self.agent_pos, obs): self.set_current_attached_body(obs) print('Attached to obstacle:', obs.label, obs) break elif event.key == pygame.K_d: self.set_current_attached_body(None) print('Detached from obstacles.') else: print('New obstacle pos:', self.current_attached_body.position if self.current_attached_body is not None else None)
[docs] def center_polygon(vertices) -> Tuple[Tuple[float, float], Sequence[Tuple[float, float]]]: vertices = np.array(vertices) center = np.mean(vertices, axis=0) return center.tolist(), (vertices - center).tolist()
[docs] def is_point_close_to_body(point: Tuple[int, int], body: Body, max_distance: float = 20) -> bool: for shape in body.shapes: if shape.point_query(point).distance < max_distance: return True return False