Source code for

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   :
# Author : Jiayuan Mao
# Email  :
# Date   : 11/09/2023
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import warnings
from typing import Any, Optional, Union, Iterable, Iterator, Tuple, List, Dict

import jacinle

from concepts.dsl.expression import ValueOutputExpression, ListExpansionExpression, is_and_expr
from concepts.dsl.constraint import OptimisticValue, ConstraintSatisfactionProblem
from import OperatorApplicationExpression, OperatorApplier
from import PDSketchExecutor, PDSketchSGC
from import State
from import RegressionCommitFlag, AchieveExpression, BindExpression, SubgoalCSPSerializability, \
from import ground_actions
from import csp_dpll_sampling_solve
from import ground_fol_expression, evaluate_bool_scalar_expression
from import ground_operator_application_expression, gen_applicable_regression_rules, len_candidate_regression_rules
from import map_csp_placeholder_goal, map_csp_placeholder_action, map_csp_variable_state, map_csp_variable_mapping, mark_constraint_group_solver
from import create_find_expression_csp_variable, has_optimistic_constant_expression
from import PartiallyOrderedPlan, TotallyOrderedPlan

warnings.warn('This is an obsolete implementation of the CROW planner. It has limited support for the primitive statements supported in the regression rule bodies. Please use crow_planner_v2 instead', FutureWarning)

[docs] def crow_recursive( executor: PDSketchExecutor, state: State, goal_expr: Union[str, ValueOutputExpression], *, is_goal_serializable: bool = True, is_goal_refinement_compressible: bool = True, enable_reordering: bool = False, enable_csp: bool = True, max_search_depth: int = 10, max_csp_branching_factor: int = 5, max_beam_size: int = 20, allow_empty_plan_for_optimistic_goal: bool = False, verbose: bool = True ) -> Tuple[Iterable[Any], Dict[str, Any]]: """Compositional Regression and Optimization Wayfinder. Args: executor: the executor. state: the initial state. goal_expr: the goal expression. is_goal_serializable: whether the goal is serialized already. Otherwise, it will be treated as a conjunction. is_goal_refinement_compressible: whether the goals are refinement compressible. enable_reordering: whether to enable reordering of subgoals in regression rules. enable_csp: whether to enable CSP solving. max_search_depth: the maximum number of actions in a plan. max_csp_branching_factor: the maximum branching factor of the CSP solver. max_beam_size: the maximum beam size for keep tracking of CSP solutions. allow_empty_plan_for_optimistic_goal: whether to allow empty plans for optimistic goals. verbose: whether to print verbose information. Returns: A list of plans. Each plan is a tuple of (actions, csp, initial_state, final_state). """ if isinstance(goal_expr, str): goal_expr = executor.parse(goal_expr) search_cache = dict() search_stat = {'nr_expanded_nodes': 0} # NB(Jiayuan Mao @ 2023/09/09): the cache only works for previous_actions == []. # That is, we only cache the search results that start from the initial state. def return_with_cache(goal_set, previous_actions, rv): if len(previous_actions) == 0: goal_str = goal_set.gen_string() if goal_str not in search_cache: search_cache[goal_str] = rv return rv def try_retrieve_cache(goal_set, previous_actions): if len(previous_actions) == 0: goal_str = goal_set.gen_string() if goal_str in search_cache: return search_cache[goal_str] return None @jacinle.log_function(verbose=False) def dfs( s: State, g: PartiallyOrderedPlan, c: Tuple[ValueOutputExpression, ...], csp: Optional[ConstraintSatisfactionProblem], previous_actions: List[OperatorApplier], return_all_skeletons: bool = False, tail_csp_solve: bool = False, search_depth: int = 0 ) -> Iterator[Tuple[State, ConstraintSatisfactionProblem, List[OperatorApplier]]]: """Depth-first search for all possible plans. Args: s: the current state. g: the current goal. c: the list of constraints to maintain. csp: the current constraint satisfaction problem. previous_actions: the previous actions. return_all_skeletons: whether to return all possible plans. If False, only return the first plan found. tail_csp_solve: whether to solve the CSP after the expansion of the current goal. search_depth: the current search depth. Returns: a list of plans. Each plan is a tuple of (final_state, csp, actions). """ if verbose: jacinle.log_function.print('Current goal', g, f'return_all_skeletons={return_all_skeletons}', f'previous_actions={previous_actions}') if (rv := try_retrieve_cache(g, previous_actions)) is not None: return rv all_possible_plans = list() flatten_goals = list(g.iter_goals()) if not has_optimistic_constant_expression(*flatten_goals) or allow_empty_plan_for_optimistic_goal: """If the current goal contains no optimistic constant, we may directly solve the CSP.""" rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(executor, flatten_goals, s, dict(), csp, csp_note='goal_test') if rv: all_possible_plans.append((s, new_csp, previous_actions)) if not is_optimistic: # If there is no optimistic value, we can stop the search from here. # NB(Jiayuan Mao @ 2023/09/11): note that even if `return_all_skeletons` is True, we still return here. # This corresponds to an early stopping behavior that defines the space of all possible plans. return return_with_cache(g, previous_actions, all_possible_plans) if search_depth > max_search_depth: return return_with_cache(g, previous_actions, all_possible_plans) search_stat['nr_expanded_nodes'] += 1 candidate_regression_rules = gen_applicable_regression_rules(executor, s, g, c) if len_candidate_regression_rules(candidate_regression_rules) == 0: return return_with_cache(g, previous_actions, all_possible_plans) some_rule_success = False # If return_all_skeletons is False, we will stop the search once some rule application succeeds. for chain_index, subgoal_index, this_candidate_regression_rules in candidate_regression_rules: cur_goal = g.chains[chain_index].sequence[subgoal_index] other_goals = g.exclude(chain_index, subgoal_index) other_goals_return_all_skeletons = g.chains[chain_index].get_return_all_skeletons_flag(subgoal_index) if verbose: jacinle.log_function.print('Now trying to excluding goal', cur_goal) grounded_subgoals_cache = dict() for regression_rule_index, (rule, bounded_variables) in enumerate(this_candidate_regression_rules): grounded_subgoals = list() placeholder_csp = ConstraintSatisfactionProblem() placeholder_bounded_variables = bounded_variables.copy() for i, item in enumerate(rule.body): if isinstance(item, AchieveExpression): grounded_subgoals.append(AchieveExpression(ground_fol_expression(item.goal, placeholder_bounded_variables), maintains=[], serializability=item.serializability, csp_serializability=item.csp_serializability)) elif isinstance(item, BindExpression): for variable in item.variables: placeholder_bounded_variables[variable] = create_find_expression_csp_variable(variable, csp=placeholder_csp, bounded_variables=placeholder_bounded_variables) grounded_subgoals.append(BindExpression([], ground_fol_expression(item.goal, placeholder_bounded_variables), serializability=item.serializability, csp_serializability=item.csp_serializability, ordered=item.ordered)) elif isinstance(item, OperatorApplicationExpression): cur_action = ground_operator_application_expression(item, placeholder_bounded_variables, csp=placeholder_csp, add_csp_variables=False) grounded_subgoals.append(cur_action) elif isinstance(item, RegressionRuleApplicationExpression): raise NotImplementedError('Regression rule application is not supported in the body of a regression rule.') elif isinstance(item, ListExpansionExpression): subgoals = executor.execute(item.expression, s, placeholder_bounded_variables, sgc=PDSketchSGC(s, g, c)) assert isinstance(subgoals, TotallyOrderedPlan), f'ListExpansionExpression must be used with a TotallyOrderedPlan, got {type(subgoals)}' grounded_subgoals.extend(subgoals.sequence) elif isinstance(item, RegressionCommitFlag): grounded_subgoals.append(item) else: raise ValueError(f'Unknown item type {type(item)} in rule {item}.') # pass the serializability information to the previous subgoal. max_reorder_prefix_length = 0 for i, item in enumerate(grounded_subgoals): if isinstance(item, RegressionCommitFlag): if i > 0 and isinstance(grounded_subgoals[i - 1], (AchieveExpression, BindExpression)): grounded_subgoals[i - 1].serializability = item.goal_serializability if isinstance(item, (AchieveExpression, BindExpression)): if item.sequential_decomposable is False: max_reorder_prefix_length = i + 1 grounded_subgoals_cache[regression_rule_index] = (grounded_subgoals, placeholder_csp, max_reorder_prefix_length) if len(other_goals) == 0: other_goals_plans = [(s, csp, previous_actions)] else: # TODO(Jiayuan Mao @ 2023/09/09): change this list to an actual generator call. other_goals_plans = list() other_goals_plans_tmp = list(dfs(s, other_goals, c, csp, previous_actions, search_depth=search_depth, return_all_skeletons=other_goals_return_all_skeletons)) for cur_state, cur_csp, cur_actions in other_goals_plans_tmp: rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(executor, [cur_goal], cur_state, dict(), cur_csp, csp_note='goal_test_shortcut') if rv: all_possible_plans.append((cur_state, new_csp, cur_actions)) if not is_optimistic: # NB(Jiayuan Mao @ 2023/09/11): another place where we stop the search and ignores the `return_all_skeletons` flag. continue other_goals_plans.append((cur_state, cur_csp, cur_actions)) if len(this_candidate_regression_rules) == 0 or len(other_goals_plans) == 0: continue if len(other_goals) == 0: max_prefix_length = 0 else: max_prefix_length = 0 if not enable_reordering else max([x[2] for x in grounded_subgoals_cache.values()]) prefix_stop_mark = dict() for prefix_length in range(max_prefix_length + 1): for regression_rule_index, (rule, bounded_variables) in enumerate(this_candidate_regression_rules): grounded_subgoals, placeholder_csp, max_reorder_prefix_length = grounded_subgoals_cache[regression_rule_index] if prefix_length > max_reorder_prefix_length: continue if regression_rule_index in prefix_stop_mark and prefix_stop_mark[regression_rule_index]: continue if verbose: jacinle.log_function.print('Applying rule', rule, 'for', cur_goal, 'and prefix length', prefix_length, 'goal is', g) if prefix_length == 0: previous_possible_branches = other_goals_plans start_csp_variable_mapping = dict() else: start_csp_variable_mapping = dict() new_csp = cur_csp.clone() new_chain_subgoals = list() new_chain_flags = list() for i, item in enumerate(grounded_subgoals[:prefix_length]): if isinstance(item, AchieveExpression): subgoal, start_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, start_csp_variable_mapping) new_chain_subgoals.append(item.goal) new_chain_flags.append(not item.refinement_compressible or return_all_skeletons) elif isinstance(item, BindExpression): # TODO(Jiayuan Mao @ 2023/12/06): implement this for bypassing FindExpressions that can be commited... subgoal, start_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, start_csp_variable_mapping) with new_csp.with_group(subgoal) as group: rv = executor.execute(subgoal, cur_state, {}, csp=new_csp).item() if isinstance(rv, OptimisticValue): new_csp.add_equal_constraint(rv) mark_constraint_group_solver(executor, state, bounded_variables, group) elif isinstance(item, RegressionCommitFlag): continue else: raise TypeError(f'Unsupported item type {type(item)} in rule {item}.') cur_other_goals = other_goals.add_chain(new_chain_subgoals, new_chain_flags) cur_other_goals_return_all_skeletons = new_chain_flags[-1] if len(new_chain_flags) > 0 else return_all_skeletons previous_possible_branches = list(dfs(s, cur_other_goals, c, new_csp, previous_actions, search_depth=search_depth, return_all_skeletons=cur_other_goals_return_all_skeletons)) if len(previous_possible_branches) == 0: if verbose: jacinle.log_function.print('Prefix planning failed!!! Stop.') # If it's not possible to achieve the subset of goals, then it's not possible to achieve the whole goal. # Therefore, this is a break, not a continue. prefix_stop_mark[regression_rule_index] = True continue for prev_state, prev_csp, prev_actions in previous_possible_branches: # construct the new csp and the sequence of grounded subgoals. possible_branches = [(prev_state, prev_csp, prev_actions, start_csp_variable_mapping)] for i in range(prefix_length, len(grounded_subgoals)): item = grounded_subgoals[i] next_possible_branches = list() if isinstance(item, (AchieveExpression, BindExpression)): if not return_all_skeletons and item.refinement_compressible and len(possible_branches) > 1: # TODO(Jiayuan Mao @ 2023/12/06): implement this for the case of CSP solving --- we may need to keep multiple variable bindings! possible_branches = [min(possible_branches, key=lambda x: len(x[2]))] for branch_index, (cur_state, cur_csp, cur_actions, cur_csp_variable_mapping) in enumerate(possible_branches): # prev_next_possible_branches_length = len(next_possible_branches) if isinstance(item, AchieveExpression): new_csp = cur_csp.clone() if cur_csp is not None else None subgoal, new_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, cur_csp_variable_mapping) subgoal_return_all_skeletons_flag = not item.refinement_compressible or return_all_skeletons this_next_possible_branches = ([(*x, new_csp_variable_mapping) for x in dfs( cur_state, PartiallyOrderedPlan.from_single_goal(subgoal, subgoal_return_all_skeletons_flag), c + item.maintains, new_csp, cur_actions, return_all_skeletons=subgoal_return_all_skeletons_flag, search_depth=search_depth + 1 )]) elif isinstance(item, BindExpression): if cur_csp is None: raise RuntimeError('FindExpression must be used with a CSP.') new_csp = cur_csp.clone() subgoal, new_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, cur_csp_variable_mapping) with new_csp.with_group(subgoal) as group: rv = executor.execute(subgoal, cur_state, {}, csp=new_csp).item() if isinstance(rv, OptimisticValue): new_csp.add_equal_constraint(rv) mark_constraint_group_solver(executor, state, bounded_variables, group) this_next_possible_branches = [(cur_state, new_csp, cur_actions, new_csp_variable_mapping)] elif isinstance(item, OperatorApplier): # TODO(Jiayuan Mao @ 2023/09/11): vectorize this operation, probably only useful when `return_all_skeletons` is True. new_csp = cur_csp.clone() if cur_csp is not None else None subaction, new_csp_variable_mapping = map_csp_placeholder_action(item, new_csp, placeholder_csp, cur_csp_variable_mapping) succ, new_state = executor.apply(subaction, cur_state, csp=new_csp, clone=True, action_index=len(cur_actions)) if succ: this_next_possible_branches = [(new_state, new_csp, cur_actions + [subaction], new_csp_variable_mapping)] else: this_next_possible_branches = [] elif isinstance(item, RegressionCommitFlag): this_next_possible_branches = [(cur_state, cur_csp, cur_actions, cur_csp_variable_mapping)] else: raise TypeError(f'Unknown item: {item}') commit_csp = False if isinstance(item, RegressionCommitFlag) and item.csp_serializability in (SubgoalCSPSerializability.FORALL, SubgoalCSPSerializability.SOME): commit_csp = True elif isinstance(item, (AchieveExpression, BindExpression)) and item.csp_serializability in (SubgoalCSPSerializability.FORALL, SubgoalCSPSerializability.SOME): commit_csp = True if commit_csp: for new_state, new_csp, new_actions, new_csp_variable_mapping in this_next_possible_branches: assignments = csp_dpll_sampling_solve(executor, cur_csp) if assignments is not None: new_state = map_csp_variable_state(cur_state, cur_csp, assignments) new_csp = ConstraintSatisfactionProblem() new_actions = ground_actions(executor, cur_actions, assignments) new_csp_variable_mapping = map_csp_variable_mapping(cur_csp_variable_mapping, csp, assignments) next_possible_branches.append((new_state, new_csp, new_actions, new_csp_variable_mapping)) # TODO(Jiayuan Mao @ 2023/11/27): okay we need to implement some kind of tracking of "bounded_variables." # This need to be done by tracking some kind of mapping for optimistic variables in "grounded_subgoals." else: next_possible_branches.extend(this_next_possible_branches) # jacinle.log_function.print(f'Branch {branch_index + 1} of {len(possible_branches)} for {item} has {len(next_possible_branches) - prev_next_possible_branches_length} branches.') possible_branches = next_possible_branches # jacinle.log_function.print(f'Finished search subgoal {i + 1} of {len(grounded_subgoals)}: {item}. Possible branches (length={len(possible_branches)}):') # for x in possible_branches: # jacinle.log_function.print(jacinle.indent_text(str(x[2]))) # all_possible_plans.extend(possible_branches) found_plan = False # TODO(Jiayuan Mao @ 2023/09/11): implement this via maintains checking. for cur_state, cur_csp, actions, _ in possible_branches: rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(executor, flatten_goals, cur_state, dict(), csp=cur_csp, csp_note=f'subgoal_test: {"; ".join([str(x) for x in flatten_goals])}') if rv: if verbose: jacinle.log_function.print('Found a plan', [str(x) for x in actions], 'for goal', g) if is_optimistic and tail_csp_solve: assignments = csp_dpll_sampling_solve(executor, new_csp, verbose=True) if assignments is not None: all_possible_plans.append((cur_state, actions, ground_actions(executor, actions, assignments))) found_plan = True else: all_possible_plans.append((cur_state, new_csp, actions)) found_plan = True if found_plan: prefix_stop_mark[regression_rule_index] = True some_rule_success = True # TODO(Jiayuan Mao @ 2023/09/06): since we have changed the order of prefix_length for-loop and the regression rule for-loop. # We need to use an additional dictionary to store whether we have found a plan for a particular regression rule. # Right now this doesn't matter because we only use the first plan. if not return_all_skeletons and some_rule_success: break # Break for-loop for `for prev_state in previous_possible_branches`. if not return_all_skeletons and some_rule_success: break # Break for-loop for `for rule in regression_rules` if not return_all_skeletons and some_rule_success: break # Break for-loop for `for prefix_length in range(1, rule.max_rule_prefix_length + 1):` if not return_all_skeletons and some_rule_success: break if len(all_possible_plans) == 0: if verbose: jacinle.log_function.print('No possible plans for goal', g) return return_with_cache(g, previous_actions, []) # TODO(Jiayuan Mao @ 2023/11/19): add unique back. # unique_all_possible_plans = _unique_plans(all_possible_plans) unique_all_possible_plans = all_possible_plans if len(unique_all_possible_plans) != len(all_possible_plans): if verbose: jacinle.log_function.print('Warning: there are duplicate plans for goal', g, f'({len(unique_all_possible_plans)} unique plans vs {len(all_possible_plans)} total plans)') # import ipdb; ipdb.set_trace() unique_all_possible_plans = sorted(unique_all_possible_plans, key=lambda x: len(x[2])) return return_with_cache(g, previous_actions, unique_all_possible_plans) if is_and_expr(goal_expr): if len(goal_expr.arguments) == 1 and goal_expr.arguments[0].return_type.is_list_type: goal_set = [goal_expr] else: goal_set = list(goal_expr.arguments) else: goal_set = [goal_expr] goal_set = PartiallyOrderedPlan((TotallyOrderedPlan(goal_set, return_all_skeletons_flags=[(not is_goal_refinement_compressible) for _ in goal_set], is_ordered=is_goal_serializable),)) candidate_plans = dfs(state, goal_set, tuple(), csp=ConstraintSatisfactionProblem() if enable_csp else None, previous_actions=list(), tail_csp_solve=True) candidate_plans = [actions for final_state, csp, actions in candidate_plans] return candidate_plans, search_stat