Source code for concepts.dm.crow.interfaces.execution_manager
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : execution_manager.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 04/13/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
from typing import Any, Optional
from jacinle.logging import get_logger
from concepts.dm.crow.crow_domain import CrowProblem
from concepts.dm.crow.executors.crow_executor import CrowExecutor
from concepts.dm.crow.interfaces.perception_interface import CrowPerceptionInterface
from concepts.dm.crow.interfaces.controller_interface import CrowSimulationControllerInterface, CrowPhysicalControllerInterface
logger = get_logger(__file__)
[docs]
class GoalAchieved(Exception):
pass
[docs]
class CrowExecutionManager(object):
[docs]
def __init__(
self, executor: CrowExecutor,
perception_interface: CrowPerceptionInterface,
simulator_controller_interface: Optional[CrowSimulationControllerInterface],
physical_controller_interface: CrowPhysicalControllerInterface,
):
self._executor = executor
self._perception_interface = perception_interface
self._simulation_interface = simulator_controller_interface
self._physical_interface = physical_controller_interface
self._current_goal = None
self._current_state = None
[docs]
def update_perception(self, action: Optional[Any] = None):
self._perception_interface.step(action)
if self._simulation_interface is not None:
self._perception_interface.update_simulator()
self._current_state = self._perception_interface.get_crow_state()
[docs]
def get_current_state(self):
return self._current_state
[docs]
def run(self, goal, max_steps: int = 100):
self.update_perception()
self._init_planner(goal)
for _ in range(max_steps):
try:
action = self._plan_next_action()
except GoalAchieved:
logger.critical('Goal achieved.')
break
self._physical_interface.step(action)
self.update_perception(action)
else:
raise RuntimeError('Execution exceeds the maximum steps.')
def _init_planner(self, goal):
self._current_goal = goal
def _plan_next_action(self):
raise NotImplementedError()
[docs]
class CrowDefaultOpenLoopExecutionManager(CrowExecutionManager):
[docs]
def __init__(
self, executor: CrowExecutor,
perception_interface: CrowPerceptionInterface,
simulator_controller_interface: Optional[CrowSimulationControllerInterface],
physical_controller_interface: CrowPhysicalControllerInterface,
planner_options: Optional[dict] = None,
planner_verbose: bool = False,
):
super().__init__(executor, perception_interface, simulator_controller_interface, physical_controller_interface)
self._plan = None
self._current_action_idx = 0
self._planner_options = planner_options if planner_options is not None else {}
self._planner_verbose = planner_verbose
[docs]
def update_planner_options(self, **kwargs):
self._planner_options.update(kwargs)
[docs]
def set_planner_verbose(self, verbose: bool = True):
self._planner_verbose = verbose
[docs]
def run(self, goal, max_steps: int = 100, confirm_plan: bool = True):
self.update_perception()
self._init_planner(goal)
if confirm_plan:
input('Press Enter to execute the plan.')
super().run(goal, max_steps)
def _init_planner(self, goal):
super()._init_planner(goal)
if self._plan is not None:
return
from concepts.dm.crow.planners.regression_planning import crow_regression
plans, _ = crow_regression(
self._executor, CrowProblem.from_state_and_goal(self._executor.domain, self._current_state, self._current_goal),
simulation_interface=self._simulation_interface,
**self._planner_options,
verbose=self._planner_verbose
)
if len(plans) == 0:
raise GoalAchieved()
self._plan = plans[0]
self._current_action_idx = 0
logger.critical('Plan: {}'.format([str(a) for a in self._plan]))
def _plan_next_action(self):
if self._current_action_idx >= len(self._plan):
raise GoalAchieved()
action = self._plan[self._current_action_idx]
self._current_action_idx += 1
return action