Source code for concepts.dm.pdsketch.planners.optimistic_search

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : optimistic_search.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 11/20/2022
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

"""CSP-based PDSketch planner. At the high-level,
this planner defers the decision of all continuous parameters to a CSP solver. During forward search,
the planner only make decisions about discrete parameters (mostly object-typed parameters), and record
all preconditions and goal tests that involve continuous parameters to an accumulated CSP data structure.

At each step, the planner will internally call a CSP solver to solve the accumulated CSP. If the CSP solver
returns a solution, the planner will generate the corresponding fully-grounded trajectory based on the
discrete parameters and the CSP solution.
"""

import heapq as hq
from dataclasses import dataclass
from typing import Any, Optional, Union, Sequence, Tuple, List, Dict, Callable

import jactorch

from concepts.dsl.dsl_types import NamedTensorValueType, PyObjValueType, UnnamedPlaceholder
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.expression import ValueOutputExpression
from concepts.dsl.constraint import OptimisticValue, EqualityConstraint, ConstraintSatisfactionProblem, AssignmentDict, ground_assignment_value
from concepts.algorithm.search.heuristic_search import QueueNode
from concepts.dm.pdsketch.operator import OperatorApplier, gen_all_partially_grounded_actions
from concepts.dm.pdsketch.domain import State, Domain
from concepts.dm.pdsketch.executor import PDSketchExecutor
from concepts.dm.pdsketch.csp_solvers.dpll_sampling import csp_dpll_sampling_solve, csp_dpll_simplify, CSPNoGenerator
from concepts.dm.pdsketch.planners.solution_score_tracker import MostPromisingTrajectoryTracker
from concepts.dm.pdsketch.strips.strips_expression import SState
from concepts.dm.pdsketch.strips.strips_heuristics import StripsHeuristic
from concepts.dm.pdsketch.strips.strips_grounding import GStripsTranslatorOptimistic
from concepts.dm.pdsketch.strips.strips_search import get_priority_func


__all__ = [
    'instantiate_action', 'ground_action', 'ground_actions', 'apply_action', 'goal_test',
    'optimistic_search_domain_check', 'prepare_optimistic_search',
    'construct_csp_from_optimistic_plan', 'solve_optimistic_plan', 'optimistic_search', 'optimistic_search_strips'
]


[docs] def instantiate_action(csp: ConstraintSatisfactionProblem, action: OperatorApplier) -> OperatorApplier: """Instantiate an action by replacing all placeholder values with a new optimistic variable. Args: csp: the CSP to which the new optimistic variable will be added. action: the action to be instantiated. Returns: the instantiated action. """ new_arguments = list() for arg in action.arguments: if isinstance(arg, UnnamedPlaceholder): assert isinstance(arg.dtype, (NamedTensorValueType, PyObjValueType)) new_arguments.append(TensorValue.from_optimistic_value(csp.new_actionable_var(arg.dtype, wrap=True))) else: new_arguments.append(arg) return OperatorApplier(action.operator, new_arguments)
[docs] def ground_action(executor: PDSketchExecutor, action: OperatorApplier, assignments: AssignmentDict) -> OperatorApplier: """Ground a single action with a given assignment to the underlying CSP. Basically, this function looks up the assigned value of each optimistic variable that appear in action parameters. Args: executor: the executor. action: the action to be grounded. assignments: the solution to the underlying CSP. Returns: the grounded action. """ new_arguments = list() for arg in action.arguments: if isinstance(arg, TensorValue): if arg.tensor_optimistic_values is not None: argv = arg.tensor_optimistic_values.item() assert isinstance(argv, int) argv = ground_assignment_value(assignments, argv) new_arguments.append(argv) else: new_arguments.append(arg) elif isinstance(arg, str): new_arguments.append(arg) else: raise TypeError(f'Unsupported argument type: {type(arg)}.') return OperatorApplier(action.operator, new_arguments)
[docs] def ground_actions(executor: PDSketchExecutor, actions: Sequence[OperatorApplier], assignments: AssignmentDict) -> List[OperatorApplier]: """Ground a list of actions with a given assignment to the underlying CSP. Basically, this function looks up the assigned value of each optimistic variable that appear in action parameters. Args: executor: the executor. actions: the list of actions to be grounded. assignments: the solution to the underlying CSP. Returns: the grounded actions. """ return [ground_action(executor, action, assignments) for action in actions]
[docs] def apply_action(executor: PDSketchExecutor, state: State, action: OperatorApplier, csp: ConstraintSatisfactionProblem) -> Tuple[Tuple[bool, State], ConstraintSatisfactionProblem]: """Apply an action to a state. Args: executor: the executor. state: the state to be updated. action: the action to be applied. csp: the CSP to which the new constraints will be added. Returns: a tuple of (goal test result, the updated state), and the updated CSP. """ csp = csp.clone() succ, state = executor.apply(action, state, csp=csp) return (succ, state), csp
[docs] def goal_test( executor: PDSketchExecutor, state: State, goal_expr: ValueOutputExpression, csp: ConstraintSatisfactionProblem, trajectory: Sequence[OperatorApplier], csp_max_generator_trials: int = 3, mpt_tracker: Optional[MostPromisingTrajectoryTracker] = None, verbose: bool = False ) -> Optional[List[OperatorApplier]]: """Test whether a state satisfies a goal expression with DPLL+Sampling CSP solver. Args: executor: the executor. state: the state to be tested. goal_expr: the goal expression. csp: the CSP that contains all constraints that has been accumulated so far. trajectory: the trajectory that leads to the current state. csp_max_generator_trials: the maximum number of trials for calling CSP generators. mpt_tracker: the tracker for the most promising trajectory. verbose: whether to print verbose information. Returns: the trajectory of grounded actions that leads to the goal state if the goal is satisfied, otherwise None. """ csp = csp.clone() rv = executor.execute(goal_expr, state=state, csp=csp).item() # print(" opt::goal_test", *trajectory, sep='\n ') # print(" opt::goal_test", rv) if isinstance(rv, OptimisticValue): csp.add_constraint(EqualityConstraint.from_bool(rv, True), note='goal_test') try: if verbose: pass # print(" opt::final_csp_solve", *trajectory, sep='\n ') # print(" opt::final_csp_solve", jacinle.indent_text(csp).lstrip()) assignments = csp_dpll_sampling_solve(executor, csp, max_generator_trials=csp_max_generator_trials) except CSPNoGenerator: return None if assignments is not None: # TODO(Jiayuan Mao @ 2022/12/16): implement mpt_tracker? return ground_actions(executor, trajectory, assignments) else: return None else: rv = float(rv) threshold = mpt_tracker.threshold if mpt_tracker is not None else 0.5 if rv > threshold or mpt_tracker is not None: if rv <= threshold: if not mpt_tracker.check(rv): return None try: assignments = csp_dpll_sampling_solve(executor, csp, max_generator_trials=csp_max_generator_trials) except CSPNoGenerator: raise None if assignments is not None: plan = ground_actions(executor, trajectory, assignments) if mpt_tracker is not None: mpt_tracker.update(rv, plan) if rv > threshold: return ground_actions(executor, trajectory, assignments) else: return None else: return None
[docs] def optimistic_search_domain_check(domain: Domain): """Check whether a domain is suitable for optimistic search. Args: domain: the domain to be checked. Raises: NotImplementedError: if the domain is not suitable for optimistic search. """ for op in domain.operators.values(): if op.is_axiom: raise NotImplementedError('Optimistic search does not support axioms.')
[docs] @jactorch.no_grad_func def construct_csp_from_optimistic_plan( executor: PDSketchExecutor, state: State, goal_expr: Union[str, ValueOutputExpression], actions: Sequence[OperatorApplier], *, simplify: bool = False, verbose: bool = False ) -> Tuple[List[OperatorApplier], ConstraintSatisfactionProblem]: """Construct a CSP from an optimistic plan. Args: executor: the executor. state: the initial state. goal_expr: the goal expression. actions: a list of partially grounded actions. simplify: whether to simplify the CSP. verbose: whether to print verbose information. Returns: the list of grounded actions, and the constructed CSP. """ optimistic_search_domain_check(executor.domain) csp = ConstraintSatisfactionProblem() if isinstance(goal_expr, str): goal_expr = executor.parse(goal_expr) else: assert isinstance(goal_expr, ValueOutputExpression) action_groundings = list() for a in actions: action_grounding = instantiate_action(csp, a) action_groundings.append(action_grounding) (succ, state), csp = apply_action(executor, state, action_grounding, csp) if succ: pass else: raise ValueError(f'Unable to perform action {action_grounding} at state {state}.') rv = executor.execute(goal_expr, state=state, csp=csp).item() if isinstance(rv, OptimisticValue): csp.add_constraint(EqualityConstraint.from_bool(rv, True), note='goal_test') if simplify: return action_groundings, csp_dpll_simplify(executor, csp) return action_groundings, csp
[docs] @jactorch.no_grad_func def solve_optimistic_plan( executor: PDSketchExecutor, state: State, goal_expr: Union[str, ValueOutputExpression], actions: Sequence[OperatorApplier], *, csp_max_generator_trials: int = 3, verbose: bool = False ) -> Tuple[State, ConstraintSatisfactionProblem, Optional[List[OperatorApplier]]]: """Solve an optimistic plan using the DPLL+Sampling algorithm. Args: executor: the executor. state: the initial state. goal_expr: the goal expression. actions: a sequence of partially grounded actions. csp_max_generator_trials: the maximum number of trials for calling CSP generators. verbose: whether to print verbose information. Returns: a list of actions that solves the plan, or None if no solution is found. """ optimistic_search_domain_check(executor.domain) csp = ConstraintSatisfactionProblem() if isinstance(goal_expr, str): goal_expr = executor.parse(goal_expr) else: assert isinstance(goal_expr, ValueOutputExpression) action_groundings = list() for a in actions: action_grounding = instantiate_action(csp, a) action_groundings.append(action_grounding) (succ, state), csp = apply_action(executor, state, action_grounding, csp) if succ: pass else: raise ValueError(f'Unable to perform action {action_grounding} at state {state}.') plan = goal_test(executor, state, goal_expr, csp, trajectory=action_groundings, csp_max_generator_trials=csp_max_generator_trials, verbose=verbose) if plan is not None: return state, csp, plan return state, csp, None
[docs] @dataclass class OptHeuristicSearchState(object): """The state for heuristic search.""" state: State """The state.""" strips_state: SState """The STRIPS state.""" trajectory: Tuple[OperatorApplier, ...] """The trajectory.""" csp: ConstraintSatisfactionProblem """The CSP that has been accumulated so far.""" g: float """The cost so far."""
[docs] @jactorch.no_grad_func def optimistic_search_strips( executor: PDSketchExecutor, state: State, goal_expr: Union[str, ValueOutputExpression], strips_heuristic: str = 'hff', *, max_expansions=100000, max_depth=100, # search related parameters. csp_max_generator_trials: int = 3, heuristic_weight: float = float('inf'), # heuristic related parameters. actions: Optional[Sequence[OperatorApplier]] = None, action_filter: Callable[[OperatorApplier], bool] = None, use_strips_op: bool = False, use_tuple_desc: bool = False, use_csp_pruning: bool = True, # pruning related parameters. forward_state_variables: bool = True, forward_derived: bool = False, # initialization related parameters. track_most_promising_trajectory: bool = False, prob_goal_threshold: float = 0.5, # non-optimal trajectory tracking related parameters. verbose: bool = False, return_extra_info: bool = False ): """Perform heuristic search with DPLL+Sampling algorithm and STRIPS-based heuristics, for mixed discrete-continuous domains. Args: executor: the executor. state: the initial state. goal_expr: the goal expression. strips_heuristic: the heuristic to use. Should be a string. max_expansions: the maximum number of expanded nodes. max_depth: the maximum depth of the search. csp_max_generator_trials: the maximum number of trials for calling CSP generators. heuristic_weight: the weight of the heuristic. Use float('inf') to do greedy best-first search. actions: the actions to use. If None, use all possible actions. Partially grounded actions are allowed. action_filter: the action filter. If None, use all possible actions. It should be a function that takes in an action and returns a boolean. use_strips_op: whether to use STRIPS operators when applying actions. Recommended to be False. use_tuple_desc: whether to use tuple description to prune the search space. forward_state_variables: whether to forward state variables before the search starts. forward_derived: whether to forward derived predicates after applying actions. track_most_promising_trajectory: whether to track the most promising trajectory. prob_goal_threshold: the probability threshold for the most promising trajectory. When there is a trajectory with probability greater than this threshold, the search will stop. verbose: whether to print verbose information. return_extra_info: whether to return extra information, such as the number of expanded nodes. Returns: the trajectory if succeeded, otherwise None. When `return_extra_info` is True, return a tuple of (trajectory, extra_info), where extra_info is a dictionary. """ assert not forward_derived, 'Not implemented.' optimistic_search_domain_check(executor.domain) state, goal_expr, actions = prepare_optimistic_search( 'optsstrips', executor, state, goal_expr, actions=actions, action_filter=action_filter, verbose=verbose, forward_state_variables=forward_state_variables, forward_derived=forward_derived ) # TODO(Jiayuan Mao @ 2022/12/16): Relevance analysis for optimistic planning tasks. strips_translator = GStripsTranslatorOptimistic(executor, use_string_name=verbose) strips_task = strips_translator.compile_task( state, goal_expr, actions, is_relaxed=False, forward_relevance_analysis=False, backward_relevance_analysis=False ) # TODO(Jiayuan Mao @ 2022/12/16): Support external heuristics. mpt_tracker = None if track_most_promising_trajectory: mpt_tracker = MostPromisingTrajectoryTracker(True, prob_goal_threshold) initial_state = OptHeuristicSearchState(state, strips_task.state, tuple(), ConstraintSatisfactionProblem(), 0) queue: List[QueueNode] = list() visited = set() heuristic = StripsHeuristic.from_type(strips_heuristic, strips_task, strips_translator) def heuristic_fn(state: OptHeuristicSearchState) -> int: return heuristic.compute(state.strips_state) priority_func = get_priority_func(heuristic_fn, heuristic_weight) def push_node(node: OptHeuristicSearchState): added = False if use_tuple_desc: nst = node.state.generate_tuple_description(executor.domain) if nst not in visited: added = True visited.add(nst) else: # unconditionally expand added = True if added: hq.heappush(queue, QueueNode(priority_func(node, node.g), node)) if optimistic_search_strips.DEBUG: print(' optsstrips::push_node:', *node.trajectory, sep='\n ') print(' ', 'heuristic =', heuristic.compute(node.strips_state), 'g =', node.g) push_node(initial_state) nr_expanded_states = 0 nr_tested_actions = 0 def wrap_extra_info(trajectory): if return_extra_info: return trajectory, {'nr_expansions': nr_expanded_states, 'nr_tested_actions': nr_tested_actions} return trajectory is_initial_state = True while len(queue) > 0 and nr_expanded_states < max_expansions: priority, node = hq.heappop(queue) nr_expanded_states += 1 s, ss, traj, csp = node.state, node.strips_state, node.trajectory, node.csp if optimistic_search_strips.DEBUG: print('optsstrips::pop_node:') print(' trajectory:', *traj, sep='\n ') print(' priority =', priority, 'g =', node.g) if optimistic_search_strips.DEBUG_INTERACTIVE: input(' Continue?') if track_most_promising_trajectory and is_initial_state: is_initial_state = False else: plan = goal_test( executor, s, goal_expr, csp, trajectory=traj, csp_max_generator_trials=csp_max_generator_trials, verbose=verbose, mpt_tracker=mpt_tracker ) if plan is not None: if verbose: print('optsstrips::search succeeded.') print('optsstrips::total_expansions:', nr_expanded_states) return wrap_extra_info(plan) if len(traj) >= max_depth: continue for sa in strips_task.operators: nr_tested_actions += 1 a = sa.raw_operator ncsp = csp.clone() action_grounding = instantiate_action(ncsp, a) (succ, ns), ncsp = apply_action(executor, s, action_grounding, ncsp) nt = traj + (action_grounding, ) if succ: if use_csp_pruning: try: if optimistic_search_strips.DEBUG: print(' optsstrips::running CSP pruning...', *nt, sep='\n ') if not csp_dpll_sampling_solve(executor, ncsp, solvable_only=True, max_generator_trials=csp_max_generator_trials): if optimistic_search_strips.DEBUG: print(' optsstrips::pruned:', *nt, sep='\n ') continue except CSPNoGenerator: pass nss = sa.apply(ss) if use_strips_op else strips_translator.compile_state(ns.clone(), forward_derived=False) nnode = OptHeuristicSearchState(ns, nss, nt, ncsp, node.g + 1) push_node(nnode) if verbose: print('optsstrips::search failed.') print('optsstrips::total_expansions:', nr_expanded_states) if mpt_tracker is not None: return mpt_tracker.solution return None
optimistic_search_strips.DEBUG = False optimistic_search_strips.set_debug = lambda x = True: setattr(optimistic_search_strips, 'DEBUG', x) optimistic_search_strips.DEBUG_INTERACTIVE = False optimistic_search_strips.set_debug_interactive = lambda x = True: setattr(optimistic_search_strips, 'DEBUG_INTERACTIVE', x)