Source code for concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_astar_v1

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : crow_regression_planner_astar_v1.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 03/21/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import queue
from typing import Optional, Union, Iterator, Sequence, Tuple, List, Dict, NamedTuple

import jacinle
from jacinle.utils.meta import UNSET

from concepts.dsl.constraint import ConstraintSatisfactionProblem, OptimisticValue
from concepts.dsl.dsl_types import Variable
from concepts.dsl.expression import ValueOutputExpression
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.tensor_state import StateObjectReference

from concepts.dm.crow.crow_domain import CrowState
from concepts.dm.crow.controller import CrowControllerApplier, CrowControllerApplicationExpression
from concepts.dm.crow.behavior import CrowBehavior, CrowAchieveExpression, CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowBehaviorApplicationExpression
from concepts.dm.crow.behavior import CrowBehaviorOrderingSuite
from concepts.dm.crow.behavior_utils import match_applicable_behaviors, ApplicableBehaviorItem, execute_object_bind

from concepts.dm.crow.planners.regression_planning import CrowPlanningResult, CrowRegressionPlanner
from concepts.dm.crow.planners.regression_utils import canonize_bounded_variables
from concepts.dm.crow.planners.regression_planning_impl.crow_regression_planner_dfs_v1_utils import execute_behavior_effect
from concepts.dm.crow.csp_solver.csp_utils import csp_ground_action_list

__all__ = ['CrowRegressionPlannerAStarv1']


class _CrowBehaviorEffectApplication(NamedTuple):
    """Apply the effect of an action."""
    behavior: CrowBehavior


class _ScopedStatement2(NamedTuple):
    """A statement in the right stack of the planning state."""
    statement: Union[CrowBehaviorOrderingSuite, CrowBindExpression, CrowAssertExpression, CrowControllerApplicationExpression, _CrowBehaviorEffectApplication, CrowAchieveExpression, CrowBehaviorApplicationExpression]
    """The statement."""

    scope_id: int
    """The scope id of the statement."""

    expanded_state: Optional['_AStarNode'] = None
    """The state where this statement was serialized to the right stack."""


class _AStarNode(NamedTuple):
    """The planning state for the A* search algorithm."""

    program: Optional[CrowBehaviorOrderingSuite]
    """The current program that is being expanded (middle)."""

    state: CrowState
    """The current state of the planning."""

    csp: ConstraintSatisfactionProblem
    """The current constraint satisfaction problem."""

    scopes: Dict[int, dict]
    """The current scopes."""

    latest_scope: int
    """The latest scope id."""

    left_statements: Tuple[CrowControllerApplier, ...]
    """The (left) statements that have been executed."""

    right_statements: Tuple[_ScopedStatement2, ...]
    """The statements that are waiting to be expanded."""

    statements_evaluations: Dict[int, bool]
    """Whether a statement has been evaluated."""

    commit_sketch: bool = False
    """Whether to commit the sketch."""

    commit_csp: bool = False
    """Whether to commit the CSP."""

    commit_execution: bool = False
    """Whether to commit the behavior execution."""

    def clone(
        self, program=UNSET, state=UNSET, csp=UNSET, scopes=UNSET, latest_scope=UNSET, left_statements=UNSET, right_statements=UNSET, statements_evaluations=UNSET,
        commit_sketch=UNSET, commit_csp=UNSET, commit_execution=UNSET
    ):
        return _AStarNode(
            program if program is not UNSET else self.program,
            state if state is not UNSET else self.state,
            csp if csp is not UNSET else self.csp,
            scopes if scopes is not UNSET else self.scopes,
            latest_scope if latest_scope is not UNSET else self.latest_scope,
            left_statements if left_statements is not UNSET else self.left_statements,
            right_statements if right_statements is not UNSET else self.right_statements,
            statements_evaluations if statements_evaluations is not UNSET else (dict() if program is not UNSET else self.statements_evaluations),
            commit_sketch if commit_sketch is not UNSET else self.commit_sketch,
            commit_csp if commit_csp is not UNSET else self.commit_csp,
            commit_execution if commit_execution is not UNSET else self.commit_execution
        )

    @classmethod
    def make_empty(cls, program, state, csp, commit_execution=False, commit_sketch=False, commit_csp=False):
        return cls(
            program, state, csp, scopes={0: {}}, latest_scope=0, left_statements=tuple(), right_statements=tuple(), statements_evaluations=dict(),
            commit_execution=commit_execution, commit_sketch=commit_sketch, commit_csp=commit_csp
        )

    def print(self):
        left_str = ' '.join([str(x) for x in self.left_statements]) if len(self.left_statements) > 0 else '<empty left>'
        program_str = str(self.program) if self.program is not None else '<empty program>'

        def stringify_stmt(stmt: _ScopedStatement2) -> str:
            return str(stmt.statement).replace('\n', '') + f'@{stmt.scope_id}'

        right_statements_str = '\n'.join([stringify_stmt(s) for s in self.right_statements]) if len(self.right_statements) > 0 else '<empty right>'
        print('HASH:  ', hash(str(self)), f'!sketch={self.commit_sketch}', f'!csp={self.commit_csp}', f'!exe={self.commit_execution}')
        print(
            'LEFT:  ' + jacinle.colored(left_str, 'green'),
            'PROG:  ' + jacinle.colored(program_str, 'blue'),
            'RIGHT: \n' + jacinle.indent_text(jacinle.colored(right_statements_str, 'yellow')),
            'SCOPE: ' + str(self.scopes),
            sep='\n'
        )

    def iter_program_statements(self) -> Iterator[Union[CrowAchieveExpression, CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression, CrowBehaviorApplicationExpression]]:
        if self.program is not None:
            yield from self.program.iter_statements()


class _AStarNodeWithHeuristic(NamedTuple):
    state: _AStarNode

    g: float
    h: float

    previous_state: Optional['_AStarNodeWithHeuristic'] = None

    def __lt__(self, other):
        return self.g + self.h < other.g + other.h

    def print_history(self):
        self.state.print()
        queue_state = self
        while queue_state.previous_state is not None:
            print('<-' * 30)
            queue_state = queue_state.previous_state
            queue_state.state.print()


[docs] class SolutionFound(Exception):
[docs] def __init__(self, results: Sequence[CrowPlanningResult]): self.results = results
LOG_GRAPH_STATES = False LOG_ENQUEUE_DEQUEUE = False
[docs] class CrowRegressionPlannerAStarv1(CrowRegressionPlanner):
[docs] def main_entry(self, program: CrowBehaviorOrderingSuite, minimize: Optional[ValueOutputExpression] = None) -> List[Tuple[CrowControllerApplier, ...]]: state = _AStarNode.make_empty(program, self.state, ConstraintSatisfactionProblem()) results = self.bfs(state) return [x.controller_actions for x in results]
def _compute_heuristic(self, state: _AStarNode) -> float: """Compute the heuristic value for a planning state. The current implementation uses a simple (non-admissible) heuristic that counts the number of achieve statements in the program and the right statements. Roughly speaking, it is the number of "subgoals" that haven't been achieved yet. """ h = 0 if state.program is not None: for expression in state.program.iter_statements(): if isinstance(expression, CrowAchieveExpression) or isinstance(expression, CrowControllerApplicationExpression): h += 1 for item in state.right_statements: if isinstance(item.statement, (CrowAchieveExpression, CrowBehaviorApplicationExpression)): h += 1 elif isinstance(item.statement, CrowBehaviorOrderingSuite): for stmt in item.statement.iter_statements(): if isinstance(stmt, (CrowAchieveExpression, CrowBehaviorApplicationExpression)): h += 1 return h queue: queue.PriorityQueue _graph: dict _current_queue_state: Optional[_AStarNodeWithHeuristic] = None _expanded_right_first: Dict[int, _AStarNodeWithHeuristic] _expanded_queue_nodes: Dict[str, _AStarNodeWithHeuristic] _expanded_queue_node_to_children: Dict[int, List[_AStarNodeWithHeuristic]]
[docs] def bfs(self, planning_state: _AStarNode) -> Sequence[CrowPlanningResult]: self.queue = queue.PriorityQueue() self._graph = {'nodes': {}, 'edges': list()} self._expanded_right_first = dict() self._expanded_queue_nodes = dict() self._expanded_queue_node_to_children = dict() self.bfs_add_queue(planning_state) try: while not self.queue.empty(): self._search_stat['nr_expanded_nodes'] += 1 if self._search_stat['nr_expanded_nodes'] % 1000 == 0: print('Expanded nodes:', self._search_stat['nr_expanded_nodes']) if self._search_stat['nr_expanded_nodes'] > 1000000: print('Too many expanded nodes.') import ipdb; ipdb.set_trace() break queue_state = self.queue.get() self._current_queue_state = queue_state self._expanded_queue_node_to_children[id(queue_state)] = list() self.bfs_expand(queue_state.state) except SolutionFound as e: return e.results return []
[docs] def bfs_add_queue(self, planning_state: _AStarNode): if LOG_ENQUEUE_DEQUEUE: print(jacinle.colored('Enqueue' + '-' * 60, 'blue')) planning_state.print() # input('Press Enter to continue...') g = len(planning_state.left_statements) h = self._compute_heuristic(planning_state) if planning_state.program is None and len(planning_state.right_statements) == 0: if not planning_state.csp.empty(): from concepts.dm.crow.csp_solver.dpll_sampling import dpll_solve solution = dpll_solve(self.executor, planning_state.csp, simulation_interface=self.simulation_interface, actions=planning_state.left_statements) if solution is None: # If the CSP is unsatisfiable, we will prune this branch. return else: actions = csp_ground_action_list(self.executor, planning_state.left_statements, solution) raise SolutionFound([CrowPlanningResult(planning_state.state, planning_state.csp, actions, planning_state.scopes)]) raise SolutionFound([CrowPlanningResult(planning_state.state, planning_state.csp, planning_state.left_statements, planning_state.scopes)]) queue_state = _AStarNodeWithHeuristic(planning_state, g, h, self._current_queue_state) if LOG_GRAPH_STATES: self._graph['nodes'][id(queue_state)] = queue_state if self._current_queue_state is not None: self._graph['edges'].append((id(self._current_queue_state), id(queue_state))) state_hash = str(queue_state.state) if state_hash in self._expanded_queue_nodes: if id(self._current_queue_state.state.state) == id(self._expanded_queue_nodes[state_hash].state.state): print('Already expanded queue node.', hash(state_hash)) return self._expanded_queue_nodes[state_hash] = queue_state if self._current_queue_state is not None: self._expanded_queue_node_to_children[id(self._current_queue_state)].append(queue_state) self.queue.put(queue_state)
[docs] def bfs_expand(self, planning_state: _AStarNode): """The BFS algorithm is simulating a hierarchical planning algorithm. The current state can be encoded as: .. code-block:: python left_actions = (a1, a2, a3, ...) middle_program = CrowActionOrderingSuite(...) right_statements = [RegressionStatement2(...), RegressionStatement2(...), ...] It corresponds to this figure: .. code-block:: python a1 -> a2 -> a3 -> ... -> {middle_program} -> [right_statements] Therefore, - state.left_actions: the actions that have been executed (a1, a2, a3, ...). - state.program: the current program that is being expanded ({middle_program}). - state.right_statements: the statements that are waiting to be expanded ([right_statements]). At each step, - If the program is empty, we will pop up the first right statement and expand it. - If the program is not empty, we will randomly pop a statement from the middle program, and prepend it to the right statements. """ if LOG_ENQUEUE_DEQUEUE: print(jacinle.colored('Dequeue ' + '-' * 60, 'red')) planning_state.print() input('Press Enter to continue...') if planning_state.program is None: # The current main program body is empty. We will pop up the first right statement and expand it. stmt = planning_state.right_statements[0] right_stmts = planning_state.right_statements[1:] planning_state = planning_state.clone(program=None, right_statements=right_stmts) self._bfs_expand_inner(planning_state, None, stmt.statement, stmt.expanded_state, stmt.scope_id, stmt_id=id(stmt)) else: all_satisfied = self._bfs_is_all_satisfied(planning_state) if all_satisfied: self.bfs_add_queue(planning_state.clone(program=None)) else: # The current main program body is not empty. We will randomly pop a statement from the middle program, and prepend it to the right statements. # A special case is that after popping the statement, the middle program becomes empty. In this case, we will directly expand the right statements. for middle, stmt, scope_id in planning_state.program.pop_right_statement(): new_state = planning_state.clone(program=middle) self._bfs_expand_inner(planning_state, middle, stmt, new_state, scope_id, stmt_id=None)
def _bfs_is_all_satisfied(self, planning_state: _AStarNode) -> bool: for stmt, scope_id in planning_state.program.iter_statements_with_scope(): if id(stmt) in planning_state.statements_evaluations: if planning_state.statements_evaluations[id(stmt)]: continue else: return False if isinstance(stmt, CrowAchieveExpression) or isinstance(stmt, CrowAssertExpression): expr = stmt.goal if isinstance(stmt, CrowAchieveExpression) else stmt.bool_expr rv = self.executor.execute(expr, state=planning_state.state, bounded_variables=canonize_bounded_variables(planning_state.scopes, scope_id), optimistic_execution=True) rv = rv.item() if isinstance(rv, OptimisticValue): rv = False planning_state.statements_evaluations[id(stmt)] = bool(rv) if not bool(rv): return False else: return False return True def _bfs_expand_inner( self, planning_state: _AStarNode, middle: Optional[CrowBehaviorOrderingSuite], stmt: Union[CrowAchieveExpression, CrowBehaviorApplicationExpression, CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression], expanded_planning_state: Optional[_AStarNode], scope_id: int, stmt_id: Optional[int] = None ): """Expand the tree by extracting the right-most statement and recursively expanding the left part or refine the current statement. Args: planning_state: the current planning state. middle: the rest of the middle programs that are being expanded. stmt: the statement to be expanded at this point. expanded_planning_state: if the stmt is a CrowAchieveExpression, this is the state where the achieve statement was serialized to the right stack. scope_id: the current scope id. stmt_id: the unique id of the statement. If it is None, it means that the statement is not a part of the main program body. This is used to prune some unnecessary expansions. """ # print('Expanding inner:', stmt, 'with scope', scope_id, 'and left program', middle) # print('Expanded_state:', expanded_state) # import ipdb; ipdb.set_trace() if isinstance(stmt, CrowAchieveExpression): if middle is not None: new_state = planning_state.clone(program=middle) self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(_ScopedStatement2(stmt, scope_id, expanded_state=new_state),) + planning_state.right_statements)) else: rv, csp = self.evaluate(stmt.goal, state=planning_state.state, csp=planning_state.csp, bounded_variables=canonize_bounded_variables(planning_state.scopes, scope_id)) if isinstance(rv, OptimisticValue): # If the value is optimistic, we will add a constraint to the CSP and continue the search. # But we also need to consider the case where the optimistic value is False, so we do not stop the branching here (no return). self.bfs_add_queue(planning_state.clone(program=None, csp=csp.add_equal_constraint(rv, True))) else: if bool(rv): self.bfs_add_queue(planning_state.clone(program=None)) return first_time_expand = stmt_id not in self._expanded_right_first or stmt_id is None if stmt_id is not None: self._expanded_right_first[stmt_id] = self._current_queue_state # if not first_time_expand: # print('Already expanded. {{{', '-' * 60) for action_matching in match_applicable_behaviors(self.executor.domain, planning_state.state, stmt.goal, planning_state.scopes[scope_id]): self._bfs_expand_inner_action(expanded_planning_state, expanded_planning_state.program, action_matching, scope_id, prefix_expanded_planning_state=planning_state, first_time_expand=first_time_expand) # if not first_time_expand: # print(' }}}', '-' * 60) elif isinstance(stmt, CrowBehaviorApplicationExpression): if middle is not None: new_state = planning_state.clone(program=middle) self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(_ScopedStatement2(stmt, scope_id, expanded_state=new_state),) + planning_state.right_statements)) else: first_time_expand = stmt_id not in self._expanded_right_first if stmt_id is not None: self._expanded_right_first[stmt_id] = self._current_queue_state # if not first_time_expand: # print('Already expanded. {{{', '-' * 60) # TODO(Jiayuan Mao @ 2024/03/27): think about which state should these actions be grounded on and how we should handle the CSP. argument_values = [self.executor.execute(x, state=planning_state.state, bounded_variables=canonize_bounded_variables(planning_state.scopes, scope_id)) for x in stmt.arguments] action_matching = ApplicableBehaviorItem(stmt.behavior, {k.name: v for k, v in zip(stmt.behavior.arguments, argument_values)}) self._bfs_expand_inner_action(expanded_planning_state, expanded_planning_state.program, action_matching, scope_id, prefix_expanded_planning_state=planning_state, first_time_expand=first_time_expand) # if not first_time_expand: # print(' }}}', '-' * 60) elif isinstance(stmt, CrowBehaviorOrderingSuite): assert middle is None, 'The middle part should be empty for a program.' self.bfs_add_queue(planning_state.clone(program=stmt)) elif isinstance(stmt, (CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression)): if middle is not None: self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(_ScopedStatement2(stmt, scope_id),) + planning_state.right_statements)) else: self._bfs_expand_inner_primitive(planning_state, stmt, scope_id) elif isinstance(stmt, _CrowBehaviorEffectApplication): if middle is not None: self.bfs_add_queue(planning_state.clone(program=middle, right_statements=(_ScopedStatement2(stmt, scope_id),) + planning_state.right_statements)) else: self._bfs_expand_inner_action_effect(planning_state, stmt.behavior, scope_id) else: raise ValueError(f'Unknown statement type: {stmt}') def _bfs_expand_inner_action(self, planning_state: _AStarNode, middle: Optional[CrowBehaviorOrderingSuite], action_matching: ApplicableBehaviorItem, scope_id: int, prefix_expanded_planning_state: _AStarNode, first_time_expand: bool): """Expand the tree by refining a particular action. Args: planning_state: the current planning state. middle: the rest of the middle programs that are being expanded. action_matching: the action to be refined, including the action and the bounded variables. scope_id: the current scope id. prefix_expanded_planning_state: the planning search state assuming everything in the middle program has been refined separately without considering the current action. first_time_expand: whether this is the first time to expand the action. If it is not the first time, we will skip the expansion. """ bounded_variables = action_matching.bounded_variables for var, value in bounded_variables.items(): if isinstance(value, Variable): bounded_variables[var] = value.clone_with_scope(scope_id) if action_matching.behavior.is_sequential_only(): new_scope_id = prefix_expanded_planning_state.latest_scope + 1 program = action_matching.behavior.assign_body_program_scope(new_scope_id) preamble, promotable, rest = program.split_preamble_and_promotable() new_scopes = prefix_expanded_planning_state.scopes.copy() new_scopes[new_scope_id] = bounded_variables.copy() program = CrowBehaviorOrderingSuite.make_sequential(rest, variable_scope_identifier=new_scope_id) self.bfs_add_queue(prefix_expanded_planning_state.clone( program=program, scopes=new_scopes, latest_scope=new_scope_id, right_statements=(_ScopedStatement2(_CrowBehaviorEffectApplication(action_matching.behavior), new_scope_id),) + prefix_expanded_planning_state.right_statements )) return if not first_time_expand: return new_scope_id = planning_state.latest_scope + 1 program = action_matching.behavior.assign_body_program_scope(new_scope_id) preamble, promotable, rest = program.split_preamble_and_promotable() new_scopes = planning_state.scopes.copy() new_scopes[new_scope_id] = bounded_variables.copy() if preamble is not None: new_left_program = CrowBehaviorOrderingSuite.make_sequential(preamble, variable_scope_identifier=new_scope_id) if middle is not None: new_middle_program = CrowBehaviorOrderingSuite.make_unordered(middle, CrowBehaviorOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id)) if promotable is not None else middle else: new_middle_program = CrowBehaviorOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id) if promotable is not None else None new_right_program = CrowBehaviorOrderingSuite.make_sequential(rest, variable_scope_identifier=new_scope_id) new_right_statements = [_ScopedStatement2(new_right_program, new_scope_id), _ScopedStatement2(_CrowBehaviorEffectApplication(action_matching.behavior), new_scope_id)] if new_middle_program is not None: new_right_statements.insert(0, _ScopedStatement2(new_middle_program, new_scope_id)) self.bfs_add_queue(planning_state.clone(program=new_left_program, scopes=new_scopes, latest_scope=new_scope_id, right_statements=tuple(new_right_statements) + planning_state.right_statements)) else: if promotable is not None: if middle is not None: new_left_program = CrowBehaviorOrderingSuite.make_unordered(middle, CrowBehaviorOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id)) else: new_left_program = CrowBehaviorOrderingSuite.make_sequential(promotable, variable_scope_identifier=new_scope_id) new_right_program = CrowBehaviorOrderingSuite.make_sequential(rest, variable_scope_identifier=new_scope_id) new_right_statements = (_ScopedStatement2(new_right_program, new_scope_id), _ScopedStatement2(_CrowBehaviorEffectApplication(action_matching.behavior), new_scope_id)) + planning_state.right_statements self.bfs_add_queue(planning_state.clone(program=new_left_program, scopes=new_scopes, latest_scope=new_scope_id, right_statements=new_right_statements)) else: raise RuntimeError('Should not reach here. This case should have been already handled by the action.is_sequential_only() check.') def _bfs_expand_inner_primitive(self, state: _AStarNode, stmt: Union[CrowBindExpression, CrowRuntimeAssignmentExpression, CrowAssertExpression, CrowControllerApplicationExpression], scope_id: int): """Expand the tree by refining a particular primitive statement.""" if isinstance(stmt, CrowControllerApplicationExpression): new_csp = state.csp.clone() argument_values = [self.evaluate(x, state=state.state, csp=new_csp, bounded_variables=canonize_bounded_variables(state.scopes, scope_id), clone_csp=False)[0] for x in stmt.arguments] for i, argv in enumerate(argument_values): if isinstance(argv, StateObjectReference): argument_values[i] = argv.name self.bfs_add_queue(state.clone(program=None, left_statements=state.left_statements + (CrowControllerApplier(stmt.controller, argument_values),), csp=new_csp)) elif isinstance(stmt, CrowBindExpression): if stmt.is_object_bind: for new_scope in execute_object_bind(self.executor, stmt, state.state, canonize_bounded_variables(state.scopes, scope_id)): new_scopes = state.scopes.copy() new_scopes[scope_id] = new_scope self.bfs_add_queue(state.clone(program=None, scopes=new_scopes)) else: new_csp = state.csp.clone() new_scopes = state.scopes.copy() for var in stmt.variables: new_scopes[scope_id][var] = TensorValue.from_optimistic_value(new_csp.new_var(var.dtype, wrap=True)) rv, new_csp = self.evaluate(stmt.goal, state=state.state, csp=new_csp, bounded_variables=canonize_bounded_variables(new_scopes, scope_id)) self.bfs_add_queue(state.clone(program=None, scopes=new_scopes, csp=new_csp.add_equal_constraint(rv, True))) elif isinstance(stmt, CrowRuntimeAssignmentExpression): rv, new_csp = self.evaluate(stmt.value, state=state.state, csp=state.csp, bounded_variables=canonize_bounded_variables(state.scopes, scope_id)) new_scopes = state.scopes.copy() new_scopes[scope_id] = state.scopes[scope_id].copy() new_scopes[scope_id][stmt.variable.name] = rv self.bfs_add_queue(state.clone(program=None, scopes=new_scopes, csp=new_csp)) elif isinstance(stmt, CrowAssertExpression): rv, new_csp = self.evaluate(stmt.bool_expr, state=state.state, csp=state.csp, bounded_variables=canonize_bounded_variables(state.scopes, scope_id)) if isinstance(rv, OptimisticValue): self.bfs_add_queue(state.clone(program=None, csp=new_csp.add_equal_constraint(rv, True))) else: if bool(rv): self.bfs_add_queue(state.clone(program=None)) else: raise ValueError(f'Unknown statement type: {stmt}') def _bfs_expand_inner_action_effect(self, planning_state: _AStarNode, stmt: CrowBehavior, scope_id: int): new_csp = planning_state.csp.clone() if planning_state.csp is not None else None new_state = execute_behavior_effect( self.executor, stmt, planning_state.state, canonize_bounded_variables(planning_state.scopes, scope_id), csp=new_csp, action_index=len(planning_state.left_statements) - 1 ) self.bfs_add_queue(planning_state.clone(program=None, state=new_state, csp=new_csp))