#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : behavior_utils.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 03/17/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
import itertools
from typing import Any, Optional, Union, Iterator, Sequence, Tuple, List, Dict, NamedTuple
from jacinle.utils.printing import indent_text
from concepts.dsl.dsl_types import AutoType, QINDEX, Variable, ObjectConstant
from concepts.dsl.value import ListValue
from concepts.dsl.constraint import ConstraintSatisfactionProblem, OptimisticValue
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.tensor_state import StateObjectReference, StateObjectList
from concepts.dsl.expression import Expression, ExpressionDefinitionContext, ValueOutputExpression, ObjectOrValueOutputExpression, FunctionApplicationExpression, VariableExpression, VariableAssignmentExpression, ConstantExpression, ObjectConstantExpression, is_null_expression
from concepts.dsl.expression import AssignExpression, BoolExpression, BoolOpType
from concepts.dsl.expression_utils import surface_fol_downcast, find_free_variables
from concepts.dm.crow.controller import CrowController, CrowControllerApplicationExpression
from concepts.dm.crow.crow_expression_utils import crow_replace_expression_variables
from concepts.dm.crow.behavior import CrowBehavior, CrowAchieveExpression, CrowUntrackExpression, CrowBindExpression, CrowAssertExpression
from concepts.dm.crow.behavior import CrowRuntimeAssignmentExpression, CrowFeatureAssignmentExpression, CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression
from concepts.dm.crow.behavior import CrowBehaviorOrderingSuite, CrowBehaviorCommit
from concepts.dm.crow.behavior import CrowBehaviorForeachLoopSuite, CrowBehaviorWhileLoopSuite, CrowBehaviorConditionSuite, CrowEffectApplier
from concepts.dm.crow.crow_domain import CrowDomain, CrowState
from concepts.dm.crow.executors.crow_executor import CrowExecutor
__all__ = [
'ApplicableBehaviorItem', 'match_applicable_behaviors', 'match_policy_applicable_behaviors',
'crow_replace_expression_variables_ext', 'format_behavior_statement', 'format_behavior_program',
'execute_effect_statements', 'execute_behavior_effect_body', 'execute_effect_applier',
'execute_object_bind', 'execute_additive_heuristic_program'
]
[docs]
class ApplicableBehaviorItem(NamedTuple):
"""A behavior item that is applicable to the given goal."""
behavior: CrowBehavior
"""The behavior that is applicable."""
bounded_variables: Union[Dict[str, Union[Variable, ObjectConstant, TensorValue]], Dict[str, ObjectOrValueOutputExpression]]
"""The bounded variables that are used to instantiate the behavior. If deferred_execution is True, the values are ObjectOrValueOutputExpression, otherwise Variable or ObjectConstant."""
defered_execution: bool = False
"""Whether the behavior should be executed in deferred mode."""
[docs]
def match_applicable_behaviors(
domain: CrowDomain, state: CrowState, goal: ValueOutputExpression, goal_scope: Dict[str, Any],
return_all_candidates: bool = True
) -> List[ApplicableBehaviorItem]:
candidate_behaviors = list()
for behavior in domain.behaviors.values():
goal_expr = behavior.goal
if is_null_expression(goal_expr):
continue
if (variable_binding := surface_fol_downcast(goal_expr, goal)) is None:
continue
# TODO(Jiayuan Mao @ 2024/07/18): think about if this "promoted" variable binding is correct.
# My current intuition is that, if a variable got changed to a different value later, this procedure will break...
for variable, value in variable_binding.items():
if isinstance(value, Variable):
# assert value.name in goal_scope, f"Variable {value.name} not found in goal scope."
if value.name in goal_scope:
variable_binding[variable] = goal_scope[value.name]
candidate_behaviors.append(ApplicableBehaviorItem(behavior, variable_binding))
# TODO: implement return_all_candidates
return candidate_behaviors
[docs]
def match_policy_applicable_behaviors(
domain: CrowDomain, state: CrowState, goal: ValueOutputExpression, goal_scope: Dict[str, Any],
return_all_candidates: bool = True,
pachieve_kwargs: Dict[str, Any] = None,
) -> List[ApplicableBehaviorItem]:
candidate_behaviors = match_applicable_behaviors(domain, state, goal, goal_scope, return_all_candidates)
if len(candidate_behaviors) > 0:
return candidate_behaviors
free_variables = find_free_variables(goal)
bounded_variables = {var.name: var for var in free_variables}
if isinstance(goal, BoolExpression) and goal.bool_op is BoolOpType.AND:
subgoals = [CrowAchieveExpression(subgoal, once=False, is_policy_achieve=True, **pachieve_kwargs) for subgoal in goal.arguments]
if pachieve_kwargs['ordered']:
goal_set = CrowBehaviorOrderingSuite.make_sequential(subgoals)
else:
goal_set = CrowBehaviorOrderingSuite.make_unordered(subgoals)
if pachieve_kwargs['serializable']:
program = CrowBehaviorOrderingSuite.make_sequential(goal_set)
else:
program = CrowBehaviorOrderingSuite.make_sequential(CrowBehaviorOrderingSuite.make_promotable(goal_set), _skip_simplify=True)
return [ApplicableBehaviorItem(CrowBehavior('__pachive__', free_variables, None, program), bounded_variables)]
elif isinstance(goal, BoolExpression) and goal.bool_op is BoolOpType.OR:
subgoals = [CrowAchieveExpression(subgoal, once=False, is_policy_achieve=True, **pachieve_kwargs) for subgoal in goal.arguments]
if pachieve_kwargs['serializable']:
subgoals = [CrowBehaviorOrderingSuite.make_sequential(subgoal) for subgoal in subgoals]
else:
subgoals = [CrowBehaviorOrderingSuite.make_sequential(CrowBehaviorOrderingSuite.make_promotable(subgoal), _skip_simplify=True) for subgoal in subgoals]
return [ApplicableBehaviorItem(CrowBehavior('__pachive__', free_variables, None, program), bounded_variables) for program in subgoals]
else:
raise ValueError(f"Goal type {goal} is not supported.")
[docs]
def crow_replace_expression_variables_ext(
expr: Union[Expression, CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowFeatureAssignmentExpression, CrowControllerApplicationExpression, CrowAchieveExpression, CrowUntrackExpression, CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression],
mappings: Optional[Dict[Union[FunctionApplicationExpression, VariableExpression], Union[Variable, ValueOutputExpression]]] = None,
ctx: Optional[ExpressionDefinitionContext] = None,
) -> Union[ObjectOrValueOutputExpression, VariableAssignmentExpression, CrowBindExpression, CrowAssertExpression, CrowRuntimeAssignmentExpression, CrowFeatureAssignmentExpression, CrowControllerApplicationExpression, CrowAchieveExpression, CrowUntrackExpression, CrowBehaviorApplicationExpression, CrowBehaviorEffectApplicationExpression]:
if isinstance(expr, Expression):
return crow_replace_expression_variables(expr, mappings, ctx)
if isinstance(expr, CrowBindExpression):
return CrowBindExpression(expr.variables, crow_replace_expression_variables(expr.goal, mappings, ctx))
if isinstance(expr, CrowAssertExpression):
return CrowAssertExpression(crow_replace_expression_variables(expr.bool_expr, mappings, ctx), once=expr.once)
if isinstance(expr, CrowRuntimeAssignmentExpression):
return CrowRuntimeAssignmentExpression(expr.variable, crow_replace_expression_variables(expr.value, mappings, ctx))
if isinstance(expr, CrowFeatureAssignmentExpression):
return CrowFeatureAssignmentExpression(crow_replace_expression_variables(expr.feature, mappings, ctx), crow_replace_expression_variables(expr.value, mappings, ctx))
if isinstance(expr, CrowControllerApplicationExpression):
return CrowControllerApplicationExpression(expr.controller, [crow_replace_expression_variables(arg, mappings, ctx) for arg in expr.arguments])
if isinstance(expr, CrowAchieveExpression):
return CrowAchieveExpression(crow_replace_expression_variables(expr.goal, mappings, ctx), once=expr.once)
if isinstance(expr, CrowUntrackExpression):
return CrowUntrackExpression(crow_replace_expression_variables(expr.goal, mappings, ctx))
if isinstance(expr, CrowBehaviorApplicationExpression):
return CrowBehaviorApplicationExpression(expr.behavior, [crow_replace_expression_variables(arg, mappings, ctx) for arg in expr.arguments])
if isinstance(expr, CrowBehaviorEffectApplicationExpression):
return CrowBehaviorEffectApplicationExpression(expr.behavior, [crow_replace_expression_variables(arg, mappings, ctx) for arg in expr.arguments])
raise ValueError(f'Invalid expression ({type(expr)}): {expr}')
[docs]
def execute_effect_statements(
executor: CrowExecutor, statements: Sequence[Union[CrowFeatureAssignmentExpression, CrowBehaviorForeachLoopSuite, CrowBehaviorConditionSuite]],
state: CrowState, csp: Optional[ConstraintSatisfactionProblem] = None,
scope: Optional[dict] = None, action_index: Optional[int] = None
):
if scope is None:
scope = dict()
else:
scope = {k: v for k, v in scope.items() if not (k.startswith('__') and k.endswith('__'))}
for stmt in statements:
if isinstance(stmt, CrowFeatureAssignmentExpression):
with executor.update_effect_mode(stmt.evaluation_mode, action_index=action_index):
executor.execute(AssignExpression(stmt.feature, stmt.value), state=state, csp=csp, bounded_variables=scope)
elif isinstance(stmt, CrowBehaviorForeachLoopSuite) and stmt.is_foreach_in_expression:
var = stmt.variable
values = executor.execute(stmt.loop_in_expression, state=state, csp=csp, bounded_variables=scope).values
for value in values:
new_scope = scope.copy()
new_scope[var.name] = value
execute_effect_statements(executor, stmt.statements, state, csp=csp, scope=new_scope)
elif isinstance(stmt, CrowBehaviorForeachLoopSuite):
var = stmt.variable
new_scope = scope.copy()
for i, name in enumerate(state.object_type2name[var.typename]):
new_scope[var.name] = ObjectConstant(name, var.dtype)
execute_effect_statements(executor, stmt.statements, state, csp=csp, scope=new_scope)
elif isinstance(stmt, CrowBehaviorConditionSuite):
rv = executor.execute(stmt.condition, state=state, bounded_variables=scope).item()
if isinstance(rv, OptimisticValue):
raise NotImplementedError('OptimisticValue is not supported in the current implementation.')
if bool(rv):
execute_effect_statements(executor, stmt.statements, state, csp=csp, scope=scope)
else:
if stmt.else_statements is not None:
execute_effect_statements(executor, stmt.else_statements, state, csp=csp, scope=scope)
else:
raise ValueError(f'Unsupported statement type: {type(stmt)}')
[docs]
def execute_behavior_effect_body(executor: CrowExecutor, behavior: Union[CrowBehavior, CrowController], state: CrowState, scope: dict, csp: Optional[ConstraintSatisfactionProblem] = None, action_index: Optional[int] = None) -> CrowState:
"""Execute a behavior effect with for-loops and conditional statements."""
new_state = state.clone()
execute_effect_statements(executor, behavior.effect_body.statements, new_state, csp=csp, scope=scope, action_index=action_index)
return new_state
[docs]
def execute_effect_applier(executor: CrowExecutor, applier: CrowEffectApplier, state: CrowState, csp: Optional[ConstraintSatisfactionProblem] = None, action_index: Optional[int] = None) -> CrowState:
return execute_effect_statements(executor, applier.statements, state, csp=csp, scope=applier.bounded_variables, action_index=action_index)
[docs]
def execute_object_bind(executor: CrowExecutor, stmt: CrowBindExpression, state: CrowState, scope: dict) -> Iterator[Dict[str, Union[ObjectConstant, Variable]]]:
"""Execute a bind statement and yield the possible scopes.
Args:
executor: the executor to execute the statement.
stmt: the bind statement.
state: the current state.
scope: the current scope.
Returns:
an iterator of possible scopes with the bind statement applied.
"""
if stmt.goal.is_null_expression:
rv = TensorValue.TRUE
else:
eval_scope = {k: v for k, v in scope.items() if not (k.startswith('__') and k.endswith('__'))}
for var in stmt.variables:
eval_scope[var] = QINDEX
rv = executor.execute(stmt.goal, state=state, bounded_variables=eval_scope)
typeonly_indices_variables = list()
typeonly_indices_values = list()
for v in stmt.variables:
if v.name not in rv.batch_variables:
typeonly_indices_variables.append(v.name)
typeonly_indices_values.append(range(len(state.object_type2name[v.dtype.typename])))
for indices in rv.tensor.nonzero():
for typeonly_indices in itertools.product(*typeonly_indices_values):
new_scope_variables = scope.copy()
for var in stmt.variables:
if var.name in rv.batch_variables:
new_scope_variables[var.name] = ObjectConstant(state.object_type2name[var.dtype.typename][indices[rv.batch_variables.index(var.name)]], var.dtype)
else:
new_scope_variables[var.name] = ObjectConstant(state.object_type2name[var.dtype.typename][typeonly_indices[typeonly_indices_variables.index(var.name)]], var.dtype)
yield new_scope_variables
[docs]
class CrowAdditiveHeuristicProgramExecutor(object):
[docs]
def __init__(self, executor: CrowExecutor, state: CrowState, minimize: Optional[ValueOutputExpression] = None, is_unit_cost: bool = False):
self.executor = executor
self.state = state.clone()
self.minimize = minimize
self.initial_cost = self.get_cost()
self.is_unit_cost = is_unit_cost
if not self.is_unit_cost:
assert self.minimize is not None, 'minimize should be provided when is_unit_cost is False.'
[docs]
def get_cost(self) -> float:
if self.minimize is None:
return 0
return self.executor.execute(self.minimize, state=self.state).item()
[docs]
def run(self, stmt: Union[CrowAchieveExpression, CrowBehaviorApplicationExpression, CrowControllerApplicationExpression], scope: dict) -> float:
scope = {k: v for k, v in scope.items() if not (k.startswith('__') and k.endswith('__'))}
if isinstance(stmt, CrowControllerApplicationExpression):
if self.minimize is None:
return 1.0
if stmt.controller.effect_body is None:
return 0.0
current_cost = self.get_cost()
bounded_variables = {k.name: self.evaluate(v, state=self.state, csp=None, bounded_variables=scope, clone_csp=False, force_tensor_value=True)[0] for k, v in zip(stmt.controller.arguments, stmt.arguments)}
if any(isinstance(v, Variable) for v in bounded_variables.values()):
return 0.0
execute_effect_statements(self.executor, stmt.controller.effect_body.statements, self.state, scope=bounded_variables)
return self.get_cost() - current_cost
if isinstance(stmt, CrowAchieveExpression):
# Case 1: The variable is not fully grounded.
for var in find_free_variables(stmt.goal):
if var.name not in scope:
return 0.0
# Case 2: The goal is already achieved.
rv, _ = self.evaluate(stmt.goal, self.state, csp=None, bounded_variables=scope, clone_csp=False, force_tensor_value=False)
if isinstance(rv, OptimisticValue) or bool(rv):
return 0.0
# Case 3: Now we want to find a way to achieve the goal.
if isinstance(stmt, CrowAchieveExpression):
behavior_matches = match_applicable_behaviors(self.executor.domain, self.state, stmt.goal, scope)
elif isinstance(stmt, CrowBehaviorApplicationExpression):
# TODO(Jiayuan Mao @ 2024/08/11): handle CSP correctly.
behavior_matches = [ApplicableBehaviorItem(stmt.behavior, {
k.name: self.evaluate(v, state=self.state, csp=None, bounded_variables=scope, clone_csp=False, force_tensor_value=True)[0]
for k, v in zip(stmt.behavior.arguments, stmt.arguments)
})]
else:
raise ValueError(f'Unsupported statement type: {type(stmt)}')
if len(behavior_matches) == 0:
return 0.0
need_backtrack = (len(behavior_matches) > 1)
best_behavior_cost = None
best_behavior_cost_state = None
current_cost = self.get_cost()
state_backup = None
for behavior_match in behavior_matches:
if need_backtrack:
state_backup = self.state.clone()
# Check if the behavior can be grounded.
skip_evaluation = False
if any(isinstance(v, Variable) for v in behavior_match.bounded_variables.values()):
skip_evaluation = True
if skip_evaluation:
best_behavior_cost = 0.0
continue
# Not really grounded.
if behavior_match.behavior.heuristic is None:
# If there is a behavior that can achieve the goal but it does not have an associated heuristic value, we do a "auto" evaluation.
if self.minimize is not None:
for stmt in behavior_match.behavior.body.statements:
if isinstance(stmt, CrowAchieveExpression):
self.run(stmt, behavior_match.bounded_variables)
elif isinstance(stmt, CrowBehaviorApplicationExpression):
self.run(stmt, behavior_match.bounded_variables)
elif isinstance(stmt, CrowControllerApplicationExpression):
self.run(stmt, behavior_match.bounded_variables)
behavior_cost = self.get_cost() - current_cost
else:
behavior_cost = 0
for stmt in behavior_match.behavior.body.statements:
if isinstance(stmt, CrowAchieveExpression):
behavior_cost += self.run(stmt, behavior_match.bounded_variables)
elif isinstance(stmt, CrowBehaviorApplicationExpression):
behavior_cost += self.run(stmt, behavior_match.bounded_variables)
elif isinstance(stmt, CrowControllerApplicationExpression):
behavior_cost += 1
else:
# Run the heuristic program.
for stmt in behavior_match.behavior.heuristic.statements:
if isinstance(stmt, CrowAchieveExpression):
self.run(stmt, behavior_match.bounded_variables)
elif isinstance(stmt, CrowBehaviorApplicationExpression):
self.run(stmt, behavior_match.bounded_variables)
elif isinstance(stmt, CrowFeatureAssignmentExpression):
execute_effect_statements(self.executor, [stmt], self.state, scope=behavior_match.bounded_variables)
else:
raise ValueError(f'Unsupported statement type: {type(stmt)}')
behavior_cost = self.get_cost() - current_cost
if best_behavior_cost is None or behavior_cost < best_behavior_cost:
best_behavior_cost = behavior_cost
if need_backtrack:
best_behavior_cost_state = self.state.clone()
if need_backtrack:
self.state = state_backup
if need_backtrack:
self.state = best_behavior_cost_state
return best_behavior_cost
[docs]
def evaluate(
self, expression: Union[ObjectOrValueOutputExpression, VariableAssignmentExpression], state: CrowState, csp: Optional[ConstraintSatisfactionProblem] = None,
bounded_variables: Optional[Dict[str, Union[TensorValue, ObjectConstant]]] = None,
clone_csp: bool = True,
force_tensor_value: bool = False
) -> Tuple[Union[None, StateObjectReference, StateObjectList, TensorValue, OptimisticValue], Optional[ConstraintSatisfactionProblem]]:
"""Evaluate an expression and return the result.
Args:
expression: the expression to evaluate.
state: the current state.
csp: the current CSP.
bounded_variables: the bounded variables.
clone_csp: whether to clone the CSP.
force_tensor_value: whether to force the result to be a tensor value.
Returns:
the evaluation result and the updated CSP.
"""
if clone_csp:
csp = csp.clone() if csp is not None else None
if bounded_variables is not None:
bounded_variables = {k: v for k, v in bounded_variables.items() if not (k.startswith('__') and k.endswith('__'))}
if isinstance(expression, VariableAssignmentExpression):
self.executor.execute(expression, state=state, csp=csp, bounded_variables=bounded_variables)
return None, csp
rv = self.executor.execute(expression, state=state, csp=csp, bounded_variables=bounded_variables)
if isinstance(rv, TensorValue):
if force_tensor_value:
return rv, csp
if rv.is_scalar:
return rv.item(), csp
return rv, csp
if isinstance(rv, ListValue) and len(rv.values) > 0:
if isinstance(rv.values[0], StateObjectReference):
rv = StateObjectList(rv.dtype, rv.values)
assert isinstance(rv, StateObjectReference) or isinstance(rv, StateObjectList) or isinstance(rv, ListValue)
return rv, csp
[docs]
def execute_additive_heuristic_program(
executor: CrowExecutor, stmt: Union[CrowAchieveExpression, CrowBehaviorApplicationExpression, CrowControllerApplicationExpression], state: CrowState, scope: dict, minimize: Optional[ValueOutputExpression] = None,
is_unit_cost: bool = False
) -> float:
program_executor = CrowAdditiveHeuristicProgramExecutor(executor, state, minimize=minimize, is_unit_cost=is_unit_cost)
return program_executor.run(stmt, scope)