#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : strips_heuristics.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 03/20/2022
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
from typing import Optional, Union, Tuple, List, Set, Dict, Callable
from concepts.dm.pdsketch.strips.strips_expression import SProposition, SState
from concepts.dm.pdsketch.strips.strips_grounded_expression import GSBoolForwardDiffReturn, GSBoolOutputExpression, GSSimpleBoolAssignExpression, GSConditionalAssignExpression
from concepts.dm.pdsketch.strips.strips_grounding import GStripsProblem, GStripsTranslatorBase
__all__ = ['StripsHeuristic', 'StripsBlindHeuristic', 'StripsRPGHeuristic', 'StripsHFFHeuristic']
[docs]
class StripsHeuristic(object):
"""The base class for STRIPS heuristics."""
[docs]
def __init__(self, task: GStripsProblem, translator: Optional[GStripsTranslatorBase] = None):
"""Initialize the heuristic.
Args:
task: the task to compute the heuristic value (including the goal and the operators)
translator: the translator to translate the task representation into the `GS*` (grounded STRIPS) representation.
"""
self._task = task
self._goal_func = task.goal.compile()
self._translator = translator
@property
def task(self) -> GStripsProblem:
"""The grounded STRIPS planning task."""
return self._task
@property
def goal_func(self) -> Callable[[SState], Union[bool, GSBoolForwardDiffReturn]]:
"""The (compiled version) of goal function of the task."""
return self._goal_func
@property
def translator(self) -> Optional[GStripsTranslatorBase]:
"""The translator used to translate the task representation into the `GS*` (grounded STRIPS) representation."""
return self._translator
[docs]
@classmethod
def from_type(cls, type_identifier: str, task: GStripsProblem, translator: Optional[GStripsTranslatorBase] = None, **kwargs) -> 'StripsHeuristic':
"""Create a heuristic from the given type identifier.
Args:
type_identifier: the type identifier of the heuristic.
task: the grounded STRIPS planning task.
translator: the translator used to translate the planning task to the `GS*` (grounded STRIPS) representation.
**kwargs: additional arguments to pass to the constructor.
Returns:
the created heuristic.
"""
if type_identifier == 'hff':
return StripsHFFHeuristic(task, translator, **kwargs)
elif type_identifier == 'blind':
return StripsBlindHeuristic(task, translator)
else:
raise ValueError(f'Unknown heuristic type {type_identifier}.')
[docs]
def compute(self, state: SState) -> int:
"""Compute the heuristic value of the given state.
Args:
state: the state to compute the heuristic value.
Returns:
the heuristic value of the given state.
"""
raise NotImplementedError()
[docs]
class StripsBlindHeuristic(StripsHeuristic):
"""A blind heuristic that always returns 1 if the goal is not satisfied, and 0 otherwise."""
[docs]
def compute(self, state: SState) -> int:
goal_rv = self._goal_func(state)
return 0 if goal_rv else 1
[docs]
class StripsRPGHeuristic(StripsHeuristic):
"""RPG heuristic (relaxed planning graph)."""
[docs]
def __init__(self, task: GStripsProblem, translator: Optional[GStripsTranslatorBase] = None, forward_relevance_analysis: bool = True, backward_relevance_analysis: bool = True):
"""Initialize an RPG heuristic.
Args:
task: the grounded STRIPS planning task.
translator: the translator used to translate the planning task to the `GS*` (grounded STRIPS) representation.
forward_relevance_analysis: whether to perform forward relevance analysis.
backward_relevance_analysis: whether to perform backward relevance analysis.
"""
super().__init__(task, translator)
self._forward_relevance_analysis = forward_relevance_analysis
self._backward_relevance_analysis = backward_relevance_analysis
if task.is_relaxed:
self._relaxed = task
else:
assert translator is not None
self._relaxed = translator.recompile_relaxed_task(task, forward_relevance_analysis=forward_relevance_analysis, backward_relevance_analysis=backward_relevance_analysis)
@property
def relaxed(self) -> GStripsProblem:
"""The relaxed version of the task."""
return self._relaxed
[docs]
def compute_rpg_forward_diff(self, state: SState, relaxed_task: Optional[GStripsProblem] = None) -> Tuple[
List[Set[SState]],
Dict[SProposition, int],
List[List[Tuple[int, int, Set[SProposition]]]],
List[List[Tuple[int, int, Set[SProposition]]]],
GSBoolForwardDiffReturn
]:
"""Compute the relaxed planning graph using forward differentiation.
Args:
state: the state to compute the relaxed planning graph.
relaxed_task: the relaxed version of the task. If not provided, the task will be relaxed using the translator.
Returns:
the relaxed planning graph, which is represented as a tuple of the following elements:
- a list of sets of states, where each set contains all the states that can be reached from the initial state in the given number of steps.
- a dictionary that maps propositions to their level.
- a list of lists of tuples, where each tuple contains the following elements:
- the index of the action in the relaxed task operator list.
- the index of the effect in the operator.
- the propositions that contributes to the effect (computed using Forward Diff).
- a list of lists of tuples, where each tuple contains the following elements:
- the index of the action that achieves the proposition.
- the level of the proposition.
- the set of propositions that are achieved by the action.
- the result of forward differentiation of the goal function.
"""
if relaxed_task is None:
relaxed_task = self._relaxed
with GSBoolOutputExpression.enable_forward_diff_ctx():
F_sets = [set(state)]
A_sets = []
D_sets = []
used_operators = set()
used_derived_predicates = set()
# print('rpginit', F_sets[-1])
goal_rv = self._goal_func(F_sets[-1])
while not goal_rv.rv:
# for op in relaxed_task.operators:
# print(' ', op.raw_operator, op.precondition, op.applicable(F_sets[-1]))
new_ops: List[Tuple[int, int, Set[SProposition]]] = list() # op_index, eff_index, op_precondition
# print('Starting new step')
for i, op in enumerate(relaxed_task.operators):
op_pred_rv = None
for j, e in enumerate(op.effects):
if (i, j) not in used_operators:
if op_pred_rv is None:
op_pred_rv = op.applicable(F_sets[-1])
# print('Evaluating op', op.raw_operator, op_pred_rv)
if op_pred_rv.rv:
if isinstance(e, GSSimpleBoolAssignExpression):
# TODO:: check if it should be (i, j) or (i, 0, op_pred_rv.propositions)
new_ops.append((i, j, op_pred_rv.propositions))
used_operators.add((i, j))
elif isinstance(e, GSConditionalAssignExpression):
eff_pred_rv = e.applicable(F_sets[-1])
if eff_pred_rv.rv:
new_ops.append((i, j, op_pred_rv.propositions | eff_pred_rv.propositions))
# print(' Use operator', i, j)
used_operators.add((i, j))
else:
break
new_F = F_sets[-1].copy()
for op_index, effect_index, _ in new_ops:
op = relaxed_task.operators[op_index]
eff = op.effects[effect_index]
new_F.update(eff.add_effects)
new_dps: List[Tuple[int, int, Set[SProposition]]] = list() # dp_index, eff_index, dp_precondition
for i, dp in enumerate(relaxed_task.derived_predicates):
for j, e in enumerate(dp.effects):
if (i, j) not in used_derived_predicates:
dp_pred_rv = e.applicable(new_F)
# print((i, j), dp_pred_rv)
if dp_pred_rv.rv:
used_derived_predicates.add((i, j))
new_dps.append((i, j, dp_pred_rv.propositions))
for dp_index, effect_index, _ in new_dps:
dp = relaxed_task.derived_predicates[dp_index]
eff = dp.effects[effect_index]
new_F.update(eff.add_effects)
# print('depth', len(F_sets), new_F)
if len(new_F) == len(F_sets[-1]):
break
A_sets.append(new_ops)
D_sets.append(new_dps)
F_sets.append(new_F)
goal_rv = self._goal_func(F_sets[-1])
F_levels = {}
for i in range(0, len(F_sets)):
for f in F_sets[i]:
if f not in F_levels:
F_levels[f] = i
return F_sets, F_levels, A_sets, D_sets, goal_rv
[docs]
def compute(self, state: SState) -> int:
raise NotImplementedError()
[docs]
class StripsHFFHeuristic(StripsRPGHeuristic):
[docs]
def compute(self, state: SState) -> int:
"""Compute the hFF heuristic value for the given state.
Args:
state: the state to compute the heuristic value for.
Returns:
the heuristic value.
"""
# NB(Jiayuan Mao @ 12/04): We don't need to do relevance analysis because the actual computation for hFF will do similar things anyway...
relaxed = self._translator.recompile_task_new_state(self._relaxed, state, forward_relevance_analysis=False, backward_relevance_analysis=False)
state = relaxed.state
F_sets, F_levels, A_sets, D_sets, goal_rv = self.compute_rpg_forward_diff(state, relaxed_task=relaxed)
if not goal_rv.rv:
return int(1e9)
selected: Set[Tuple[int, int]] = set()
selected_facts = [set() for _ in F_sets]
goal_propositions = goal_rv.propositions
for pred in goal_propositions:
if pred not in F_levels:
return int(1e9)
selected_facts[F_levels[pred]].add(pred)
for t in reversed(list(range(len(A_sets)))):
for dp_index, effect_index, propositions in D_sets[t]:
dp = self._relaxed.derived_predicates[dp_index]
eff = dp.effects[effect_index]
if eff.add_effects.intersection(selected_facts[t+1]):
# print('Selecting Derived Predicate', dp_index, effect_index, propositions, eff.add_effects)
selected_facts[t + 1] -= eff.add_effects
for pred in propositions:
# print(' Selecting predicate', pred)
selected_facts[F_levels[pred]].add(pred)
for op_index, effect_index, propositions in A_sets[t]:
op = self._relaxed.operators[op_index]
eff = op.effects[effect_index]
if eff.add_effects.intersection(selected_facts[t+1]):
# print('Selecting Action', op_index, effect_index, op.raw_operator, propositions, eff.add_effects)
selected.add((op_index, effect_index)) # only add the operator or both? Maybe at each level, just add the operator? or add a flag?
selected_facts[t + 1] -= eff.add_effects
for pred in propositions:
# print(' Selecting predicate', pred)
selected_facts[F_levels[pred]].add(pred)
return len(selected)