#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File : crow_regression_utils.py
# Author : Jiayuan Mao
# Email : maojiayuan@gmail.com
# Date : 03/21/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.
import itertools
from typing import Optional, Union, Iterator, Sequence, Tuple, List, Dict
from concepts.dsl.dsl_types import Variable, ObjectConstant, QINDEX
from concepts.dsl.expression import AssignExpression
from concepts.dsl.constraint import ConstraintSatisfactionProblem, OptimisticValue
from concepts.dsl.tensor_value import TensorValue
from concepts.dm.crow.behavior import CrowBehavior, CrowAssertExpression, CrowBindExpression
from concepts.dm.crow.behavior import CrowBehaviorOrderingSuite, CrowBehaviorForeachLoopSuite, CrowBehaviorConditionSuite, CrowFeatureAssignmentExpression
from concepts.dm.crow.controller import CrowControllerApplicationExpression
from concepts.dm.crow.crow_domain import CrowState
from concepts.dm.crow.executors.crow_executor import CrowExecutor
from concepts.dm.crow.planners.regression_planning import ScopedCrowExpression, CrowPlanningResult
[docs]
def canonize_bounded_variables(scopes, scope_id):
scope = scopes[scope_id].copy()
for var, value in scope.items():
if isinstance(value, Variable):
assert value.scope > -1
for i in range(100):
if isinstance(value, Variable):
value = scopes[value.scope][value.name]
else:
break
else:
raise RuntimeError('Too deep scope reference.')
scope[var] = value
return scope
[docs]
def split_simple_sequential(program: Sequence[Union[CrowBehavior, CrowBehaviorOrderingSuite]], scope_id: int) -> Tuple[
Sequence[Union[CrowBehavior, CrowBehaviorOrderingSuite]],
List[ScopedCrowExpression]
]:
simple_part = list()
for i in reversed(range(len(program))):
if isinstance(program[i], (CrowAssertExpression, CrowControllerApplicationExpression, CrowBindExpression)):
simple_part.append(ScopedCrowExpression(program[i], scope_id))
else:
break
complex_part = program[:len(program) - len(simple_part)]
return complex_part, list(reversed(simple_part))
[docs]
def execute_object_bind(executor: CrowExecutor, stmt: CrowBindExpression, state: CrowState, scope: dict) -> Iterator[Dict[str, Union[ObjectConstant, Variable]]]:
if stmt.goal.is_null_expression:
rv = TensorValue.TRUE
else:
eval_scope = scope.copy()
for var in stmt.variables:
eval_scope[var] = QINDEX
rv = executor.execute(stmt.goal, state=state, bounded_variables=eval_scope)
typeonly_indices_variables = list()
typeonly_indices_values = list()
for v in stmt.variables:
if v.name not in rv.batch_variables:
typeonly_indices_variables.append(v.name)
typeonly_indices_values.append(range(len(state.object_type2name[v.dtype.typename])))
for indices in rv.tensor.nonzero():
for typeonly_indices in itertools.product(*typeonly_indices_values):
new_scope_variables = scope.copy()
for var in stmt.variables:
if var.name in rv.batch_variables:
new_scope_variables[var.name] = ObjectConstant(state.object_type2name[var.dtype.typename][indices[rv.batch_variables.index(var.name)]], var.dtype)
else:
new_scope_variables[var.name] = ObjectConstant(state.object_type2name[var.dtype.typename][typeonly_indices[typeonly_indices_variables.index(var.name)]], var.dtype)
yield new_scope_variables
[docs]
def execute_behavior_effect(executor: CrowExecutor, behavior: CrowBehavior, state: CrowState, scope: dict, csp: Optional[ConstraintSatisfactionProblem] = None, action_index: Optional[int] = None) -> CrowState:
new_state = state.clone()
for effect in behavior.effects:
with executor.update_effect_mode(effect.evaluation_mode, action_index=action_index):
executor.execute(effect.assign_expr, state=new_state, csp=csp, bounded_variables=scope)
return new_state
[docs]
def execute_behavior_effect_batch(executor: CrowExecutor, results: Sequence[CrowPlanningResult], behavior: CrowBehavior, scope_id: int, csp: Optional[ConstraintSatisfactionProblem] = None) -> Sequence[CrowPlanningResult]:
# print('!!!Apply behavior effect:', action.short_str(), 'with scope:', results[0].scopes[scope_id])
new_results = list()
for result in results:
state = execute_behavior_effect(executor, behavior, result.state, canonize_bounded_variables(result.scopes, scope_id), csp=csp)
new_results.append(CrowPlanningResult(state, result.csp, result.controller_actions, result.scopes))
return new_results
[docs]
def unique_results(results: Sequence[CrowPlanningResult]) -> Tuple[CrowPlanningResult, ...]:
def keyfunc(x):
return tuple(map(str, x.controller_actions))
results: Dict[Tuple[str, ...], CrowPlanningResult] = {keyfunc(x): x for x in results}
return tuple(results.values())