#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : regression_planning.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 03/17/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
import os
from dataclasses import dataclass
from typing import Any, Optional, Union, Iterator, Tuple, NamedTuple, List, Dict, Type
from jacinle.utils.meta import UNSET
from concepts.dsl.dsl_types import ObjectConstant
from concepts.dsl.value import ListValue
from concepts.dsl.constraint import ConstraintSatisfactionProblem, OptimisticValue
from concepts.dsl.expression import NullExpression, ValueOutputExpression, is_and_expr, ObjectOrValueOutputExpression, VariableAssignmentExpression
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.tensor_state import StateObjectReference, StateObjectList
from concepts.dm.crow.crow_domain import CrowDomain, CrowProblem, CrowState
from concepts.dm.crow.controller import CrowControllerApplier, CrowControllerApplicationExpression
from concepts.dm.crow.behavior import CrowBehavior, CrowBehaviorCommit, CrowBehaviorEffectApplicationExpression
from concepts.dm.crow.behavior import CrowAssertExpression, CrowBindExpression, CrowAchieveExpression, CrowBehaviorApplicationExpression, CrowUntrackExpression
from concepts.dm.crow.behavior import CrowRuntimeAssignmentExpression, CrowFeatureAssignmentExpression
from concepts.dm.crow.behavior import CrowBehaviorOrderingSuite, CrowBehaviorCommit, CrowBehaviorConditionSuite, CrowBehaviorWhileLoopSuite, CrowBehaviorForeachLoopSuite
from concepts.dm.crow.behavior_utils import format_behavior_statement
from concepts.dm.crow.executors.crow_executor import CrowExecutor
from concepts.dm.crow.executors.generator_executor import CrowGeneratorExecutor
from concepts.dm.crow.interfaces.controller_interface import CrowSimulationControllerInterface
from concepts.dm.crow.planners.regression_dependency import RegressionTraceStatement
__all__ = [
'SupportedCrowExpressionType', 'ScopedCrowExpression', 'CrowPlanningResult', 'CrowPlanningResult3',
'CrowRegressionPlanner', 'crow_regression', 'get_crow_regression_algorithm', 'set_crow_regression_algorithm'
]
SupportedCrowExpressionType = Union[
CrowBehaviorOrderingSuite, CrowBehaviorForeachLoopSuite, CrowBehaviorWhileLoopSuite, CrowBehaviorConditionSuite,
CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowRuntimeAssignmentExpression,
CrowControllerApplicationExpression, CrowAchieveExpression, CrowUntrackExpression,
CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression,
CrowBehaviorCommit
]
[docs]
class ScopedCrowExpression(NamedTuple):
"""A statement in the right stack of the planning state. This class is a named tuple so that it can be iterated as `for (stmt, scope_id) in ...`."""
statement: SupportedCrowExpressionType
"""The statement."""
scope_id: int
"""The scope id of the statement."""
def __str__(self):
return format_behavior_statement(self.statement, scope_id=self.scope_id)
def __repr__(self):
return self.__str__()
[docs]
@dataclass
class CrowPlanningResult(object):
state: CrowState
csp: Optional[ConstraintSatisfactionProblem]
controller_actions: Tuple[CrowControllerApplier, ...]
scopes: Dict[int, Any]
latest_scope: int
[docs]
@classmethod
def make_empty(cls, state: CrowState) -> 'CrowPlanningResult':
return cls(state, ConstraintSatisfactionProblem(), tuple(), dict(), 0)
[docs]
def clone(self, state=UNSET, csp=UNSET, controller_actions=UNSET, scopes=UNSET, latest_scope=UNSET) -> 'CrowPlanningResult':
return CrowPlanningResult(
state=state if state is not UNSET else self.state,
csp=csp if csp is not UNSET else self.csp,
controller_actions=controller_actions if controller_actions is not UNSET else self.controller_actions,
scopes=scopes if scopes is not UNSET else self.scopes,
latest_scope=latest_scope if latest_scope is not UNSET else self.latest_scope
)
[docs]
@dataclass
class CrowPlanningResult3(CrowPlanningResult):
scope_constraints: Dict[int, List[ValueOutputExpression]]
scope_constraint_evaluations: Dict[int, List[bool]]
dependency_trace: Tuple['RegressionTraceStatement', ...]
cost: float = 0.0
[docs]
@classmethod
def make_empty(cls, state: CrowState) -> 'CrowPlanningResult3':
return CrowPlanningResult3(
state, ConstraintSatisfactionProblem(), tuple(),
scopes={0: dict()}, latest_scope=0,
scope_constraints=dict(), scope_constraint_evaluations=dict(),
dependency_trace=tuple()
)
[docs]
def clone(
self, state=UNSET, csp=UNSET, controller_actions=UNSET,
scopes=UNSET, latest_scope=UNSET,
scope_constraints=UNSET, scope_constraint_evaluations=UNSET,
dependency_trace=UNSET
) -> 'CrowPlanningResult3':
return CrowPlanningResult3(
state=state if state is not UNSET else self.state,
csp=csp if csp is not UNSET else self.csp,
controller_actions=controller_actions if controller_actions is not UNSET else self.controller_actions,
scopes=scopes if scopes is not UNSET else self.scopes,
latest_scope=latest_scope if latest_scope is not UNSET else self.latest_scope,
scope_constraints=scope_constraints if scope_constraints is not UNSET else self.scope_constraints,
scope_constraint_evaluations=scope_constraint_evaluations if scope_constraint_evaluations is not UNSET else self.scope_constraint_evaluations,
dependency_trace=dependency_trace if dependency_trace is not UNSET else self.dependency_trace
)
[docs]
def clone_with_new_constraint(self, scope_id: int, constraint: ValueOutputExpression, evaluation: bool, do: bool) -> 'CrowPlanningResult3':
if not do:
return self
scope_constraints = self.scope_constraints.copy()
scope_constraint_evaluations = self.scope_constraint_evaluations.copy()
if scope_id not in self.scope_constraints:
scope_constraints[scope_id] = list()
scope_constraint_evaluations[scope_id] = list()
else:
scope_constraints[scope_id] = self.scope_constraints[scope_id].copy()
scope_constraint_evaluations[scope_id] = self.scope_constraint_evaluations[scope_id].copy()
scope_constraints[scope_id].append(constraint)
scope_constraint_evaluations[scope_id].append(evaluation)
return self.clone(scope_constraints=scope_constraints, scope_constraint_evaluations=scope_constraint_evaluations)
[docs]
def clone_with_removed_constraint(self, scope_id: int, constraint: Union[ValueOutputExpression, NullExpression]) -> 'CrowPlanningResult3':
if scope_id not in self.scope_constraints:
return self
if isinstance(constraint, NullExpression):
# Remove all constraints at the scope.
scope_constraints = self.scope_constraints.copy()
scope_constraint_evaluations = self.scope_constraint_evaluations.copy()
del scope_constraints[scope_id]
del scope_constraint_evaluations[scope_id]
return self.clone(scope_constraints=scope_constraints, scope_constraint_evaluations=scope_constraint_evaluations)
found = None
for i, c in enumerate(self.scope_constraints[scope_id]):
if str(c) == str(constraint):
found = i
break
if found is None:
raise RuntimeError(f'Constraint not found: {constraint} in {self.scope_constraints[scope_id]}')
scope_constraints = self.scope_constraints.copy()
scope_constraint_evaluations = self.scope_constraint_evaluations.copy()
scope_constraints[scope_id] = scope_constraints[scope_id][:found] + scope_constraints[scope_id][found + 1:]
scope_constraint_evaluations[scope_id] = scope_constraint_evaluations[scope_id][:found] + scope_constraint_evaluations[scope_id][found + 1:]
return self.clone(scope_constraints=scope_constraints, scope_constraint_evaluations=scope_constraint_evaluations)
[docs]
def iter_not_satisfied_constraints(self) -> Iterator[Tuple[int, int, ValueOutputExpression]]:
for scope_id, constraints in self.scope_constraints.items():
constraint_evaluations = self.scope_constraint_evaluations[scope_id]
for constraint_id, constraint in enumerate(constraints):
if len(constraint_evaluations) <= constraint_id or not constraint_evaluations[constraint_id]:
yield scope_id, constraint_id, constraint
[docs]
def all_scope_constraints(self) -> List[str]:
from concepts.dm.crow.planners.regression_utils import replace_variable_with_value
all_constraints = list()
for scope_id, constraints in self.scope_constraints.items():
for constraint in constraints:
all_constraints.append(str(replace_variable_with_value(constraint, self.scopes[scope_id])) + f'@{scope_id}')
return all_constraints
[docs]
class CrowRegressionPlanner(object):
[docs]
def __init__(
self, executor: CrowExecutor, state: CrowState, goal_expr: Union[str, ValueOutputExpression, None], *,
simulation_interface: Optional[CrowSimulationControllerInterface] = None,
# Group 1: goal serialization and refinements.
is_goal_ordered: bool = True,
is_goal_serializable: bool = True,
is_goal_refinement_compressible: bool = True,
# Group 2: CSP solver.
enable_csp: bool = True,
max_csp_trials: int = 1,
max_global_csp_trials: int = 100,
max_csp_branching_factor: int = 5,
use_generator_manager: bool = True,
store_generator_manager_history: bool = False,
# Group 3: output format.
include_effect_appliers: bool = False,
include_dependency_trace: bool = False,
verbose: bool = False,
**kwargs
):
"""Initialize the planner.
Args:
executor: the executor.
state: the initial state.
goal_expr: the goal expression.
simulation_interface: the simulation interface.
enable_reordering: whether to enable reordering.
is_goal_ordered: whether the goal is ordered.
is_goal_serializable: whether the goal is serializable.
is_goal_refinement_compressible: whether the goal refinement is compressible.
enable_csp: whether to enable the CSP solver.
max_csp_trials: the maximum number of CSP trials.
max_global_csp_trials: the maximum number of global CSP trials.
max_csp_branching_factor: the maximum CSP branching factor.
use_generator_manager: whether to use the generator manager.
store_generator_manager_history: whether to store the generator manager history.
include_effect_appliers: whether to include the effect appliers in the search result.
The effect appliers are of type :class:`~concepts.dm.crow.behavior.CrowEffectApplier`.
include_dependency_trace: whether to include the dependency graph in the search result.
verbose: whether to output verbose information.
"""
self.executor = executor
self.state = state
self.goal_expr = goal_expr
if isinstance(self.goal_expr, str):
self.goal_expr = executor.parse(self.goal_expr, state=state)
self.simulation_interface = simulation_interface
self.is_goal_ordered = is_goal_ordered
self.is_goal_serializable = is_goal_serializable
self.is_goal_refinement_compressible = is_goal_refinement_compressible
self.enable_csp = enable_csp
self.max_csp_trials = max_csp_trials
self.max_global_csp_trials = max_global_csp_trials
self.max_csp_branching_factor = max_csp_branching_factor
self.use_generator_manager = use_generator_manager
self.store_generator_manager_history = store_generator_manager_history
self.include_effect_appliers = include_effect_appliers
self.include_dependency_trace = include_dependency_trace
self.verbose = verbose
self.generator_manager = None
if self.use_generator_manager:
self.generator_manager = CrowGeneratorExecutor(executor, store_history=self.store_generator_manager_history)
self._search_cache = dict()
self._search_stat = {'nr_expanded_nodes': 0}
self._results = list()
self._post_init(**kwargs)
@property
def domain(self) -> CrowDomain:
return self.executor.domain
executor: CrowExecutor
"""The executor."""
state: CrowState
"""The initial state."""
goal_expr: Optional[ValueOutputExpression]
"""The goal expression. If this is None, the goal will be extracted from the behavior named '__goal__'."""
simulation_interface: Optional[CrowSimulationControllerInterface]
"""The simulation interface."""
is_goal_ordered: bool
"""Whether the goal is ordered. This only takes effect when the goal is a set of AND-connected expressions."""
is_goal_serializable: bool
"""Whether the goal is serializable. This only takes effect when the goal is a set of AND-connected expressions."""
is_goal_refinement_compressible: bool
"""Whether the refinement for each component of the goal is compressible. This only takes effect when the goal is a set of AND-connected expressions."""
enable_csp: bool
"""Whether to enable the CSP solver."""
max_csp_trials: int
"""The maximum number of CSP trials (for solving a single CSP)."""
max_global_csp_trials: int
"""The maximum number of global CSP trials."""
max_csp_branching_factor: int
"""The maximum CSP branching factor."""
use_generator_manager: bool
"""Whether to use the generator manager."""
store_generator_manager_history: bool
"""Whether to store the generator manager history."""
include_effect_appliers: bool
"""Whether to include the effect appliers in the search result."""
include_dependency_trace: bool
"""Whether to include the dependency graph in the search result."""
verbose: bool
"""Whether to output verbose information."""
def _post_init(self, **kwargs) -> None:
pass
def _make_goal_program(self) -> Tuple[CrowBehaviorOrderingSuite, Optional[ValueOutputExpression]]:
if self.domain.has_behavior('__goal__'):
return self.domain.get_behavior('__goal__').body, self.domain.get_behavior('__goal__').minimize
if is_and_expr(self.goal_expr):
if len(self.goal_expr.arguments) == 1 and self.goal_expr.arguments[0].return_type.is_list_type:
goal_set = [self.goal_expr]
else:
goal_set = list(self.goal_expr.arguments)
else:
goal_set = [self.goal_expr]
goal_set = [CrowAchieveExpression(x) for x in goal_set]
assert_expr = CrowAssertExpression(self.goal_expr)
if self.is_goal_ordered:
goal_set = CrowBehaviorOrderingSuite.make_sequential(goal_set, variable_scope_identifier=0)
else:
goal_set = CrowBehaviorOrderingSuite.make_unordered(goal_set, variable_scope_identifier=0)
if self.is_goal_serializable:
program = CrowBehaviorOrderingSuite.make_sequential(
goal_set,
assert_expr,
CrowBehaviorCommit(csp=True, sketch=True, behavior=True),
variable_scope_identifier=0
)
else:
program = CrowBehaviorOrderingSuite.make_sequential(
CrowBehaviorOrderingSuite.make_promotable(goal_set),
assert_expr,
CrowBehaviorCommit(csp=True, sketch=True, behavior=True),
variable_scope_identifier=0
)
return program, None
@property
def search_stat(self) -> dict:
return self._search_stat
@property
def results(self) -> List[CrowPlanningResult]:
return self._results
[docs]
def set_results(self, results: List[CrowPlanningResult]) -> None:
self._results = results
[docs]
def main(self) -> Tuple[List[Tuple[CrowControllerApplier, ...]], dict]:
program, minimize = self._make_goal_program()
behavior_application = CrowBehaviorApplicationExpression(CrowBehavior('__goal__', [], None, program), [])
program = CrowBehaviorOrderingSuite.make_sequential(behavior_application, variable_scope_identifier=0)
candidate_plans = self.main_entry(program, minimize)
if self.generator_manager is not None:
for k, v in self.generator_manager.generator_calls_count.items():
self._search_stat[f'gen_call/{k}'] = v
return candidate_plans, self._search_stat
[docs]
def evaluate(
self, expression: Union[ObjectOrValueOutputExpression, VariableAssignmentExpression], state: CrowState, csp: Optional[ConstraintSatisfactionProblem] = None,
bounded_variables: Optional[Dict[str, Union[TensorValue, ObjectConstant]]] = None,
clone_csp: bool = True,
force_tensor_value: bool = False
) -> Tuple[Union[None, StateObjectReference, StateObjectList, TensorValue, OptimisticValue], Optional[ConstraintSatisfactionProblem]]:
"""Evaluate an expression and return the result.
Args:
expression: the expression to evaluate.
state: the current state.
csp: the current CSP.
bounded_variables: the bounded variables.
clone_csp: whether to clone the CSP.
force_tensor_value: whether to force the result to be a tensor value.
Returns:
the evaluation result and the updated CSP.
"""
if clone_csp:
csp = csp.clone() if csp is not None else None
if bounded_variables is not None:
bounded_variables = {k: v for k, v in bounded_variables.items() if not (k.startswith('__') and k.endswith('__'))}
if isinstance(expression, VariableAssignmentExpression):
self.executor.execute(expression, state=state, csp=csp, bounded_variables=bounded_variables)
return None, csp
rv = self.executor.execute(expression, state=state, csp=csp, bounded_variables=bounded_variables)
if isinstance(rv, TensorValue):
if force_tensor_value:
return rv, csp
if rv.is_scalar:
return rv.item(), csp
return rv, csp
if isinstance(rv, ListValue) and len(rv.values) > 0:
if isinstance(rv.values[0], StateObjectReference):
rv = StateObjectList(rv.dtype, rv.values)
assert isinstance(rv, StateObjectReference) or isinstance(rv, StateObjectList) or isinstance(rv, ListValue)
return rv, csp
[docs]
def main_entry(self, state: CrowBehaviorOrderingSuite, minimize: Optional[ValueOutputExpression]) -> List[Tuple[CrowControllerApplier, ...]]:
raise NotImplementedError()
g_crow_regression_algorithm = None
[docs]
def get_crow_regression_algorithm_mappings() -> Type[CrowRegressionPlanner]:
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_dfs_v1 import CrowRegressionPlannerDFSv1
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_bfs_v1 import CrowRegressionPlannerBFSv1
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_dfs_v2 import CrowRegressionPlannerDFSv2
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_astar_v1 import CrowRegressionPlannerAStarv1
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_iddfs_v1 import CrowRegressionPlannerIDDFSv1
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_priority_tree_v1 import CrowRegressionPlannerPriorityTreev1
return {
'dfs_v1': CrowRegressionPlannerDFSv1,
'bfs_v1': CrowRegressionPlannerBFSv1,
'dfs_v2': CrowRegressionPlannerDFSv2,
'astar_v1': CrowRegressionPlannerAStarv1,
'iddfs_v1': CrowRegressionPlannerIDDFSv1,
'priority_tree_v1': CrowRegressionPlannerPriorityTreev1
}
[docs]
def get_crow_regression_algorithm(algorithm: Optional[str]) -> Type[CrowRegressionPlanner]:
mappings = get_crow_regression_algorithm_mappings()
if algorithm is None:
global g_crow_regression_algorithm
if g_crow_regression_algorithm is None:
g_crow_regression_algorithm = os.environ.get('CROW_REGRESSION_ALGO', 'iddfs_v1')
print('Using default regression algorithm: {}'.format(g_crow_regression_algorithm))
algorithm = g_crow_regression_algorithm
if algorithm in mappings:
return mappings[algorithm]
raise ValueError(f'Unknown regression algorithm: {algorithm}. Available algorithms: {list(mappings.keys())}')
[docs]
def set_crow_regression_algorithm(algorithm: str) -> None:
if algorithm not in get_crow_regression_algorithm_mappings():
raise ValueError(f'Unknown regression algorithm: {algorithm}. Available algorithms: {list(get_crow_regression_algorithm_mappings().keys())}')
global g_crow_regression_algorithm
g_crow_regression_algorithm = algorithm
[docs]
def crow_regression(
domain_or_executor_or_problem: Union[CrowExecutor, CrowDomain, CrowProblem], problem: Optional[CrowProblem] = None,
goal: Optional[Union[str, ValueOutputExpression]] = None,
return_planner: bool = False, return_results: bool = False,
**kwargs
) -> Union[Tuple[list, dict], CrowRegressionPlanner, List[CrowPlanningResult]]:
if isinstance(domain_or_executor_or_problem, CrowExecutor):
executor = domain_or_executor_or_problem
elif isinstance(domain_or_executor_or_problem, CrowDomain):
executor = domain_or_executor_or_problem.make_executor()
elif isinstance(domain_or_executor_or_problem, CrowProblem):
problem = domain_or_executor_or_problem
executor = problem.domain.make_executor()
else:
raise ValueError(f'Unknown domain or executor: {domain_or_executor_or_problem}')
merged_kwargs = problem.planner_options.copy()
merged_kwargs.update(kwargs)
kwargs = merged_kwargs
algo = get_crow_regression_algorithm(kwargs.pop('algo', None))
planner = algo(executor, problem.get_state_or_init(), goal if goal is not None else problem.goal, **kwargs)
# planner.set_human_control_interface(True)
if return_planner:
return planner
if return_results:
planner.main()
return planner.results
return planner.main()