#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : optimistic_search_bilevel_legacy.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 09/10/2023
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
from typing import Union, Sequence, Iterable, Tuple, List, Set
import jacinle
from concepts.dsl.dsl_types import QINDEX
from concepts.dsl.expression import ValueOutputExpression, is_and_expr
from concepts.dsl.constraint import ConstraintSatisfactionProblem
from concepts.dm.pdsketch.domain import State
from concepts.dm.pdsketch.executor import PDSketchExecutor
from concepts.dm.pdsketch.operator import OperatorApplier, OperatorApplicationExpression
from concepts.dm.pdsketch.regression_rule import AchieveExpression
from concepts.dm.pdsketch.regression_utils import ground_fol_expression, ground_operator_application_expression, surface_fol_downcast
from concepts.dm.pdsketch.crow.crow_state import TotallyOrderedPlan, PartiallyOrderedPlan
from concepts.dm.pdsketch.planners.optimistic_search_bilevel_utils import extract_bounded_variables_from_nonzero, extract_bounded_variables_from_nonzero_dc
__all__ = [
'enumerate_possible_symbolic_plans_regression_c_v1', 'gen_applicable_regression_rules_v1',
'enumerate_possible_symbolic_plans_regression_c_v2', 'gen_applicable_regression_rules_v2'
]
[docs]
def enumerate_possible_symbolic_plans_regression_c_v1(
executor: PDSketchExecutor, state: State, goal_expr: Union[str, ValueOutputExpression],
max_actions: int = 10,
) -> Iterable[Tuple[Sequence[OperatorApplier], ConstraintSatisfactionProblem, State, State]]:
"""Enumerate all possible plans that can achieve the goal.
Args:
executor: the executor.
state: the initial state.
goal_expr: the goal expression.
max_actions: the maximum number of actions in a plan.
Returns:
A list of plans. Each plan is a tuple of (actions, csp, initial_state, final_state).
"""
if isinstance(goal_expr, str):
goal_expr = executor.parse(goal_expr)
@jacinle.log_function(verbose=False)
def dfs(state, goal_set, maintains, csp, previous_actions, nr_high_level_actions=0):
jacinle.log_function.print('Current goal_set', goal_set)
rv = min([executor.execute(goal, state=state, bounded_variables=dict()).item() for goal in goal_set])
if rv > 0.5:
return [(state, None, previous_actions)]
if nr_high_level_actions > max_actions:
return []
# jacinle.log_function.print('Current state', state)
# import ipdb; ipdb.set_trace()
all_possible_plans = list()
candidate_regression_rules = gen_applicable_regression_rules_v1(executor, state, goal_set, maintains)
jacinle.log_function.print('Candidate regression rules', candidate_regression_rules)
if len(candidate_regression_rules) == 0:
print('Entering a debug mode: no candidate regression rules.')
import ipdb; ipdb.set_trace()
return []
for goal_index, rule, bounded_variables in candidate_regression_rules:
grounded_subgoals = list()
for item in rule.body[:-1]:
assert isinstance(item, AchieveExpression)
grounded_subgoals.append(ground_fol_expression(item.goal, bounded_variables))
assert isinstance(rule.body[-1], OperatorApplicationExpression)
if len(goal_set) == 1: # If there is only one single goal left, we should just directly try this regression rule.
max_prefix_length = 0
else:
# TODO(Jiayuan Mao @ 2023/09/06): find the last [always] achievement statement.
max_prefix_length = len(rule.body) - 1
for prefix_length in range(0, max_prefix_length + 1): # assuming the last item is OperatorApplicationExpression
jacinle.log_function.print('Applying rule', rule, 'for', goal_set[goal_index], 'and prefix length', prefix_length)
other_goals = goal_set[:goal_index] + goal_set[goal_index + 1:] + grounded_subgoals[:prefix_length]
if len(other_goals) > 0:
# jacinle.log_function.print('Other goals', other_goals)
possible_branches = dfs(state, other_goals, maintains, csp, previous_actions, nr_high_level_actions=nr_high_level_actions + 1)
else:
possible_branches = [(state, csp, previous_actions)]
if len(possible_branches) == 0:
jacinle.log_function.print('Prefix planning failed!!! Stop.')
# If it's not possible to achieve the subset of goals, then it's not possible to achieve the whole goal set.
# Therefore, this is a break, not a continue.
break
for i in range(prefix_length, len(rule.body)):
item = rule.body[i]
next_possible_branches = list()
jacinle.log_function.print(f'Now enter item #{i}: {item}')
for cur_state, cur_csp, actions in possible_branches:
if isinstance(item, AchieveExpression):
subgoal = grounded_subgoals[i]
# jacinle.log_function.print('Achieve subgoal', subgoal)
next_possible_branches.extend(dfs(cur_state, [subgoal], maintains, None, actions, nr_high_level_actions=nr_high_level_actions + 1))
elif isinstance(item, OperatorApplicationExpression):
cur_action = ground_operator_application_expression(item, bounded_variables)
succ, cur_state = executor.apply(cur_action, cur_state, csp=None, clone=True)
jacinle.log_function.print('Applying action', cur_action, 'with result', succ)
if succ:
next_possible_branches.append((cur_state, None, actions + [cur_action]))
else:
raise TypeError()
possible_branches = next_possible_branches
# all_possible_plans.extend(possible_branches)
found_plan = False
for plan in possible_branches:
rv = min([executor.execute(goal, state=plan[0], bounded_variables=dict()).item() for goal in goal_set])
if rv > 0.5:
jacinle.log_function.print('Found plan', plan[2])
found_plan = True
all_possible_plans.append(plan)
else:
jacinle.log_function.print('Plan', plan[2], 'failed')
if found_plan:
break
# TODO(Jiayuan Mao @ 2023/09/06): think about what's the semantics for this early return.
# The idea is that, as soon as we found a plan using one of the regression rules, we return it, and don't worry about the other rules.
# Therefore, it is not guaranteed that the returned plan is the shortest (optimal) one.
if len(all_possible_plans) > 0:
break
if len(all_possible_plans) == 0:
jacinle.log_function.print('No possible plans for goal set', goal_set)
return []
shortest_plan = min(all_possible_plans, key=lambda x: len(x[2]))
return [shortest_plan]
if is_and_expr(goal_expr):
goal_set = list(goal_expr.arguments)
else:
goal_set = [goal_expr]
candidate_plans = dfs(state, goal_set, None, None, list())
candidate_plans = [(actions, csp, state, final_state) for final_state, csp, actions in candidate_plans]
return candidate_plans
[docs]
def gen_applicable_regression_rules_v1(executor: PDSketchExecutor, state: State, goal_set: List[ValueOutputExpression], maintains: Set[ValueOutputExpression]):
"""Generate applicable regression rules for the given goal set and maintains set."""
# TODO(Jiayuan Mao @ 2023/09/04): implement CSP.
# TODO(Jiayuan Mao @ 2023/09/04): implement "maintains".
candidate_regression_rules = list()
for i, goal in enumerate(goal_set):
for regression_rule in executor.domain.regression_rules.values():
goal_expr = regression_rule.goal_expression
if (variable_binding := surface_fol_downcast(goal_expr, goal)) is None:
continue
bounded_variables = dict()
for v in regression_rule.goal_arguments:
bounded_variables[v] = variable_binding[v.name].name
if len(regression_rule.binding_arguments) > 0:
for v in regression_rule.binding_arguments:
bounded_variables[v] = QINDEX
if len(regression_rule.preconditions_conjunction.arguments) > 0:
rv = executor.execute(regression_rule.preconditions_conjunction, state=state, bounded_variables=bounded_variables)
else:
rv = None
if rv is None:
candidate_regression_rules.append((i, regression_rule, bounded_variables))
else:
all_forall, candidate_bounded_variables = extract_bounded_variables_from_nonzero(state, rv, regression_rule, default_bounded_variables=bounded_variables)
if all_forall and regression_rule.always:
candidate_regression_rules.append((i, regression_rule, candidate_bounded_variables[0]))
else:
candidate_regression_rules.extend([(i, regression_rule, cbv) for cbv in candidate_bounded_variables])
return candidate_regression_rules
[docs]
def enumerate_possible_symbolic_plans_regression_c_v2(
executor: PDSketchExecutor, state: State, goal_expr: Union[str, ValueOutputExpression],
max_depth: int = 10, enable_reordering: bool = True,
verbose: bool = False
) -> Iterable[Tuple[Sequence[OperatorApplier], ConstraintSatisfactionProblem, State, State]]:
"""Enumerate all possible plans that can achieve the goal.
Args:
executor: the executor.
state: the initial state.
goal_expr: the goal expression.
max_depth: the maximum depth of goal regression search.
enable_reordering: whether to enable reordering of the subgoals coming from different chains.
verbose: whether to print verbose information.
Returns:
A list of plans. Each plan is a tuple of (actions, csp, initial_state, final_state).
"""
if isinstance(goal_expr, str):
goal_expr = executor.parse(goal_expr)
search_cache = dict()
search_stat = {'nr_expanded_nodes': 0}
def return_with_cache(goal_set, previous_actions, rv):
if len(previous_actions) == 0:
goal_str = goal_set.gen_string()
if goal_str not in search_cache:
search_cache[goal_str] = rv
return rv
def try_retrive_cache(goal_set, previous_actions):
if len(previous_actions) == 0:
goal_str = goal_set.gen_string()
if goal_str in search_cache:
return search_cache[goal_str]
return None
@jacinle.log_function(verbose=False)
def dfs(state: State, goal_set: PartiallyOrderedPlan, maintains, csp, previous_actions, nr_high_level_actions=0):
if verbose:
jacinle.log_function.print('Current goal_set', goal_set)
jacinle.log_function.print('Previous actions', previous_actions)
if (rv := try_retrive_cache(goal_set, previous_actions)) is not None:
return rv
if len(goal_set) == 0:
return return_with_cache(goal_set, previous_actions, [(state, None, previous_actions)])
rv = min([executor.execute(goal, state=state, bounded_variables=dict()).item() for goal in goal_set.iter_goals()])
if rv > 0.5:
return return_with_cache(goal_set, previous_actions, [(state, None, previous_actions)])
if nr_high_level_actions > max_depth:
return return_with_cache(goal_set, previous_actions, [])
search_stat['nr_expanded_nodes'] += 1
all_possible_plans = list()
candidate_regression_rules = gen_applicable_regression_rules_v2(executor, state, goal_set, maintains)
if verbose:
jacinle.log_function.print('Candidate regression rules', candidate_regression_rules)
if sum(len(r) for _, _, r in candidate_regression_rules) == 0:
# print('Entering a debug mode: no candidate regression rules.')
# import ipdb; ipdb.set_trace()
return return_with_cache(goal_set, previous_actions, [])
for chain_index, subgoal_index, this_candidate_regression_rules in candidate_regression_rules:
other_goals = goal_set.exclude(chain_index, subgoal_index)
cur_goal = goal_set.chains[chain_index].sequence[subgoal_index]
if verbose:
jacinle.log_function.print('Now trying to excluding goal', cur_goal)
if len(other_goals) == 0:
other_goals_plans = [(state, None, previous_actions)]
else:
other_goals_plans = dfs(state, other_goals, maintains, csp, previous_actions, nr_high_level_actions=nr_high_level_actions)
for cur_state, cur_csp, cur_actions in other_goals_plans:
rv = executor.execute(cur_goal, state=state, bounded_variables=dict()).item()
if rv > 0.5: # Found a plan that directly achieve the goal without any regression.
all_possible_plans.append((cur_state, cur_csp, cur_actions))
if len(all_possible_plans) > 0:
break
if len(other_goals) == 0 or not enable_reordering:
max_prefix_length = 0
else:
max_prefix_length = max(len(rule.body) - 1 for rule, _ in this_candidate_regression_rules)
grounded_subgoals_cache = dict()
for prefix_length in range(max_prefix_length + 1):
for regression_rule_index, (rule, bounded_variables) in enumerate(this_candidate_regression_rules):
if prefix_length > len(rule.body) - 1:
continue
if verbose:
jacinle.log_function.print('Applying rule', rule, 'for', cur_goal, 'and prefix length', prefix_length, 'goal set is', goal_set)
# construct grounded_subgoals
if regression_rule_index in grounded_subgoals_cache:
grounded_subgoals = grounded_subgoals_cache[regression_rule_index]
else:
grounded_subgoals = list()
for item in rule.body:
if isinstance(item, AchieveExpression):
grounded_subgoals.append(ground_fol_expression(item.goal, bounded_variables))
grounded_subgoals_cache[regression_rule_index] = grounded_subgoals
if prefix_length == 0:
possible_branches = other_goals_plans
else:
cur_other_goals = other_goals.add_chain(grounded_subgoals[:prefix_length])
possible_branches = dfs(state, cur_other_goals, maintains, csp, previous_actions, nr_high_level_actions=nr_high_level_actions + 1)
if len(possible_branches) == 0:
if verbose:
jacinle.log_function.print('Prefix planning failed!!! Stop.')
# If it's not possible to achieve the subset of goals, then it's not possible to achieve the whole goal set.
# Therefore, this is a break, not a continue.
break
for i in range(prefix_length, len(rule.body)):
item = rule.body[i]
next_possible_branches = list()
if verbose:
jacinle.log_function.print(f'Now enter item #{i}: {item}')
for cur_state, cur_csp, actions in possible_branches:
if isinstance(item, AchieveExpression):
subgoal = grounded_subgoals[i]
# jacinle.log_function.print('Achieve subgoal', subgoal)
next_possible_branches.extend(dfs(cur_state, PartiallyOrderedPlan.from_single_goal(subgoal), maintains, None, actions, nr_high_level_actions=nr_high_level_actions + 1))
elif isinstance(item, OperatorApplicationExpression):
cur_action = ground_operator_application_expression(item, bounded_variables)
succ, cur_state = executor.apply(cur_action, cur_state, csp=None, clone=True)
# if verbose:
# jacinle.log_function.print('Applying action', cur_action, 'with result', succ)
if succ:
next_possible_branches.append((cur_state, None, actions + [cur_action]))
else:
raise TypeError()
possible_branches = next_possible_branches
# all_possible_plans.extend(possible_branches)
found_plan = False
for plan in possible_branches:
rv = min([executor.execute(goal, state=plan[0], bounded_variables=dict()).item() for goal in goal_set.iter_goals()])
if rv > 0.5:
if verbose:
jacinle.log_function.print('Found plan', plan[2], 'for goal', goal_set)
found_plan = True
all_possible_plans.append(plan)
else:
if verbose:
jacinle.log_function.print('Plan', plan[2], 'failed')
if found_plan:
# TODO(Jiayuan Mao @ 2023/09/06): since we have changed the order of prefix_length for-loop and the regression rule for-loop.
# We need to use an additional dictionary to store whether we have found a plan for a particular regression rule.
# Right now this doens't matter because we only use the first plan.
break
# TODO(Jiayuan Mao @ 2023/09/06): think about what's the semantics for this early return.
# The idea is that, as soon as we found a plan using one of the regression rules, we return it, and don't worry about the other rules.
# Therefore, it is not guaranteed that the returned plan is the shortest (optimal) one.
if len(all_possible_plans) > 0:
break
if len(all_possible_plans) > 0:
break
if len(all_possible_plans) == 0:
if verbose:
jacinle.log_function.print('No possible plans for goal set', goal_set)
return return_with_cache(goal_set, previous_actions, [])
shortest_plan = min(all_possible_plans, key=lambda x: len(x[2]))
return return_with_cache(goal_set, previous_actions, [shortest_plan])
if is_and_expr(goal_expr):
goal_set = list(goal_expr.arguments)
else:
goal_set = [goal_expr]
goal_set = PartiallyOrderedPlan((TotallyOrderedPlan(goal_set, is_ordered=True),))
candidate_plans = dfs(state, goal_set, None, None, list())
candidate_plans = [(actions, csp, state, final_state) for final_state, csp, actions in candidate_plans]
return candidate_plans, search_stat
[docs]
def gen_applicable_regression_rules_v2(executor: PDSketchExecutor, state: State, goals: PartiallyOrderedPlan, maintains: Set[ValueOutputExpression]):
# TODO(Jiayuan Mao @ 2023/09/10): implement maintains.
candidate_regression_rules = list()
for chain_index, chain in goals.iter_feasible_chains():
if chain.is_ordered:
subgoal_indices = [len(chain) - 1]
else:
subgoal_indices = list(range(len(chain)))
for subgoal_index in subgoal_indices:
subgoal = chain.sequence[subgoal_index]
this_chain_candidate_regression_rules = list()
for regression_rule in executor.domain.regression_rules.values():
goal_expr = regression_rule.goal_expression
if (variable_binding := surface_fol_downcast(goal_expr, subgoal)) is None:
continue
bounded_variables = dict()
for v in regression_rule.goal_arguments:
bounded_variables[v] = variable_binding[v.name].name
if len(regression_rule.binding_arguments) > 0:
for v in regression_rule.binding_arguments:
bounded_variables[v] = QINDEX
if len(regression_rule.preconditions_conjunction.arguments) > 0:
if len(regression_rule.binding_arguments) > 4 and len(regression_rule.preconditions_conjunction.arguments) >= 2:
rv = extract_bounded_variables_from_nonzero_dc(executor, state, regression_rule, bounded_variables, use_optimistic=False)
else:
rv = executor.execute(regression_rule.preconditions_conjunction, state=state, bounded_variables=bounded_variables)
rv = extract_bounded_variables_from_nonzero(state, rv, regression_rule, default_bounded_variables=bounded_variables, use_optimistic=False)
else:
rv = None
if rv is None:
this_chain_candidate_regression_rules = [(regression_rule, bounded_variables)]
if regression_rule.always:
break
else:
all_forall, candidate_bounded_variables = rv
if all_forall and regression_rule.always:
this_chain_candidate_regression_rules = [(regression_rule, candidate_bounded_variables[0])]
break
else:
this_chain_candidate_regression_rules.extend([(regression_rule, cbv) for cbv in candidate_bounded_variables])
candidate_regression_rules.append((chain_index, subgoal_index, this_chain_candidate_regression_rules))
return candidate_regression_rules