Source code for concepts.dm.crow.controller

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : controller.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 03/15/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

from typing import Optional, Union, Sequence, Tuple, Dict, TYPE_CHECKING

from jacinle.utils.meta import repr_from_str
from jacinle.utils.printing import indent_text
from concepts.dsl.dsl_types import ObjectType, ValueType, Variable
from concepts.dsl.constraint import OptimisticValue
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.tensor_state import StateObjectReference, StateObjectList
from concepts.dsl.expression import ObjectOrValueOutputExpression, ValueOutputExpression

if TYPE_CHECKING:
    from concepts.dm.crow.behavior import CrowBehaviorOrderingSuite

__all__ = ['CrowController', 'CrowControllerApplier', 'CrowControllerApplicationExpression']


[docs] class CrowController(object): """A controller is a class that defines a primitive action in the environment."""
[docs] def __init__(self, name: str, arguments: Sequence[Variable], effect_body: Optional['CrowBehaviorOrderingSuite'] = None, python_effect: bool = False): self.name = name self.arguments = tuple(arguments) self.effect_body = effect_body self.python_effect = python_effect
name: str """The name of the controller.""" arguments: Tuple[Variable, ...] """The arguments of the controller.""" effect_body: Optional['CrowBehaviorOrderingSuite'] """The effect body of the controller.""" python_effect: bool """Whether the effect body is implemented in Python.""" @property def argument_names(self) -> Tuple[str, ...]: return tuple(arg.name for arg in self.arguments) @property def argument_types(self) -> Tuple[Union[ObjectType, ValueType], ...]: return tuple(arg.dtype for arg in self.arguments)
[docs] def short_str(self): return f'{self.name}({", ".join(str(arg) for arg in self.arguments)})'
[docs] def long_str(self): flag_string = '' if self.python_effect: flag_string = '[[python_effect]]' fmt = f'controller {flag_string}{self.name}({", ".join(str(arg) for arg in self.arguments)})' if self.effect_body is not None: effect_string = '\n'.join(map(str, self.effect_body.statements)) fmt += ':\n' fmt += f' effects:\n{indent_text(effect_string, 2)}' return fmt
def __str__(self): return self.short_str() __repr__ = repr_from_str
[docs] class CrowControllerApplier(object):
[docs] def __init__( self, controller: CrowController, arguments: Sequence[Union[str, StateObjectReference, StateObjectList, TensorValue, OptimisticValue]], global_constraints: Optional[Dict[int, Sequence[ValueOutputExpression]]] = None, local_constraints: Optional[Sequence[ValueOutputExpression]] = None ): self.controller = controller self.arguments = tuple(arguments) self.global_constraints = global_constraints self.local_constraints = local_constraints
controller: CrowController """The controller to be applied.""" arguments: Tuple[Union[str, TensorValue], ...] """The arguments of the controller application.""" global_constraints: Optional[Dict[int, Tuple[Tuple[ValueOutputExpression, ...], dict]]] """The global constraints of the controller application.""" local_constraints: Optional[Tuple[Tuple[ValueOutputExpression, ...], dict]] """The local constraints of the controller application."""
[docs] def set_constraints(self, global_constraints: Dict[int, Sequence[ValueOutputExpression]], global_scopes: Dict[int, dict], scope_id: int): # TODO(Jiayuan Mao @ 2025/01/14): set the scopes associated with these constraints... self.global_constraints = {k: (tuple(v), global_scopes[k]) for k, v in global_constraints.items()} self.local_constraints = self.global_constraints.get(scope_id, None) return self
@property def name(self) -> str: return self.controller.name
[docs] def long_str(self) -> str: from concepts.dm.crow.behavior_utils import format_behavior_statement fmt = f'{self.controller.name}({", ".join(_argument_string(arg) for arg in self.arguments)})' if self.global_constraints is not None: global_constraints_str = indent_text('\n'.join(f'{k}: {{{", ".join(str(format_behavior_statement(c, scope=scope)) for c in constraints)}}}' for k, (constraints, scope) in self.global_constraints.items()), 2) fmt += f'\n with global constraints:\n{global_constraints_str}' if self.local_constraints is not None: local_constraints_str = '{' + ', '.join(str(format_behavior_statement(c, scope=self.local_constraints[1])) for c in self.local_constraints[0]) + '}' fmt += f'\n with local constraints: {local_constraints_str}' return fmt
[docs] def short_str(self) -> str: return f'{self.controller.name}({", ".join(_argument_string(arg) for arg in self.arguments)})'
def __str__(self): return self.short_str() __repr__ = repr_from_str
[docs] class CrowControllerApplicationExpression(object):
[docs] def __init__(self, controller: CrowController, arguments: Sequence[ObjectOrValueOutputExpression]): self.controller = controller self.arguments = tuple(arguments)
controller: CrowController """The controller to be applied.""" arguments: Tuple[ObjectOrValueOutputExpression, ...] """The arguments of the controller application.""" def __str__(self): return f'{self.controller.name}({", ".join(str(arg) for arg in self.arguments)})' __repr__ = repr_from_str
def _argument_string(arg: Union[str, StateObjectReference, StateObjectList, TensorValue, OptimisticValue]) -> str: if isinstance(arg, bool): return str(arg) if isinstance(arg, str): return arg if isinstance(arg, StateObjectReference): return arg.name if isinstance(arg, StateObjectList): return f'[{", ".join([x.name for x in arg.values])}]' if isinstance(arg, TensorValue): if arg.is_single_elem: return str(arg.single_elem()) return str(arg) if isinstance(arg, OptimisticValue): return str(arg) raise TypeError(f'Unsupported argument type: {type(arg)}')