Source code for concepts.dm.crow.controller

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : controller.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 03/15/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

from typing import Optional, Union, Sequence, Tuple, TYPE_CHECKING

from jacinle.utils.meta import repr_from_str
from jacinle.utils.printing import indent_text
from concepts.dsl.dsl_types import ObjectType, ValueType, Variable
from concepts.dsl.constraint import OptimisticValue
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.tensor_state import StateObjectReference
from concepts.dsl.expression import ObjectOrValueOutputExpression

if TYPE_CHECKING:
    from concepts.dm.crow.behavior import CrowBehaviorOrderingSuite

__all__ = ['CrowController', 'CrowControllerApplier', 'CrowControllerApplicationExpression']


[docs] class CrowController(object): """A controller is a class that defines a primitive action in the environment."""
[docs] def __init__(self, name: str, arguments: Sequence[Variable], effect_body: Optional['CrowBehaviorOrderingSuite'] = None): self.name = name self.arguments = tuple(arguments) self.effect_body = effect_body
name: str """The name of the controller.""" arguments: Tuple[Variable, ...] """The arguments of the controller.""" effect_body: Optional['CrowBehaviorOrderingSuite'] """The effect body of the controller.""" @property def argument_names(self) -> Tuple[str, ...]: return tuple(arg.name for arg in self.arguments) @property def argument_types(self) -> Tuple[Union[ObjectType, ValueType], ...]: return tuple(arg.dtype for arg in self.arguments)
[docs] def short_str(self): return f'{self.name}({", ".join(str(arg) for arg in self.arguments)})'
[docs] def long_str(self): fmt = f'controller {self.name}({", ".join(str(arg) for arg in self.arguments)})' if self.effect_body is not None: effect_string = '\n'.join(map(str, self.effect_body.statements)) fmt += ':\n' fmt += f' effects:\n{indent_text(effect_string, 2)}' return fmt
def __str__(self): return self.short_str() __repr__ = repr_from_str
[docs] class CrowControllerApplier(object):
[docs] def __init__(self, controller: CrowController, arguments: Sequence[Union[str, StateObjectReference, TensorValue, OptimisticValue]]): self.controller = controller self.arguments = tuple(arguments)
controller: CrowController """The controller to be applied.""" arguments: Tuple[Union[str, TensorValue], ...] """The arguments of the controller application.""" @property def name(self) -> str: return self.controller.name def __str__(self): return f'{self.controller.name}({", ".join(_argument_string(arg) for arg in self.arguments)})' __repr__ = repr_from_str
[docs] class CrowControllerApplicationExpression(object):
[docs] def __init__(self, controller: CrowController, arguments: Sequence[ObjectOrValueOutputExpression]): self.controller = controller self.arguments = tuple(arguments)
controller: CrowController """The controller to be applied.""" arguments: Tuple[ObjectOrValueOutputExpression, ...] """The arguments of the controller application.""" def __str__(self): return f'{self.controller.name}({", ".join(str(arg) for arg in self.arguments)})' __repr__ = repr_from_str
def _argument_string(arg: Union[str, StateObjectReference, TensorValue, OptimisticValue]) -> str: if isinstance(arg, bool): return str(arg) if isinstance(arg, str): return arg if isinstance(arg, StateObjectReference): return arg.name if isinstance(arg, TensorValue): if arg.is_single_elem: return str(arg.single_elem()) return str(arg) if isinstance(arg, OptimisticValue): return str(arg) raise TypeError(f'Unsupported argument type: {type(arg)}')