Source code for concepts.dm.crow.csp_solver.dpll_sampling

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : dpll_sampling.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 03/23/2024
#
# This file is part of Project Concepts.
# Distributed under terms of the MIT license.

"""The DPLL-Sampling algorithm for solving CSPs. This algorithm is specifically designed
for solving CSPs with mixed Boolean and continuous variables. At a high level, the algorithm
uses DPLL-style search to find a solution to Boolean variables. After the value for all Boolean
variables are fixed, the algorithm uses a sampling-based method to find a solution to the continuous variables."""

from typing import Optional, Union, Sequence, Tuple, List, Dict

import itertools
import collections
import jacinle
from jacinle.utils.context import EmptyContext

from concepts.dsl.dsl_types import BOOL, ValueType, NamedTensorValueType, TensorValueTypeBase, PyObjValueType
from concepts.dsl.dsl_functions import Function
from concepts.dsl.tensor_value import TensorValue
from concepts.dsl.constraint import Constraint, GroupConstraint, ConstraintSatisfactionProblem, OptimisticValue, OptimisticValueRecord, Assignment, AssignmentType, AssignmentDict, SimulationFluentConstraintFunction, ground_assignment_value
from concepts.dsl.expression import VariableExpression, ValueOutputExpression, BoolOpType, QuantificationOpType, BoolExpression, PredicateEqualExpression, ValueCompareExpression, CompareOpType, FunctionApplicationExpression
from concepts.dsl.expression_utils import iter_exprs

from concepts.dm.crow.crow_function import CrowFeature, CrowFunction, CrowFunctionBase
from concepts.dm.crow.crow_generator import CrowGeneratorBase, CrowDirectedGenerator, CrowUndirectedGenerator
from concepts.dm.crow.controller import CrowControllerApplier, CrowControllerApplier
from concepts.dm.crow.interfaces.controller_interface import CrowSimulationControllerInterface
from concepts.dm.crow.executors.crow_executor import CrowExecutor
from concepts.dm.crow.executors.generator_executor import CrowGeneratorExecutor
from concepts.dm.crow.csp_solver.csp_utils import csp_ground_action

__all__ = [
    'CSPNotSolvable', 'CSPNoGenerator', 'ConstraintList',
    'dpll_apply_assignments',
    'dpll_filter_deterministic_equal', 'dpll_filter_deterministic_clauses', 'dpll_filter_unused_rhs', 'dpll_filter_unused_simulation_rhs', 'dpll_filter_duplicated_constraints',
    'dpll_find_bool_variable', 'dpll_find_grounded_function_application', 'dpll_find_typegen_variable', 'dpll_find_gen_variable_combined',
    'GeneratorMatchingInputType', 'GeneratorMatchingOutputType', 'GeneratorMatchingIOReturnType', 'GeneratorMatchingReturnType',
    'dpll_solve', 'dpll_simplify'
]

ConstraintList = List[Optional[Union[Constraint, GroupConstraint]]]


[docs] class CSPNotSolvable(Exception): """An exception raised when the CSP is not solvable.""" pass
[docs] class CSPNoGenerator(Exception): """An exception raised when there is no generator that can be matched in order to solve the CSP. Note that this does not mean that the CSP is not solvable.""" pass
def _determined(*args) -> bool: """Helper function: if all arguments are determined. Args: *args: the arguments. Returns: True if all arguments are determined. """ for x in args: if isinstance(x, OptimisticValue): return False return True def _ground_assignment_value_partial(assignments: Dict[int, Assignment], dtype: ValueType, identifier: int) -> Union[TensorValue, OptimisticValue]: """Get the value of a variable based on the assignment dictionary. It will follow the EQUAL assignment types. The key difference between the :meth:`~concepts.dsl.constraint.ground_assignment_value` is the return type of this function. Specifically, the :meth:`concepts.dsl.constraint.ground_assignment_value` (exported) function returns the actual Value object. This function returns a wrapped Value object. When the value is not determined, it will return an OptimisticValue. Args: assignments: the assignment dictionary. dtype: the type of the variable. Returns: the value of the variable, wrapped in either :class:`~concepts.dsl.constraint.DeterminedValue` or :class:`~concepts.dsl.constraint.OptimisticValue`. """ while identifier in assignments and assignments[identifier].t is AssignmentType.EQUAL: identifier = assignments[identifier].d if identifier in assignments: return assignments[identifier].d return OptimisticValue(dtype, identifier)
[docs] def dpll_apply_assignments(executor: CrowExecutor, constraints: ConstraintList, assignments: Dict[int, Assignment]) -> ConstraintList: """Apply the assignments to the constraints. Essentially, it replaces all variables that have been determined in the assignment dictionary with the value. This function will also check all the constraints to make sure that the assignments are valid. When a constraint is invalid, this function will raises CSPNotSolvable. Otherwise, constraints that have been satisfied will be removed from the list. Args: executor: the executor. constraints: the list of constraints. assignments: the dictionary of assignments. Returns: the list of constraints that have not been satisfied. """ new_constraints = list() for c in constraints: if c is None: continue if isinstance(c, GroupConstraint): has_unsatisfied_subconstraint = False for c2 in constraints: if not c2.is_group_constraint and c2.group is not None and c2.group == c: has_unsatisfied_subconstraint = True break if has_unsatisfied_subconstraint: new_constraints.append(c) continue # If the return value of the constraint is ignored, simply ignore the entire constraint. if isinstance(c.rv, OptimisticValue) and c.rv.identifier in assignments and assignments[c.rv.identifier].t is AssignmentType.IGNORE: continue # Ground the arguments and the return value. new_args = list(c.arguments) for i, x in enumerate(c.arguments): if isinstance(x, OptimisticValue) and x.identifier in assignments: new_args[i] = _ground_assignment_value_partial(assignments, x.dtype, x.identifier) new_rv = c.rv if isinstance(c.rv, OptimisticValue) and c.rv.identifier in assignments: new_rv = _ground_assignment_value_partial(assignments, c.rv.dtype, c.rv.identifier) # Evaluate the constraint. nc = Constraint(c.function, new_args, new_rv, note=c.note, group=c.group, timestamp=c.timestamp) if _determined(nc.rv) and _determined(*nc.arguments) and not isinstance(nc.function, SimulationFluentConstraintFunction): if isinstance(nc.function, CrowFunction) and nc.function.is_generator_placeholder: raise CSPNotSolvable(f'Constraint {c} is not satisfied: the generator placeholder is not resolved.') if executor.check_constraint(nc): continue else: raise CSPNotSolvable(f'Constraint {c} is not satisfied.') new_constraints.append(nc) return new_constraints
[docs] def dpll_filter_deterministic_equal(executor: CrowExecutor, constraints: ConstraintList, assignments: Dict[int, Assignment]) -> Tuple[bool, ConstraintList]: """Filter the constraints to remove the ones that are determined to be equal. Args: executor: the executor. constraints: the list of constraints. assignments: the dictionary of assignments. Returns: a tuple of (whether we have made progress, the list of constraints that have not been satisfied). """ progress = False for i, c in enumerate(constraints): if not c.is_group_constraint and c.is_equal_constraint: if isinstance(c.rv, TensorValue): # If the constraint looks like `x == x`, we can simply ignore it. if isinstance(c.arguments[0], OptimisticValue) and isinstance(c.arguments[1], OptimisticValue) and c.arguments[0].identifier == c.arguments[1].identifier: if c.rv.item(): constraints[i] = None progress = True continue else: raise CSPNotSolvable(f'Constraint {c} can not be satisfied: {c.arguments[0]} is not equal to itself.') # If the constraint looks like: (x == y) == True, then we can set x = y. if c.rv.item(): if isinstance(c.arguments[0], OptimisticValue): if isinstance(c.arguments[1], OptimisticValue): assignments[c.arguments[0].identifier] = Assignment(AssignmentType.EQUAL, c.arguments[1].identifier) else: assignments[c.arguments[0].identifier] = Assignment(AssignmentType.VALUE, c.arguments[1]) constraints[i] = None elif isinstance(c.arguments[1], OptimisticValue): if isinstance(c.arguments[0], OptimisticValue): assignments[c.arguments[1].identifier] = Assignment(AssignmentType.EQUAL, c.arguments[0].identifier) else: assignments[c.arguments[1].identifier] = Assignment(AssignmentType.VALUE, c.arguments[0]) constraints[i] = None elif isinstance(c.arguments[0], TensorValue) and isinstance(c.arguments[1], TensorValue): if executor.check_eq_constraint(c.arguments[0].dtype, c.arguments[0], c.arguments[1], True): constraints[i] = None else: raise CSPNotSolvable(f'Constraint {c} can not be satisfied: {c.arguments[0]} is not equal to {c.arguments[1]}.') else: raise AssertionError('Sanity check failed.') progress = True else: if c.arguments[0].dtype == BOOL: constraints[i] = Constraint(BoolOpType.NOT, [c.arguments[0]], c.arguments[1], note=c.note, group=c.group, timestamp=c.timestamp) progress = True else: if isinstance(c.arguments[0], OptimisticValue) and isinstance(c.arguments[1], OptimisticValue) and c.arguments[0].identifier == c.arguments[1].identifier: assignments[c.rv.identifier] = Assignment(AssignmentType.VALUE, True) constraints[i] = None progress = True if progress: return progress, dpll_apply_assignments(executor, constraints, assignments) return progress, constraints
[docs] def dpll_filter_unused_rhs(executor: CrowExecutor, constraints: ConstraintList, assignments: Dict[int, Assignment], index2record: Dict[int, OptimisticValueRecord]) -> ConstraintList: """Filter out constraints that only appear once in the RHS of the constraints. In this case, the variable can be ignored and the related constraints can be removed. Args: executor: the executor. constraints: the list of constraints. assignments: the dictionary of assignments. index2record: the dictionary of variable records. Returns: the list of constraints that have not been satisfied, after removing all unused variables. """ used: Dict[int, int] = collections.defaultdict(int) for i, record in index2record.items(): if record.actionable: used[i] += 100 for c in constraints: if c.is_group_constraint: continue for x in c.arguments: if isinstance(x, OptimisticValue): used[x.identifier] += 100 # as long as a variable has appeared in the lhs of a constraint, it is used. if isinstance(c.rv, OptimisticValue): used[c.rv.identifier] += 1 # if the variable has only appeared in the rhs of a constraint for once, it is not used. for k, v in used.items(): if v == 1: assignments[k] = Assignment(AssignmentType.IGNORE, None) return dpll_apply_assignments(executor, constraints, assignments)
[docs] def dpll_filter_deterministic_clauses(executor: CrowExecutor, constraints: ConstraintList, assignments: Dict[int, Assignment]) -> Tuple[bool, ConstraintList]: """Filter out Boolean constraints that have been determined. For example, AND(x, y, z) == true, then x == true, y == true, z == true. This function will remove the AND(x, y, z) constraint. There is another case that this function handles: for Boolean constraints, if everything on the LHS is determined, then we can determine the RHS. Args: executor: the executor. constraints: the list of constraints. assignments: the dictionary of assignments. Returns: a tuple of (whether we have made progress, the list of constraints that have not been satisfied). """ progress = False for i, c in enumerate(constraints): if c.is_group_constraint: continue if isinstance(c.function, (QuantificationOpType, BoolOpType)): if _determined(c.rv): if ( (c.function in (QuantificationOpType.FORALL, BoolOpType.AND)) or (c.function in (QuantificationOpType.EXISTS, BoolOpType.OR) and len(c.arguments) <= 1) ): if c.rv.item(): for x in c.arguments: if isinstance(x, OptimisticValue): assignments[x.identifier] = Assignment(AssignmentType.VALUE, True) progress = True # print('assign', optimistic_value_id(x), True) elif not x: raise CSPNotSolvable() else: # AND(x, y, z) == false determined_values = [x for x in c.arguments if _determined(x)] if False in determined_values: progress = True constraints[i] = None elif len(determined_values) == len(c.arguments): raise CSPNotSolvable() elif len(determined_values) == len(c.arguments) - 1: for x in c.arguments: if not _determined(x): progress = True assignments[x.identifier] = Assignment(AssignmentType.VALUE, False) elif c.function in (QuantificationOpType.EXISTS, BoolOpType.OR): if not c.rv.item(): for x in c.arguments: if isinstance(x, OptimisticValue): progress = True assignments[x.identifier] = Assignment(AssignmentType.VALUE, False) # print('assign', optimistic_value_id(x), False) elif x: raise CSPNotSolvable() else: # OR(x, y, z) == TRUE determined_values = [x.item() for x in c.arguments if _determined(x)] if True in determined_values: progress = True constraints[i] = None elif len(determined_values) == len(c.arguments): raise CSPNotSolvable() elif len(determined_values) == len(c.arguments) - 1: for x in c.arguments: if not _determined(x): progress = True assignments[x.identifier] = Assignment(AssignmentType.VALUE, True) elif c.function is BoolOpType.NOT: progress = True assignments[c.arguments[0].identifier] = Assignment(AssignmentType.VALUE, not c.rv.item()) elif c.function is BoolOpType.IMPLIES: progress = True assignments[c.arguments[0].identifier] = Assignment(AssignmentType.VALUE, not c.rv.item() or c.arguments[1].item()) elif c.function is BoolOpType.XOR: determined_values = [x for x in c.arguments if _determined(x)] if len(determined_values) == len(c.arguments): raise CSPNotSolvable() elif len(determined_values) == len(c.arguments) - 1: for x in c.arguments: if not _determined(x): progress = True if c.rv.item(): assignments[x.identifier] = Assignment(AssignmentType.VALUE, sum(x.item() for x in c.arguments) % 2 == 0) else: assignments[x.identifier] = Assignment(AssignmentType.VALUE, sum(x.item() for x in c.arguments) % 2 == 1) elif _determined(*c.arguments): progress = True constraints[i] = None if c.function in (QuantificationOpType.FORALL, BoolOpType.AND): assignments[c.rv.identifier] = Assignment(AssignmentType.VALUE, all(x.item() for x in c.arguments)) elif c.function in (QuantificationOpType.EXISTS, BoolOpType.OR): assignments[c.rv.identifier] = Assignment(AssignmentType.VALUE, any(x.item() for x in c.arguments)) elif c.function is BoolOpType.NOT: assignments[c.rv.identifier] = Assignment(AssignmentType.VALUE, not c.arguments[0].item()) elif c.function is BoolOpType.IMPLIES: assignments[c.rv.identifier] = Assignment(AssignmentType.VALUE, not c.arguments[0].item() or c.arguments[1].item()) elif c.function is BoolOpType.XOR: assignments[c.rv.identifier] = Assignment(AssignmentType.VALUE, sum(x.item() for x in c.arguments) % 2 == 1) elif c.is_equal_constraint and _determined(*c.arguments): progress = True assignments[c.rv.identifier] = Assignment(AssignmentType.VALUE, c.arguments[0].item() == c.arguments[1].item()) if progress: return progress, dpll_apply_assignments(executor, constraints, assignments) return progress, constraints
[docs] def dpll_filter_duplicated_constraints(executor: CrowExecutor, constraints: ConstraintList) -> Tuple[bool, ConstraintList]: """Filter out duplicated constraints. For example, if we have x == 1 and x == 1, then we can remove one of them. Args: executor: the executor. constraints: the list of constraints. Returns: a tuple of (whether we have made progress, the list of constraints that have not been satisfied). """ progress = False string_set = set() for i, c in enumerate(constraints): if c.is_group_constraint: continue # TODO(Jiayuan Mao @ 2023/11/24): since constraint_str contains shortened encodings for TensorValues, we should not use it here as a hash. cstr = c.constraint_str() if cstr in string_set: progress = True constraints[i] = None else: string_set.add(cstr) if progress: return progress, dpll_apply_assignments(executor, constraints, {}) return progress, constraints
[docs] def dpll_find_bool_variable(executor: CrowExecutor, constraints: ConstraintList, assignments: Dict[int, Assignment]) -> Optional[int]: """Find a Boolean variable that is not determined. As a heuristic, we will look for the variable that appear in the maximum number of constraints. Args: constraints: the list of constraints. assignments: the dictionary of assignments. Returns: the variable that is not determined. """ count: Dict[int, int] = collections.defaultdict(int) for c in constraints: if c.is_group_constraint: continue for x in itertools.chain(c.arguments, [c.rv]): if isinstance(x, OptimisticValue) and x.identifier not in assignments and x.dtype == BOOL: count[x.identifier] += 1 if len(count) == 0: return None return max(count, key=count.get)
[docs] def dpll_find_grounded_function_application(executor: CrowExecutor, constraints: ConstraintList) -> Optional[Constraint]: """Find a function application whose arguments are all determined. Args: executor: the executor. constraints: the list of constraints. Returns: the function application that is not determined. """ for c in constraints: if c.is_group_constraint: continue if _determined(*c.arguments) and isinstance(c.function, Function): return c return None
[docs] def dpll_find_typegen_variable(executor: CrowExecutor, dtype: ValueType) -> Optional[CrowDirectedGenerator]: assert isinstance(dtype, TensorValueTypeBase) for g in executor.domain.generators.values(): if isinstance(g, CrowDirectedGenerator): if len(g.inputs) == 0 and len(g.outputs) == 1 and g.outputs[0].dtype == dtype: if len(g.certifies) == 1 and g.certifies[0].is_null_expression: return g return None
GeneratorMatchingInputType = List[Optional[TensorValue]] GeneratorMatchingOutputType = List[Optional[OptimisticValue]] GeneratorMatchingIOReturnType = Tuple[Optional[GeneratorMatchingInputType], Optional[GeneratorMatchingOutputType]] def _match_generator(c: Constraint, g: CrowDirectedGenerator, certifies_expr: Optional[ValueOutputExpression] = None, allow_star_matching: bool = False) -> GeneratorMatchingIOReturnType: def gen_input_output(func_arguments, rv_variable=None): inputs: GeneratorMatchingInputType = [None for _ in range(len(g.inputs))] outputs: GeneratorMatchingOutputType = [None for _ in range(len(g.outputs))] for argc, argg in zip(c.arguments, func_arguments): if isinstance(argc, OptimisticValue): if argg.variable in g.outputs: outputs[g.outputs.index(argg.variable)] = argc else: return None, None else: if argg.variable in g.inputs: inputs[g.inputs.index(argg.variable)] = argc elif allow_star_matching and argg.name == '??': continue else: return None, None if rv_variable is not None: if rv_variable.variable in g.inputs: # If the RV is an output, then this would correspond to a function application, which is already handled. inputs[g.inputs.index(rv_variable.variable)] = c.rv else: return None, None return inputs, outputs if certifies_expr is None: certifies_expr = g.certifies[0] if isinstance(c.rv, TensorValue) and c.rv.dtype == BOOL: if c.rv.item(): # match (pred ?x ?y) == True if isinstance(certifies_expr, FunctionApplicationExpression): if c.function.name == certifies_expr.function.name: return gen_input_output(certifies_expr.arguments) else: if isinstance(certifies_expr, BoolExpression) and certifies_expr.bool_op is BoolOpType.NOT: # match (pred ?x ?y) == False inner_expr = certifies_expr.arguments[0] if isinstance(inner_expr, FunctionApplicationExpression): if c.function.name == inner_expr.function.name: return gen_input_output(inner_expr.arguments) elif c.is_equal_constraint: # match (equal ?x ?y) == False inner_expr = _parse_value_compare_expr_into_predicate_equal_expr(inner_expr) if isinstance(inner_expr, PredicateEqualExpression): if c.arguments[0].dtype == inner_expr.predicate.return_type: return gen_input_output([inner_expr.predicate, inner_expr.value]) if isinstance(c.rv, TensorValue) and isinstance(c.function, Function): # match (pred ?x ?y) == ?z certifies_expr = _parse_value_compare_expr_into_predicate_equal_expr(certifies_expr) if isinstance(certifies_expr, PredicateEqualExpression) and c.function.name == certifies_expr.predicate.function.name: if isinstance(certifies_expr.value, VariableExpression): return gen_input_output(certifies_expr.predicate.arguments, certifies_expr.value) return None, None def _parse_value_compare_expr_into_predicate_equal_expr(expr) -> Optional[PredicateEqualExpression]: if isinstance(expr, ValueCompareExpression): if expr.compare_op is CompareOpType.EQ: if isinstance(expr.arguments[0], FunctionApplicationExpression): return PredicateEqualExpression(expr.arguments[0], expr.arguments[1]) return expr GeneratorMatchingReturnType = Optional[List[Tuple[Union[Constraint, GroupConstraint], CrowDirectedGenerator, Optional[GeneratorMatchingInputType], Optional[GeneratorMatchingOutputType]]]] def _find_gen_variable_group(executor: CrowExecutor, constraints: ConstraintList) -> GeneratorMatchingReturnType: all_generators = list() for c in constraints: if c.is_group_constraint: for g in c.candidate_generators: all_generators.append((c, *g)) if len(all_generators) == 0: return None return all_generators def _find_gen_variable(executor: CrowExecutor, constraints: ConstraintList) -> GeneratorMatchingReturnType: # Step 1: find all applicable generators. all_generators = list() for c in constraints: if c.is_group_constraint: continue for g in executor.domain.generators.values(): i, o = _match_generator(c, g) # jacinle.log_function.print('matching', c, g, i, o) if i is not None: all_generators.append((c, g, i, o)) # Step 2: find if there is any variable with only one generator. target_to_generator: Dict[int, list] = collections.defaultdict(list) for c, g, i, o in all_generators: for target in o: target_to_generator[target.identifier].append((c, g, i, o)) for target, generators in target_to_generator.items(): if len(target_to_generator[target]) == 1: return target_to_generator[target] if len(all_generators) > 0: max_priority = max([r[1].priority for r in all_generators]) all_generators = [r for r in all_generators if r[1].priority == max_priority] return all_generators return None def _find_gen_variable_advanced(executor: CrowExecutor, constraints: ConstraintList) -> GeneratorMatchingReturnType: def match_io(list1, list2): for x, y in zip(list1, list2): if x is None or y is None: continue if isinstance(x, OptimisticValue): if isinstance(y, OptimisticValue): if x.identifier != y.identifier: return False else: return False elif isinstance(x, TensorValue): if isinstance(y, TensorValue): if not executor.check_eq_constraint(x.dtype, x, y, True): return False else: return False else: raise TypeError(f'Invalid type: {type(x)}.') return True def is_star_expression(sub_certifies): for x in iter_exprs(sub_certifies): if isinstance(x, VariableExpression) and x.name == '??': return True return False generator2matched = collections.defaultdict(list) for c in constraints: if c.is_group_constraint: continue for g in executor.domain.generators.values(): for sub_certifies_index, sub_certifies in enumerate(g.certifies): i, o = _match_generator(c, g, sub_certifies, allow_star_matching=True) if i is not None: generator2matched[g].append((c, i, o, sub_certifies_index)) all_generators = list() # TODO: implement the exact matching algorithm. for g, matched in generator2matched.items(): all_matches = list() for result_index in range(len(matched)): c, i, o, sub_certifies_index = matched[result_index] used = False # constraints, g, i, o, matched_sub_certifies_index for mcs, mg, mi, mo, matched_sci in all_matches: if match_io(i, mi) and match_io(o, mo): for j in range(len(mi)): if mi[j] is None: mi[j] = i[j] for j in range(len(mo)): if mo[j] is None: mo[j] = o[j] mcs.append(c) matched_sci.add(sub_certifies_index) used = True break if not used: all_matches.append(([c], g, i, o, {sub_certifies_index})) this_is_star_expression = list() for sub_certifies in g.certifies: this_is_star_expression.append(is_star_expression(sub_certifies)) for mcs, mg, mi, mo, matched_sci in all_matches: # the matched constraints should cover all sentences in the flatten_expression. match_succ = True for sub_certifies_index, sub_certifies in enumerate(g.certifies): if not this_is_star_expression[sub_certifies_index] and sub_certifies_index not in matched_sci: match_succ = False break if not match_succ: continue if None not in mi and None not in mo: all_generators.append((mcs, mg, mi, mo)) return all_generators if len(all_generators) > 0 else None def _find_fancy_gen_variable( executor: CrowExecutor, csp: ConstraintSatisfactionProblem, constraints: ConstraintList, assignments: AssignmentDict ) -> Optional[List[Tuple[List[Constraint], Dict[int, Union[TensorValueTypeBase, PyObjValueType]], CrowUndirectedGenerator]]]: results = list() for g in sorted(executor.domain.generators.values(), key=lambda generator: generator.priority, reverse=True): if isinstance(g, CrowUndirectedGenerator): this_constraints = list() this_variable_dtypes = dict() for certifies_expr in g.certifies: assert isinstance(certifies_expr, FunctionApplicationExpression) for arg in certifies_expr.arguments: assert isinstance(arg, VariableExpression) and arg.name == '??' for c in constraints: if c.function.name == certifies_expr.function.name: this_constraints.append(c) for arg in itertools.chain(c.arguments, [c.rv]): if isinstance(arg, OptimisticValue): this_variable_dtypes[arg.identifier] = arg.dtype if len(this_constraints) == 0: continue results.append((this_constraints, this_variable_dtypes, g)) if len(results) > 0: return results return None
[docs] def dpll_find_gen_variable_combined(executor: CrowExecutor, csp: ConstraintSatisfactionProblem, constraints: ConstraintList, assignments: AssignmentDict) -> GeneratorMatchingReturnType: """Combine the generator matching in the following order: 1. Use :func:`_find_gen_variable` to find the generator with the highest priority. 2. Use :func:`_find_gen_variable_advanced` to find the generator with the highest priority, using star-matching. 3. Use :func:`_find_typegen_variable` to find the generator with the highest priority, using type-matching. """ rv = _find_gen_variable_group(executor, constraints) if rv is not None: return rv rv = _find_gen_variable(executor, constraints) if rv is not None: return rv # rv = _find_gen_variable_advanced(executor, constraints) # if rv is not None: # return rv # Implement _find_gen_variable_advanced # Implement _find_fancy_gen_variable rv = list() for name, record in csp.index2record.items(): dtype = record.dtype if name not in assignments and isinstance(dtype, TensorValueTypeBase): g = dpll_find_typegen_variable(executor, dtype) if g is not None: rv.append((None, g, [], [OptimisticValue(dtype, name)])) if len(rv) > 0: return rv return None
[docs] def dpll_filter_unused_simulation_rhs(executor: CrowExecutor, constraints: ConstraintList, assignments: Dict[int, Assignment]) -> ConstraintList: """Filter out simulation constraints that only appear once in the RHS of the constraints. In this case, the variable can be ignored and the related constraints can be removed. Args: executor: the executor. constraints: the list of constraints. assignments: the dictionary of assignments. Returns: the list of constraints that have not been satisfied, after removing all unused variables. """ used = collections.defaultdict(int) for c in constraints: if c.is_group_constraint: continue for x in c.arguments: if isinstance(x, OptimisticValue): used[x.identifier] += 100 # as long as a variable has appeared in the lhs of a constraint, it is used. if isinstance(c.rv, OptimisticValue) and isinstance(c.function, SimulationFluentConstraintFunction): used[c.rv.identifier] += 1 # if the variable has only appeared in the rhs of a constraint for once, it is not used. for k, v in used.items(): if v == 1: assignments[k] = Assignment(AssignmentType.IGNORE, None) return dpll_apply_assignments(executor, constraints, assignments)
[docs] def dpll_apply_assignments_with_simulation( executor: CrowExecutor, constraints: ConstraintList, assignments: Dict[int, Assignment], simulation_interface: Optional[CrowSimulationControllerInterface], actions: Sequence[CrowControllerApplier], verbose: bool = False ) -> Tuple[ConstraintList, AssignmentDict]: new_constraints = dpll_apply_assignments(executor, constraints, assignments) new_assignments = None if simulation_interface is None: return new_constraints, assignments while True: # TODO(Jiayuan Mao @ 2024/04/22): better formulate the simulation constraints. next_action_index = simulation_interface.get_action_counter() if next_action_index < len(actions): action = actions[next_action_index] try: grounded_action = csp_ground_action(executor, action, assignments) except AssertionError: # The action is not applicable. break if verbose: jacinle.log_function.print(f'Executing grounded action: #{next_action_index}') succ = simulation_interface.step_without_error(grounded_action) if not succ: if verbose: jacinle.log_function.print(jacinle.colored(f'Action {grounded_action} failed.', 'red')) raise CSPNotSolvable(f'Unable to perform action {action}.') # jacinle.log_function.print(f'Action {grounded_action} succeeded.') state = simulation_interface.get_crow_state() if new_assignments is None: new_assignments = assignments.copy() # Update the assignments. for i, c in enumerate(new_constraints): if isinstance(c.function, SimulationFluentConstraintFunction): function: SimulationFluentConstraintFunction = c.function if function.action_index == next_action_index: if isinstance(c.rv, TensorValue): if c.rv.dtype != BOOL: raise NotImplementedError('Only bool is supported for simulation constraints.') # print('Simulation constraint', c) # print(state.features[function.predicate.name]) # print('Desired value =', c.rv.value, 'Actual value =', state.features[function.predicate.name][function.arguments].item()) # import pybullet # pybullet.stepSimulation() # import ipdb; ipdb.set_trace() if state.features[function.predicate.name][function.arguments].item() != c.rv.item(): raise CSPNotSolvable() else: new_constraints[i] = None else: new_assignments[c.rv.identifier] = Assignment( AssignmentType.VALUE, state.features[function.predicate.name][function.arguments] ) new_constraints[i] = None new_constraints = dpll_apply_assignments(executor, new_constraints, new_assignments) else: break return new_constraints, new_assignments if new_assignments is not None else assignments
[docs] def dpll_solve( executor: CrowExecutor, csp: ConstraintSatisfactionProblem, *, generator_manager: Optional[CrowGeneratorExecutor] = None, simulation_interface: Optional[CrowSimulationControllerInterface] = None, actions: Optional[Sequence[CrowControllerApplier]] = None, max_generator_trials: int = 3, enable_ignore: bool = False, solvable_only: bool = False, verbose: bool = False ) -> Optional[Union[bool, AssignmentDict]]: """Solve the constraint satisfaction problem using the DPLL-sampling algorithm. Args: executor: the executor. csp: the constraint satisfaction problem. generator_manager: the generator manager. simulation_interface: the simulation interface. actions: the list of actions. Only used when `simulation_interface` is not None. max_generator_trials: the maximum number of trials for each generator. enable_ignore: whether to ignore constraints whose RHS value is not determined. solvable_only: whether to only return whether the problem is solvable, without returning the solution. verbose: whether to print verbose information. Returns: When `solvable_only` is True, return a single Boolean value indicating whether the problem is solvable. When `solvable_only` is False, return an assignment dictionary. When the problem is not solvable, return None. Raises: CSPNotSolvable: when the problem is not solvable. CSPNoGenerator: when no generator can be found to solve the problem. However, the problem may still be solvable. """ if generator_manager is None: generator_manager = CrowGeneratorExecutor(executor, store_history=False) constraints = csp.constraints.copy() if simulation_interface is not None: simulation_interface.reset_action_counter() def restore_context(verbose=verbose): if simulation_interface is not None: return simulation_interface.restore_context(verbose) return jacinle.EmptyContext() @jacinle.log_function(verbose=False) def dfs(constraints, assignments): if len(constraints) == 0: return assignments progress = True while progress: progress, constraints = dpll_filter_deterministic_equal(executor, constraints, assignments) if enable_ignore: constraints = dpll_filter_unused_rhs(executor, constraints, assignments, csp.index2record) else: # NB(Jiayuan Mao @ 2023/03/11): for simulation constraints, we can remove them if they are not used in any other constraints. # TODO(Jiayuan Mao @ 2023/03/15): actually, I just noticed that this is a "bug" in the implementation of `_find_bool_variable`. # Basically, if the variable is a simulation variable and it only appears in the RHS of a constraint, then it will be ignored. # The current handling will work, but probably I need to think about a better way to handle this. constraints = dpll_filter_unused_simulation_rhs(executor, constraints, assignments) progress = True while progress: progress, constraints = dpll_filter_deterministic_clauses(executor, constraints, assignments) if verbose: jacinle.log_function.print('Remaining constraints:', len(constraints)) jacinle.log_function.print(*constraints, sep='\n') if len(constraints) == 0: return assignments if (next_bool_var := dpll_find_bool_variable(executor, constraints, assignments)) is not None: assignments_true = assignments.copy() assignments_true[next_bool_var] = Assignment(AssignmentType.VALUE, True) with restore_context(): try: constraints_true, assignments_true = dpll_apply_assignments_with_simulation(executor, constraints, assignments_true, simulation_interface, actions, verbose=verbose) return dfs(constraints_true, assignments_true) except CSPNotSolvable: pass assignments_false = assignments.copy() assignments_false[next_bool_var] = Assignment(AssignmentType.VALUE, False) with restore_context(): try: constraints_false, assignments_false = dpll_apply_assignments_with_simulation(executor, constraints, assignments_false, simulation_interface, actions, verbose=verbose) return dfs(constraints_false, assignments_false) except CSPNotSolvable: pass raise CSPNotSolvable() elif (next_fapp := dpll_find_grounded_function_application(executor, constraints)) is not None: function = next_fapp.function arguments = next_fapp.arguments target = next_fapp.rv external_function = executor.get_function_implementation(function.name) output = external_function(*arguments, return_type=target.dtype if isinstance(target, (TensorValue, OptimisticValue)) else None) new_assignments = assignments.copy() new_assignments[target.identifier] = Assignment(AssignmentType.VALUE, output) with restore_context(): try: new_constraints = constraints.copy() new_constraints[new_constraints.index(next_fapp)] = None new_constraints, new_assignments = dpll_apply_assignments_with_simulation(executor, new_constraints, new_assignments, simulation_interface, actions, verbose=verbose) return dfs(new_constraints, new_assignments) except CSPNotSolvable: pass raise CSPNotSolvable() elif (next_gen_vars := dpll_find_gen_variable_combined(executor, csp, constraints, assignments)) is not None: if len(next_gen_vars) >= 1: if verbose: jacinle.log_function.print('Generator orders', *[str(vv[1]).split('\n')[0] for vv in next_gen_vars], sep='\n ') for vv in next_gen_vars: c, g, args, outputs_target = vv # TODO(Jiayuan Mao @ 2024/04/22): implement the "not solvable" handling. # if g.unsolvable: # raise CSPNotSolvable('Hit unsolvable generator.') if verbose: jacinle.log_function.print(f'Generator: {g}\nArgs: {args}') generator = generator_manager.call(g, max_generator_trials, args, c) # NB(Jiayuan Mao @ 2023/03/03): I didn't write for x in generator in order to make the verbose output more readable. generator = iter(generator) for j in range(max_generator_trials): if verbose: jacinle.log_function.print('Running generator', g.name, f'count = {j + 1} / {max_generator_trials}') try: output_index, outputs = next(generator) except StopIteration: if verbose: jacinle.log_function.print('Generator', g.name, 'exhausted.') break new_assignments = assignments.copy() for output, target in zip(outputs, outputs_target): new_assignments[target.identifier] = Assignment(AssignmentType.VALUE, output, generator_index=output_index) # jacinle.log_function.print('Assigned', target, output) with restore_context(): try: new_constraints = constraints.copy() if c is None: # If g is a type-only generator function, c === None pass else: if isinstance(c, list): for cc in c: new_constraints[new_constraints.index(cc)] = None else: new_constraints[new_constraints.index(c)] = None new_constraints, new_assignments = dpll_apply_assignments_with_simulation(executor, new_constraints, new_assignments, simulation_interface, actions, verbose=verbose) return dfs(new_constraints, new_assignments) except CSPNotSolvable as e: if verbose: jacinle.log_function.print('Failed to apply assignments. Reason:', e) pass raise CSPNotSolvable() else: # jacinle.log_function.print('Can not find a generator. Constraints:\n ' + '\n '.join([str(x) for x in constraints])) raise CSPNoGenerator('Can not find a generator. Constraints:\n ' + '\n '.join([str(x) for x in constraints])) try: assignments = dfs(constraints, {}) if not solvable_only: for name, record in csp.index2record.items(): dtype = record.dtype if name not in assignments: g = dpll_find_typegen_variable(executor, dtype) if g is None: raise NotImplementedError('Can not find a generator for unbounded variable {}, type {}.'.format(name, dtype)) else: try: output_index, (output, ) = next(iter(generator_manager.call(g, 1, [], None))) assignments[name] = Assignment(AssignmentType.VALUE, output, generator_index=output_index) except StopIteration: raise CSPNotSolvable if generator_manager.store_history: generator_manager.mark_success(assignments) if solvable_only: return True return assignments except CSPNotSolvable: return None except CSPNoGenerator: raise
[docs] def dpll_simplify( executor: CrowExecutor, csp: ConstraintSatisfactionProblem, enable_ignore: bool = True, return_assignments: bool = False ) -> Union[ConstraintSatisfactionProblem, Tuple[ConstraintSatisfactionProblem, AssignmentDict]]: """Simplify the CSP using DPLL algorithm. Args: executor: the executor. csp: the CSP. enable_ignore: whether to ignore constraints whose RHS value is not determined. return_assignments: whether to return the assignments. Returns: the simplified CSP. """ constraints = csp.constraints.copy() assignments = dict() if len(constraints) == 0: return csp while True: nr_constraints = len(constraints) progress = True while progress: progress, constraints = dpll_filter_deterministic_equal(executor, constraints, assignments) if enable_ignore: constraints = dpll_filter_unused_rhs(executor, constraints, assignments, csp.index2record) progress = True while progress: progress, constraints = dpll_filter_deterministic_clauses(executor, constraints, assignments) if len(constraints) == nr_constraints: break if return_assignments: return csp.clone(constraints), assignments return csp.clone(constraints)