Source code for concepts.dm.pdsketch.crow.compat.crow_planner_v1

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : crow_planner_v1.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 11/09/2023
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import warnings
from typing import Any, Optional, Union, Iterable, Iterator, Tuple, List, Dict

import jacinle

from concepts.dsl.expression import ValueOutputExpression, ListExpansionExpression, is_and_expr
from concepts.dsl.constraint import OptimisticValue, ConstraintSatisfactionProblem
from concepts.dm.pdsketch.operator import OperatorApplicationExpression, OperatorApplier
from concepts.dm.pdsketch.executor import PDSketchExecutor, PDSketchSGC
from concepts.dm.pdsketch.domain import State
from concepts.dm.pdsketch.regression_rule import RegressionCommitFlag, AchieveExpression, BindExpression, SubgoalCSPSerializability, \
    RegressionRuleApplicationExpression
from concepts.dm.pdsketch.planners.optimistic_search import ground_actions
from concepts.dm.pdsketch.csp_solvers.dpll_sampling import csp_dpll_sampling_solve
from concepts.dm.pdsketch.regression_utils import ground_fol_expression, evaluate_bool_scalar_expression
from concepts.dm.pdsketch.regression_utils import ground_operator_application_expression, gen_applicable_regression_rules, len_candidate_regression_rules
from concepts.dm.pdsketch.regression_utils import map_csp_placeholder_goal, map_csp_placeholder_action, map_csp_variable_state, map_csp_variable_mapping, mark_constraint_group_solver
from concepts.dm.pdsketch.regression_utils import create_find_expression_csp_variable, has_optimistic_constant_expression
from concepts.dm.pdsketch.crow.crow_state import PartiallyOrderedPlan, TotallyOrderedPlan

warnings.warn('This is an obsolete implementation of the CROW planner. It has limited support for the primitive statements supported in the regression rule bodies. Please use crow_planner_v2 instead', FutureWarning)


[docs] def crow_recursive( executor: PDSketchExecutor, state: State, goal_expr: Union[str, ValueOutputExpression], *, is_goal_serializable: bool = True, is_goal_refinement_compressible: bool = True, enable_reordering: bool = False, enable_csp: bool = True, max_search_depth: int = 10, max_csp_branching_factor: int = 5, max_beam_size: int = 20, allow_empty_plan_for_optimistic_goal: bool = False, verbose: bool = True ) -> Tuple[Iterable[Any], Dict[str, Any]]: """Compositional Regression and Optimization Wayfinder. Args: executor: the executor. state: the initial state. goal_expr: the goal expression. is_goal_serializable: whether the goal is serialized already. Otherwise, it will be treated as a conjunction. is_goal_refinement_compressible: whether the goals are refinement compressible. enable_reordering: whether to enable reordering of subgoals in regression rules. enable_csp: whether to enable CSP solving. max_search_depth: the maximum number of actions in a plan. max_csp_branching_factor: the maximum branching factor of the CSP solver. max_beam_size: the maximum beam size for keep tracking of CSP solutions. allow_empty_plan_for_optimistic_goal: whether to allow empty plans for optimistic goals. verbose: whether to print verbose information. Returns: A list of plans. Each plan is a tuple of (actions, csp, initial_state, final_state). """ if isinstance(goal_expr, str): goal_expr = executor.parse(goal_expr) search_cache = dict() search_stat = {'nr_expanded_nodes': 0} # NB(Jiayuan Mao @ 2023/09/09): the cache only works for previous_actions == []. # That is, we only cache the search results that start from the initial state. def return_with_cache(goal_set, previous_actions, rv): if len(previous_actions) == 0: goal_str = goal_set.gen_string() if goal_str not in search_cache: search_cache[goal_str] = rv return rv def try_retrieve_cache(goal_set, previous_actions): if len(previous_actions) == 0: goal_str = goal_set.gen_string() if goal_str in search_cache: return search_cache[goal_str] return None @jacinle.log_function(verbose=False) def dfs( s: State, g: PartiallyOrderedPlan, c: Tuple[ValueOutputExpression, ...], csp: Optional[ConstraintSatisfactionProblem], previous_actions: List[OperatorApplier], return_all_skeletons: bool = False, tail_csp_solve: bool = False, search_depth: int = 0 ) -> Iterator[Tuple[State, ConstraintSatisfactionProblem, List[OperatorApplier]]]: """Depth-first search for all possible plans. Args: s: the current state. g: the current goal. c: the list of constraints to maintain. csp: the current constraint satisfaction problem. previous_actions: the previous actions. return_all_skeletons: whether to return all possible plans. If False, only return the first plan found. tail_csp_solve: whether to solve the CSP after the expansion of the current goal. search_depth: the current search depth. Returns: a list of plans. Each plan is a tuple of (final_state, csp, actions). """ if verbose: jacinle.log_function.print('Current goal', g, f'return_all_skeletons={return_all_skeletons}', f'previous_actions={previous_actions}') if (rv := try_retrieve_cache(g, previous_actions)) is not None: return rv all_possible_plans = list() flatten_goals = list(g.iter_goals()) if not has_optimistic_constant_expression(*flatten_goals) or allow_empty_plan_for_optimistic_goal: """If the current goal contains no optimistic constant, we may directly solve the CSP.""" rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(executor, flatten_goals, s, dict(), csp, csp_note='goal_test') if rv: all_possible_plans.append((s, new_csp, previous_actions)) if not is_optimistic: # If there is no optimistic value, we can stop the search from here. # NB(Jiayuan Mao @ 2023/09/11): note that even if `return_all_skeletons` is True, we still return here. # This corresponds to an early stopping behavior that defines the space of all possible plans. return return_with_cache(g, previous_actions, all_possible_plans) if search_depth > max_search_depth: return return_with_cache(g, previous_actions, all_possible_plans) search_stat['nr_expanded_nodes'] += 1 candidate_regression_rules = gen_applicable_regression_rules(executor, s, g, c) if len_candidate_regression_rules(candidate_regression_rules) == 0: return return_with_cache(g, previous_actions, all_possible_plans) some_rule_success = False # If return_all_skeletons is False, we will stop the search once some rule application succeeds. for chain_index, subgoal_index, this_candidate_regression_rules in candidate_regression_rules: cur_goal = g.chains[chain_index].sequence[subgoal_index] other_goals = g.exclude(chain_index, subgoal_index) other_goals_return_all_skeletons = g.chains[chain_index].get_return_all_skeletons_flag(subgoal_index) if verbose: jacinle.log_function.print('Now trying to excluding goal', cur_goal) grounded_subgoals_cache = dict() for regression_rule_index, (rule, bounded_variables) in enumerate(this_candidate_regression_rules): grounded_subgoals = list() placeholder_csp = ConstraintSatisfactionProblem() placeholder_bounded_variables = bounded_variables.copy() for i, item in enumerate(rule.body): if isinstance(item, AchieveExpression): grounded_subgoals.append(AchieveExpression(ground_fol_expression(item.goal, placeholder_bounded_variables), maintains=[], serializability=item.serializability, csp_serializability=item.csp_serializability)) elif isinstance(item, BindExpression): for variable in item.variables: placeholder_bounded_variables[variable] = create_find_expression_csp_variable(variable, csp=placeholder_csp, bounded_variables=placeholder_bounded_variables) grounded_subgoals.append(BindExpression([], ground_fol_expression(item.goal, placeholder_bounded_variables), serializability=item.serializability, csp_serializability=item.csp_serializability, ordered=item.ordered)) elif isinstance(item, OperatorApplicationExpression): cur_action = ground_operator_application_expression(item, placeholder_bounded_variables, csp=placeholder_csp, add_csp_variables=False) grounded_subgoals.append(cur_action) elif isinstance(item, RegressionRuleApplicationExpression): raise NotImplementedError('Regression rule application is not supported in the body of a regression rule.') elif isinstance(item, ListExpansionExpression): subgoals = executor.execute(item.expression, s, placeholder_bounded_variables, sgc=PDSketchSGC(s, g, c)) assert isinstance(subgoals, TotallyOrderedPlan), f'ListExpansionExpression must be used with a TotallyOrderedPlan, got {type(subgoals)}' grounded_subgoals.extend(subgoals.sequence) elif isinstance(item, RegressionCommitFlag): grounded_subgoals.append(item) else: raise ValueError(f'Unknown item type {type(item)} in rule {item}.') # pass the serializability information to the previous subgoal. max_reorder_prefix_length = 0 for i, item in enumerate(grounded_subgoals): if isinstance(item, RegressionCommitFlag): if i > 0 and isinstance(grounded_subgoals[i - 1], (AchieveExpression, BindExpression)): grounded_subgoals[i - 1].serializability = item.goal_serializability if isinstance(item, (AchieveExpression, BindExpression)): if item.sequential_decomposable is False: max_reorder_prefix_length = i + 1 grounded_subgoals_cache[regression_rule_index] = (grounded_subgoals, placeholder_csp, max_reorder_prefix_length) if len(other_goals) == 0: other_goals_plans = [(s, csp, previous_actions)] else: # TODO(Jiayuan Mao @ 2023/09/09): change this list to an actual generator call. other_goals_plans = list() other_goals_plans_tmp = list(dfs(s, other_goals, c, csp, previous_actions, search_depth=search_depth, return_all_skeletons=other_goals_return_all_skeletons)) for cur_state, cur_csp, cur_actions in other_goals_plans_tmp: rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(executor, [cur_goal], cur_state, dict(), cur_csp, csp_note='goal_test_shortcut') if rv: all_possible_plans.append((cur_state, new_csp, cur_actions)) if not is_optimistic: # NB(Jiayuan Mao @ 2023/09/11): another place where we stop the search and ignores the `return_all_skeletons` flag. continue other_goals_plans.append((cur_state, cur_csp, cur_actions)) if len(this_candidate_regression_rules) == 0 or len(other_goals_plans) == 0: continue if len(other_goals) == 0: max_prefix_length = 0 else: max_prefix_length = 0 if not enable_reordering else max([x[2] for x in grounded_subgoals_cache.values()]) prefix_stop_mark = dict() for prefix_length in range(max_prefix_length + 1): for regression_rule_index, (rule, bounded_variables) in enumerate(this_candidate_regression_rules): grounded_subgoals, placeholder_csp, max_reorder_prefix_length = grounded_subgoals_cache[regression_rule_index] if prefix_length > max_reorder_prefix_length: continue if regression_rule_index in prefix_stop_mark and prefix_stop_mark[regression_rule_index]: continue if verbose: jacinle.log_function.print('Applying rule', rule, 'for', cur_goal, 'and prefix length', prefix_length, 'goal is', g) if prefix_length == 0: previous_possible_branches = other_goals_plans start_csp_variable_mapping = dict() else: start_csp_variable_mapping = dict() new_csp = cur_csp.clone() new_chain_subgoals = list() new_chain_flags = list() for i, item in enumerate(grounded_subgoals[:prefix_length]): if isinstance(item, AchieveExpression): subgoal, start_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, start_csp_variable_mapping) new_chain_subgoals.append(item.goal) new_chain_flags.append(not item.refinement_compressible or return_all_skeletons) elif isinstance(item, BindExpression): # TODO(Jiayuan Mao @ 2023/12/06): implement this for bypassing FindExpressions that can be commited... subgoal, start_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, start_csp_variable_mapping) with new_csp.with_group(subgoal) as group: rv = executor.execute(subgoal, cur_state, {}, csp=new_csp).item() if isinstance(rv, OptimisticValue): new_csp.add_equal_constraint(rv) mark_constraint_group_solver(executor, state, bounded_variables, group) elif isinstance(item, RegressionCommitFlag): continue else: raise TypeError(f'Unsupported item type {type(item)} in rule {item}.') cur_other_goals = other_goals.add_chain(new_chain_subgoals, new_chain_flags) cur_other_goals_return_all_skeletons = new_chain_flags[-1] if len(new_chain_flags) > 0 else return_all_skeletons previous_possible_branches = list(dfs(s, cur_other_goals, c, new_csp, previous_actions, search_depth=search_depth, return_all_skeletons=cur_other_goals_return_all_skeletons)) if len(previous_possible_branches) == 0: if verbose: jacinle.log_function.print('Prefix planning failed!!! Stop.') # If it's not possible to achieve the subset of goals, then it's not possible to achieve the whole goal. # Therefore, this is a break, not a continue. prefix_stop_mark[regression_rule_index] = True continue for prev_state, prev_csp, prev_actions in previous_possible_branches: # construct the new csp and the sequence of grounded subgoals. possible_branches = [(prev_state, prev_csp, prev_actions, start_csp_variable_mapping)] for i in range(prefix_length, len(grounded_subgoals)): item = grounded_subgoals[i] next_possible_branches = list() if isinstance(item, (AchieveExpression, BindExpression)): if not return_all_skeletons and item.refinement_compressible and len(possible_branches) > 1: # TODO(Jiayuan Mao @ 2023/12/06): implement this for the case of CSP solving --- we may need to keep multiple variable bindings! possible_branches = [min(possible_branches, key=lambda x: len(x[2]))] for branch_index, (cur_state, cur_csp, cur_actions, cur_csp_variable_mapping) in enumerate(possible_branches): # prev_next_possible_branches_length = len(next_possible_branches) if isinstance(item, AchieveExpression): new_csp = cur_csp.clone() if cur_csp is not None else None subgoal, new_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, cur_csp_variable_mapping) subgoal_return_all_skeletons_flag = not item.refinement_compressible or return_all_skeletons this_next_possible_branches = ([(*x, new_csp_variable_mapping) for x in dfs( cur_state, PartiallyOrderedPlan.from_single_goal(subgoal, subgoal_return_all_skeletons_flag), c + item.maintains, new_csp, cur_actions, return_all_skeletons=subgoal_return_all_skeletons_flag, search_depth=search_depth + 1 )]) elif isinstance(item, BindExpression): if cur_csp is None: raise RuntimeError('FindExpression must be used with a CSP.') new_csp = cur_csp.clone() subgoal, new_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, cur_csp_variable_mapping) with new_csp.with_group(subgoal) as group: rv = executor.execute(subgoal, cur_state, {}, csp=new_csp).item() if isinstance(rv, OptimisticValue): new_csp.add_equal_constraint(rv) mark_constraint_group_solver(executor, state, bounded_variables, group) this_next_possible_branches = [(cur_state, new_csp, cur_actions, new_csp_variable_mapping)] elif isinstance(item, OperatorApplier): # TODO(Jiayuan Mao @ 2023/09/11): vectorize this operation, probably only useful when `return_all_skeletons` is True. new_csp = cur_csp.clone() if cur_csp is not None else None subaction, new_csp_variable_mapping = map_csp_placeholder_action(item, new_csp, placeholder_csp, cur_csp_variable_mapping) succ, new_state = executor.apply(subaction, cur_state, csp=new_csp, clone=True, action_index=len(cur_actions)) if succ: this_next_possible_branches = [(new_state, new_csp, cur_actions + [subaction], new_csp_variable_mapping)] else: this_next_possible_branches = [] elif isinstance(item, RegressionCommitFlag): this_next_possible_branches = [(cur_state, cur_csp, cur_actions, cur_csp_variable_mapping)] else: raise TypeError(f'Unknown item: {item}') commit_csp = False if isinstance(item, RegressionCommitFlag) and item.csp_serializability in (SubgoalCSPSerializability.FORALL, SubgoalCSPSerializability.SOME): commit_csp = True elif isinstance(item, (AchieveExpression, BindExpression)) and item.csp_serializability in (SubgoalCSPSerializability.FORALL, SubgoalCSPSerializability.SOME): commit_csp = True if commit_csp: for new_state, new_csp, new_actions, new_csp_variable_mapping in this_next_possible_branches: assignments = csp_dpll_sampling_solve(executor, cur_csp) if assignments is not None: new_state = map_csp_variable_state(cur_state, cur_csp, assignments) new_csp = ConstraintSatisfactionProblem() new_actions = ground_actions(executor, cur_actions, assignments) new_csp_variable_mapping = map_csp_variable_mapping(cur_csp_variable_mapping, csp, assignments) next_possible_branches.append((new_state, new_csp, new_actions, new_csp_variable_mapping)) # TODO(Jiayuan Mao @ 2023/11/27): okay we need to implement some kind of tracking of "bounded_variables." # This need to be done by tracking some kind of mapping for optimistic variables in "grounded_subgoals." else: next_possible_branches.extend(this_next_possible_branches) # jacinle.log_function.print(f'Branch {branch_index + 1} of {len(possible_branches)} for {item} has {len(next_possible_branches) - prev_next_possible_branches_length} branches.') possible_branches = next_possible_branches # jacinle.log_function.print(f'Finished search subgoal {i + 1} of {len(grounded_subgoals)}: {item}. Possible branches (length={len(possible_branches)}):') # for x in possible_branches: # jacinle.log_function.print(jacinle.indent_text(str(x[2]))) # all_possible_plans.extend(possible_branches) found_plan = False # TODO(Jiayuan Mao @ 2023/09/11): implement this via maintains checking. for cur_state, cur_csp, actions, _ in possible_branches: rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(executor, flatten_goals, cur_state, dict(), csp=cur_csp, csp_note=f'subgoal_test: {"; ".join([str(x) for x in flatten_goals])}') if rv: if verbose: jacinle.log_function.print('Found a plan', [str(x) for x in actions], 'for goal', g) if is_optimistic and tail_csp_solve: assignments = csp_dpll_sampling_solve(executor, new_csp, verbose=True) if assignments is not None: all_possible_plans.append((cur_state, actions, ground_actions(executor, actions, assignments))) found_plan = True else: all_possible_plans.append((cur_state, new_csp, actions)) found_plan = True if found_plan: prefix_stop_mark[regression_rule_index] = True some_rule_success = True # TODO(Jiayuan Mao @ 2023/09/06): since we have changed the order of prefix_length for-loop and the regression rule for-loop. # We need to use an additional dictionary to store whether we have found a plan for a particular regression rule. # Right now this doesn't matter because we only use the first plan. if not return_all_skeletons and some_rule_success: break # Break for-loop for `for prev_state in previous_possible_branches`. if not return_all_skeletons and some_rule_success: break # Break for-loop for `for rule in regression_rules` if not return_all_skeletons and some_rule_success: break # Break for-loop for `for prefix_length in range(1, rule.max_rule_prefix_length + 1):` if not return_all_skeletons and some_rule_success: break if len(all_possible_plans) == 0: if verbose: jacinle.log_function.print('No possible plans for goal', g) return return_with_cache(g, previous_actions, []) # TODO(Jiayuan Mao @ 2023/11/19): add unique back. # unique_all_possible_plans = _unique_plans(all_possible_plans) unique_all_possible_plans = all_possible_plans if len(unique_all_possible_plans) != len(all_possible_plans): if verbose: jacinle.log_function.print('Warning: there are duplicate plans for goal', g, f'({len(unique_all_possible_plans)} unique plans vs {len(all_possible_plans)} total plans)') # import ipdb; ipdb.set_trace() unique_all_possible_plans = sorted(unique_all_possible_plans, key=lambda x: len(x[2])) return return_with_cache(g, previous_actions, unique_all_possible_plans) if is_and_expr(goal_expr): if len(goal_expr.arguments) == 1 and goal_expr.arguments[0].return_type.is_list_type: goal_set = [goal_expr] else: goal_set = list(goal_expr.arguments) else: goal_set = [goal_expr] goal_set = PartiallyOrderedPlan((TotallyOrderedPlan(goal_set, return_all_skeletons_flags=[(not is_goal_refinement_compressible) for _ in goal_set], is_ordered=is_goal_serializable),)) candidate_plans = dfs(state, goal_set, tuple(), csp=ConstraintSatisfactionProblem() if enable_csp else None, previous_actions=list(), tail_csp_solve=True) candidate_plans = [actions for final_state, csp, actions in candidate_plans] return candidate_plans, search_stat