Source code for concepts.dm.crow.parsers.cdl_symbolic_execution

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : cdl_symbolic_execution.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 08/15/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

from dataclasses import dataclass, field
from typing import Any, Optional, Union, Tuple, List, Dict

import jacinle
from jacinle.utils.enum import JacEnum

import concepts.dsl.expression as E
from concepts.dm.crow.behavior import (
    CrowAchieveExpression, CrowAssertExpression, CrowBehaviorApplicationExpression, CrowBindExpression,
    CrowFeatureAssignmentExpression, CrowRuntimeAssignmentExpression, CrowUntrackExpression
)
from concepts.dm.crow.behavior import CrowBehaviorBodyItem, CrowBehaviorBodyPrimitiveBase, CrowBehaviorBodySuiteBase, CrowBehaviorCommit
from concepts.dm.crow.behavior import CrowBehaviorConditionSuite, CrowBehaviorForeachLoopSuite, CrowBehaviorOrderingSuite, CrowBehaviorWhileLoopSuite
from concepts.dm.crow.controller import CrowControllerApplicationExpression
from concepts.dm.crow.crow_expression_utils import crow_replace_expression_variables
from concepts.dm.crow.crow_generator import CrowGeneratorApplicationExpression
from concepts.dsl.dsl_types import AutoType, UnnamedPlaceholder, Variable


[docs] class SymbolicExecutionMode(JacEnum): """The execution mode of the symbolic execution engine. Different modes have different supported instructions. - FUNCTIONAL: the functional mode, which supports only assignments, function calls, and return statements. - DERIVED: the derived mode, which supports only function calls and if-else conditions. - BEHAVIOR: the behavior mode corresponds to the "body" of a behavior rule, which supports everything except for return statements and feature assignments. - EFFECT: the effect mode corresponds to the "effect" of a behavior rule, which supports only variable / feature assignments and function calls. """ FUNCTIONAL = 'functional' DERIVED = 'derived' BEHAVIOR = 'behavior' EFFECT = 'effect' HEURISTIC = 'heuristic' @property def support_achieve_statements(self) -> bool: return self in (SymbolicExecutionMode.BEHAVIOR, SymbolicExecutionMode.HEURISTIC) @property def support_misc_behavior_body_statements(self): return self in (SymbolicExecutionMode.BEHAVIOR, ) @property def support_ordering_statements(self) -> bool: return self in (SymbolicExecutionMode.BEHAVIOR, ) @property def support_assign_statements(self) -> bool: return self in (SymbolicExecutionMode.EFFECT, SymbolicExecutionMode.HEURISTIC) @property def support_if_statements(self) -> bool: return self in (SymbolicExecutionMode.FUNCTIONAL, SymbolicExecutionMode.DERIVED, SymbolicExecutionMode.BEHAVIOR, SymbolicExecutionMode.EFFECT, SymbolicExecutionMode.HEURISTIC) @property def support_foreach_statements(self) -> bool: return self in (SymbolicExecutionMode.BEHAVIOR, SymbolicExecutionMode.EFFECT, SymbolicExecutionMode.HEURISTIC) @property def support_while_statements(self) -> bool: return self in (SymbolicExecutionMode.BEHAVIOR, SymbolicExecutionMode.HEURISTIC) @property def support_expr_statements(self) -> bool: return self in (SymbolicExecutionMode.DERIVED, ) @property def support_return_statements(self) -> bool: return self in (SymbolicExecutionMode.FUNCTIONAL, SymbolicExecutionMode.DERIVED)
[docs] @dataclass class ArgumentsList(object): """A list of argument values. They can be variables, function calls, or other expressions.""" arguments: Tuple[Union['Suite', E.ValueOutputExpression, E.ListExpansionExpression, E.VariableExpression, bool, int, float, complex, str], ...]
[docs] @dataclass class FunctionCall(object): """A function call. This is used as the intermediate representation of the parsed expressions. Note that this includes not only function calls but also primitive operators and control flow statements. """ name: str args: ArgumentsList annotations: Optional[Dict[str, Any]] = None def __str__(self): annotation_str = '' if self.annotations is not None: annotation_str = f'[[' + ', '.join(f'{k}={v}' for k, v in self.annotations.items()) + ']] ' arg_strings = [str(arg) for arg in self.args.arguments] if sum(len(arg) for arg in arg_strings) > 80: arg_strings = [jacinle.indent_text(arg) for arg in arg_strings] return f'{annotation_str}{self.name}:\n' + '\n'.join(arg_strings) return f'{annotation_str}{self.name}(' + ', '.join(arg_strings) + ')' def __repr__(self): return f'FunctionCall{{{str(self)}}}'
[docs] @dataclass class Suite(object): """A suite of statements. This is used as the intermediate representation of the parsed expressions.""" items: Tuple[Any, ...] local_variables: Dict[str, Any] = field(default_factory=dict) tracker: Optional['FunctionCallSymbolicExecutor2'] = None def _init_tracker(self, mode: SymbolicExecutionMode): self.tracker = FunctionCallSymbolicExecutor2(self, dict(), mode).run()
[docs] def get_effect_statements(self) -> List[Union[CrowFeatureAssignmentExpression, CrowBehaviorForeachLoopSuite, CrowBehaviorConditionSuite]]: self._init_tracker(SymbolicExecutionMode.EFFECT) return self.tracker.statements
[docs] def get_behavior_body_statements(self) -> List[Any]: self._init_tracker(SymbolicExecutionMode.BEHAVIOR) return self.tracker.statements
[docs] def get_heuristic_statements(self) -> List[Any]: self._init_tracker(SymbolicExecutionMode.HEURISTIC) return self.tracker.statements
[docs] def get_derived_expression(self) -> Optional[Union[E.ValueOutputExpression, Tuple[E.ValueOutputExpression, ...]]]: self._init_tracker(SymbolicExecutionMode.DERIVED) if len(self.tracker.statements) > 0: raise ValueError('No body statements are allowed in a derived expression.') return self.tracker.return_expression
def __str__(self): if len(self.items) == 0: return 'Suite{}' if len(self.items) == 1: return f'Suite{{{self.items[0]}}}' return 'Suite{\n' + '\n'.join(jacinle.indent_text(str(item)) for item in self.items) + '\n}' def __repr__(self): return self.__str__()
[docs] class FunctionCallSymbolicExecutor(object): """This class is used to track the function calls and other statements in a suite. It supports simulating the execution of the program and generating the post-condition of the program."""
[docs] def __init__(self, suite: Suite, init_local_variables: Optional[Dict[str, Any]] = None): """Initialize the function call tracker. Args: suite: the suite to be tracked. init_local_variables: the initial local variables. If None, an empty dictionary will be used. """ self.suite = suite self.local_variables = dict() if init_local_variables is None else init_local_variables self.assign_expressions = list() self.assign_expression_signatures = dict() self.expr_expressions = list() self.behavior_expressions = list() self.return_expression = None
local_variables_stack: List[Dict[str, Any]] """The assignments of local variables.""" assign_expressions: List[Union[CrowFeatureAssignmentExpression, CrowBehaviorForeachLoopSuite, CrowBehaviorConditionSuite]] """A list of assign expressions.""" assign_expression_signatures: Dict[Tuple[str, ...], E.VariableAssignmentExpression] """A dictionary of assign expressions, indexed by their signatures.""" check_expressions: List[E.ValueOutputExpression] """A list of check expressions.""" expr_expressions: List[E.ValueOutputExpression] """A list of expr expressions (i.e. bare expressions in the body).""" return_expression: Optional[E.ValueOutputExpression] """The return expression. This is either None or a single expression.""" behavior_expressions: List[CrowBehaviorBodyItem] def _g( self, expr: Union[E.Expression, UnnamedPlaceholder, CrowBehaviorBodyItem] ) -> Union[E.Expression, UnnamedPlaceholder, CrowBehaviorBodyItem]: if isinstance(expr, (CrowControllerApplicationExpression, CrowBehaviorApplicationExpression, CrowGeneratorApplicationExpression)): return expr if isinstance(expr, (CrowBehaviorBodyPrimitiveBase, CrowBehaviorBodySuiteBase)): return expr if not isinstance(expr, E.Expression): raise ValueError(f'Invalid expression: {expr}') return crow_replace_expression_variables(expr, { E.VariableExpression(Variable(k, AutoType)): v for k, v in self.local_variables.items() }) def _get_deictic_signature(self, e, known_deictic_vars=tuple()) -> Optional[Tuple[str, ...]]: if isinstance(e, E.DeicticAssignExpression): known_deictic_vars = known_deictic_vars + (e.variable.name,) return self._get_deictic_signature(e.expression, known_deictic_vars) elif isinstance(e, E.AssignExpression): args = [x.name if x.name not in known_deictic_vars else '?' for x in e.predicate.arguments] return tuple((e.predicate.function.name, *args)) else: return None def _mark_assign(self, *exprs: E.VariableAssignmentExpression, annotations: Optional[Dict[str, Any]] = None): if annotations is None: annotations = dict() for expr in exprs: signature = self._get_deictic_signature(expr) if signature is not None: if signature in self.assign_expression_signatures: raise ValueError(f'Duplicate assign expression: {expr} vs {self.assign_expression_signatures[signature]}') self.assign_expressions.append((expr, annotations)) self.assign_expression_signatures[signature] = expr else: self.assign_expressions.append((expr, annotations))
[docs] @jacinle.log_function(verbose=False) def run(self): """Simulate the execution of the program and generates an equivalent return statement. This function handles if-else conditions. However, loops are not allowed.""" # jacinle.log_function.print('Current suite:', self.suite) from concepts.dm.crow.parsers.cdl_parser import FunctionCall current_return_statement = None current_return_statement_condition_neg = None for item in self.suite.items: assert isinstance(item, FunctionCall), f'Invalid item in suite: {item}' if item.name == 'assign': if isinstance(item.args.arguments[0], E.FunctionApplicationExpression) and item.args.arguments[1] is Ellipsis: # TODO(Jiayuan Mao @ 2024/07/17): implement this for the new CrowFeatureAssignExpression. self.assign_expressions.append(CrowFeatureAssignmentExpression( self._g(item.args.arguments[0]), E.NullExpression(item.args.arguments[0].return_type), **item.annotations if item.annotations is not None else dict() )) elif isinstance(item.args.arguments[0], (E.ListFunctionApplicationExpression, E.FunctionApplicationExpression)) and isinstance(item.args.arguments[1], E.ObjectOrValueOutputExpression): self.assign_expressions.append(CrowFeatureAssignmentExpression( self._g(item.args.arguments[0]), self._g(item.args.arguments[1]), **item.annotations if item.annotations is not None else dict() )) elif isinstance(item.args.arguments[0], E.VariableExpression) and isinstance(item.args.arguments[1], E.ObjectOrValueOutputExpression): if item.annotations is None or 'symbol' not in item.annotations: # Runtime assign / assignment to feature variables. if item.args.arguments[0].name not in self.local_variables: self.local_variables[item.args.arguments[0].name] = E.VariableExpression(item.args.arguments[0]) self.behavior_expressions.append(CrowRuntimeAssignmentExpression( item.args.arguments[0].variable, self._g(item.args.arguments[1]) )) else: self.local_variables[item.args.arguments[0].name] = self._g(item.args.arguments[1]) else: raise ValueError(f'Invalid assignment: {item}. Types: {type(item.args.arguments[0])}, {type(item.args.arguments[1])}.') elif item.name == 'check': assert isinstance(item.args.arguments[0], (E.ValueOutputExpression, E.VariableExpression)), f'Invalid check expression: {item.args.arguments[0]}' self.check_expressions.append(self._g(item.args.arguments[0])) elif item.name == 'expr': if isinstance(item.args.arguments[0], (CrowControllerApplicationExpression, CrowBehaviorApplicationExpression, E.ListExpansionExpression)): self.behavior_expressions.append(self._g(item.args.arguments[0])) else: assert isinstance( item.args.arguments[0], (E.ValueOutputExpression, E.VariableExpression, CrowGeneratorApplicationExpression) ), f'Invalid expr expression: {item.args.arguments[0]}' self.expr_expressions.append(self._g(item.args.arguments[0])) elif item.name == 'bind': arguments = item.args.arguments[0].items body = item.args.arguments[1] self.behavior_expressions.append(CrowBindExpression(arguments, body)) elif item.name == 'achieve': term = item.args.arguments[0] self.behavior_expressions.append(CrowAchieveExpression(term, **item.annotations if item.annotations is not None else dict())) elif item.name == 'pachieve': term = item.args.arguments[0] self.behavior_expressions.append(CrowAchieveExpression(term, is_policy_achieve=True, **item.annotations if item.annotations is not None else dict())) elif item.name == 'untrack': if len(item.args.arguments) == 0: self.behavior_expressions.append(CrowUntrackExpression(E.NullExpression(BOOL))) else: term = item.args.arguments[0] self.behavior_expressions.append(CrowUntrackExpression(term)) elif item.name == 'assert': term = item.args.arguments[0] self.behavior_expressions.append(CrowAssertExpression(term, **item.annotations if item.annotations is not None else dict())) elif item.name == 'commit': self.behavior_expressions.append(CrowBehaviorCommit(**item.annotations if item.annotations is not None else dict())) elif item.name == 'return': assert isinstance(item.args.arguments[0], (E.ValueOutputExpression, E.VariableExpression)), f'Invalid return expression: {item.args.arguments[0]}' self.return_expression = _make_conditional_return(current_return_statement, current_return_statement_condition_neg, self._g(item.args.arguments[0])) break elif item.name == 'ordering': suite = item.args.arguments[1] tracker = FunctionCallSymbolicExecutor(suite, self.local_variables.copy()).run() self.local_variables = tracker.local_variables behavior_expressions = tracker.behavior_expressions if item.args.arguments[0] == 'promotable unordered': prog = CrowBehaviorOrderingSuite('promotable', (CrowBehaviorOrderingSuite('unordered', behavior_expressions),)) elif item.args.arguments[0] == 'promotable sequential': prog = CrowBehaviorOrderingSuite('promotable', behavior_expressions) elif item.args.arguments[0] == 'critical unordered': prog = CrowBehaviorOrderingSuite('critical', (CrowBehaviorOrderingSuite('unordered', behavior_expressions),)) elif item.args.arguments[0] == 'critical sequential': prog = CrowBehaviorOrderingSuite('critical', behavior_expressions) else: assert ' ' not in item.args.arguments[0], f'Invalid ordering type: {item.args.arguments[0]}' prog = CrowBehaviorOrderingSuite(item.args.arguments[0], behavior_expressions) self.behavior_expressions.append(prog) elif item.name == 'if': condition = self._g(item.args.arguments[0]) neg_condition = E.NotExpression(condition) assert isinstance(condition, E.ValueOutputExpression), f'Invalid condition: {condition}. Type: {type(condition)}.' t_suite = item.args.arguments[1] f_suite = item.args.arguments[2] t_tracker = FunctionCallSymbolicExecutor(t_suite, self.local_variables.copy()).run() f_tracker = FunctionCallSymbolicExecutor(f_suite, self.local_variables.copy()).run() assert set(t_tracker.local_variables.keys()) == set(f_tracker.local_variables.keys()), f'Local variables in the true and false branches are not consistent: {t_tracker.local_variables.keys()} vs {f_tracker.local_variables.keys()}' new_local_variables = t_tracker.local_variables for k, v in t_tracker.local_variables.items(): if f_tracker.local_variables[k] != v: new_local_variables[k] = E.ConditionExpression(condition, v, f_tracker.local_variables[k]) self.local_variables = new_local_variables if len(t_tracker.assign_expressions) > 0 or len(f_tracker.assign_expressions) > 0: self.assign_expressions.append(CrowBehaviorConditionSuite(condition, t_tracker.assign_expressions, f_tracker.assign_expressions if len(f_tracker.assign_expressions) > 0 else None)) for expr in t_tracker.check_expressions: self.check_expressions.append(_make_conditional_implies(condition, expr)) for expr in f_tracker.check_expressions: raise RuntimeError(f'Check statements in the false branch are not supported: {expr}') # TODO(Jiayuan Mao @ 2024/07/17): implement false-branch check statements. self.check_expressions.append(_make_conditional_implies(neg_condition, expr)) if len(t_tracker.expr_expressions) != len(f_tracker.expr_expressions): raise ValueError(f'Number of bare expressions in the true and false branches are not consistent: {len(t_tracker.expr_expressions)} vs {len(f_tracker.expr_expressions)}') if len(t_tracker.expr_expressions) == 0: pass elif len(t_tracker.expr_expressions) == 1: self.expr_expressions.append(E.ConditionExpression(condition, t_tracker.expr_expressions[0], f_tracker.expr_expressions[0])) else: raise ValueError(f'Multiple bare expressions in the true and false branches are not supported: {t_tracker.expr_expressions} vs {f_tracker.expr_expressions}') if len(t_tracker.behavior_expressions) > 0: self.behavior_expressions.append(CrowBehaviorConditionSuite(condition, t_tracker.behavior_expressions, f_tracker.behavior_expressions)) if t_tracker.return_expression is not None and f_tracker.return_expression is not None: # Both branches have return statements. statement = E.ConditionExpression(condition, t_tracker.return_expression, f_tracker.return_expression) self.return_expression = _make_conditional_return(current_return_statement, current_return_statement_condition_neg, statement) break elif t_tracker.return_expression is not None: current_return_statement = _make_conditional_return(current_return_statement, current_return_statement_condition_neg, t_tracker.return_expression) current_return_statement_condition_neg = E.NotExpression(condition) elif f_tracker.return_expression is not None: current_return_statement = _make_conditional_return(current_return_statement, current_return_statement_condition_neg, f_tracker.return_expression) current_return_statement_condition_neg = condition else: pass elif item.name == 'while': condition = self._g(item.args.arguments[0]) suite = item.args.arguments[1] tracker = FunctionCallSymbolicExecutor(suite, self.local_variables.copy()).run() for k in tracker.local_variables: if k in self.local_variables and self.local_variables[k] != tracker.local_variables[k]: raise ValueError(f'Local variable {k} is assigned in the while statement but has been assigned before: {self.local_variables[k]} vs {tracker.local_variables[k]}') for expr in tracker.assign_expressions: raise ValueError(f'Assign statements are not allowed in a while statement: {expr}') for expr in tracker.check_expressions: raise ValueError(f'Check statements are not allowed in a while statement: {expr}') if len(tracker.expr_expressions) > 0: raise ValueError(f'Expr statements are not allowed in a while statement: {tracker.expr_expressions}') if len(tracker.behavior_expressions) > 0: self.behavior_expressions.append(CrowBehaviorWhileLoopSuite(condition, tracker.behavior_expressions, **item.annotations if item.annotations is not None else dict())) if tracker.return_expression is not None: raise ValueError(f'Return statement is not allowed in a while statement: {tracker.return_expression}') elif item.name == 'foreach': suite = item.args.arguments[1] tracker = FunctionCallSymbolicExecutor(suite, self.local_variables.copy()).run() for k in tracker.local_variables: if k in self.local_variables and self.local_variables[k] != tracker.local_variables[k]: raise ValueError(f'Local variable {k} is assigned in the foreach statement but has been assigned before: {self.local_variables[k]} vs {tracker.local_variables[k]}') statements = tracker.assign_expressions if len(statements) > 0: for var in item.args.arguments[0].items: statements = [CrowBehaviorForeachLoopSuite(var, statements)] self.assign_expressions.extend(statements) for expr in tracker.check_expressions: for var in item.args.arguments[0].items: expr = E.ForallExpression(var, expr) self.check_expressions.append(expr) if len(tracker.expr_expressions) == 0: pass else: if len(tracker.expr_expressions) == 1: merged = tracker.expr_expressions[0] else: merged = E.AndExpression(*tracker.expr_expressions) for var in item.args.arguments[0].items: merged = E.ForallExpression(var, merged) self.expr_expressions.append(merged) if len(tracker.behavior_expressions) > 0: assert len(item.args.arguments[0].items) == 1, f'Invalid number of variables in the foreach statement: {item.args.arguments[0].items}. Currently only one variable is supported.' self.behavior_expressions.append(CrowBehaviorForeachLoopSuite(item.args.arguments[0].items[0], tracker.behavior_expressions)) if tracker.return_expression is not None: raise ValueError(f'Return statement is not allowed in a foreach statement: {tracker.return_expression}') elif item.name == 'foreach_in': variables = item.args.arguments[0].items values = item.args.arguments[1].items suite = item.args.arguments[2] if len(variables) != 1 or len(values) != 1: raise NotImplementedError(f'Currently only one variable and one value are supported in a foreach_in statement: {variables} vs {values}') tracker = FunctionCallSymbolicExecutor(suite, self.local_variables.copy()).run() for k in tracker.local_variables: if k in self.local_variables and self.local_variables[k] != tracker.local_variables[k]: raise ValueError(f'Local variable {k} is assigned in the foreach_in statement but has been assigned before: {self.local_variables[k]} vs {tracker.local_variables[k]}') statements = tracker.assign_expressions if len(statements) > 0: for var, value in reversed(list(zip(variables, values))): statements = [CrowBehaviorForeachLoopSuite(var, statements, loop_in_expression=value)] self.assign_expressions.extend(statements) if len(tracker.check_expressions) > 0: raise NotImplementedError(f'Check statements are not allowed in a foreach_in statement: {tracker.check_expressions}') if len(tracker.expr_expressions) > 0: raise NotImplementedError(f'Expr statements are not allowed in a foreach_in statement: {tracker.expr_expressions}') if len(tracker.behavior_expressions) > 0: # TODO(Jiayuan Mao @ 2024/03/12): implement the rest parts of action statements. expressions = tracker.behavior_expressions for var, value in reversed(list(zip(variables, values))): expressions = [CrowBehaviorForeachLoopSuite(var, expressions, loop_in_expression=value)] self.behavior_expressions.extend(expressions) if tracker.return_expression is not None: raise ValueError(f'Return statement is not allowed in a foreach_in statement: {tracker.return_expression}') elif item.name == 'pass': pass # jacinle.log_function.print('Local variables:', self.local_variables) # jacinle.log_function.print('Assign expressions:', self.assign_expressions) # jacinle.log_function.print('Check expressions:', self.check_expressions) # jacinle.log_function.print('Expr expressions:', self.expr_expressions) # jacinle.log_function.print('Return expression:', self.return_expression) return self
def _make_conditional_implies(condition: E.ValueOutputExpression, test: E.ValueOutputExpression): if isinstance(test, E.BoolExpression) and test.op == E.BoolOpType.IMPLIES: if isinstance(test.arguments[0], E.BoolExpression) and test.arguments[0].op == E.BoolOpType.AND: return E.ImpliesExpression(E.AndExpression(condition, *test.arguments[0].arguments), test.arguments[1]) else: return E.ImpliesExpression(E.AndExpression(condition, test.arguments[0]), test.arguments[1]) else: return E.ImpliesExpression(condition, test) def _make_conditional_return(current_stmt: Optional[E.ValueOutputExpression], current_condition_neg: Optional[E.ValueOutputExpression], new_stmt: E.ValueOutputExpression): if current_stmt is None: return new_stmt return E.ConditionExpression(current_condition_neg, new_stmt, current_stmt) def _make_conditional_assign(assign_stmt: E.VariableAssignmentExpression, condition: E.ValueOutputExpression): if isinstance(assign_stmt, E.AssignExpression): return E.ConditionalAssignExpression(assign_stmt.predicate, assign_stmt.value, condition) elif isinstance(assign_stmt, E.ConditionalAssignExpression): if isinstance(assign_stmt.condition, E.BoolExpression) and assign_stmt.condition.op == E.BoolOpType.AND: return E.ConditionalAssignExpression(assign_stmt.predicate, assign_stmt.value, E.AndExpression(condition, *assign_stmt.condition.arguments)) else: return E.ConditionalAssignExpression(assign_stmt.predicate, assign_stmt.value, E.AndExpression(condition, assign_stmt.condition)) elif isinstance(assign_stmt, E.DeicticAssignExpression): return E.DeicticAssignExpression(assign_stmt.variable, _make_conditional_assign(assign_stmt.expression, condition)) else: raise ValueError(f'Invalid assign statement: {assign_stmt}')
[docs] class FunctionCallSymbolicExecutor2(object): """This class is used to track the function calls and other statements in a suite. It supports simulating the execution of the program and generating the post-condition of the program."""
[docs] def __init__(self, suite: Suite, init_local_variables: Optional[Dict[str, Any]] = None, mode: SymbolicExecutionMode = SymbolicExecutionMode.FUNCTIONAL): """Initialize the function call tracker. Args: suite: the suite to be tracked. init_local_variables: the initial local variables. If None, an empty dictionary will be used. """ self.suite = suite self.local_variables = dict() if init_local_variables is None else init_local_variables self.mode = mode self.statements = list() self.return_expression = None
[docs] def fork(self, suite: Suite, init_local_variables: Optional[Dict[str, Any]] = None) -> 'FunctionCallSymbolicExecutor2': """Make a "fork" of the current tracker. Args: suite: the new suite to be tracked. init_local_variables: the new initial local variables. If None, the current local variables will be used. Returns: A new instance of the tracker. """ return FunctionCallSymbolicExecutor2(suite, init_local_variables if init_local_variables is not None else self.local_variables.copy(), mode=self.mode)
suite: Suite """The suite to be tracked.""" local_variables_stack: List[Dict[str, Any]] """The assignments of local variables.""" mode: SymbolicExecutionMode statements: List[Union[ CrowFeatureAssignmentExpression, CrowRuntimeAssignmentExpression, CrowControllerApplicationExpression, CrowBehaviorOrderingSuite, CrowBehaviorWhileLoopSuite, CrowBehaviorForeachLoopSuite, CrowBehaviorConditionSuite ]] return_expression: Optional[E.ValueOutputExpression] """The return expression. This is either None or a single expression.""" def _replace_variables( self, expr: Union[E.Expression, UnnamedPlaceholder, CrowBehaviorBodyItem] ) -> Union[E.Expression, UnnamedPlaceholder, CrowBehaviorBodyItem]: if isinstance(expr, (CrowControllerApplicationExpression, CrowBehaviorApplicationExpression, CrowGeneratorApplicationExpression)): return expr if isinstance(expr, (CrowBehaviorBodyPrimitiveBase, CrowBehaviorBodySuiteBase)): return expr if not isinstance(expr, E.Expression): raise ValueError(f'Invalid expression: {expr}') return crow_replace_expression_variables(expr, { E.VariableExpression(Variable(k, AutoType)): v for k, v in self.local_variables.items() })
[docs] def set_unique_return(self, return_expression: E.ValueOutputExpression): """Set the return expression. This function is used to set the return expression when it is unique.""" assert self.return_expression is None, f'The return expression has already been set: {self.return_expression}' self.return_expression = return_expression
[docs] @jacinle.log_function(verbose=False) def run(self): """Simulate the execution of the program and generates an equivalent return statement. This function handles if-else conditions. However, loops are not allowed.""" # jacinle.log_function.print('Current suite:', self.suite) from concepts.dm.crow.parsers.cdl_parser import FunctionCall current_return_statement = None current_return_statement_condition_neg = None for item in self.suite.items: assert isinstance(item, FunctionCall), f'Invalid item in suite: {item}' annotations = item.annotations if item.annotations is not None else dict() if item.name == 'assign': target, value = item.args.arguments if isinstance(target, E.FunctionApplicationExpression) and value is Ellipsis: if not self.mode.support_assign_statements: raise ValueError(f'Feature assign statements are not allowed in the current mode: {self.mode}') self.statements.append(CrowFeatureAssignmentExpression(target, E.NullExpression(target.return_type), **annotations)) elif isinstance(target, E.FunctionApplicationExpression) and isinstance(value, E.ObjectOrValueOutputExpression): if not self.mode.support_assign_statements: raise ValueError(f'Feature assign statements are not allowed in the current mode: {self.mode}') self.statements.append(CrowFeatureAssignmentExpression(target, value, **annotations)) elif isinstance(target, E.VariableExpression) and isinstance(value, E.ObjectOrValueOutputExpression): use_symbol_definition = 'symbol' in item.annotations if self.mode is SymbolicExecutionMode.DERIVED: use_symbol_definition = True if use_symbol_definition: self.local_variables[target.name] = self._replace_variables(item.args.arguments[1]) else: # Runtime assign / assignment to feature variables. if target.name not in self.local_variables: # Only put a VariableExpression in the local variables. self.local_variables[target.name] = target self.statements.append(CrowRuntimeAssignmentExpression(target.variable, value)) else: raise ValueError(f'Invalid assignment: {item}. Types: {type(target)}, {type(item.args.arguments[1])}.') elif item.name == 'expr': if isinstance(item.args.arguments[0], (CrowControllerApplicationExpression, CrowBehaviorApplicationExpression, E.ListExpansionExpression)): if not self.mode.support_misc_behavior_body_statements: raise ValueError(f'Behavior body statements are not allowed in the current mode: {self.mode}') self.statements.append(self._replace_variables(item.args.arguments[0])) else: if not self.mode.support_expr_statements: raise ValueError(f'Expr statements are not allowed in the current mode: {self.mode}') assert isinstance( item.args.arguments[0], (E.ValueOutputExpression, E.VariableExpression, CrowGeneratorApplicationExpression) ), f'Invalid expr expression: {item.args.arguments[0]}' self.set_unique_return(self._replace_variables(item.args.arguments[0])) elif item.name in ('bind', 'untrack', 'assert', 'commit'): if not self.mode.support_misc_behavior_body_statements: raise ValueError(f'Behavior body statement {item} are not allowed in the current mode: {self.mode}') if item.name == 'bind': arguments = item.args.arguments[0].items body = item.args.arguments[1] self.statements.append(CrowBindExpression(arguments, body)) elif item.name == 'untrack': term = item.args.arguments[0] self.statements.append(CrowUntrackExpression(term)) elif item.name == 'assert': term = item.args.arguments[0] self.statements.append(CrowAssertExpression(term, **annotations)) elif item.name == 'commit': self.statements.append(CrowBehaviorCommit(**annotations)) elif item.name in ('achieve', 'pachieve'): if not self.mode.support_achieve_statements: raise ValueError(f'Achieve statements are not allowed in the current mode: {self.mode}') if item.name == 'achieve': term = item.args.arguments[0] self.statements.append(CrowAchieveExpression(term, **annotations)) elif item.name == 'pachieve': term = item.args.arguments[0] self.statements.append(CrowAchieveExpression(term, is_policy_achieve=True, **annotations)) elif item.name == 'return': if not self.mode.support_return_statements: raise ValueError(f'Return statements are not allowed in the current mode: {self.mode}') assert isinstance(item.args.arguments[0], (E.ValueOutputExpression, E.VariableExpression)), f'Invalid return expression: {item.args.arguments[0]}' self.return_expression = _make_conditional_return(current_return_statement, current_return_statement_condition_neg, self._replace_variables(item.args.arguments[0])) break elif item.name == 'ordering': if not self.mode.support_ordering_statements: raise ValueError(f'Ordering statements are not allowed in the current mode: {self.mode}') suite = item.args.arguments[1] tracker = self.fork(suite).run() self.local_variables = tracker.local_variables statements = tracker.statements if item.args.arguments[0] == 'promotable unordered': prog = CrowBehaviorOrderingSuite('promotable', (CrowBehaviorOrderingSuite('unordered', statements),)) elif item.args.arguments[0] == 'promotable sequential': prog = CrowBehaviorOrderingSuite('promotable', statements) elif item.args.arguments[0] == 'critical unordered': prog = CrowBehaviorOrderingSuite('critical', (CrowBehaviorOrderingSuite('unordered', statements),)) elif item.args.arguments[0] == 'critical sequential': prog = CrowBehaviorOrderingSuite('critical', statements) else: assert ' ' not in item.args.arguments[0], f'Invalid ordering type: {item.args.arguments[0]}' prog = CrowBehaviorOrderingSuite(item.args.arguments[0], statements) self.statements.append(prog) elif item.name == 'if': if not self.mode.support_if_statements: raise ValueError(f'If statements are not allowed in the current mode: {self.mode}') condition = self._replace_variables(item.args.arguments[0]) assert isinstance(condition, E.ValueOutputExpression), f'Invalid condition: {condition}. Type: {type(condition)}.' t_suite = item.args.arguments[1] f_suite = item.args.arguments[2] t_tracker = self.fork(t_suite).run() f_tracker = self.fork(f_suite).run() assert set(t_tracker.local_variables.keys()) == set(f_tracker.local_variables.keys()), f'Local variables in the true and false branches are not consistent: {t_tracker.local_variables.keys()} vs {f_tracker.local_variables.keys()}' new_local_variables = t_tracker.local_variables for k, v in t_tracker.local_variables.items(): if f_tracker.local_variables[k] != v: new_local_variables[k] = E.ConditionExpression(condition, v, f_tracker.local_variables[k]) self.local_variables = new_local_variables if len(t_tracker.statements) > 0: self.statements.append(CrowBehaviorConditionSuite(condition, t_tracker.statements, f_tracker.statements)) if t_tracker.return_expression is not None and f_tracker.return_expression is not None: # Both branches have return statements. statement = E.ConditionExpression(condition, t_tracker.return_expression, f_tracker.return_expression) self.return_expression = _make_conditional_return(current_return_statement, current_return_statement_condition_neg, statement) break elif t_tracker.return_expression is not None: current_return_statement = _make_conditional_return(current_return_statement, current_return_statement_condition_neg, t_tracker.return_expression) current_return_statement_condition_neg = E.NotExpression(condition) elif f_tracker.return_expression is not None: current_return_statement = _make_conditional_return(current_return_statement, current_return_statement_condition_neg, f_tracker.return_expression) current_return_statement_condition_neg = condition elif item.name == 'while': if not self.mode.support_while_statements: raise ValueError(f'While statements are not allowed in the current mode: {self.mode}') condition = self._replace_variables(item.args.arguments[0]) suite = item.args.arguments[1] tracker = self.fork(suite).run() for k in tracker.local_variables: if k in self.local_variables and self.local_variables[k] != tracker.local_variables[k]: raise ValueError(f'Local variable {k} is assigned in the while statement but has been assigned before: {self.local_variables[k]} vs {tracker.local_variables[k]}') if len(tracker.statements) > 0: self.statements.append(CrowBehaviorWhileLoopSuite(condition, tracker.statements, **annotations)) if tracker.return_expression is not None: raise ValueError(f'Return statement is not allowed in a while statement: {tracker.return_expression}') elif item.name == 'foreach': if not self.mode.support_foreach_statements: raise ValueError(f'Foreach statements are not allowed in the current mode: {self.mode}') variables = item.args.arguments[0].items suite = item.args.arguments[1] tracker = self.fork(suite).run() for k in tracker.local_variables: if k in self.local_variables and self.local_variables[k] != tracker.local_variables[k]: raise ValueError(f'Local variable {k} is assigned in the foreach statement but has been assigned before: {self.local_variables[k]} vs {tracker.local_variables[k]}') statements = tracker.statements if len(statements) > 0: for var in variables: statements = [CrowBehaviorForeachLoopSuite(var, statements)] self.statements.extend(statements) if tracker.return_expression is not None: raise ValueError(f'Return statement is not allowed in a foreach statement: {tracker.return_expression}') elif item.name == 'foreach_in': if not self.mode.support_foreach_statements: raise ValueError(f'Foreach_in statements are not allowed in the current mode: {self.mode}') variables = item.args.arguments[0].items values = item.args.arguments[1].items suite = item.args.arguments[2] if len(variables) != 1 or len(values) != 1: raise NotImplementedError(f'Currently only one variable and one value are supported in a foreach_in statement: {variables} vs {values}') tracker = self.fork(suite).run() for k in tracker.local_variables: if k in self.local_variables and self.local_variables[k] != tracker.local_variables[k]: raise ValueError(f'Local variable {k} is assigned in the foreach_in statement but has been assigned before: {self.local_variables[k]} vs {tracker.local_variables[k]}') statements = tracker.statements if len(statements) > 0: for var, value in reversed(list(zip(variables, values))): statements = [CrowBehaviorForeachLoopSuite(var, statements, loop_in_expression=value)] self.statements.extend(statements) if tracker.return_expression is not None: raise ValueError(f'Return statement is not allowed in a foreach_in statement: {tracker.return_expression}') elif item.name == 'pass': pass else: raise NotImplementedError(f'Unsupported item: {item}') # jacinle.log_function.print('Local variables:', self.local_variables) # jacinle.log_function.print('Assign expressions:', self.assign_expressions) # jacinle.log_function.print('Check expressions:', self.check_expressions) # jacinle.log_function.print('Expr expressions:', self.expr_expressions) # jacinle.log_function.print('Return expression:', self.return_expression) return self