#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : crow_regression_planner_v1.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 03/21/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
import queue
import jacinle
from typing import Optional, Union, Iterator, Sequence, Tuple, List, Dict, NamedTuple
from concepts.dsl.constraint import ConstraintSatisfactionProblem
from concepts.dsl.dsl_types import Variable
from concepts.dsl.tensor_state import StateObjectReference
from concepts.dm.crow.controller import CrowControllerApplier, CrowControllerApplicationExpression
from concepts.dm.crow.behavior import CrowBehavior, CrowAchieveExpression, CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression
from concepts.dm.crow.behavior import CrowBehaviorStatementOrdering, CrowBehaviorOrderingSuite, CrowBehaviorApplicationExpression
from concepts.dm.crow.behavior_utils import match_applicable_behaviors
from concepts.dm.crow.executors.crow_executor import CrowExecutor
from concepts.dm.crow.planners.regression_planning import ScopedCrowExpression, CrowPlanningResult, CrowRegressionPlanner
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_utils import canonize_bounded_variables, split_simple_sequential, execute_object_bind, execute_behavior_effect_batch, unique_results, execute_behavior_effect
__all__ = ['CrowPlanningState', 'CrowRegressionPlannerBFSv1', 'CrowRegressionPlannerDFSv1']
[docs]
class CrowPlanningState(NamedTuple):
program: Optional[CrowBehaviorOrderingSuite]
scopes: Dict[int, dict]
latest_scope: int
right_statements: List[ScopedCrowExpression]
depth: int = 0
def __lt__(self, other):
return len(self.right_statements) < len(other.right_statements)
LOG_BFS_GRAPH = False
[docs]
class CrowRegressionPlannerBFSv1(CrowRegressionPlanner):
[docs]
def main_entry(self, program: CrowBehaviorOrderingSuite) -> List[Tuple[CrowControllerApplier, ...]]:
state = CrowPlanningState(program, scopes={0: dict()}, right_statements=[], latest_scope=0)
candidate_plans = self.bfs(state)
candidate_plans = [result.controller_actions for result in candidate_plans]
return candidate_plans
[docs]
def bfs(self, state: CrowPlanningState) -> Sequence[CrowPlanningResult]:
q = queue.PriorityQueue()
q.put(state)
graph = None
if LOG_BFS_GRAPH:
graph = dict(nodes=dict(), edges=list())
graph['nodes'][id(state)] = state
while not q.empty():
self._search_stat['nr_expanded_nodes'] += 1
if self._search_stat['nr_expanded_nodes'] > 1000000:
import ipdb; ipdb.set_trace()
raise RuntimeError('Too many nodes expanded.')
if self._search_stat['nr_expanded_nodes'] % 1000 == 0:
jacinle.log_function.print('Expanded nodes:', self._search_stat['nr_expanded_nodes'])
current_state = q.get()
# print('Current state:', current_state.right_statements)
for left, stmt, scope_id in current_state.program.pop_right_statement():
for i, new_state in enumerate(self._bfs_expand(current_state, left, stmt, scope_id)):
if LOG_BFS_GRAPH:
graph['nodes'][id(new_state)] = new_state
graph['edges'].append((id(current_state), id(new_state), f'{stmt}@{i}'))
if new_state.program is None:
if (result := self._bfs_verify(new_state)) is not None:
return [result]
else:
# jacinle.log_function.print(' New state:', new_state.right_statements)
q.put(new_state)
return list()
def _bfs_expand(self, state: CrowPlanningState, left: CrowBehaviorOrderingSuite, stmt: Union[CrowAchieveExpression, CrowControllerApplicationExpression], scope_id: int) -> Iterator[CrowPlanningState]:
if isinstance(stmt, CrowAchieveExpression):
if left is None:
yield CrowPlanningState(None, state.scopes, state.latest_scope, [ScopedCrowExpression(CrowAssertExpression(stmt.goal), scope_id)] + state.right_statements, state.depth + 1)
else:
yield CrowPlanningState(left, state.scopes, state.latest_scope, [ScopedCrowExpression(CrowAssertExpression(stmt.goal), scope_id)] + state.right_statements, state.depth + 1)
all_matching = match_applicable_behaviors(self.executor.domain, self.state, stmt.goal, state.scopes[scope_id])
all_matching = list(all_matching)
for behavior_matching in all_matching:
# print('Behavior matching:', behavior_matching.behavior.name, behavior_matching.bounded_variables)
# if 'hand' in behavior_matching.behavior.name:
# import ipdb; ipdb.set_trace()
# pass
bounded_variables = behavior_matching.bounded_variables
for var, value in bounded_variables.items():
if isinstance(value, Variable):
bounded_variables[var] = value.clone_with_scope(scope_id)
new_scope_id = state.latest_scope + 1
new_scopes = state.scopes.copy()
new_scopes[new_scope_id] = bounded_variables.copy()
program = behavior_matching.behavior.assign_body_program_scope(new_scope_id)
complex_part, simple_part = split_simple_sequential(program.get_flatten_body(), new_scope_id)
if left is None:
if len(complex_part) == 0:
new_program = None
else:
new_program = CrowBehaviorOrderingSuite(CrowBehaviorStatementOrdering.SEQUENTIAL, complex_part, variable_scope_identifier=new_scope_id)
else:
new_program = CrowBehaviorOrderingSuite(CrowBehaviorStatementOrdering.SEQUENTIAL, (left,) + program.get_flatten_body(), variable_scope_identifier=new_scope_id)
yield CrowPlanningState(new_program, new_scopes, new_scope_id, simple_part + [ScopedCrowExpression(behavior_matching.behavior, new_scope_id)] + state.right_statements, state.depth + 1)
if self.enable_reordering:
promotable, sequential_body = program.split_promotable()
complex_seq, simple_seq = split_simple_sequential(sequential_body, new_scope_id)
if left is not None and promotable is not None:
new_program = CrowBehaviorOrderingSuite(CrowBehaviorStatementOrdering.SEQUENTIAL, (
CrowBehaviorOrderingSuite(CrowBehaviorStatementOrdering.UNORDERED, (
left,
CrowBehaviorOrderingSuite(CrowBehaviorStatementOrdering.SEQUENTIAL, promotable, variable_scope_identifier=new_scope_id)
), variable_scope_identifier=new_scope_id),
*complex_seq
), variable_scope_identifier=new_scope_id)
yield CrowPlanningState(new_program, new_scopes, new_scope_id, simple_seq + [ScopedCrowExpression(behavior_matching.behavior, new_scope_id)] + state.right_statements, state.depth + 1)
elif isinstance(stmt, (CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression)):
if left is None:
yield CrowPlanningState(None, state.scopes, state.latest_scope, [ScopedCrowExpression(stmt, scope_id)] + state.right_statements, state.depth + 1)
else:
yield CrowPlanningState(left, state.scopes, state.latest_scope, [ScopedCrowExpression(stmt, scope_id)] + state.right_statements, state.depth + 1)
else:
raise ValueError(f'Unknown statement type: {stmt}')
def _bfs_verify(self, state: CrowPlanningState) -> Optional[CrowPlanningResult]:
init_results = [CrowPlanningResult(self.state, ConstraintSatisfactionProblem() if self.enable_csp else None, tuple(), state.scopes)]
results = execute_statements(self.executor, init_results, state.right_statements)
if len(results) > 0:
return results[0]
else:
return None
[docs]
class CrowRegressionPlannerDFSv1(CrowRegressionPlanner):
[docs]
def main_entry(self, program: CrowBehaviorOrderingSuite) -> List[Tuple[CrowControllerApplier, ...]]:
state = CrowPlanningState(program, scopes={0: dict()}, right_statements=[], latest_scope=0)
candidate_plans = self.bfs(state)
candidate_plans = [result.controller_actions for result in candidate_plans]
return candidate_plans
[docs]
@jacinle.log_function(verbose=False)
def dfs(self, state: CrowPlanningState) -> Sequence[CrowPlanningResult]:
jacinle.log_function.print('Program:', state.program, 'Depth:', state.depth)
if state.depth >= self.max_search_depth:
raise RuntimeError('Maximum search depth reached.')
right_statements_str = ' '.join([f'{str(x)}@{i}' for x, i in state.right_statements])
jacinle.log_function.print(jacinle.colored(right_statements_str, 'yellow'), state.scopes)
new_results = list()
for left, stmt, scope_id in state.program.pop_right_statement():
new_results.extend(self._dfs_expand(state, left, stmt, scope_id))
# jacinle.log_function.print('New results:', [x.controller_appliers for x in new_results])
return unique_results(new_results)
def _dfs_expand(self, state: CrowPlanningState, left: CrowBehaviorOrderingSuite, stmt: Union[CrowAchieveExpression, CrowControllerApplicationExpression], scope_id: int) -> Sequence[CrowPlanningResult]:
"""Expand a statement.
Args:
state: the current search state.
left: the left part of the program.
stmt: the statement to be expanded.
scope_id: the current scope id.
"""
# jacinle.log_function.print('Expanding:', stmt, 'with left:', left, 'and scope', state.scopes[scope_id])
new_results = list()
if isinstance(stmt, CrowAchieveExpression):
jacinle.log_function.print('Expanding:', stmt, 'with left:', left, 'and scope_id:', scope_id)
jacinle.log_function.print('Null goal expansion', stmt.goal)
# Branch 1: the goal is directly satisfied at the after expanding the left part (null production).
if left is None:
last_results = [CrowPlanningResult(self.state, ConstraintSatisfactionProblem() if self.enable_csp else None, tuple(), state.scopes)]
else:
last_results = self.dfs(CrowPlanningState(left, state.scopes, state.latest_scope, [ScopedCrowExpression(CrowAssertExpression(stmt.goal), scope_id)] + state.right_statements, state.depth + 1))
for last_result in last_results:
rv = self.executor.execute(stmt.goal, state=last_result.state, bounded_variables=canonize_bounded_variables(last_result.scopes, scope_id))
jacinle.log_function.print('Last result:', last_result.controller_actions, 'Null Eval=', bool(rv.item()))
if bool(rv.item()):
new_results.append(last_result)
if len(new_results) > 0:
return new_results
# Branch 2: the goal is not directly satisfied. We need to refine the goal.
all_matching = match_applicable_behaviors(self.executor.domain, self.state, stmt.goal, state.scopes[scope_id])
all_matching = list(all_matching)
matchings_str = [(x.behavior.name, x.bounded_variables) for x in all_matching]
jacinle.log_function.print(jacinle.tabulate(matchings_str, headers=['Behavior', 'Bounded Variables']))
for behavior_matching in all_matching:
bounded_variables = behavior_matching.bounded_variables
for var, value in bounded_variables.items():
if isinstance(value, Variable):
bounded_variables[var] = value.clone_with_scope(scope_id)
# Create a new scope for this subgoal refinement.
new_scope_id = state.latest_scope + 1
new_scopes = state.scopes.copy()
new_scopes[new_scope_id] = bounded_variables.copy()
program = behavior_matching.behavior.assign_body_program_scope(new_scope_id)
# Branch 2.1: the goal will be refined by ignoring the promotable part.
# Create a new program by sequencing the left part and the refinement of this behavior.
if left is None:
new_program = CrowBehaviorOrderingSuite(CrowBehaviorStatementOrdering.SEQUENTIAL, program.get_flatten_body(), variable_scope_identifier=new_scope_id)
else:
new_program = CrowBehaviorOrderingSuite(CrowBehaviorStatementOrdering.SEQUENTIAL, (left,) + program.get_flatten_body(), variable_scope_identifier=new_scope_id)
# Now recursively calls the DFS to expand the new program.
# import ipdb; ipdb.set_trace()
this_new_results = self.dfs(CrowPlanningState(new_program, new_scopes, new_scope_id, [ScopedCrowExpression(behavior_matching.behavior, new_scope_id)] + state.right_statements, state.depth + 1))
if len(this_new_results) > 0:
new_results.extend(execute_behavior_effect_batch(self.executor, this_new_results, behavior_matching.behavior, new_scope_id))
continue
# Branch 2.2: If the sequential expansion fails, we try to expand the unordered part.
if self.enable_reordering:
promotable, sequential_body = program.split_promotable()
if left is not None and promotable is not None:
new_program = CrowBehaviorOrderingSuite(
CrowBehaviorStatementOrdering.SEQUENTIAL,
(CrowBehaviorOrderingSuite(CrowBehaviorStatementOrdering.UNORDERED, (left,) + promotable, variable_scope_identifier=new_scope_id), sequential_body),
variable_scope_identifier=new_scope_id
)
this_new_results = self.dfs(CrowPlanningState(new_program, new_scopes, new_scope_id, [ScopedCrowExpression(behavior_matching.behavior, new_scope_id)] + state.right_statements, state.depth + 1))
if len(this_new_results) > 0:
new_results.extend(execute_behavior_effect_batch(self.executor, this_new_results, behavior_matching.behavior, new_scope_id))
return new_results
elif isinstance(stmt, CrowBehaviorApplicationExpression):
pass
elif isinstance(stmt, (CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression)):
# These statements are "primitive" or "atomic". Therefore, all we need to do is to expand the left branch and then apply the primitive.
if left is None:
last_results = [CrowPlanningResult(self.state, ConstraintSatisfactionProblem() if self.enable_csp else None, tuple(), state.scopes)]
else:
last_results = self.dfs(CrowPlanningState(left, state.scopes, state.latest_scope, [ScopedCrowExpression(stmt, scope_id)] + state.right_statements, state.depth + 1))
for last_result in last_results:
new_results.extend(self._dfs_expand_primitive(last_result, stmt, scope_id))
return new_results
else:
raise ValueError(f'Unknown statement type: {stmt}')
def _dfs_expand_primitive(self, last_result: CrowPlanningResult, stmt: Union[CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression], scope_id: int) -> Sequence[CrowPlanningResult]:
"""Apply a primitive statement on top of a particular planning result of the left branch."""
if isinstance(stmt, CrowControllerApplicationExpression):
argument_values = [self.executor.execute(x, state=last_result.state, bounded_variables=canonize_bounded_variables(last_result.scopes, scope_id)) for x in stmt.arguments]
for i, argv in enumerate(argument_values):
if isinstance(argv, StateObjectReference):
argument_values[i] = argv.name
return [CrowPlanningResult(last_result.state, last_result.csp, last_result.controller_actions + (CrowControllerApplier(stmt.controller, argument_values),), last_result.scopes)]
elif isinstance(stmt, CrowBindExpression):
if stmt.is_object_bind:
new_results = list()
for new_scope in execute_object_bind(self.executor, stmt, last_result.state, canonize_bounded_variables(last_result.scopes, scope_id)):
new_scopes = last_result.scopes.copy()
new_scopes[scope_id] = new_scope
new_results.append(CrowPlanningResult(last_result.state, last_result.csp, last_result.controller_actions, new_scopes))
jacinle.log_function.print(f'Object binding results for {stmt} under scope {canonize_bounded_variables(last_result.scopes, scope_id)}:', len(new_results), 'states.')
return new_results
else:
raise NotImplementedError()
elif isinstance(stmt, CrowRuntimeAssignmentExpression):
raise NotImplementedError()
elif isinstance(stmt, CrowAssertExpression):
# TODO(Jiayuan Mao @ 2024/03/19): implement the CSP tracking.
rv = self.executor.execute(stmt.bool_expr, state=last_result.state, bounded_variables=canonize_bounded_variables(last_result.scopes, scope_id))
jacinle.log_function.print(f'Assert {stmt.bool_expr} under scope {canonize_bounded_variables(last_result.scopes, scope_id)}:', bool(rv.item()))
if bool(rv.item()):
return [last_result]
else:
return []
else:
raise ValueError(f'Unknown statement type: {stmt}')
[docs]
def execute_statements(executor: CrowExecutor, init_results: Sequence[CrowPlanningResult], statements: Sequence[ScopedCrowExpression]) -> List[CrowPlanningResult]:
# statement_str = ' '.join([f'{str(x)}@{i}' for x, i in state.right_statements])
# print(jacinle.colored(statement_str, 'yellow'), state.scopes)
results = init_results
for statement in statements:
stmt, scope_id = statement.statement, statement.scope_id
new_results = list()
try:
# Invalid ordering of find statement.
bounded_variables = canonize_bounded_variables(results[0].scopes, scope_id)
except KeyError:
return list()
if isinstance(stmt, CrowAssertExpression):
for result in results:
rv = executor.execute(stmt.bool_expr, state=result.state, bounded_variables=canonize_bounded_variables(result.scopes, scope_id))
if bool(rv.item()):
new_results.append(result)
elif isinstance(stmt, CrowControllerApplicationExpression):
for result in results:
argument_values = [executor.execute(x, state=result.state, bounded_variables=canonize_bounded_variables(result.scopes, scope_id)) for x in stmt.arguments]
for i, argv in enumerate(argument_values):
if isinstance(argv, StateObjectReference):
argument_values[i] = argv.name
new_results.append(CrowPlanningResult(result.state, result.csp, result.controller_actions + (CrowControllerApplier(stmt.controller, argument_values),), result.scopes))
elif isinstance(stmt, CrowBindExpression):
for result in results:
for new_scope in execute_object_bind(executor, stmt, result.state, canonize_bounded_variables(result.scopes, scope_id)):
# print('!!!Object binding result:', new_scope)
new_scopes = result.scopes.copy()
new_scopes[scope_id] = new_scope
new_results.append(CrowPlanningResult(result.state, result.csp, result.controller_actions, new_scopes))
elif isinstance(stmt, CrowBehavior):
new_results = [execute_behavior_effect(executor, stmt, result.state, canonize_bounded_variables(result.scopes, scope_id)) for result in results]
else:
raise ValueError(f'Unknown statement type: {stmt}')
results = new_results
if len(results) == 0:
break
return results