#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : crow_planner_v1.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 11/09/2023
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
import warnings
from typing import Any, Optional, Union, Iterable, Iterator, Tuple, List, Dict
import jacinle
from concepts.dsl.expression import ValueOutputExpression, ListExpansionExpression, is_and_expr
from concepts.dsl.constraint import OptimisticValue, ConstraintSatisfactionProblem
from concepts.dm.pdsketch.operator import OperatorApplicationExpression, OperatorApplier
from concepts.dm.pdsketch.executor import PDSketchExecutor, PDSketchSGC
from concepts.dm.pdsketch.domain import State
from concepts.dm.pdsketch.regression_rule import RegressionCommitFlag, AchieveExpression, BindExpression, SubgoalCSPSerializability, \
RegressionRuleApplicationExpression
from concepts.dm.pdsketch.planners.optimistic_search import ground_actions
from concepts.dm.pdsketch.csp_solvers.dpll_sampling import csp_dpll_sampling_solve
from concepts.dm.pdsketch.regression_utils import ground_fol_expression, evaluate_bool_scalar_expression
from concepts.dm.pdsketch.regression_utils import ground_operator_application_expression, gen_applicable_regression_rules, len_candidate_regression_rules
from concepts.dm.pdsketch.regression_utils import map_csp_placeholder_goal, map_csp_placeholder_action, map_csp_variable_state, map_csp_variable_mapping, mark_constraint_group_solver
from concepts.dm.pdsketch.regression_utils import create_find_expression_csp_variable, has_optimistic_constant_expression
from concepts.dm.pdsketch.crow.crow_state import PartiallyOrderedPlan, TotallyOrderedPlan
warnings.warn('This is an obsolete implementation of the CROW planner. It has limited support for the primitive statements supported in the regression rule bodies. Please use crow_planner_v2 instead', FutureWarning)
[docs]
def crow_recursive(
executor: PDSketchExecutor, state: State, goal_expr: Union[str, ValueOutputExpression], *,
is_goal_serializable: bool = True,
is_goal_refinement_compressible: bool = True,
enable_reordering: bool = False,
enable_csp: bool = True,
max_search_depth: int = 10,
max_csp_branching_factor: int = 5, max_beam_size: int = 20,
allow_empty_plan_for_optimistic_goal: bool = False,
verbose: bool = True
) -> Tuple[Iterable[Any], Dict[str, Any]]:
"""Compositional Regression and Optimization Wayfinder.
Args:
executor: the executor.
state: the initial state.
goal_expr: the goal expression.
is_goal_serializable: whether the goal is serialized already. Otherwise, it will be treated as a conjunction.
is_goal_refinement_compressible: whether the goals are refinement compressible.
enable_reordering: whether to enable reordering of subgoals in regression rules.
enable_csp: whether to enable CSP solving.
max_search_depth: the maximum number of actions in a plan.
max_csp_branching_factor: the maximum branching factor of the CSP solver.
max_beam_size: the maximum beam size for keep tracking of CSP solutions.
allow_empty_plan_for_optimistic_goal: whether to allow empty plans for optimistic goals.
verbose: whether to print verbose information.
Returns:
A list of plans. Each plan is a tuple of (actions, csp, initial_state, final_state).
"""
if isinstance(goal_expr, str):
goal_expr = executor.parse(goal_expr)
search_cache = dict()
search_stat = {'nr_expanded_nodes': 0}
# NB(Jiayuan Mao @ 2023/09/09): the cache only works for previous_actions == [].
# That is, we only cache the search results that start from the initial state.
def return_with_cache(goal_set, previous_actions, rv):
if len(previous_actions) == 0:
goal_str = goal_set.gen_string()
if goal_str not in search_cache:
search_cache[goal_str] = rv
return rv
def try_retrieve_cache(goal_set, previous_actions):
if len(previous_actions) == 0:
goal_str = goal_set.gen_string()
if goal_str in search_cache:
return search_cache[goal_str]
return None
@jacinle.log_function(verbose=False)
def dfs(
s: State, g: PartiallyOrderedPlan, c: Tuple[ValueOutputExpression, ...],
csp: Optional[ConstraintSatisfactionProblem], previous_actions: List[OperatorApplier],
return_all_skeletons: bool = False,
tail_csp_solve: bool = False,
search_depth: int = 0
) -> Iterator[Tuple[State, ConstraintSatisfactionProblem, List[OperatorApplier]]]:
"""Depth-first search for all possible plans.
Args:
s: the current state.
g: the current goal.
c: the list of constraints to maintain.
csp: the current constraint satisfaction problem.
previous_actions: the previous actions.
return_all_skeletons: whether to return all possible plans. If False, only return the first plan found.
tail_csp_solve: whether to solve the CSP after the expansion of the current goal.
search_depth: the current search depth.
Returns:
a list of plans. Each plan is a tuple of (final_state, csp, actions).
"""
if verbose:
jacinle.log_function.print('Current goal', g, f'return_all_skeletons={return_all_skeletons}', f'previous_actions={previous_actions}')
if (rv := try_retrieve_cache(g, previous_actions)) is not None:
return rv
all_possible_plans = list()
flatten_goals = list(g.iter_goals())
if not has_optimistic_constant_expression(*flatten_goals) or allow_empty_plan_for_optimistic_goal:
"""If the current goal contains no optimistic constant, we may directly solve the CSP."""
rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(executor, flatten_goals, s, dict(), csp, csp_note='goal_test')
if rv:
all_possible_plans.append((s, new_csp, previous_actions))
if not is_optimistic: # If there is no optimistic value, we can stop the search from here.
# NB(Jiayuan Mao @ 2023/09/11): note that even if `return_all_skeletons` is True, we still return here.
# This corresponds to an early stopping behavior that defines the space of all possible plans.
return return_with_cache(g, previous_actions, all_possible_plans)
if search_depth > max_search_depth:
return return_with_cache(g, previous_actions, all_possible_plans)
search_stat['nr_expanded_nodes'] += 1
candidate_regression_rules = gen_applicable_regression_rules(executor, s, g, c)
if len_candidate_regression_rules(candidate_regression_rules) == 0:
return return_with_cache(g, previous_actions, all_possible_plans)
some_rule_success = False # If return_all_skeletons is False, we will stop the search once some rule application succeeds.
for chain_index, subgoal_index, this_candidate_regression_rules in candidate_regression_rules:
cur_goal = g.chains[chain_index].sequence[subgoal_index]
other_goals = g.exclude(chain_index, subgoal_index)
other_goals_return_all_skeletons = g.chains[chain_index].get_return_all_skeletons_flag(subgoal_index)
if verbose:
jacinle.log_function.print('Now trying to excluding goal', cur_goal)
grounded_subgoals_cache = dict()
for regression_rule_index, (rule, bounded_variables) in enumerate(this_candidate_regression_rules):
grounded_subgoals = list()
placeholder_csp = ConstraintSatisfactionProblem()
placeholder_bounded_variables = bounded_variables.copy()
for i, item in enumerate(rule.body):
if isinstance(item, AchieveExpression):
grounded_subgoals.append(AchieveExpression(ground_fol_expression(item.goal, placeholder_bounded_variables), maintains=[], serializability=item.serializability, csp_serializability=item.csp_serializability))
elif isinstance(item, BindExpression):
for variable in item.variables:
placeholder_bounded_variables[variable] = create_find_expression_csp_variable(variable, csp=placeholder_csp, bounded_variables=placeholder_bounded_variables)
grounded_subgoals.append(BindExpression([], ground_fol_expression(item.goal, placeholder_bounded_variables), serializability=item.serializability, csp_serializability=item.csp_serializability, ordered=item.ordered))
elif isinstance(item, OperatorApplicationExpression):
cur_action = ground_operator_application_expression(item, placeholder_bounded_variables, csp=placeholder_csp, add_csp_variables=False)
grounded_subgoals.append(cur_action)
elif isinstance(item, RegressionRuleApplicationExpression):
raise NotImplementedError('Regression rule application is not supported in the body of a regression rule.')
elif isinstance(item, ListExpansionExpression):
subgoals = executor.execute(item.expression, s, placeholder_bounded_variables, sgc=PDSketchSGC(s, g, c))
assert isinstance(subgoals, TotallyOrderedPlan), f'ListExpansionExpression must be used with a TotallyOrderedPlan, got {type(subgoals)}'
grounded_subgoals.extend(subgoals.sequence)
elif isinstance(item, RegressionCommitFlag):
grounded_subgoals.append(item)
else:
raise ValueError(f'Unknown item type {type(item)} in rule {item}.')
# pass the serializability information to the previous subgoal.
max_reorder_prefix_length = 0
for i, item in enumerate(grounded_subgoals):
if isinstance(item, RegressionCommitFlag):
if i > 0 and isinstance(grounded_subgoals[i - 1], (AchieveExpression, BindExpression)):
grounded_subgoals[i - 1].serializability = item.goal_serializability
if isinstance(item, (AchieveExpression, BindExpression)):
if item.sequential_decomposable is False:
max_reorder_prefix_length = i + 1
grounded_subgoals_cache[regression_rule_index] = (grounded_subgoals, placeholder_csp, max_reorder_prefix_length)
if len(other_goals) == 0:
other_goals_plans = [(s, csp, previous_actions)]
else:
# TODO(Jiayuan Mao @ 2023/09/09): change this list to an actual generator call.
other_goals_plans = list()
other_goals_plans_tmp = list(dfs(s, other_goals, c, csp, previous_actions, search_depth=search_depth, return_all_skeletons=other_goals_return_all_skeletons))
for cur_state, cur_csp, cur_actions in other_goals_plans_tmp:
rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(executor, [cur_goal], cur_state, dict(), cur_csp, csp_note='goal_test_shortcut')
if rv:
all_possible_plans.append((cur_state, new_csp, cur_actions))
if not is_optimistic:
# NB(Jiayuan Mao @ 2023/09/11): another place where we stop the search and ignores the `return_all_skeletons` flag.
continue
other_goals_plans.append((cur_state, cur_csp, cur_actions))
if len(this_candidate_regression_rules) == 0 or len(other_goals_plans) == 0:
continue
if len(other_goals) == 0:
max_prefix_length = 0
else:
max_prefix_length = 0 if not enable_reordering else max([x[2] for x in grounded_subgoals_cache.values()])
prefix_stop_mark = dict()
for prefix_length in range(max_prefix_length + 1):
for regression_rule_index, (rule, bounded_variables) in enumerate(this_candidate_regression_rules):
grounded_subgoals, placeholder_csp, max_reorder_prefix_length = grounded_subgoals_cache[regression_rule_index]
if prefix_length > max_reorder_prefix_length:
continue
if regression_rule_index in prefix_stop_mark and prefix_stop_mark[regression_rule_index]:
continue
if verbose:
jacinle.log_function.print('Applying rule', rule, 'for', cur_goal, 'and prefix length', prefix_length, 'goal is', g)
if prefix_length == 0:
previous_possible_branches = other_goals_plans
start_csp_variable_mapping = dict()
else:
start_csp_variable_mapping = dict()
new_csp = cur_csp.clone()
new_chain_subgoals = list()
new_chain_flags = list()
for i, item in enumerate(grounded_subgoals[:prefix_length]):
if isinstance(item, AchieveExpression):
subgoal, start_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, start_csp_variable_mapping)
new_chain_subgoals.append(item.goal)
new_chain_flags.append(not item.refinement_compressible or return_all_skeletons)
elif isinstance(item, BindExpression):
# TODO(Jiayuan Mao @ 2023/12/06): implement this for bypassing FindExpressions that can be commited...
subgoal, start_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, start_csp_variable_mapping)
with new_csp.with_group(subgoal) as group:
rv = executor.execute(subgoal, cur_state, {}, csp=new_csp).item()
if isinstance(rv, OptimisticValue):
new_csp.add_equal_constraint(rv)
mark_constraint_group_solver(executor, state, bounded_variables, group)
elif isinstance(item, RegressionCommitFlag):
continue
else:
raise TypeError(f'Unsupported item type {type(item)} in rule {item}.')
cur_other_goals = other_goals.add_chain(new_chain_subgoals, new_chain_flags)
cur_other_goals_return_all_skeletons = new_chain_flags[-1] if len(new_chain_flags) > 0 else return_all_skeletons
previous_possible_branches = list(dfs(s, cur_other_goals, c, new_csp, previous_actions, search_depth=search_depth, return_all_skeletons=cur_other_goals_return_all_skeletons))
if len(previous_possible_branches) == 0:
if verbose:
jacinle.log_function.print('Prefix planning failed!!! Stop.')
# If it's not possible to achieve the subset of goals, then it's not possible to achieve the whole goal.
# Therefore, this is a break, not a continue.
prefix_stop_mark[regression_rule_index] = True
continue
for prev_state, prev_csp, prev_actions in previous_possible_branches:
# construct the new csp and the sequence of grounded subgoals.
possible_branches = [(prev_state, prev_csp, prev_actions, start_csp_variable_mapping)]
for i in range(prefix_length, len(grounded_subgoals)):
item = grounded_subgoals[i]
next_possible_branches = list()
if isinstance(item, (AchieveExpression, BindExpression)):
if not return_all_skeletons and item.refinement_compressible and len(possible_branches) > 1:
# TODO(Jiayuan Mao @ 2023/12/06): implement this for the case of CSP solving --- we may need to keep multiple variable bindings!
possible_branches = [min(possible_branches, key=lambda x: len(x[2]))]
for branch_index, (cur_state, cur_csp, cur_actions, cur_csp_variable_mapping) in enumerate(possible_branches):
# prev_next_possible_branches_length = len(next_possible_branches)
if isinstance(item, AchieveExpression):
new_csp = cur_csp.clone() if cur_csp is not None else None
subgoal, new_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, cur_csp_variable_mapping)
subgoal_return_all_skeletons_flag = not item.refinement_compressible or return_all_skeletons
this_next_possible_branches = ([(*x, new_csp_variable_mapping) for x in dfs(
cur_state, PartiallyOrderedPlan.from_single_goal(subgoal, subgoal_return_all_skeletons_flag), c + item.maintains,
new_csp, cur_actions,
return_all_skeletons=subgoal_return_all_skeletons_flag, search_depth=search_depth + 1
)])
elif isinstance(item, BindExpression):
if cur_csp is None:
raise RuntimeError('FindExpression must be used with a CSP.')
new_csp = cur_csp.clone()
subgoal, new_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, cur_csp_variable_mapping)
with new_csp.with_group(subgoal) as group:
rv = executor.execute(subgoal, cur_state, {}, csp=new_csp).item()
if isinstance(rv, OptimisticValue):
new_csp.add_equal_constraint(rv)
mark_constraint_group_solver(executor, state, bounded_variables, group)
this_next_possible_branches = [(cur_state, new_csp, cur_actions, new_csp_variable_mapping)]
elif isinstance(item, OperatorApplier):
# TODO(Jiayuan Mao @ 2023/09/11): vectorize this operation, probably only useful when `return_all_skeletons` is True.
new_csp = cur_csp.clone() if cur_csp is not None else None
subaction, new_csp_variable_mapping = map_csp_placeholder_action(item, new_csp, placeholder_csp, cur_csp_variable_mapping)
succ, new_state = executor.apply(subaction, cur_state, csp=new_csp, clone=True, action_index=len(cur_actions))
if succ:
this_next_possible_branches = [(new_state, new_csp, cur_actions + [subaction], new_csp_variable_mapping)]
else:
this_next_possible_branches = []
elif isinstance(item, RegressionCommitFlag):
this_next_possible_branches = [(cur_state, cur_csp, cur_actions, cur_csp_variable_mapping)]
else:
raise TypeError(f'Unknown item: {item}')
commit_csp = False
if isinstance(item, RegressionCommitFlag) and item.csp_serializability in (SubgoalCSPSerializability.FORALL, SubgoalCSPSerializability.SOME):
commit_csp = True
elif isinstance(item, (AchieveExpression, BindExpression)) and item.csp_serializability in (SubgoalCSPSerializability.FORALL, SubgoalCSPSerializability.SOME):
commit_csp = True
if commit_csp:
for new_state, new_csp, new_actions, new_csp_variable_mapping in this_next_possible_branches:
assignments = csp_dpll_sampling_solve(executor, cur_csp)
if assignments is not None:
new_state = map_csp_variable_state(cur_state, cur_csp, assignments)
new_csp = ConstraintSatisfactionProblem()
new_actions = ground_actions(executor, cur_actions, assignments)
new_csp_variable_mapping = map_csp_variable_mapping(cur_csp_variable_mapping, csp, assignments)
next_possible_branches.append((new_state, new_csp, new_actions, new_csp_variable_mapping))
# TODO(Jiayuan Mao @ 2023/11/27): okay we need to implement some kind of tracking of "bounded_variables."
# This need to be done by tracking some kind of mapping for optimistic variables in "grounded_subgoals."
else:
next_possible_branches.extend(this_next_possible_branches)
# jacinle.log_function.print(f'Branch {branch_index + 1} of {len(possible_branches)} for {item} has {len(next_possible_branches) - prev_next_possible_branches_length} branches.')
possible_branches = next_possible_branches
# jacinle.log_function.print(f'Finished search subgoal {i + 1} of {len(grounded_subgoals)}: {item}. Possible branches (length={len(possible_branches)}):')
# for x in possible_branches:
# jacinle.log_function.print(jacinle.indent_text(str(x[2])))
# all_possible_plans.extend(possible_branches)
found_plan = False
# TODO(Jiayuan Mao @ 2023/09/11): implement this via maintains checking.
for cur_state, cur_csp, actions, _ in possible_branches:
rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(executor, flatten_goals, cur_state, dict(), csp=cur_csp, csp_note=f'subgoal_test: {"; ".join([str(x) for x in flatten_goals])}')
if rv:
if verbose:
jacinle.log_function.print('Found a plan', [str(x) for x in actions], 'for goal', g)
if is_optimistic and tail_csp_solve:
assignments = csp_dpll_sampling_solve(executor, new_csp, verbose=True)
if assignments is not None:
all_possible_plans.append((cur_state, actions, ground_actions(executor, actions, assignments)))
found_plan = True
else:
all_possible_plans.append((cur_state, new_csp, actions))
found_plan = True
if found_plan:
prefix_stop_mark[regression_rule_index] = True
some_rule_success = True
# TODO(Jiayuan Mao @ 2023/09/06): since we have changed the order of prefix_length for-loop and the regression rule for-loop.
# We need to use an additional dictionary to store whether we have found a plan for a particular regression rule.
# Right now this doesn't matter because we only use the first plan.
if not return_all_skeletons and some_rule_success:
break
# Break for-loop for `for prev_state in previous_possible_branches`.
if not return_all_skeletons and some_rule_success:
break
# Break for-loop for `for rule in regression_rules`
if not return_all_skeletons and some_rule_success:
break
# Break for-loop for `for prefix_length in range(1, rule.max_rule_prefix_length + 1):`
if not return_all_skeletons and some_rule_success:
break
if len(all_possible_plans) == 0:
if verbose:
jacinle.log_function.print('No possible plans for goal', g)
return return_with_cache(g, previous_actions, [])
# TODO(Jiayuan Mao @ 2023/11/19): add unique back.
# unique_all_possible_plans = _unique_plans(all_possible_plans)
unique_all_possible_plans = all_possible_plans
if len(unique_all_possible_plans) != len(all_possible_plans):
if verbose:
jacinle.log_function.print('Warning: there are duplicate plans for goal', g, f'({len(unique_all_possible_plans)} unique plans vs {len(all_possible_plans)} total plans)')
# import ipdb; ipdb.set_trace()
unique_all_possible_plans = sorted(unique_all_possible_plans, key=lambda x: len(x[2]))
return return_with_cache(g, previous_actions, unique_all_possible_plans)
if is_and_expr(goal_expr):
if len(goal_expr.arguments) == 1 and goal_expr.arguments[0].return_type.is_list_type:
goal_set = [goal_expr]
else:
goal_set = list(goal_expr.arguments)
else:
goal_set = [goal_expr]
goal_set = PartiallyOrderedPlan((TotallyOrderedPlan(goal_set, return_all_skeletons_flags=[(not is_goal_refinement_compressible) for _ in goal_set], is_ordered=is_goal_serializable),))
candidate_plans = dfs(state, goal_set, tuple(), csp=ConstraintSatisfactionProblem() if enable_csp else None, previous_actions=list(), tail_csp_solve=True)
candidate_plans = [actions for final_state, csp, actions in candidate_plans]
return candidate_plans, search_stat