Source code for concepts.dm.crow.planners.regression_planning

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : regression_planning.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 03/17/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

from dataclasses import dataclass
from typing import Any, Optional, Union, Tuple, List, Dict, NamedTuple, Type
from concepts.dsl.dsl_types import ObjectConstant
from concepts.dsl.constraint import ConstraintSatisfactionProblem, OptimisticValue
from concepts.dsl.expression import ValueOutputExpression, is_and_expr, ObjectOrValueOutputExpression, VariableAssignmentExpression
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.tensor_state import StateObjectReference, StateObjectList

from concepts.dm.crow.crow_domain import CrowDomain, CrowProblem, CrowState
from concepts.dm.crow.controller import CrowControllerApplier, CrowControllerApplicationExpression
from concepts.dm.crow.behavior import CrowBehavior
from concepts.dm.crow.behavior import CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowAchieveExpression, CrowUntrackExpression, CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression
from concepts.dm.crow.behavior import CrowBehaviorForeachLoopSuite, CrowBehaviorWhileLoopSuite, CrowBehaviorConditionSuite
from concepts.dm.crow.behavior import CrowBehaviorOrderingSuite, CrowBehaviorCommit
from concepts.dm.crow.executors.crow_executor import CrowExecutor
from concepts.dm.crow.interfaces.controller_interface import CrowSimulationControllerInterface

__all__ = ['SupportedCrowExpressionType', 'ScopedCrowExpression', 'CrowPlanningResult', 'CrowRegressionPlanner', 'crow_regression', 'get_crow_regression_algorithm', 'set_crow_regression_algorithm']


SupportedCrowExpressionType = Union[
    CrowBehaviorOrderingSuite, CrowBehaviorForeachLoopSuite, CrowBehaviorWhileLoopSuite, CrowBehaviorConditionSuite,
    CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowRuntimeAssignmentExpression,
    CrowControllerApplicationExpression, CrowAchieveExpression, CrowUntrackExpression,
    CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression,
    CrowBehaviorCommit
]


[docs] @dataclass(frozen=True) class ScopedCrowExpression(object): """A statement in the right stack of the planning state.""" statement: SupportedCrowExpressionType """The statement.""" scope_id: int """The scope id of the statement."""
_UNSET = object()
[docs] @dataclass(frozen=True) class CrowPlanningResult(object): state: CrowState csp: Optional[ConstraintSatisfactionProblem] controller_actions: Tuple[CrowControllerApplier, ...] scopes: Dict[int, Any] latest_scope: int
[docs] @classmethod def make_empty(cls, state: CrowState) -> 'CrowPlanningResult': return cls(state, ConstraintSatisfactionProblem(), tuple(), dict(), 0)
[docs] def clone(self, state=_UNSET, csp=_UNSET, controller_actions=_UNSET, scopes=_UNSET, latest_scope=_UNSET) -> 'CrowPlanningResult': return CrowPlanningResult( state=state if state is not _UNSET else self.state, csp=csp if csp is not _UNSET else self.csp, controller_actions=controller_actions if controller_actions is not _UNSET else self.controller_actions, scopes=scopes if scopes is not _UNSET else self.scopes, latest_scope=latest_scope if latest_scope is not _UNSET else self.latest_scope )
[docs] class CrowRegressionPlanner(object):
[docs] def __init__( self, executor: CrowExecutor, state: CrowState, goal_expr: Union[str, ValueOutputExpression], *, simulation_interface: Optional[CrowSimulationControllerInterface] = None, enable_reordering: bool = True, max_search_depth: int = 20, max_beam_size: int = 20, # Group 1: goal serialization and refinements. is_goal_ordered: bool = True, is_goal_serializable: bool = True, is_goal_refinement_compressible: bool = True, # Group 2: CSP solver. enable_csp: bool = True, max_csp_trials: int = 10, max_global_csp_trials: int = 100, max_csp_branching_factor: int = 5, use_generator_manager: bool = False, store_generator_manager_history: bool = False, # Group 3: output format. include_effect_appliers: bool = False, include_dependency_trace: bool = False, verbose: bool = True ): """Initialize the planner. Args: executor: the executor. state: the initial state. goal_expr: the goal expression. simulation_interface: the simulation interface. enable_reordering: whether to enable reordering. max_search_depth: the maximum search depth. max_beam_size: the maximum beam size. is_goal_ordered: whether the goal is ordered. is_goal_serializable: whether the goal is serializable. is_goal_refinement_compressible: whether the goal refinement is compressible. enable_csp: whether to enable the CSP solver. max_csp_trials: the maximum number of CSP trials. max_global_csp_trials: the maximum number of global CSP trials. max_csp_branching_factor: the maximum CSP branching factor. use_generator_manager: whether to use the generator manager. store_generator_manager_history: whether to store the generator manager history. include_effect_appliers: whether to include the effect appliers in the search result. The effect appliers are of type :class:`~concepts.dm.crow.behavior.CrowEffectApplier`. include_dependency_trace: whether to include the dependency graph in the search result. verbose: whether to output verbose information. """ self.executor = executor self.state = state self.goal_expr = goal_expr if isinstance(self.goal_expr, str): self.goal_expr = executor.parse(self.goal_expr, state=state) self.simulation_interface = simulation_interface self.max_search_depth = max_search_depth self.max_beam_size = max_beam_size self.enable_reordering = enable_reordering self.is_goal_ordered = is_goal_ordered self.is_goal_serializable = is_goal_serializable self.is_goal_refinement_compressible = is_goal_refinement_compressible self.enable_csp = enable_csp self.max_csp_trials = max_csp_trials self.max_global_csp_trials = max_global_csp_trials self.max_csp_branching_factor = max_csp_branching_factor self.use_generator_manager = use_generator_manager self.store_generator_manager_history = store_generator_manager_history self.include_effect_appliers = include_effect_appliers self.include_dependency_trace = include_dependency_trace self.verbose = verbose self._search_cache = dict() self._search_stat = {'nr_expanded_nodes': 0} self._results = list() self._post_init()
@property def domain(self) -> CrowDomain: return self.executor.domain def _post_init(self) -> None: pass def _make_goal_program(self): if self.domain.has_behavior('__goal__'): return self.domain.get_behavior('__goal__').body if is_and_expr(self.goal_expr): if len(self.goal_expr.arguments) == 1 and self.goal_expr.arguments[0].return_type.is_list_type: goal_set = [self.goal_expr] else: goal_set = list(self.goal_expr.arguments) else: goal_set = [self.goal_expr] goal_set = [CrowAchieveExpression(x) for x in goal_set] assert_expr = CrowAssertExpression(self.goal_expr) if self.is_goal_ordered: goal_set = CrowBehaviorOrderingSuite.make_sequential(goal_set, variable_scope_identifier=0) else: goal_set = CrowBehaviorOrderingSuite.make_unordered(goal_set, variable_scope_identifier=0) if self.is_goal_serializable: program = CrowBehaviorOrderingSuite.make_sequential( goal_set, assert_expr, CrowBehaviorCommit(csp=True, sketch=True, behavior=True), variable_scope_identifier=0 ) else: program = CrowBehaviorOrderingSuite.make_sequential( CrowBehaviorOrderingSuite.make_promotable(goal_set), assert_expr, CrowBehaviorCommit(csp=True, sketch=True, behavior=True), variable_scope_identifier=0 ) return program @property def search_stat(self) -> dict: return self._search_stat @property def results(self) -> List[CrowPlanningResult]: return self._results
[docs] def set_results(self, results: List[CrowPlanningResult]) -> None: self._results = results
[docs] def main(self) -> Tuple[List[Tuple[CrowControllerApplier, ...]], dict]: program = self._make_goal_program() behavior_application = CrowBehaviorApplicationExpression( CrowBehavior('__goal__', [], None, program, [], CrowBehaviorOrderingSuite.make_sequential()), [] ) program = CrowBehaviorOrderingSuite.make_sequential(behavior_application, variable_scope_identifier=0) candidate_plans = self.main_entry(program) return candidate_plans, self._search_stat
[docs] def evaluate( self, expression: Union[ObjectOrValueOutputExpression, VariableAssignmentExpression], state: CrowState, csp: Optional[ConstraintSatisfactionProblem] = None, bounded_variables: Optional[Dict[str, Union[TensorValue, ObjectConstant]]] = None, clone_csp: bool = True, force_tensor_value: bool = False ) -> Tuple[Union[None, StateObjectReference, StateObjectList, TensorValue, OptimisticValue], Optional[ConstraintSatisfactionProblem]]: """Evaluate an expression and return the result. Args: expression: the expression to evaluate. state: the current state. csp: the current CSP. bounded_variables: the bounded variables. clone_csp: whether to clone the CSP. force_tensor_value: whether to force the result to be a tensor value. Returns: the evaluation result and the updated CSP. """ if clone_csp: csp = csp.clone() if csp is not None else None if isinstance(expression, VariableAssignmentExpression): self.executor.execute(expression, state=state, csp=csp, bounded_variables=bounded_variables) return None, csp rv = self.executor.execute(expression, state=state, csp=csp, bounded_variables=bounded_variables) if isinstance(rv, TensorValue): if force_tensor_value: return rv, csp if rv.is_scalar: return rv.item(), csp return rv, csp assert isinstance(rv, StateObjectReference) or isinstance(rv, StateObjectList) return rv, csp
[docs] def main_entry(self, state: CrowBehaviorOrderingSuite) -> List[Tuple[CrowControllerApplier, ...]]: raise NotImplementedError()
g_crow_regression_algorithm = 'iddfs_v1'
[docs] def get_crow_regression_algorithm() -> Type[CrowRegressionPlanner]: if g_crow_regression_algorithm == 'dfs_v1': from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_v1 import CrowRegressionPlannerDFSv1 return CrowRegressionPlannerDFSv1 if g_crow_regression_algorithm == 'bfs_v1': from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_v1 import CrowRegressionPlannerBFSv1 return CrowRegressionPlannerBFSv1 if g_crow_regression_algorithm == 'dfs_v2': from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_dfs_v2 import CrowRegressionPlannerDFSv2 return CrowRegressionPlannerDFSv2 if g_crow_regression_algorithm == 'astar_v2': from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_astar_v2 import CrowRegressionPlannerAStarv2 return CrowRegressionPlannerAStarv2 if g_crow_regression_algorithm == 'iddfs_v1': from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_iddfs_v1 import CrowRegressionPlannerIDDFSv1 return CrowRegressionPlannerIDDFSv1 raise ValueError(f'Unknown regression algorithm: {g_crow_regression_algorithm}')
[docs] def set_crow_regression_algorithm(algorithm: str) -> None: global g_crow_regression_algorithm if algorithm not in {'dfs_v1', 'bfs_v1', 'dfs_v2', 'astar_v2'}: raise ValueError(f'Unknown regression algorithm: {algorithm}') g_crow_regression_algorithm = algorithm
[docs] def crow_regression(domain_or_executor: Union[CrowExecutor, CrowDomain], problem: CrowProblem, goal: Optional[Union[str, ValueOutputExpression]] = None, return_planner: bool = False, **kwargs) -> Union[Tuple[list, dict], CrowRegressionPlanner]: if isinstance(domain_or_executor, CrowExecutor): executor = domain_or_executor elif isinstance(domain_or_executor, CrowDomain): executor = domain_or_executor.make_executor() else: raise ValueError(f'Unknown domain or executor: {domain_or_executor}') algo = get_crow_regression_algorithm() planner = algo(executor, problem.state, goal if goal is not None else problem.goal, **kwargs) # planner.set_human_control_interface(True) if return_planner: return planner return planner.main()