#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : crow_planner_v2.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 01/18/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
import itertools
import jacinle
from typing import Any, Optional, Union, Iterator, Sequence, Tuple, List, Dict, NamedTuple
from concepts.dsl.constraint import OptimisticValue, ConstraintSatisfactionProblem
from concepts.dsl.dsl_types import QINDEX, ObjectConstant
from concepts.dsl.expression import ValueOutputExpression, is_and_expr
from concepts.dsl.executors.tensor_value_executor import BoundedVariablesDictCompatible
from concepts.pdsketch.executor import PDSketchExecutor, GeneratorManager
from concepts.pdsketch.simulator_interface import PDSketchSimulatorInterface
from concepts.pdsketch.domain import State
from concepts.pdsketch.operator import OperatorApplier
from concepts.pdsketch.regression_rule import RegressionRule, RegressionRuleApplier, AchieveExpression, BindExpression, RuntimeAssignExpression, RegressionCommitFlag
from concepts.pdsketch.regression_rule import SubgoalCSPSerializability
from concepts.pdsketch.planners.optimistic_search import ground_actions
from concepts.pdsketch.planners.optimistic_search_with_simulation import csp_dpll_sampling_solve_with_simulation
from concepts.pdsketch.csp_solvers.dpll_sampling import csp_dpll_sampling_solve
from concepts.pdsketch.crow.crow_state import TotallyOrderedPlan, PartiallyOrderedPlan
from concepts.pdsketch.regression_utils import has_optimistic_constant_expression, evaluate_bool_scalar_expression
from concepts.pdsketch.regression_utils import gen_applicable_regression_rules, len_candidate_regression_rules, gen_grounded_subgoals_with_placeholders
from concepts.pdsketch.regression_utils import map_csp_placeholder_goal, mark_constraint_group_solver, map_csp_placeholder_action, map_csp_placeholder_regression_rule_applier, map_csp_variable_state, map_csp_variable_mapping
__all__ = ['CROWPlanTreeNode', 'CROWSearchNode', 'CROWRecursiveSearcherV2', 'crow_recursive_v2']
[docs]
class CROWPlanTreeNode(object):
[docs]
def __init__(self, goal: PartiallyOrderedPlan, always: bool):
self.goal = goal
self.always = always
self.children = []
self.parent = None
[docs]
def add_child(self, child: Union['CROWPlanTreeNode', OperatorApplier, RegressionRuleApplier]):
self.children.append(child)
child.parent = self
[docs]
def iter_actions(self):
for child in self.children:
if isinstance(child, CROWPlanTreeNode):
yield from child.iter_actions()
else:
yield child
def __str__(self):
fmt = f'CROWPlanTreeNode({self.goal} always={self.always})\n'
for child in self.children:
fmt += f'- {jacinle.indent_text(str(child)).lstrip()}\n'
return fmt
[docs]
class CROWSearchNode(object):
[docs]
def __init__(
self, state: State, goal_set: PartiallyOrderedPlan, constraints: Tuple[ValueOutputExpression, ...], csp: Optional[ConstraintSatisfactionProblem],
plan_tree: Union[CROWPlanTreeNode, list],
track_all_skeletons: bool,
associated_regression_rule: Optional[RegressionRule] = None,
is_top_level: bool = False,
is_leftmost_branch: bool = False,
is_all_always: bool = True,
depth: int = 0,
):
"""Initialize search node for the CROW planner.
Args:
state: the current when the node is expanded.
goal_set: the current goal (subgoal) to achieve.
constraints: the constraints to be satisfied while achieving the goal.
csp: the constraint satisfaction problem to be solved, associated with all the steps accumulated so far.
plan_tree: the plan tree associated with the current node.
track_all_skeletons: whether to track all the skeletons when we are refining the branch.
associated_regression_rule: the regression rule associated with the current node.
is_top_level: whether the current node is the top level node (directly instantiated from the goal).
is_leftmost_branch: whether the current node is the leftmost branch of the search tree.
is_all_always: whether all the subgoals from the root to the current node are [[always]].
depth: the depth of the current node.
"""
self.state = state
self.goal = goal_set
self.constraints = constraints
self.csp = csp
self.plan_tree = plan_tree
self.track_all_skeletons = track_all_skeletons
self.associated_regression_rule = associated_regression_rule
self.is_top_level = is_top_level
self.is_leftmost_branch = is_leftmost_branch
self.is_all_always = is_all_always
self.depth = depth
@property
def previous_actions(self):
if isinstance(self.plan_tree, CROWPlanTreeNode):
return list(self.plan_tree.iter_actions())
return self.plan_tree
[docs]
class PossibleSearchBranch(NamedTuple):
state: State
csp: Optional[ConstraintSatisfactionProblem]
actions: List[OperatorApplier]
csp_variable_mapping: Dict[int, Any]
reg_variable_mapping: Dict[str, Any]
[docs]
class SearchReturn(NamedTuple):
state: State
csp: Optional[ConstraintSatisfactionProblem]
actions: List[OperatorApplier]
[docs]
class CROWRecursiveSearcherV2(object):
[docs]
def __init__(
self, executor: PDSketchExecutor, state: State, goal_expr: Union[str, ValueOutputExpression], *,
enable_reordering: bool = False,
max_search_depth: int = 10,
max_beam_size: int = 20,
# Group 1: goal serialization and refinements.
is_goal_serializable: bool = True,
is_goal_refinement_compressible: bool = True,
# Group 2: CSP solver.
enable_csp: bool = True,
max_csp_trials: int = 10,
max_global_csp_trials: int = 100,
max_csp_branching_factor: int = 5,
use_generator_manager: bool = False,
store_generator_manager_history: bool = False,
# Group 3: simulation.
enable_simulation: bool = False,
simulator: Optional[PDSketchSimulatorInterface] = None,
# Group 4: dirty derived predicates.
enable_dirty_derived_predicates: bool = False,
enable_greedy_execution: bool = False,
allow_empty_plan_for_optimistic_goal: bool = False,
verbose: bool = True
):
"""Initialize the CROW planner.
Args:
executor: the executor used to execute the expressions.
state: the current state.
goal_expr: the goal expression to achieve.
enable_reordering: whether to enable reordering in regression rule applications.
max_search_depth: the maximum search depth.
max_beam_size: the maximum beam size when trying different refinements of the same goal.
is_goal_serializable: whether the goal has been serialized.
is_goal_refinement_compressible: whether the serialized goals are refinement-compressible (i.e., for each goal item, do we need to track all the skeletons?)
enable_csp: whether to enable CSP solving.
max_csp_trials: the maximum number of CSP trials.
max_global_csp_trials: the maximum number of CSP trials for the global CSP (i.e., the CSP associated with the root node of the search tree.)
max_csp_branching_factor: the maximum branching factor for CSP.
use_generator_manager: whether to use the generator manager.
store_generator_manager_history: whether to store the history of the generator calls in the generator manager.
enable_simulation: whether to enable simulation in CSP solving.
simulator: the simulator used for simulation.
enable_dirty_derived_predicates: whether to enable dirty derived predicates.
enable_greedy_execution: whether to enable greedy execution.
allow_empty_plan_for_optimistic_goal: whether to allow empty plan for optimistic goal.
verbose: whether to print verbose information.
"""
self.executor = executor
self.state = state
self.goal_expr = goal_expr
if isinstance(self.goal_expr, str):
self.goal_expr = executor.parse(self.goal_expr)
self.max_search_depth = max_search_depth
self.max_beam_size = max_beam_size
self.enable_reordering = enable_reordering
self.is_goal_serializable = is_goal_serializable
self.is_goal_refinement_compressible = is_goal_refinement_compressible
self.enable_simulation = enable_simulation
self.simulator = simulator
if self.enable_simulation and self.simulator is not None:
# TODO(Jiayuan Mao @ 2024-01-22): after we perform partial grounding for the states, we probably need to update the initial state here.
self.simulator.set_init_state(self.state)
self.enable_csp = enable_csp
self.max_csp_trials = max_csp_trials
self.max_global_csp_trials = max_global_csp_trials
self.max_csp_branching_factor = max_csp_branching_factor
self.use_generator_manager = use_generator_manager
self.store_generator_manager_history = store_generator_manager_history
self.enable_dirty_derived_predicates = enable_dirty_derived_predicates
self.enable_greedy_execution = enable_greedy_execution
self.allow_empty_plan_for_optimistic_goal = allow_empty_plan_for_optimistic_goal
self.verbose = verbose
self._search_cache = dict()
self._search_stat = {'nr_expanded_nodes': 0}
if self.use_generator_manager:
self._generator_manager = GeneratorManager(self.executor, store_history=self.store_generator_manager_history)
else:
self._generator_manager = None
@property
def search_stat(self) -> Dict[str, Any]:
return self._search_stat
@property
def generator_manager(self) -> Optional[GeneratorManager]:
return self._generator_manager
[docs]
def main(self) -> Tuple[list, dict]:
if is_and_expr(self.goal_expr):
if len(self.goal_expr.arguments) == 1 and self.goal_expr.arguments[0].return_type.is_list_type:
goal_set = [self.goal_expr]
else:
goal_set = list(self.goal_expr.arguments)
else:
goal_set = [self.goal_expr]
goal_set = PartiallyOrderedPlan([TotallyOrderedPlan(
goal_set,
return_all_skeletons_flags=[(not self.is_goal_refinement_compressible) for _ in goal_set], is_ordered=self.is_goal_serializable
)])
candidate_plans = self.dfs(CROWSearchNode(
self.state, goal_set, tuple(),
csp=ConstraintSatisfactionProblem() if self.enable_csp else None,
plan_tree=list(), track_all_skeletons=False,
is_top_level=True, is_leftmost_branch=True,
is_all_always=True,
depth=0
))
candidate_plans = [actions for _, _, actions in candidate_plans]
return candidate_plans, self._search_stat
[docs]
@jacinle.log_function(verbose=False)
def dfs(self, node: CROWSearchNode) -> Sequence[SearchReturn]:
"""The main entrance of the CROW planner."""
if self.verbose:
jacinle.log_function.print('Current goal', node.goal, f'track_all_skeletons={node.track_all_skeletons}', f'previous_actions={node.previous_actions}')
# jacinle.log_function.print('Current goal', node.goal, f'track_all_skeletons={node.track_all_skeletons}', f'previous_actions={node.previous_actions}')
if (rv := self._try_retrieve_cache(node)) is not None:
return rv
if node.depth >= self.max_search_depth:
jacinle.log_function.print(jacinle.colored('Warning: search depth exceeded.', color='red'))
if self.verbose:
import ipdb; ipdb.set_trace()
self._search_stat['nr_expanded_nodes'] += 1
all_possible_plans = list()
flatten_goals = list(node.goal.iter_goals())
if not has_optimistic_constant_expression(*flatten_goals) or self.allow_empty_plan_for_optimistic_goal:
"""If the current goal contains no optimistic constant, we may directly solve the CSP."""
rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(self.executor, flatten_goals, node.state, dict(), node.csp, csp_note='goal_test')
if rv:
all_possible_plans.append(SearchReturn(node.state, new_csp, node.previous_actions))
if not is_optimistic: # If there is no optimistic value, we can stop the search from here.
# note that even if `track_all_skeletons` is True, we still return here.
# This corresponds to an early stopping behavior that defines the space of all possible plans.
return self._return_with_cache(node, all_possible_plans)
all_candidate_regression_rules = gen_applicable_regression_rules(self.executor, node.state, node.goal, node.constraints, return_all_candidates=node.track_all_skeletons)
if len_candidate_regression_rules(all_candidate_regression_rules) == 0:
return self._return_with_cache(node, all_possible_plans)
if self.verbose:
rows = list()
for chain_index, subgoal_index, candidate_regression_rules in all_candidate_regression_rules:
cur_goal = node.goal.chains[chain_index].sequence[subgoal_index]
for rule_item in candidate_regression_rules:
rows.append((cur_goal, rule_item[0]))
jacinle.log_function.print('All candidate regression rules:')
jacinle.log_function.print(jacinle.tabulate(rows, headers=['Goal', 'Rule'], tablefmt='rst'))
for chain_index, subgoal_index, candidate_regression_rules in all_candidate_regression_rules:
cur_goal, other_goals = node.goal.chains[chain_index].sequence[subgoal_index], node.goal.exclude(chain_index, subgoal_index)
other_goals_track_all_skeletons = node.goal.chains[chain_index].get_return_all_skeletons_flag(subgoal_index)
candidate_grounded_subgoals = gen_grounded_subgoals_with_placeholders(self.executor, node.state, cur_goal, node.constraints, candidate_regression_rules, enable_csp=self.enable_csp)
if self.verbose:
jacinle.log_function.print('Now trying to excluding goal', cur_goal)
other_goals_plans: List[Tuple[State, Optional[ConstraintSatisfactionProblem], List[OperatorApplier]]]
if len(other_goals) == 0:
other_goals_plans = [(node.state, node.csp, node.plan_tree)]
else:
other_goals_plans = list()
other_goals_plans_tmp = self.dfs(CROWSearchNode(
node.state, other_goals, node.constraints, node.csp, node.plan_tree,
track_all_skeletons=other_goals_track_all_skeletons,
is_leftmost_branch=node.is_leftmost_branch,
is_all_always=False,
depth=node.depth + 1
))
for cur_state, cur_csp, cur_actions in other_goals_plans_tmp:
rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(self.executor, [cur_goal], cur_state, dict(), cur_csp, csp_note='goal_test_shortcut')
if rv:
all_possible_plans.append(SearchReturn(cur_state, new_csp, cur_actions))
if not is_optimistic:
# another place where we stop the search and ignores the `track_all_skeletons` flag.
continue
other_goals_plans.append((cur_state, cur_csp, cur_actions))
if len(other_goals_plans) == 0 or len(candidate_grounded_subgoals) == 0:
continue
if len(other_goals) == 0:
max_prefix_length = 0
else:
max_prefix_length = 0 if not self.enable_reordering else max(x[2] for x in candidate_grounded_subgoals.values())
prefix_stop_mark = dict()
for prefix_length in range(max_prefix_length + 1):
for regression_rule_index, (rule, bounded_variables) in enumerate(candidate_regression_rules):
grounded_subgoals, placeholder_csp, max_reorder_prefix_length = candidate_grounded_subgoals[regression_rule_index]
if prefix_length > max_reorder_prefix_length or (regression_rule_index in prefix_stop_mark and prefix_stop_mark[regression_rule_index]):
continue
# If the "new" item to be added in the prefix is a Find expression, we should just skip this prefix length.
if prefix_length > 0 and isinstance(grounded_subgoals[prefix_length - 1], BindExpression):
continue
if self.verbose:
jacinle.log_function.print('Applying rule', rule, bounded_variables, 'for subgoal', cur_goal, 'under prefix length', prefix_length)
# jacinle.log_function.print('Applying rule', rule, bounded_variables, 'for subgoal', cur_goal, 'under prefix length', prefix_length)
if prefix_length == 0:
start_csp_variable_mapping = dict()
previous_possible_branches = ([PossibleSearchBranch(x[0], x[1], x[2], start_csp_variable_mapping, {}) for x in other_goals_plans])
else:
previous_possible_branches = list()
search_goals = self.apply_regression_rule_prefix(node, grounded_subgoals, placeholder_csp, prefix_length, bounded_variables)
for new_chain_subgoals, new_chain_flags, new_csp, start_csp_variable_mapping, cur_reg_variable_mapping in search_goals:
if len(new_chain_subgoals) == 0:
previous_possible_branches = ([PossibleSearchBranch(x[0], x[1], x[2], start_csp_variable_mapping, cur_reg_variable_mapping) for x in other_goals_plans])
break
cur_other_goals = other_goals.add_chain(new_chain_subgoals, new_chain_flags)
cur_other_goals_track_all_skeletons = new_chain_flags[-1] if len(new_chain_flags) > 0 else node.track_all_skeletons
previous_possible_branches.extend([PossibleSearchBranch(x[0], x[1], x[2], start_csp_variable_mapping, cur_reg_variable_mapping) for x in self.dfs(CROWSearchNode(
node.state, cur_other_goals, node.constraints, new_csp,
node.plan_tree, track_all_skeletons=cur_other_goals_track_all_skeletons,
is_leftmost_branch=node.is_leftmost_branch,
is_all_always=False,
depth = node.depth + 1
))])
if len(previous_possible_branches) == 0:
if self.verbose:
jacinle.log_function.print('Prefix planning failed!!! Stop.')
# If it's not possible to achieve the subset of goals, then it's not possible to achieve the whole goal.
# Therefore, this is a break, not a "continue".
prefix_stop_mark[regression_rule_index] = True
continue
possible_branches = previous_possible_branches
for i in range(prefix_length, len(grounded_subgoals)):
item = grounded_subgoals[i]
next_possible_branches = list()
if isinstance(item, (AchieveExpression, BindExpression)):
if not node.track_all_skeletons and item.refinement_compressible and len(possible_branches) > 1:
# TODO(Jiayuan Mao @ 2023/12/06): implement this for the case of CSP solving --- we may need to keep multiple variable bindings!
possible_branches = [min(possible_branches, key=lambda x: len(x.actions))]
prev_next_possible_branches_length = 0
for branch_index, (cur_state, cur_csp, cur_actions, cur_csp_variable_mapping, cur_reg_variable_mapping) in enumerate(possible_branches):
# prev_next_possible_branches_length = len(next_possible_branches)
is_leftmost_branch = node.is_leftmost_branch and all(not isinstance(x, (AchieveExpression, RegressionRuleApplier)) for x in grounded_subgoals[:i])
if isinstance(item, AchieveExpression):
new_csp = cur_csp.clone() if cur_csp is not None else None
subgoal, new_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, cur_csp_variable_mapping, cur_reg_variable_mapping)
subgoal_track_all_skeletons_flag = not item.refinement_compressible or node.track_all_skeletons
this_next_possible_branches = ([PossibleSearchBranch(x[0], x[1], x[2], new_csp_variable_mapping, cur_reg_variable_mapping) for x in self.dfs(CROWSearchNode(
cur_state, PartiallyOrderedPlan.from_single_goal(subgoal, subgoal_track_all_skeletons_flag), node.constraints + item.maintains,
new_csp, cur_actions,
track_all_skeletons=subgoal_track_all_skeletons_flag, is_leftmost_branch=is_leftmost_branch,
is_all_always=node.is_all_always and rule.always,
depth = node.depth + 1
))])
elif isinstance(item, RegressionRuleApplier):
new_csp = cur_csp.clone() if cur_csp is not None else None
subgoal, new_csp_variable_mapping = map_csp_placeholder_regression_rule_applier(item, new_csp, placeholder_csp, cur_csp_variable_mapping, cur_reg_variable_mapping)
subgoal_track_all_skeletons_flag = node.track_all_skeletons
this_next_possible_branches = ([PossibleSearchBranch(x[0], x[1], x[2], new_csp_variable_mapping, cur_reg_variable_mapping) for x in self.dfs(CROWSearchNode(
cur_state, PartiallyOrderedPlan.from_single_goal(item, subgoal_track_all_skeletons_flag), node.constraints + item.maintains,
cur_csp, cur_actions,
track_all_skeletons=subgoal_track_all_skeletons_flag, is_leftmost_branch=is_leftmost_branch,
is_all_always=node.is_all_always and rule.always,
depth=node.depth + 1
))])
elif isinstance(item, OperatorApplier):
# TODO(Jiayuan Mao @ 2023/09/11): vectorize this operation, probably only useful when `track_all_skeletons` is True.
new_csp = cur_csp.clone() if cur_csp is not None else None
action, new_csp_variable_mapping = map_csp_placeholder_action(item, new_csp, placeholder_csp, cur_csp_variable_mapping, cur_reg_variable_mapping)
succ, new_state = self.executor.apply(action, cur_state, csp=new_csp, clone=True, action_index=len(cur_actions))
if succ:
this_next_possible_branches = [PossibleSearchBranch(new_state, new_csp, cur_actions + [action], new_csp_variable_mapping, cur_reg_variable_mapping)]
else:
self.executor.apply_precondition_debug(action, cur_state, csp=new_csp)
if self.verbose:
jacinle.log_function.print('Warning: action', action, 'failed.')
this_next_possible_branches = []
elif isinstance(item, BindExpression) and item.is_object_bind_expression:
variables = cur_reg_variable_mapping.copy()
variables.update({x: QINDEX for x in item.variables})
rv = self.executor.execute(item.goal, cur_state, variables, csp=cur_csp)
this_next_possible_branches = list()
typeonly_indices_variables = list()
typeonly_indices_values = list()
for v in item.variables:
if v.name not in rv.batch_variables:
typeonly_indices_variables.append(v.name)
typeonly_indices_values.append(range(len(node.state.object_type2name[v.dtype.typename])))
for indices in rv.tensor.nonzero():
for typeonly_indices in itertools.product(*typeonly_indices_values):
new_reg_variable_mapping = cur_reg_variable_mapping.copy()
for var in item.variables:
if var.name in rv.batch_variables:
new_reg_variable_mapping[var.name] = ObjectConstant(
node.state.object_type2name[var.dtype.typename][indices[rv.batch_variables.index(var.name)]],
var.dtype
)
else:
new_reg_variable_mapping[var.name] = ObjectConstant(
node.state.object_type2name[var.dtype.typename][typeonly_indices[typeonly_indices_variables.index(var.name)]],
var.dtype
)
this_next_possible_branches.append(PossibleSearchBranch(cur_state, cur_csp, cur_actions, cur_csp_variable_mapping, new_reg_variable_mapping))
if item.refinement_compressible:
break
elif isinstance(item, BindExpression) and not item.is_object_bind_expression:
if cur_csp is None:
raise RuntimeError('FindExpression must be used with a CSP.')
new_csp = cur_csp.clone()
subgoal, new_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, cur_csp_variable_mapping)
with new_csp.with_group(subgoal) as group:
rv = self.executor.execute(subgoal, cur_state, cur_reg_variable_mapping, csp=new_csp).item()
if isinstance(rv, OptimisticValue):
new_csp.add_equal_constraint(rv)
mark_constraint_group_solver(self.executor, node.state, cur_reg_variable_mapping, group)
this_next_possible_branches = [PossibleSearchBranch(cur_state, new_csp, cur_actions, new_csp_variable_mapping, cur_reg_variable_mapping)]
elif isinstance(item, RuntimeAssignExpression):
new_reg_variable_mapping = cur_reg_variable_mapping.copy()
rv = self.executor.execute(item.value, cur_state, new_reg_variable_mapping, csp=cur_csp)
new_reg_variable_mapping[item.variable] = rv
this_next_possible_branches = [PossibleSearchBranch(cur_state, cur_csp, cur_actions, cur_csp_variable_mapping, new_reg_variable_mapping)]
elif isinstance(item, RegressionCommitFlag):
this_next_possible_branches = [PossibleSearchBranch(cur_state, cur_csp, cur_actions, cur_csp_variable_mapping, cur_reg_variable_mapping)]
else:
raise TypeError(f'Unknown item: {item}')
# TODO(Jiayuan Mao @ 2024/01/23): this is a hack to handle partial observability, and execution-based constraints.
# In general, there should be a more generic way to select if we can directly return.
if self.enable_greedy_execution and node.is_all_always:
if i == len(grounded_subgoals) - 1 or isinstance(grounded_subgoals[i + 1], (AchieveExpression, BindExpression)):
found_plan = False
for new_state, new_csp, new_actions, new_csp_variable_mapping in this_next_possible_branches:
if len(new_actions) > 0:
if self.verbose:
jacinle.log_function.print(jacinle.colored('Greedy execution for', new_actions, color='green'))
all_possible_plans.append(SearchReturn(new_state, new_csp, new_actions))
found_plan = True
break
if found_plan:
break
if self.enable_csp:
commit_csp = False
if isinstance(item, RegressionCommitFlag) and item.csp_serializability in (SubgoalCSPSerializability.FORALL, SubgoalCSPSerializability.SOME):
commit_csp = True
elif isinstance(item, (AchieveExpression, BindExpression)) and item.csp_serializability in (SubgoalCSPSerializability.FORALL, SubgoalCSPSerializability.SOME):
commit_csp = True
if commit_csp:
for new_state, new_csp, new_actions, new_csp_variable_mapping in this_next_possible_branches:
assignments = self.solve_csp(new_csp, self.max_csp_trials, actions=new_actions)
if assignments is not None:
new_state = map_csp_variable_state(new_state, new_csp, assignments)
new_csp = ConstraintSatisfactionProblem()
new_actions = ground_actions(self.executor, new_actions, assignments)
new_csp_variable_mapping = map_csp_variable_mapping(new_csp_variable_mapping, node.csp, assignments)
next_possible_branches.append((new_state, new_csp, new_actions, new_csp_variable_mapping))
else:
next_possible_branches.extend(this_next_possible_branches)
else:
next_possible_branches.extend(this_next_possible_branches)
if self.verbose:
jacinle.log_function.print(f'Branch {branch_index + 1} of {len(possible_branches)} for {item} has {len(next_possible_branches) - prev_next_possible_branches_length} branches.')
prev_next_possible_branches_length = len(next_possible_branches)
possible_branches = next_possible_branches
if self.verbose:
jacinle.log_function.print(f'Finished search subgoal {i + 1} of {len(grounded_subgoals)}: {item}. Possible branches (length={len(possible_branches)}):')
for x in possible_branches:
if len(x[2]) > 0:
action_string = '[' + ', '.join([str(x) for x in x[2]]) + ']'
else:
action_string = '[empty]'
jacinle.log_function.print(jacinle.indent_text(action_string))
# TODO(Jiayuan Mao @ 2024/01/23): this is related to the hack above for the partial observability and execution-based constraints.
if not node.track_all_skeletons and len(all_possible_plans) > 1:
return self.postprocess_plans(node, all_possible_plans)
if self.enable_dirty_derived_predicates:
updated_possible_branches = list()
for cur_state, cur_csp, cur_actions, _mapping in possible_branches:
cur_state = self.apply_regression_rule_effect(cur_state, rule, bounded_variables)
updated_possible_branches.append((cur_state, cur_csp, cur_actions, _mapping))
possible_branches = updated_possible_branches
found_plan = False
# TODO(Jiayuan Mao @ 2023/09/11): implement this via maintains checking.
for cur_state, cur_csp, cur_actions, _, cur_reg_variable_mapping in possible_branches:
rv, is_optimistic, new_csp = evaluate_bool_scalar_expression(self.executor, flatten_goals, cur_state, cur_reg_variable_mapping, csp=cur_csp, csp_note=f'subgoal_test: {"; ".join([str(x) for x in flatten_goals])}')
if self.verbose:
jacinle.log_function.print(f'Goal test for {[str(x) for x in cur_actions]} (optimistic={is_optimistic}): {rv}')
if rv:
if self.verbose:
jacinle.log_function.print('Found a plan', [str(x) for x in cur_actions], 'for goal', node.goal)
if self.enable_csp and is_optimistic and node.is_top_level:
assignments = self.solve_csp(new_csp, self.max_global_csp_trials, actions=cur_actions)
if assignments is not None:
grounded_actions = ground_actions(self.executor, cur_actions, assignments)
all_possible_plans.append(SearchReturn(cur_state, cur_csp, grounded_actions))
found_plan = True
if self.verbose:
jacinle.log_function.print(jacinle.colored('Global csp solving found a plan', [str(x) for x in grounded_actions], color='green'))
else:
if self.verbose:
jacinle.log_function.print(jacinle.colored('Global csp solving failed for the plan', [str(x) for x in cur_actions], color='red'))
else:
all_possible_plans.append(SearchReturn(cur_state, new_csp, cur_actions))
found_plan = True
if found_plan:
# Since we have changed the order of prefix_length for-loop and the regression rule for-loop,
# we need to use an additional dictionary to store whether we have found a plan for a particular regression rule.
prefix_stop_mark[regression_rule_index] = True
if not node.track_all_skeletons and len(all_possible_plans) > 1:
return self.postprocess_plans(node, all_possible_plans)
return self.postprocess_plans(node, all_possible_plans)
[docs]
def apply_regression_rule_prefix(
self, node: CROWSearchNode, grounded_subgoals, placeholder_csp: ConstraintSatisfactionProblem, prefix_length: int,
bounded_variables: BoundedVariablesDictCompatible
) -> List[Tuple[
List[ValueOutputExpression], List[bool], Optional[ConstraintSatisfactionProblem], Dict[int, Any], Dict[str, Any]
]]:
"""Apply the regression rule for a prefix of the subgoal."""
start_csp_variable_mapping = dict()
cur_reg_variable_mapping = dict()
new_csp = node.csp.clone() if node.csp is not None else None
new_chain_subgoals = list()
new_chain_flags = list()
candidate_rvs = [(new_chain_subgoals, new_chain_flags, new_csp, start_csp_variable_mapping, cur_reg_variable_mapping)]
for i, item in enumerate(grounded_subgoals[:prefix_length]):
next_candidate_rvs = list()
inplace_update = True
for new_chain_subgoals, new_chain_flags, new_csp, start_csp_variable_mapping, cur_reg_variable_mapping in candidate_rvs:
if isinstance(item, AchieveExpression):
subgoal, start_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, start_csp_variable_mapping, cur_reg_variable_mapping)
new_chain_subgoals.append(subgoal)
new_chain_flags.append(not item.refinement_compressible or node.track_all_skeletons)
elif isinstance(item, BindExpression):
# TODO(Jiayuan Mao @ 2024/03/08): handle the case of FindExpression with CSP.
if item.is_object_bind_expression:
variables = {x: QINDEX for x in item.variables}
rv = self.executor.execute(item.goal, node.state, variables, csp=new_csp)
typeonly_indices_variables = list()
typeonly_indices_values = list()
for v in item.variables:
if v.name not in rv.batch_variables:
typeonly_indices_variables.append(v.name)
typeonly_indices_values.append(range(len(node.state.object_type2name[v.dtype.typename])))
for indices in rv.tensor.nonzero():
for typeonly_indices in itertools.product(*typeonly_indices_values):
new_reg_variable_mapping = cur_reg_variable_mapping.copy()
for var in item.variables:
if var.name in rv.batch_variables:
new_reg_variable_mapping[var.name] = ObjectConstant(
node.state.object_type2name[var.dtype.typename][indices[rv.batch_variables.index(var.name)]],
var.dtype
)
else:
new_reg_variable_mapping[var.name] = ObjectConstant(
node.state.object_type2name[var.dtype.typename][typeonly_indices[typeonly_indices_variables.index(var.name)]],
var.dtype
)
next_candidate_rvs.append((new_chain_subgoals, new_chain_flags, new_csp, start_csp_variable_mapping, new_reg_variable_mapping))
if item.refinement_compressible:
break
inplace_update = False
else:
# TODO(Jiayuan Mao @ 2023/12/06): implement this for bypassing FindExpressions that can be committed...
subgoal, start_csp_variable_mapping = map_csp_placeholder_goal(item.goal, new_csp, placeholder_csp, start_csp_variable_mapping, cur_reg_variable_mapping)
with new_csp.with_group(subgoal) as group:
rv = self.executor.execute(subgoal, node.state, cur_reg_variable_mapping, csp=new_csp).item()
if isinstance(rv, OptimisticValue):
new_csp.add_equal_constraint(rv)
mark_constraint_group_solver(self.executor, node.state, bounded_variables, group)
elif isinstance(item, RuntimeAssignExpression):
rv = self.executor.execute(item.value, node.state, cur_reg_variable_mapping, csp=new_csp)
cur_reg_variable_mapping[item.variable] = rv
elif isinstance(item, RegressionCommitFlag):
continue
else:
raise TypeError(f'Unsupported item type {type(item)} in rule {item}.')
if not inplace_update:
candidate_rvs = next_candidate_rvs
return candidate_rvs
[docs]
def postprocess_plans(self, node, all_possible_plans):
if len(all_possible_plans) == 0:
if self.verbose:
jacinle.log_function.print('No possible plans for goal', node.goal)
return self._return_with_cache(node, all_possible_plans)
# TODO(Jiayuan Mao @ 2023/11/19): add unique back.
# unique_all_possible_plans = _unique_plans(all_possible_plans)
unique_all_possible_plans = all_possible_plans
if len(unique_all_possible_plans) != len(all_possible_plans):
if self.verbose:
jacinle.log_function.print('Warning: there are duplicate plans for goal', node.goal, f'({len(unique_all_possible_plans)} unique plans vs {len(all_possible_plans)} total plans)')
# import ipdb; ipdb.set_trace()
unique_all_possible_plans = sorted(unique_all_possible_plans, key=lambda x: len(x.actions))
return self._return_with_cache(node, unique_all_possible_plans)
[docs]
def solve_csp(self, csp, max_csp_trials, actions=None):
for _ in range(max_csp_trials):
if not self.enable_simulation:
assignments = csp_dpll_sampling_solve(self.executor, csp, generator_manager=self.generator_manager, max_generator_trials=self.max_csp_branching_factor, verbose=True)
else:
assert self.simulator is not None and actions is not None, 'If simulation is enabled, you must provide the simulator, the state, and the actions.'
# NB(Jiayuan Mao @ 2024-01-22): state is actually not used in the csp_dpll_sampling_solve_with_simulation function. So, we just provide the initial state here.
assignments = csp_dpll_sampling_solve_with_simulation(self.executor, self.simulator, csp, self.state, actions, generator_manager=self.generator_manager, max_generator_trials=self.max_csp_branching_factor, verbose=True)
if assignments is not None:
return assignments
return None
[docs]
def apply_regression_rule_effect(self, state, rule: RegressionRule, bounded_variables: BoundedVariablesDictCompatible):
return self.executor.apply_effect(rule, state, bounded_variables=bounded_variables, clone=True)
def _return_with_cache(self, node: CROWSearchNode, rv):
"""The cache only works for previous_actions == []. That is, we only cache the search results that start from the initial state."""
goal_set, previous_actions = node.goal, node.previous_actions
if len(previous_actions) == 0:
goal_str = goal_set.gen_string()
if goal_str not in self._search_cache:
self._search_cache[goal_str] = rv
return rv
def _try_retrieve_cache(self, node: CROWSearchNode):
goal_set, previous_actions = node.goal, node.previous_actions
if len(previous_actions) == 0:
goal_str = goal_set.gen_string()
if goal_str in self._search_cache:
return self._search_cache[goal_str]
return None
[docs]
def crow_recursive_v2(
executor: PDSketchExecutor, state: State, goal_expr: Union[str, ValueOutputExpression], *,
enable_reordering: bool = False,
max_search_depth: int = 10,
max_beam_size: int = 20,
# Group 1: goal serialization and refinements.
is_goal_serializable: bool = True,
is_goal_refinement_compressible: bool = True,
# Group 2: CSP solver.
enable_csp: bool = True,
max_csp_trials: int = 10,
max_global_csp_trials: int = 100,
max_csp_branching_factor: int = 5,
use_generator_manager: bool = False,
store_generator_manager_history: bool = False,
# Group 3: simulation.
enable_simulation: bool = False,
simulator: Optional[PDSketchSimulatorInterface] = None,
# Group 4: dirty derived predicates.
enable_dirty_derived_predicates: bool = False,
enable_greedy_execution: bool = False,
allow_empty_plan_for_optimistic_goal: bool = False,
verbose: bool = True,
):
kwargs = {
'enable_reordering': enable_reordering,
'max_search_depth': max_search_depth,
'max_beam_size': max_beam_size,
'is_goal_serializable': is_goal_serializable,
'is_goal_refinement_compressible': is_goal_refinement_compressible,
'enable_csp': enable_csp,
'max_csp_trials': max_csp_trials,
'max_global_csp_trials': max_global_csp_trials,
'max_csp_branching_factor': max_csp_branching_factor,
'use_generator_manager': use_generator_manager,
'store_generator_manager_history': store_generator_manager_history,
'enable_simulation': enable_simulation,
'simulator': simulator,
'enable_dirty_derived_predicates': enable_dirty_derived_predicates,
'enable_greedy_execution': enable_greedy_execution,
'allow_empty_plan_for_optimistic_goal': allow_empty_plan_for_optimistic_goal,
'verbose': verbose
}
return CROWRecursiveSearcherV2(executor, state, goal_expr, **kwargs).main()