Source code for concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_astar_v3

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : crow_regression_planner_astar_v3.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 04/13/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import queue
from typing import Optional, Union, Iterator, Sequence, Tuple, List, Dict, NamedTuple

import jacinle

from concepts.dsl.constraint import ConstraintSatisfactionProblem, OptimisticValue
from concepts.dsl.dsl_types import Variable
from concepts.dsl.expression import Expression
from concepts.dsl.tensor_state import StateObjectReference

from concepts.dm.crow.domain import CrowState
from concepts.dm.crow.controller import CrowControllerApplier, CrowControllerApplicationExpression
from concepts.dm.crow.action import CrowAction, CrowAchieveExpression, CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowActionApplicationExpression
from concepts.dm.crow.action import CrowActionOrderingSuite
from concepts.dm.crow.action_utils import match_applicable_actions, ApplicableActionItem

from concepts.dm.crow.planners.regression_planning import CrowRegressionPlanner, CrowPlanningResult
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_utils import canonize_bounded_variables, unique_results, execute_object_bind, execute_action_effect

from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_astar_v2 import CrowActionEffectApplication, RegressionStatement2, UNSET, CrowPlanningState2, CrowPlanningStateWithHeuristic, SolutionFound


LOG_GRAPH_STATES = False
LOG_ENQUEUE_DEQUEUE = False


[docs]class CrowRegressionPlannerAStarv3(CrowRegressionPlanner):
[docs] def main_entry(self, program: CrowActionOrderingSuite) -> List[Tuple[CrowControllerApplier, ...]]: state = CrowPlanningState2.make_empty(program, self.state, ConstraintSatisfactionProblem()) results = self.bfs(state) return [x.actions for x in results]
def _compute_heuristic(self, state: CrowPlanningState2) -> float: h = 0 if state.program is not None: for expression in state.program.iter_statements(): if isinstance(expression, CrowAchieveExpression) or isinstance(expression, CrowControllerApplicationExpression): h += 1 for item in state.right_statements: if isinstance(item.statement, (CrowAchieveExpression, CrowActionApplicationExpression)): h += 1 elif isinstance(item.statement, CrowActionOrderingSuite): for stmt in item.statement.iter_statements(): if isinstance(stmt, (CrowAchieveExpression, CrowActionApplicationExpression)): h += 1 return h queue: queue.PriorityQueue _graph: dict _current_queue_state: Optional[CrowPlanningStateWithHeuristic] = None _expanded_right_first: Dict[int, CrowPlanningStateWithHeuristic] _expanded_queue_nodes: Dict[str, CrowPlanningStateWithHeuristic] _expanded_queue_node_to_children: Dict[int, List[CrowPlanningStateWithHeuristic]]
[docs] def bfs(self, planning_state: CrowPlanningState2) -> Sequence[CrowPlanningResult]: self.queue = queue.PriorityQueue() self._graph = {'nodes': {}, 'edges': list()} self._expanded_right_first = dict() self._expanded_queue_nodes = dict() self._expanded_queue_node_to_children = dict() self.bfs_add_queue(planning_state) try: while not self.queue.empty(): self._search_stat['nr_expanded_nodes'] += 1 if self._search_stat['nr_expanded_nodes'] % 1000 == 0: print('Expanded nodes:', self._search_stat['nr_expanded_nodes']) if self._search_stat['nr_expanded_nodes'] > 1000000: print('Too many expanded nodes.') import ipdb; ipdb.set_trace() break queue_state = self.queue.get() self._current_queue_state = queue_state self._expanded_queue_node_to_children[id(queue_state)] = list() self.bfs_expand(queue_state.state) except SolutionFound as e: return e.results return []
[docs] def bfs_add_queue(self, planning_state: CrowPlanningState2): if LOG_ENQUEUE_DEQUEUE: print(jacinle.colored('Enqueue' + '-' * 60, 'blue')) planning_state.print() # input('Press Enter to continue...') g = len(planning_state.left_actions) h = self._compute_heuristic(planning_state) if planning_state.program is None and len(planning_state.right_statements) == 0: raise SolutionFound([CrowPlanningResult(planning_state.state, planning_state.csp, planning_state.left_actions, planning_state.scopes)]) queue_state = CrowPlanningStateWithHeuristic(planning_state, g, h, self._current_queue_state) if LOG_GRAPH_STATES: self._graph['nodes'][id(queue_state)] = queue_state if self._current_queue_state is not None: self._graph['edges'].append((id(self._current_queue_state), id(queue_state))) state_hash = str(queue_state.state) if state_hash in self._expanded_queue_nodes: if id(self._current_queue_state.state.state) == id(self._expanded_queue_nodes[state_hash].state.state): print('Already expanded queue node.', hash(state_hash)) return self._expanded_queue_nodes[state_hash] = queue_state if self._current_queue_state is not None: self._expanded_queue_node_to_children[id(self._current_queue_state)].append(queue_state) self.queue.put(queue_state)
[docs] def bfs_expand(self, planning_state: CrowPlanningState2): """The BFS algorithm is simulating a hierarchical planning algorithm. The current state can be encoded as: .. code-block:: python left_actions = (a1, a2, a3, ...) middle_program = CrowActionOrderingSuite(...) right_statements = [RegressionStatement2(...), RegressionStatement2(...), ...] It corresponds to this figure: .. code-block:: python a1 -> a2 -> a3 -> ... -> {middle_program} -> [right_statements] Therefore, - state.left_actions: the actions that have been executed (a1, a2, a3, ...). - state.program: the current program that is being expanded ({middle_program}). - state.right_statements: the statements that are waiting to be expanded ([right_statements]). At each step, - If the program is empty, we will pop up the first right statement and expand it. - If the program is not empty, we will randomly pop a statement from the middle program, and prepend it to the right statements. """ if LOG_ENQUEUE_DEQUEUE: print(jacinle.colored('Dequeue ' + '-' * 60, 'red')) planning_state.print() input('Press Enter to continue...') if planning_state.program is None: # The current main program body is empty. We will pop up the first right statement and expand it. stmt = planning_state.right_statements[0] right_stmts = planning_state.right_statements[1:] planning_state = planning_state.clone(program=None, right_statements=right_stmts) self._bfs_expand_inner(planning_state, None, stmt.statement, stmt.expanded_state, stmt.scope_id, stmt_id=id(stmt)) else: all_satisfied = self._bfs_is_all_satisfied(planning_state) if all_satisfied: self.bfs_add_queue(planning_state.clone(program=None)) else: # The current main program body is not empty. We will randomly pop a statement from the middle program, and prepend it to the right statements. # A special case is that after popping the statement, the middle program becomes empty. In this case, we will directly expand the right statements. for middle, stmt, scope_id in planning_state.program.pop_right_statement(): new_state = planning_state.clone(program=middle) self._bfs_expand_inner(planning_state, middle, stmt, new_state, scope_id, stmt_id=None)
def _bfs_is_all_satisfied(self, planning_state: CrowPlanningState2) -> bool: for stmt, scope_id in planning_state.program.iter_statements_with_scope(): if id(stmt) in planning_state.statements_evaluations: if planning_state.statements_evaluations[id(stmt)]: continue else: return False if isinstance(stmt, CrowAchieveExpression) or isinstance(stmt, CrowAssertExpression): expr = stmt.goal if isinstance(stmt, CrowAchieveExpression) else stmt.bool_expr rv = self.executor.execute(expr, state=planning_state.state, bounded_variables=canonize_bounded_variables(planning_state.scopes, scope_id), optimistic_execution=True) rv = rv.item() if isinstance(rv, OptimisticValue): rv = False planning_state.statements_evaluations[id(stmt)] = bool(rv) if not bool(rv): return False else: return False return True def _bfs_expand_inner( self, planning_state: CrowPlanningState2, middle: Optional[CrowActionOrderingSuite], stmt: Union[CrowAchieveExpression, CrowActionApplicationExpression, CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression], expanded_planning_state: Optional[CrowPlanningState2], scope_id: int, stmt_id: Optional[int] = None ): """Expand the tree by extracting the right-most statement and recursively expanding the left part or refine the current statement. Args: planning_state: the current planning state. middle: the rest of the middle programs that are being expanded. stmt: the statement to be expanded at this point. expanded_planning_state: if the stmt is a CrowAchieveExpression, this is the state where the achieve statement was serialized to the right stack. scope_id: the current scope id. stmt_id: the unique id of the statement. If it is None, it means that the statement is not a part of the main program body. This is used to prune some unnecessary expansions. """ # print('Expanding inner:', stmt, 'with scope', scope_id, 'and left program', middle) # print('Expanded_state:', expanded_state) # import ipdb; ipdb.set_trace() if isinstance(stmt, CrowAchieveExpression): if middle is not None: new_state = planning_state.clone(program=middle) self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(RegressionStatement2(stmt, scope_id, expanded_state=new_state),) + planning_state.right_statements)) else: rv, csp = self.evaluate(stmt.goal, state=planning_state.state, csp=planning_state.csp, bounded_variables=canonize_bounded_variables(planning_state.scopes, scope_id)) if isinstance(rv, OptimisticValue): # If the value is optimistic, we will add a constraint to the CSP and continue the search. # But we also need to consider the case where the optimistic value is False, so we do not stop the branching here (no return). self.bfs_add_queue(planning_state.clone(program=None, csp=csp.add_equal_constraint(rv, True))) else: if bool(rv): self.bfs_add_queue(planning_state.clone(program=None)) return first_time_expand = stmt_id not in self._expanded_right_first or stmt_id is None if stmt_id is not None: self._expanded_right_first[stmt_id] = self._current_queue_state # if not first_time_expand: # print('Already expanded. {{{', '-' * 60) for action_matching in match_applicable_actions(self.executor.domain, self.state, stmt.goal, planning_state.scopes[scope_id]): self._bfs_expand_inner_action(expanded_planning_state, expanded_planning_state.program, action_matching, scope_id, prefix_expanded_planning_state=planning_state, first_time_expand=first_time_expand) # if not first_time_expand: # print(' }}}', '-' * 60) elif isinstance(stmt, CrowActionApplicationExpression): if middle is not None: new_state = planning_state.clone(program=middle) self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(RegressionStatement2(stmt, scope_id, expanded_state=new_state),) + planning_state.right_statements)) else: first_time_expand = stmt_id not in self._expanded_right_first if stmt_id is not None: self._expanded_right_first[stmt_id] = self._current_queue_state # if not first_time_expand: # print('Already expanded. {{{', '-' * 60) # TODO(Jiayuan Mao @ 2024/03/27): think about which state should these actions be grounded on and how we should handle the CSP. argument_values = [self.executor.execute(x, state=planning_state.state, bounded_variables=canonize_bounded_variables(planning_state.scopes, scope_id)) for x in stmt.arguments] action_matching = ApplicableActionItem(stmt.action, {k.name: v for k, v in zip(stmt.action.arguments, argument_values)}) self._bfs_expand_inner_action(expanded_planning_state, expanded_planning_state.program, action_matching, scope_id, prefix_expanded_planning_state=planning_state, first_time_expand=first_time_expand) # if not first_time_expand: # print(' }}}', '-' * 60) elif isinstance(stmt, CrowActionOrderingSuite): assert middle is None, 'The middle part should be empty for a program.' self.bfs_add_queue(planning_state.clone(program=stmt)) elif isinstance(stmt, (CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression)): if middle is not None: self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(RegressionStatement2(stmt, scope_id),) + planning_state.right_statements)) else: self._bfs_expand_inner_primitive(planning_state, stmt, scope_id) elif isinstance(stmt, CrowActionEffectApplication): if middle is not None: self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(RegressionStatement2(stmt, scope_id),) + planning_state.right_statements)) else: self._bfs_expand_inner_action_effect(planning_state, stmt.action, scope_id) else: raise ValueError(f'Unknown statement type: {stmt}') def _bfs_expand_inner_action(self, planning_state: CrowPlanningState2, middle: Optional[CrowActionOrderingSuite], action_matching: ApplicableActionItem, scope_id: int, prefix_expanded_planning_state: CrowPlanningState2, first_time_expand: bool): """Expand the tree by refining a particular action. Args: planning_state: the current planning state. middle: the rest of the middle programs that are being expanded. action_matching: the action to be refined, including the action and the bounded variables. scope_id: the current scope id. prefix_expanded_planning_state: the planning search state assuming everything in the middle program has been refined separately without considering the current action. first_time_expand: whether this is the first time to expand the action. If it is not the first time, we will skip the expansion. """ bounded_variables = action_matching.bounded_variables for var, value in bounded_variables.items(): if isinstance(value, Variable): bounded_variables[var] = value.clone_with_scope(scope_id) if action_matching.action.is_sequential_only(): new_scope_id = prefix_expanded_planning_state.latest_scope + 1 program = action_matching.action.assign_body_program_scope(new_scope_id) preamble, promotable, rest = program.split_preamble_and_promotable() new_scopes = prefix_expanded_planning_state.scopes.copy() new_scopes[new_scope_id] = bounded_variables.copy() program = CrowActionOrderingSuite.make_sequential(rest, variable_scope_identifier=new_scope_id) self.bfs_add_queue(prefix_expanded_planning_state.clone(program=program, scopes=new_scopes, latest_scope=new_scope_id, right_statements=(RegressionStatement2(CrowActionEffectApplication(action_matching.action), new_scope_id),) + prefix_expanded_planning_state.right_statements)) return if not first_time_expand: return new_scope_id = planning_state.latest_scope + 1 program = action_matching.action.assign_body_program_scope(new_scope_id) preamble, promotable, rest = program.split_preamble_and_promotable() new_scopes = planning_state.scopes.copy() new_scopes[new_scope_id] = bounded_variables.copy() if preamble is not None: new_left_program = CrowActionOrderingSuite.make_sequential(preamble, variable_scope_identifier=new_scope_id) if middle is not None: new_middle_program = CrowActionOrderingSuite.make_unordered(middle, CrowActionOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id)) if promotable is not None else middle else: new_middle_program = CrowActionOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id) if promotable is not None else None new_right_program = CrowActionOrderingSuite.make_sequential(rest, variable_scope_identifier=new_scope_id) new_right_statements = [RegressionStatement2(new_right_program, new_scope_id), RegressionStatement2(CrowActionEffectApplication(action_matching.action), new_scope_id)] if new_middle_program is not None: new_right_statements.insert(0, RegressionStatement2(new_middle_program, new_scope_id)) self.bfs_add_queue(planning_state.clone(program=new_left_program, scopes=new_scopes, latest_scope=new_scope_id, right_statements=tuple(new_right_statements) + planning_state.right_statements)) else: if promotable is not None: new_left_program = CrowActionOrderingSuite.make_unordered(middle, CrowActionOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id)) if middle is not None else CrowActionOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id) new_right_program = CrowActionOrderingSuite.make_sequential(rest, variable_scope_identifier=new_scope_id) new_right_statements = (RegressionStatement2(new_right_program, new_scope_id), RegressionStatement2(CrowActionEffectApplication(action_matching.action), new_scope_id)) + planning_state.right_statements self.bfs_add_queue(planning_state.clone(program=new_left_program, scopes=new_scopes, latest_scope=new_scope_id, right_statements=new_right_statements)) else: raise RuntimeError('Should not reach here. This case should have been already handled by the action.is_sequential_only() check.') def _bfs_expand_inner_primitive(self, state: CrowPlanningState2, stmt: Union[CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression], scope_id: int): """Expand the tree by refining a particular primitive statement.""" if isinstance(stmt, CrowControllerApplicationExpression): new_csp = state.csp.clone() argument_values = [self.evaluate(x, state=state.state, csp=new_csp, bounded_variables=canonize_bounded_variables(state.scopes, scope_id), clone_csp=False)[0] for x in stmt.arguments] for i, argv in enumerate(argument_values): if isinstance(argv, StateObjectReference): argument_values[i] = argv.name self.bfs_add_queue(state.clone(program=None, left_actions=state.left_actions + (CrowControllerApplier(stmt.controller, argument_values),), csp=new_csp)) elif isinstance(stmt, CrowBindExpression): if stmt.is_object_bind: for new_scope in execute_object_bind(self.executor, stmt, state.state, canonize_bounded_variables(state.scopes, scope_id)): new_scopes = state.scopes.copy() new_scopes[scope_id] = new_scope self.bfs_add_queue(state.clone(program=None, scopes=new_scopes)) else: raise NotImplementedError() elif isinstance(stmt, CrowRuntimeAssignmentExpression): rv, new_csp = self.evaluate(stmt.value, state=state.state, csp=state.csp, bounded_variables=canonize_bounded_variables(state.scopes, scope_id)) new_scopes = state.scopes.copy() new_scopes[scope_id] = state.scopes[scope_id].copy() new_scopes[scope_id][stmt.variable.name] = rv self.bfs_add_queue(state.clone(program=None, scopes=new_scopes, csp=new_csp)) elif isinstance(stmt, CrowAssertExpression): rv, new_csp = self.evaluate(stmt.bool_expr, state=state.state, csp=state.csp, bounded_variables=canonize_bounded_variables(state.scopes, scope_id)) if isinstance(rv, OptimisticValue): self.bfs_add_queue(state.clone(program=None, csp=new_csp.add_equal_constraint(rv, True))) else: if bool(rv): self.bfs_add_queue(state.clone(program=None)) else: raise ValueError(f'Unknown statement type: {stmt}') def _bfs_expand_inner_action_effect(self, planning_state: CrowPlanningState2, stmt: CrowAction, scope_id: int): new_csp = planning_state.csp.clone() if planning_state.csp is not None else None new_state = execute_action_effect(self.executor, stmt, planning_state.state, canonize_bounded_variables(planning_state.scopes, scope_id), csp=new_csp) self.bfs_add_queue(planning_state.clone(program=None, state=new_state, csp=new_csp))