#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : atomic_strips_domain.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 04/27/2023
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
from typing import Optional, Union, Sequence, Tuple, Mapping, Dict
import jacinle
from jacinle.utils.cache import cached_property
from concepts.dsl.dsl_types import ObjectType, Variable, ObjectConstant
from concepts.dsl.expression import VariableExpression, FunctionApplicationExpression, BoolOpType, BoolExpression, AssignExpression, ConstantExpression, is_and_expr
from concepts.dm.pdsketch.predicate import Predicate
from concepts.dm.pdsketch.operator import Operator, OperatorApplicationExpression
from concepts.dm.pdsketch.regression_rule import AchieveExpression, RegressionRule
from concepts.dm.pdsketch.domain import Domain, Problem
from concepts.dm.pdsketch.parsers.pdsketch_parser import load_domain_file, load_domain_string, load_problem_file, load_problem_string
from concepts.dm.pdsketch.strips.strips_expression import SBoolPredicateApplicationExpression, SProposition, SState, SStateCompatible, make_sproposition_from_function_application
# TODO(Jiayuan Mao @ 2023/04/27): implement ADL operator.
[docs]
class AtomicStripsOperator(object):
[docs]
def __init__(
self,
arguments: Sequence[Variable],
preconditions: Sequence[SBoolPredicateApplicationExpression],
add_effects: Sequence[SBoolPredicateApplicationExpression],
del_effects: Sequence[SBoolPredicateApplicationExpression],
raw_operator: Operator
):
self.name = raw_operator.name
self.arguments = tuple(arguments)
self.preconditions = tuple(preconditions)
self.add_effects = tuple(add_effects)
self.del_effects = tuple(del_effects)
self.raw_operator = raw_operator
arguments: Tuple[Variable]
"""The arguments of the operator, as a tuple of :class:`~concepts.dsl.dsl_types.Variable`."""
preconditions: Tuple[SBoolPredicateApplicationExpression, ...]
"""The precondition of the operator, as a tuple of :class:`~concepts.dm.pdsketch.strips.strips_expression.SBoolPredicateApplicationExpression`."""
add_effects: Tuple[SBoolPredicateApplicationExpression, ...]
"""The add effects of the operator, as a tuple of :class:`~concepts.dm.pdsketch.strips.strips_expression.SBoolPredicateApplicationExpression`."""
del_effects: Tuple[SBoolPredicateApplicationExpression, ...]
"""The delete effects of the operator, as a tuple of :class:`~concepts.dm.pdsketch.strips.strips_expression.SBoolPredicateApplicationExpression`."""
raw_operator: Operator
"""The raw operator that this strips operator is derived from."""
[docs]
def relax(self) -> 'AtomicStripsOperator':
return type(self)(
arguments=self.arguments,
preconditions=self.preconditions,
add_effects=self.add_effects,
del_effects=[],
raw_operator=self.raw_operator
)
[docs]
@classmethod
def from_operator(cls, operator: Operator) -> 'AtomicStripsOperator':
"""Create a strips operator from a PDSketch operator.
Args:
operator: the PDSketch operator.
Returns:
the strips operator.
"""
arguments = operator.arguments
preconditions = list()
for precondition in operator.preconditions:
precondition = precondition.bool_expr
preconditions.append(SBoolPredicateApplicationExpression.from_function_application_expression(precondition))
add_effects = list()
del_effects = list()
for effect in operator.effects:
effect = effect.assign_expr
assert isinstance(effect, AssignExpression), f'Not supported assign expression: {effect}.'
sassign = SBoolPredicateApplicationExpression.from_function_application_expression(effect.predicate)
assert isinstance(effect.value, ConstantExpression), f'Not supported assign expression: {effect}.'
value = effect.value.constant.item()
assert isinstance(value, int)
if value > 0.5:
add_effects.append(sassign)
else:
del_effects.append(sassign)
return cls(arguments, preconditions, add_effects, del_effects, operator)
def __str__(self) -> str:
return f'{self.name}({", ".join(arg.name for arg in self.arguments)})'
def __repr__(self) -> str:
return f'StripsOperator({self.name}, arguments={self.arguments}, preconditions={self.preconditions}, {self.add_effects}, {self.del_effects})'
[docs]
def to_applier_str(self, bound_arguments: Dict[str, str]):
return self.name + '(' + ', '.join(f'{arg.name}={bound_arguments[arg.name]}' for arg in self.arguments) + ')'
[docs]
def to_applier_pddl_str(self, bound_arguments: Dict[str, str]):
return '(' + self.name + ' ' + ' '.join(bound_arguments[arg.name] for arg in self.arguments) + ')'
[docs]
def ground(self, variable_dict: Dict[str, str], state: Optional[SStateCompatible] = None):
# TODO(Jiayuan Mao @ 2023/04/27): include the state in the grounding.
return AtomicStripsOperatorApplier(self, variable_dict)
[docs]
def __call__(self, *args: str):
assert len(args) == len(self.arguments), f'Expected {len(self.arguments)} arguments, got {len(args)}.'
bound_arguments = {arg.name: argv for arg, argv in zip(self.arguments, args)}
return self.ground(bound_arguments)
[docs]
class AtomicStripsOperatorApplier(object):
[docs]
def __init__(self, operator: AtomicStripsOperator, bound_arguments: Dict[str, str]):
self.operator = operator
self.arguments = tuple(bound_arguments[arg.name] for arg in operator.arguments)
self.bound_arguments = bound_arguments
self.preconditions = tuple(precondition.ground(bound_arguments, return_proposition=True) for precondition in operator.preconditions)
self.add_effects = tuple(add_effect.ground(bound_arguments, return_proposition=True) for add_effect in operator.add_effects)
self.del_effects = tuple(del_effect.ground(bound_arguments, return_proposition=True) for del_effect in operator.del_effects)
operator: AtomicStripsOperator
"""The operator that this applier is derived from."""
arguments: Tuple[str, ...]
"""The arguments of the operator, as a tuple of strings."""
bound_arguments: Dict[str, str]
"""The bound arguments of the operator."""
preconditions: Tuple[SProposition, ...]
"""The preconditions of the operator, as a tuple of :class:`~concepts.dm.pdsketch.strips.strips_expression.SProposition`."""
add_effects: Tuple[SProposition, ...]
"""The add effects of the operator, as a tuple of :class:`~concepts.dm.pdsketch.strips.strips_expression.SProposition`."""
del_effects: Tuple[SProposition, ...]
"""The delete effects of the operator, as a tuple of :class:`~concepts.dm.pdsketch.strips.strips_expression.SProposition`."""
def __str__(self) -> str:
return f'{self.operator.name}({", ".join(f"{arg.name}={self.bound_arguments[arg.name]}" for arg in self.operator.arguments)})'
__repr__ = jacinle.repr_from_str
[docs]
class AtomicStripsOperatorApplicationExpression(object):
[docs]
def __init__(self, operator: AtomicStripsOperator, arguments: Sequence[Variable]):
self.operator = operator
self.arguments = tuple(arguments)
operator: AtomicStripsOperator
"""The operator to be applied."""
arguments: Tuple[Variable, ...]
"""The arguments of the operator."""
def __str__(self) -> str:
return f'{self.operator.name}({", ".join(arg.name if isinstance(arg, Variable) else arg for arg in self.arguments)})'
__repr__ = jacinle.repr_from_str
[docs]
@classmethod
def from_operator_application_expression(cls, expression: OperatorApplicationExpression, atomic_strips_operators: Mapping[str, AtomicStripsOperator]) -> 'AtomicStripsOperatorApplicationExpression':
"""Create a strips operator application expression from a PDSketch operator application expression.
Args:
expression: the PDSketch operator application expression.
Returns:
the strips operator application expression.
"""
operator = atomic_strips_operators[expression.operator.name]
arguments = list()
for arg in expression.arguments:
assert isinstance(arg, VariableExpression), f'Not supported argument expression: {arg}.'
arguments.append(arg.variable)
return cls(operator, arguments)
[docs]
def ground(self, variable_dict: Dict[str, str], state: Optional[SStateCompatible] = None):
return AtomicStripsOperatorApplier(self.operator, {arg.name: variable_dict[arg.name] for arg in self.arguments})
[docs]
def __call__(self, *args):
assert len(args) == len(self.arguments), f'Expected {len(self.arguments)} arguments, got {len(args)}.'
bound_arguments = {arg.name: argv for arg, argv in zip(self.arguments, args)}
return self.ground(bound_arguments)
[docs]
class AtomicStripsAchieveExpression(object):
[docs]
def __init__(self, goal: SBoolPredicateApplicationExpression, maintains: Sequence[SBoolPredicateApplicationExpression]):
self.goal = goal
self.maintains = tuple(maintains)
goal: SBoolPredicateApplicationExpression
"""The goal of the achieve expression."""
maintains: Tuple[SBoolPredicateApplicationExpression, ...]
"""A list of expression to be maintained."""
def __str__(self) -> str:
return f'achieve({self.goal}, maintains={{{self.maintains}}})'
__repr__ = jacinle.repr_from_str
[docs]
@classmethod
def from_achieve_expression(cls, expression: AchieveExpression) -> 'AtomicStripsAchieveExpression':
"""Create a strips achieve expression from a PDSketch achieve expression.
Args:
expression: the PDSketch achieve expression.
Returns:
the strips achieve expression.
"""
goal = SBoolPredicateApplicationExpression.from_function_application_expression(expression.goal)
maintains = tuple(SBoolPredicateApplicationExpression.from_function_application_expression(maintain) for maintain in expression.maintains)
return cls(goal, maintains)
[docs]
def ground(self, variable_dict: Dict[str, str], state: Optional[SStateCompatible] = None):
return AtomicStripsGroundedAchieveExpression(
self.goal.ground(variable_dict, return_proposition=True),
tuple(maintain.ground(variable_dict, return_proposition=True) for maintain in self.maintains)
)
[docs]
def __call__(self, *args):
assert len(args) == len(self.arguments), f'Expected {len(self.arguments)} arguments, got {len(args)}.'
bound_arguments = {arg.name: argv for arg, argv in zip(self.arguments, args)}
return self.ground(bound_arguments)
[docs]
class AtomicStripsGroundedAchieveExpression(object):
[docs]
def __init__(self, goal: SProposition, maintains: Sequence[SProposition]):
self.goal = goal
self.maintains = tuple(maintains)
goal: SProposition
"""The goal of the achieve expression."""
maintains: Tuple[SProposition, ...]
"""A list of expression to be maintained."""
def __str__(self) -> str:
return f'achieve({self.goal}, maintains={{{", ".join(map(str, self.maintains))}}})'
__repr__ = jacinle.repr_from_str
[docs]
class AtomicStripsRegressionRule(object):
[docs]
def __init__(
self,
arguments: Sequence[Variable],
preconditions: Sequence[SBoolPredicateApplicationExpression],
goal: SBoolPredicateApplicationExpression,
body: Sequence[Union[AtomicStripsOperatorApplicationExpression, AtomicStripsAchieveExpression]],
raw_regression_rule: RegressionRule,
):
self.name = raw_regression_rule.name
self.arguments = tuple(arguments)
self.preconditions = tuple(preconditions)
self.goal = goal
self.body = tuple(body)
self.raw_regression_rule = raw_regression_rule
arguments: Tuple[Variable, ...]
"""The arguments of the regression rule."""
preconditions: Tuple[SBoolPredicateApplicationExpression, ...]
"""The preconditions of the regression rule."""
goal: SBoolPredicateApplicationExpression
"""The goal of the regression rule."""
body: Tuple[Union[AtomicStripsOperatorApplicationExpression, AtomicStripsAchieveExpression], ...]
"""The body of the regression rule."""
raw_regression_rule: RegressionRule
"""The raw regression rule."""
def __str__(self) -> str:
return f'{self.goal} <- {", ".join(str(item) for item in self.body)}'
__repr__ = jacinle.repr_from_str
[docs]
@classmethod
def from_regression_rule(cls, regression_rule: RegressionRule, atomic_strips_operators: Mapping[str, AtomicStripsOperator]):
arguments = regression_rule.arguments
preconditions = tuple(SBoolPredicateApplicationExpression.from_function_application_expression(precondition.bool_expr) for precondition in regression_rule.preconditions)
goal_expression = regression_rule.goal_expression
assert isinstance(goal_expression, BoolExpression) and goal_expression.bool_op is BoolOpType.AND
sassign = SBoolPredicateApplicationExpression.from_function_application_expression(goal_expression.arguments[0])
body = []
for body_expression in regression_rule.body:
if isinstance(body_expression, OperatorApplicationExpression):
body.append(AtomicStripsOperatorApplicationExpression.from_operator_application_expression(body_expression, atomic_strips_operators))
elif isinstance(body_expression, AchieveExpression):
body.append(AtomicStripsAchieveExpression.from_achieve_expression(body_expression))
else:
raise NotImplementedError(f'Not supported body expression: {body_expression}.')
return cls(arguments, preconditions, sassign, body, regression_rule)
[docs]
def ground(self, variable_dict: Dict[str, str], state: Optional[SStateCompatible] = None):
# TODO(Jiayuan Mao @ 2023/07/12): include the state in the grounding.
return AtomicStripsRegressionRuleApplier(self, variable_dict)
[docs]
def __call__(self, *args: str):
assert len(args) == len(self.arguments), f'Expected {len(self.arguments)} arguments, got {len(args)}.'
bound_arguments = {arg.name: argv for arg, argv in zip(self.arguments, args)}
return self.ground(bound_arguments)
[docs]
class AtomicStripsRegressionRuleApplier(object):
[docs]
def __init__(self, regression_rule: AtomicStripsRegressionRule, bound_arguments: Dict[str, str]):
self.regression_rule = regression_rule
self.arguments = tuple(bound_arguments[arg.name] for arg in regression_rule.arguments)
self.bound_arguments = bound_arguments
self.preconditions = tuple(precondition.ground(bound_arguments, return_proposition=True) for precondition in regression_rule.preconditions)
self.goal = regression_rule.goal.ground(bound_arguments, return_proposition=True)
self.body = tuple(item.ground(bound_arguments) for item in regression_rule.body)
regression_rule: AtomicStripsRegressionRule
"""The regression rule."""
arguments: Tuple[str, ...]
"""The arguments of the operator, as a tuple of strings."""
bound_arguments: Dict[str, str]
"""The bound arguments of the operator."""
def __str__(self) -> str:
return f'{self.regression_rule.name}({", ".join(f"{arg}={argv}" for arg, argv in self.bound_arguments.items())})'
__repr__ = jacinle.repr_from_str
[docs]
class AtomicStripsDomain(object):
"""The domain of the atomic STRIPS planning problem."""
[docs]
def __init__(self, types: Dict[str, ObjectType], predicates: Dict[str, Predicate], operators: Dict[str, AtomicStripsOperator], regression_rules: Dict[str, AtomicStripsRegressionRule], constants: Dict[str, ObjectConstant]):
self.types = types
self.predicates = predicates
self.operators = operators
self.regression_rules = regression_rules
self.constants = constants
types: Dict[str, ObjectType]
"""The types of the domain, as a dictionary from type name to :class:`~concepts.dm.pdsketch.strips.strips_expression.ObjectType`."""
predicates: Dict[str, Predicate]
"""The predicates of the domain, as a dictionary from predicate name to :class:`~concepts.dm.pdsketch.predicate.Predicate`."""
operators: Dict[str, AtomicStripsOperator]
"""The operators of the domain, as a dictionary from operator name to :class:`~concepts.dm.pdsketch.strips.atomic_strips_domain.AtomicStripsOperator`."""
regression_rules: Dict[str, AtomicStripsRegressionRule]
"""The regression rules of the domain, as a dictionary from regression rule name to :class:`~concepts.dm.pdsketch.strips.atomic_strips_domain.AtomicStripsRegressionRule`."""
constants: Dict[str, ObjectConstant]
"""The constants of the domain, as a dictionary from constant name to :class:`~concepts.dsl.dsl_types.ObjectConstant`."""
[docs]
@classmethod
def from_domain(cls, domain: Domain) -> 'AtomicStripsDomain':
operators = dict()
for operator in domain.operators.values():
if isinstance(operator, Operator):
if operator.is_macro:
continue
# TODO(Jiayuan Mao @ 2023/03/19): support macro operator here.
strips_operator = AtomicStripsOperator.from_operator(operator)
operators[operator.name] = strips_operator
regression_rules = dict()
for regression_rule in domain.regression_rules.values():
strips_regression_rule = AtomicStripsRegressionRule.from_regression_rule(regression_rule, operators)
regression_rules[regression_rule.name] = strips_regression_rule
return cls(
types=domain.types.copy(),
predicates=domain.functions.copy(),
operators=operators,
regression_rules=regression_rules,
constants=domain.constants.copy(),
)
[docs]
class AtomicStripsProblem(object):
[docs]
def __init__(self, domain: AtomicStripsDomain, objects: Dict[str, str], initial_state: SStateCompatible, conjunctive_goal: Sequence[SProposition]):
self.domain = domain
self.objects = objects
self.initial_state = SState(initial_state)
self.conjunctive_goal = tuple(conjunctive_goal)
domain: AtomicStripsDomain
"""The domain of the problem, as a :class:`~concepts.dm.pdsketch.strips.strips_domain.AtomicStripsDomain`."""
objects: Dict[str, str]
"""The objects of the problem, as a dictionary from object name to object type."""
initial_state: SState
"""The initial state of the problem, as a :class:`~concepts.dm.pdsketch.strips.strips_expression.SState`."""
conjunctive_goal: Tuple[SProposition, ...]
"""The conjunctive goal of the problem, as a tuple of :class:`~concepts.dm.pdsketch.strips.strips_expression.SProposition`."""
@cached_property
def objects_type2names(self) -> Dict[str, Tuple[str, ...]]:
objects_type2names = dict()
for name, type_ in self.objects.items():
objects_type2names.setdefault(type_, []).append(name)
return {type_: tuple(names) for type_, names in objects_type2names.items()}
[docs]
@classmethod
def from_domain_and_problem(cls, domain: AtomicStripsDomain, problem: Problem) -> 'AtomicStripsProblem':
objects = problem.objects.copy()
for _, typename in objects.items():
if typename not in domain.types:
raise ValueError(f'Unknown type {typename}.')
initial_state = list()
for proposition in problem.predicates:
if proposition.function.name not in domain.predicates:
raise ValueError(f'Unknown predicate {proposition.function.name}.')
initial_state.append(make_sproposition_from_function_application(proposition, objects))
conjunctive_goal = list()
if not is_and_expr(problem.goal):
raise ValueError(f'Expected conjunctive goal, got {problem.goal}.')
for proposition in problem.goal.arguments:
if not isinstance(proposition, FunctionApplicationExpression):
raise ValueError(f'Expected proposition, got {proposition}.')
if proposition.function.name not in domain.predicates:
raise ValueError(f'Unknown predicate {proposition.function.name}.')
conjunctive_goal.append(make_sproposition_from_function_application(proposition, objects))
return cls(
domain,
objects=objects,
initial_state=initial_state,
conjunctive_goal=conjunctive_goal,
)
[docs]
def load_astrips_domain_file(filename: str, return_raw_domain: bool = False) -> Union[AtomicStripsDomain, Tuple[Domain, AtomicStripsDomain]]:
"""Load a strips domain from a domain file.
Args:
filename: the domain file.
return_raw_domain: whether to return the raw domain.
Returns:
the strips domain, or a tuple of the raw domain and the strips domain.
"""
domain = load_domain_file(filename)
strips_domain = AtomicStripsDomain.from_domain(domain)
if return_raw_domain:
return domain, strips_domain
return strips_domain
[docs]
def load_astrips_domain_string(domain_string: str, return_raw_domain: bool = False) -> Union[AtomicStripsDomain, Tuple[Domain, AtomicStripsDomain]]:
"""Load a strips domain from a domain string.
Args:
domain_string: the domain string.
return_raw_domain: whether to return the raw domain.
Returns:
the strips domain, or a tuple of the raw domain and the strips domain.
"""
domain = load_domain_string(domain_string)
strips_domain = AtomicStripsDomain.from_domain(domain)
if return_raw_domain:
return domain, strips_domain
return strips_domain
[docs]
def load_astrips_problem_file(domain: Domain, astrips_domain: AtomicStripsDomain, filename: str) -> AtomicStripsProblem:
"""Load a strips problem from a problem file.
Args:
domain: the domain of the problem.
filename: the problem file.
Returns:
the strips problem.
"""
problem = load_problem_file(filename, domain=domain, return_tensor_state=False)
return AtomicStripsProblem.from_domain_and_problem(astrips_domain, problem)
[docs]
def load_astrips_problem_string(domain: Domain, astrips_domain: AtomicStripsDomain, problem_string: str) -> AtomicStripsProblem:
"""Load a strips problem from a problem string.
Args:
domain: the domain of the problem.
problem_string: the problem string.
Returns:
the strips problem.
"""
problem = load_problem_string(problem_string, domain=domain, return_tensor_state=False)
return AtomicStripsProblem.from_domain_and_problem(astrips_domain, problem)