#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : crow_regression_planner_astar_v3.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 04/13/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
import queue
from typing import Optional, Union, Iterator, Sequence, Tuple, List, Dict, NamedTuple
import jacinle
from concepts.dsl.constraint import ConstraintSatisfactionProblem, OptimisticValue
from concepts.dsl.dsl_types import Variable
from concepts.dsl.expression import Expression
from concepts.dsl.tensor_state import StateObjectReference
from concepts.dm.crow.domain import CrowState
from concepts.dm.crow.controller import CrowControllerApplier, CrowControllerApplicationExpression
from concepts.dm.crow.action import CrowAction, CrowAchieveExpression, CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowActionApplicationExpression
from concepts.dm.crow.action import CrowActionOrderingSuite
from concepts.dm.crow.action_utils import match_applicable_actions, ApplicableActionItem
from concepts.dm.crow.planners.regression_planning import CrowRegressionPlanner, CrowPlanningResult
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_utils import canonize_bounded_variables, unique_results, execute_object_bind, execute_action_effect
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_astar_v2 import CrowActionEffectApplication, RegressionStatement2, UNSET, CrowPlanningState2, CrowPlanningStateWithHeuristic, SolutionFound
LOG_GRAPH_STATES = False
LOG_ENQUEUE_DEQUEUE = False
[docs]class CrowRegressionPlannerAStarv3(CrowRegressionPlanner):
[docs] def main_entry(self, program: CrowActionOrderingSuite) -> List[Tuple[CrowControllerApplier, ...]]:
state = CrowPlanningState2.make_empty(program, self.state, ConstraintSatisfactionProblem())
results = self.bfs(state)
return [x.actions for x in results]
def _compute_heuristic(self, state: CrowPlanningState2) -> float:
h = 0
if state.program is not None:
for expression in state.program.iter_statements():
if isinstance(expression, CrowAchieveExpression) or isinstance(expression, CrowControllerApplicationExpression):
h += 1
for item in state.right_statements:
if isinstance(item.statement, (CrowAchieveExpression, CrowActionApplicationExpression)):
h += 1
elif isinstance(item.statement, CrowActionOrderingSuite):
for stmt in item.statement.iter_statements():
if isinstance(stmt, (CrowAchieveExpression, CrowActionApplicationExpression)):
h += 1
return h
queue: queue.PriorityQueue
_graph: dict
_current_queue_state: Optional[CrowPlanningStateWithHeuristic] = None
_expanded_right_first: Dict[int, CrowPlanningStateWithHeuristic]
_expanded_queue_nodes: Dict[str, CrowPlanningStateWithHeuristic]
_expanded_queue_node_to_children: Dict[int, List[CrowPlanningStateWithHeuristic]]
[docs] def bfs(self, planning_state: CrowPlanningState2) -> Sequence[CrowPlanningResult]:
self.queue = queue.PriorityQueue()
self._graph = {'nodes': {}, 'edges': list()}
self._expanded_right_first = dict()
self._expanded_queue_nodes = dict()
self._expanded_queue_node_to_children = dict()
self.bfs_add_queue(planning_state)
try:
while not self.queue.empty():
self._search_stat['nr_expanded_nodes'] += 1
if self._search_stat['nr_expanded_nodes'] % 1000 == 0:
print('Expanded nodes:', self._search_stat['nr_expanded_nodes'])
if self._search_stat['nr_expanded_nodes'] > 1000000:
print('Too many expanded nodes.')
import ipdb; ipdb.set_trace()
break
queue_state = self.queue.get()
self._current_queue_state = queue_state
self._expanded_queue_node_to_children[id(queue_state)] = list()
self.bfs_expand(queue_state.state)
except SolutionFound as e:
return e.results
return []
[docs] def bfs_add_queue(self, planning_state: CrowPlanningState2):
if LOG_ENQUEUE_DEQUEUE:
print(jacinle.colored('Enqueue' + '-' * 60, 'blue'))
planning_state.print()
# input('Press Enter to continue...')
g = len(planning_state.left_actions)
h = self._compute_heuristic(planning_state)
if planning_state.program is None and len(planning_state.right_statements) == 0:
raise SolutionFound([CrowPlanningResult(planning_state.state, planning_state.csp, planning_state.left_actions, planning_state.scopes)])
queue_state = CrowPlanningStateWithHeuristic(planning_state, g, h, self._current_queue_state)
if LOG_GRAPH_STATES:
self._graph['nodes'][id(queue_state)] = queue_state
if self._current_queue_state is not None:
self._graph['edges'].append((id(self._current_queue_state), id(queue_state)))
state_hash = str(queue_state.state)
if state_hash in self._expanded_queue_nodes:
if id(self._current_queue_state.state.state) == id(self._expanded_queue_nodes[state_hash].state.state):
print('Already expanded queue node.', hash(state_hash))
return
self._expanded_queue_nodes[state_hash] = queue_state
if self._current_queue_state is not None:
self._expanded_queue_node_to_children[id(self._current_queue_state)].append(queue_state)
self.queue.put(queue_state)
[docs] def bfs_expand(self, planning_state: CrowPlanningState2):
"""The BFS algorithm is simulating a hierarchical planning algorithm. The current state can be encoded as:
.. code-block:: python
left_actions = (a1, a2, a3, ...)
middle_program = CrowActionOrderingSuite(...)
right_statements = [RegressionStatement2(...), RegressionStatement2(...), ...]
It corresponds to this figure:
.. code-block:: python
a1 -> a2 -> a3 -> ... -> {middle_program} -> [right_statements]
Therefore,
- state.left_actions: the actions that have been executed (a1, a2, a3, ...).
- state.program: the current program that is being expanded ({middle_program}).
- state.right_statements: the statements that are waiting to be expanded ([right_statements]).
At each step,
- If the program is empty, we will pop up the first right statement and expand it.
- If the program is not empty, we will randomly pop a statement from the middle program, and prepend it to the right statements.
"""
if LOG_ENQUEUE_DEQUEUE:
print(jacinle.colored('Dequeue ' + '-' * 60, 'red'))
planning_state.print()
input('Press Enter to continue...')
if planning_state.program is None:
# The current main program body is empty. We will pop up the first right statement and expand it.
stmt = planning_state.right_statements[0]
right_stmts = planning_state.right_statements[1:]
planning_state = planning_state.clone(program=None, right_statements=right_stmts)
self._bfs_expand_inner(planning_state, None, stmt.statement, stmt.expanded_state, stmt.scope_id, stmt_id=id(stmt))
else:
all_satisfied = self._bfs_is_all_satisfied(planning_state)
if all_satisfied:
self.bfs_add_queue(planning_state.clone(program=None))
else:
# The current main program body is not empty. We will randomly pop a statement from the middle program, and prepend it to the right statements.
# A special case is that after popping the statement, the middle program becomes empty. In this case, we will directly expand the right statements.
for middle, stmt, scope_id in planning_state.program.pop_right_statement():
new_state = planning_state.clone(program=middle)
self._bfs_expand_inner(planning_state, middle, stmt, new_state, scope_id, stmt_id=None)
def _bfs_is_all_satisfied(self, planning_state: CrowPlanningState2) -> bool:
for stmt, scope_id in planning_state.program.iter_statements_with_scope():
if id(stmt) in planning_state.statements_evaluations:
if planning_state.statements_evaluations[id(stmt)]:
continue
else:
return False
if isinstance(stmt, CrowAchieveExpression) or isinstance(stmt, CrowAssertExpression):
expr = stmt.goal if isinstance(stmt, CrowAchieveExpression) else stmt.bool_expr
rv = self.executor.execute(expr, state=planning_state.state, bounded_variables=canonize_bounded_variables(planning_state.scopes, scope_id), optimistic_execution=True)
rv = rv.item()
if isinstance(rv, OptimisticValue):
rv = False
planning_state.statements_evaluations[id(stmt)] = bool(rv)
if not bool(rv):
return False
else:
return False
return True
def _bfs_expand_inner(
self, planning_state: CrowPlanningState2, middle: Optional[CrowActionOrderingSuite],
stmt: Union[CrowAchieveExpression, CrowActionApplicationExpression, CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression],
expanded_planning_state: Optional[CrowPlanningState2],
scope_id: int, stmt_id: Optional[int] = None
):
"""Expand the tree by extracting the right-most statement and recursively expanding the left part or refine the current statement.
Args:
planning_state: the current planning state.
middle: the rest of the middle programs that are being expanded.
stmt: the statement to be expanded at this point.
expanded_planning_state: if the stmt is a CrowAchieveExpression, this is the state where the achieve statement was serialized to the right stack.
scope_id: the current scope id.
stmt_id: the unique id of the statement. If it is None, it means that the statement is not a part of the main program body. This is used to prune some unnecessary expansions.
"""
# print('Expanding inner:', stmt, 'with scope', scope_id, 'and left program', middle)
# print('Expanded_state:', expanded_state)
# import ipdb; ipdb.set_trace()
if isinstance(stmt, CrowAchieveExpression):
if middle is not None:
new_state = planning_state.clone(program=middle)
self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(RegressionStatement2(stmt, scope_id, expanded_state=new_state),) + planning_state.right_statements))
else:
rv, csp = self.evaluate(stmt.goal, state=planning_state.state, csp=planning_state.csp, bounded_variables=canonize_bounded_variables(planning_state.scopes, scope_id))
if isinstance(rv, OptimisticValue):
# If the value is optimistic, we will add a constraint to the CSP and continue the search.
# But we also need to consider the case where the optimistic value is False, so we do not stop the branching here (no return).
self.bfs_add_queue(planning_state.clone(program=None, csp=csp.add_equal_constraint(rv, True)))
else:
if bool(rv):
self.bfs_add_queue(planning_state.clone(program=None))
return
first_time_expand = stmt_id not in self._expanded_right_first or stmt_id is None
if stmt_id is not None:
self._expanded_right_first[stmt_id] = self._current_queue_state
# if not first_time_expand:
# print('Already expanded. {{{', '-' * 60)
for action_matching in match_applicable_actions(self.executor.domain, self.state, stmt.goal, planning_state.scopes[scope_id]):
self._bfs_expand_inner_action(expanded_planning_state, expanded_planning_state.program, action_matching, scope_id, prefix_expanded_planning_state=planning_state, first_time_expand=first_time_expand)
# if not first_time_expand:
# print(' }}}', '-' * 60)
elif isinstance(stmt, CrowActionApplicationExpression):
if middle is not None:
new_state = planning_state.clone(program=middle)
self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(RegressionStatement2(stmt, scope_id, expanded_state=new_state),) + planning_state.right_statements))
else:
first_time_expand = stmt_id not in self._expanded_right_first
if stmt_id is not None:
self._expanded_right_first[stmt_id] = self._current_queue_state
# if not first_time_expand:
# print('Already expanded. {{{', '-' * 60)
# TODO(Jiayuan Mao @ 2024/03/27): think about which state should these actions be grounded on and how we should handle the CSP.
argument_values = [self.executor.execute(x, state=planning_state.state, bounded_variables=canonize_bounded_variables(planning_state.scopes, scope_id)) for x in stmt.arguments]
action_matching = ApplicableActionItem(stmt.action, {k.name: v for k, v in zip(stmt.action.arguments, argument_values)})
self._bfs_expand_inner_action(expanded_planning_state, expanded_planning_state.program, action_matching, scope_id, prefix_expanded_planning_state=planning_state, first_time_expand=first_time_expand)
# if not first_time_expand:
# print(' }}}', '-' * 60)
elif isinstance(stmt, CrowActionOrderingSuite):
assert middle is None, 'The middle part should be empty for a program.'
self.bfs_add_queue(planning_state.clone(program=stmt))
elif isinstance(stmt, (CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression)):
if middle is not None:
self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(RegressionStatement2(stmt, scope_id),) + planning_state.right_statements))
else:
self._bfs_expand_inner_primitive(planning_state, stmt, scope_id)
elif isinstance(stmt, CrowActionEffectApplication):
if middle is not None:
self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(RegressionStatement2(stmt, scope_id),) + planning_state.right_statements))
else:
self._bfs_expand_inner_action_effect(planning_state, stmt.action, scope_id)
else:
raise ValueError(f'Unknown statement type: {stmt}')
def _bfs_expand_inner_action(self, planning_state: CrowPlanningState2, middle: Optional[CrowActionOrderingSuite], action_matching: ApplicableActionItem, scope_id: int, prefix_expanded_planning_state: CrowPlanningState2, first_time_expand: bool):
"""Expand the tree by refining a particular action.
Args:
planning_state: the current planning state.
middle: the rest of the middle programs that are being expanded.
action_matching: the action to be refined, including the action and the bounded variables.
scope_id: the current scope id.
prefix_expanded_planning_state: the planning search state assuming everything in the middle program has been refined separately without considering the current action.
first_time_expand: whether this is the first time to expand the action. If it is not the first time, we will skip the expansion.
"""
bounded_variables = action_matching.bounded_variables
for var, value in bounded_variables.items():
if isinstance(value, Variable):
bounded_variables[var] = value.clone_with_scope(scope_id)
if action_matching.action.is_sequential_only():
new_scope_id = prefix_expanded_planning_state.latest_scope + 1
program = action_matching.action.assign_body_program_scope(new_scope_id)
preamble, promotable, rest = program.split_preamble_and_promotable()
new_scopes = prefix_expanded_planning_state.scopes.copy()
new_scopes[new_scope_id] = bounded_variables.copy()
program = CrowActionOrderingSuite.make_sequential(rest, variable_scope_identifier=new_scope_id)
self.bfs_add_queue(prefix_expanded_planning_state.clone(program=program, scopes=new_scopes, latest_scope=new_scope_id, right_statements=(RegressionStatement2(CrowActionEffectApplication(action_matching.action), new_scope_id),) + prefix_expanded_planning_state.right_statements))
return
if not first_time_expand:
return
new_scope_id = planning_state.latest_scope + 1
program = action_matching.action.assign_body_program_scope(new_scope_id)
preamble, promotable, rest = program.split_preamble_and_promotable()
new_scopes = planning_state.scopes.copy()
new_scopes[new_scope_id] = bounded_variables.copy()
if preamble is not None:
new_left_program = CrowActionOrderingSuite.make_sequential(preamble, variable_scope_identifier=new_scope_id)
if middle is not None:
new_middle_program = CrowActionOrderingSuite.make_unordered(middle, CrowActionOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id)) if promotable is not None else middle
else:
new_middle_program = CrowActionOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id) if promotable is not None else None
new_right_program = CrowActionOrderingSuite.make_sequential(rest, variable_scope_identifier=new_scope_id)
new_right_statements = [RegressionStatement2(new_right_program, new_scope_id), RegressionStatement2(CrowActionEffectApplication(action_matching.action), new_scope_id)]
if new_middle_program is not None:
new_right_statements.insert(0, RegressionStatement2(new_middle_program, new_scope_id))
self.bfs_add_queue(planning_state.clone(program=new_left_program, scopes=new_scopes, latest_scope=new_scope_id, right_statements=tuple(new_right_statements) + planning_state.right_statements))
else:
if promotable is not None:
new_left_program = CrowActionOrderingSuite.make_unordered(middle, CrowActionOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id)) if middle is not None else CrowActionOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id)
new_right_program = CrowActionOrderingSuite.make_sequential(rest, variable_scope_identifier=new_scope_id)
new_right_statements = (RegressionStatement2(new_right_program, new_scope_id), RegressionStatement2(CrowActionEffectApplication(action_matching.action), new_scope_id)) + planning_state.right_statements
self.bfs_add_queue(planning_state.clone(program=new_left_program, scopes=new_scopes, latest_scope=new_scope_id, right_statements=new_right_statements))
else:
raise RuntimeError('Should not reach here. This case should have been already handled by the action.is_sequential_only() check.')
def _bfs_expand_inner_primitive(self, state: CrowPlanningState2, stmt: Union[CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression], scope_id: int):
"""Expand the tree by refining a particular primitive statement."""
if isinstance(stmt, CrowControllerApplicationExpression):
new_csp = state.csp.clone()
argument_values = [self.evaluate(x, state=state.state, csp=new_csp, bounded_variables=canonize_bounded_variables(state.scopes, scope_id), clone_csp=False)[0] for x in stmt.arguments]
for i, argv in enumerate(argument_values):
if isinstance(argv, StateObjectReference):
argument_values[i] = argv.name
self.bfs_add_queue(state.clone(program=None, left_actions=state.left_actions + (CrowControllerApplier(stmt.controller, argument_values),), csp=new_csp))
elif isinstance(stmt, CrowBindExpression):
if stmt.is_object_bind:
for new_scope in execute_object_bind(self.executor, stmt, state.state, canonize_bounded_variables(state.scopes, scope_id)):
new_scopes = state.scopes.copy()
new_scopes[scope_id] = new_scope
self.bfs_add_queue(state.clone(program=None, scopes=new_scopes))
else:
raise NotImplementedError()
elif isinstance(stmt, CrowRuntimeAssignmentExpression):
rv, new_csp = self.evaluate(stmt.value, state=state.state, csp=state.csp, bounded_variables=canonize_bounded_variables(state.scopes, scope_id))
new_scopes = state.scopes.copy()
new_scopes[scope_id] = state.scopes[scope_id].copy()
new_scopes[scope_id][stmt.variable.name] = rv
self.bfs_add_queue(state.clone(program=None, scopes=new_scopes, csp=new_csp))
elif isinstance(stmt, CrowAssertExpression):
rv, new_csp = self.evaluate(stmt.bool_expr, state=state.state, csp=state.csp, bounded_variables=canonize_bounded_variables(state.scopes, scope_id))
if isinstance(rv, OptimisticValue):
self.bfs_add_queue(state.clone(program=None, csp=new_csp.add_equal_constraint(rv, True)))
else:
if bool(rv):
self.bfs_add_queue(state.clone(program=None))
else:
raise ValueError(f'Unknown statement type: {stmt}')
def _bfs_expand_inner_action_effect(self, planning_state: CrowPlanningState2, stmt: CrowAction, scope_id: int):
new_csp = planning_state.csp.clone() if planning_state.csp is not None else None
new_state = execute_action_effect(self.executor, stmt, planning_state.state, canonize_bounded_variables(planning_state.scopes, scope_id), csp=new_csp)
self.bfs_add_queue(planning_state.clone(program=None, state=new_state, csp=new_csp))