Source code for concepts.dm.crow.planners.regression_planning_impl.crow_regression_utils

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : crow_regression_utils.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 03/21/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

import itertools
from typing import Optional, Union, Iterator, Sequence, Tuple, List, Dict

from concepts.dsl.dsl_types import Variable, ObjectConstant, QINDEX
from concepts.dsl.expression import AssignExpression
from concepts.dsl.constraint import ConstraintSatisfactionProblem, OptimisticValue
from concepts.dsl.tensor_value import TensorValue

from concepts.dm.crow.behavior import CrowBehavior, CrowAssertExpression, CrowBindExpression
from concepts.dm.crow.behavior import CrowBehaviorOrderingSuite, CrowBehaviorForeachLoopSuite, CrowBehaviorConditionSuite, CrowFeatureAssignmentExpression
from concepts.dm.crow.controller import CrowControllerApplicationExpression
from concepts.dm.crow.crow_domain import CrowState
from concepts.dm.crow.executors.crow_executor import CrowExecutor
from concepts.dm.crow.planners.regression_planning import ScopedCrowExpression, CrowPlanningResult


[docs] def canonize_bounded_variables(scopes, scope_id): scope = scopes[scope_id].copy() for var, value in scope.items(): if isinstance(value, Variable): assert value.scope > -1 for i in range(100): if isinstance(value, Variable): value = scopes[value.scope][value.name] else: break else: raise RuntimeError('Too deep scope reference.') scope[var] = value return scope
[docs] def split_simple_sequential(program: Sequence[Union[CrowBehavior, CrowBehaviorOrderingSuite]], scope_id: int) -> Tuple[ Sequence[Union[CrowBehavior, CrowBehaviorOrderingSuite]], List[ScopedCrowExpression] ]: simple_part = list() for i in reversed(range(len(program))): if isinstance(program[i], (CrowAssertExpression, CrowControllerApplicationExpression, CrowBindExpression)): simple_part.append(ScopedCrowExpression(program[i], scope_id)) else: break complex_part = program[:len(program) - len(simple_part)] return complex_part, list(reversed(simple_part))
[docs] def execute_object_bind(executor: CrowExecutor, stmt: CrowBindExpression, state: CrowState, scope: dict) -> Iterator[Dict[str, Union[ObjectConstant, Variable]]]: if stmt.goal.is_null_expression: rv = TensorValue.TRUE else: eval_scope = scope.copy() for var in stmt.variables: eval_scope[var] = QINDEX rv = executor.execute(stmt.goal, state=state, bounded_variables=eval_scope) typeonly_indices_variables = list() typeonly_indices_values = list() for v in stmt.variables: if v.name not in rv.batch_variables: typeonly_indices_variables.append(v.name) typeonly_indices_values.append(range(len(state.object_type2name[v.dtype.typename]))) for indices in rv.tensor.nonzero(): for typeonly_indices in itertools.product(*typeonly_indices_values): new_scope_variables = scope.copy() for var in stmt.variables: if var.name in rv.batch_variables: new_scope_variables[var.name] = ObjectConstant(state.object_type2name[var.dtype.typename][indices[rv.batch_variables.index(var.name)]], var.dtype) else: new_scope_variables[var.name] = ObjectConstant(state.object_type2name[var.dtype.typename][typeonly_indices[typeonly_indices_variables.index(var.name)]], var.dtype) yield new_scope_variables
[docs] def execute_behavior_effect(executor: CrowExecutor, behavior: CrowBehavior, state: CrowState, scope: dict, csp: Optional[ConstraintSatisfactionProblem] = None, action_index: Optional[int] = None) -> CrowState: new_state = state.clone() for effect in behavior.effects: with executor.update_effect_mode(effect.evaluation_mode, action_index=action_index): executor.execute(effect.assign_expr, state=new_state, csp=csp, bounded_variables=scope) return new_state
[docs] def execute_behavior_effect_batch(executor: CrowExecutor, results: Sequence[CrowPlanningResult], behavior: CrowBehavior, scope_id: int, csp: Optional[ConstraintSatisfactionProblem] = None) -> Sequence[CrowPlanningResult]: # print('!!!Apply behavior effect:', action.short_str(), 'with scope:', results[0].scopes[scope_id]) new_results = list() for result in results: state = execute_behavior_effect(executor, behavior, result.state, canonize_bounded_variables(result.scopes, scope_id), csp=csp) new_results.append(CrowPlanningResult(state, result.csp, result.controller_actions, result.scopes)) return new_results
[docs] def unique_results(results: Sequence[CrowPlanningResult]) -> Tuple[CrowPlanningResult, ...]: def keyfunc(x): return tuple(map(str, x.controller_actions)) results: Dict[Tuple[str, ...], CrowPlanningResult] = {keyfunc(x): x for x in results} return tuple(results.values())