Source code for cosapp.drivers.utils

import enum
from typing import Optional, Union, Dict, Tuple, List, Set, NamedTuple
from collections.abc import Collection

from cosapp.core.numerics.boundary import Unknown
from cosapp.core.numerics.basics import MathematicalProblem
from cosapp.core.eval_str import EvalString
from cosapp.utils.helpers import check_arg
from cosapp.utils.parsing import multi_split

import logging
logger = logging.getLogger(__name__)


[docs]class SystemAnalyzer: """Class containing data collected on a system, to be shared between different drivers. """ __slots__ = ('__system', '__data') def __init__(self, system: Optional["cosapp.systems.System"] = None): self.__reset() self.system = system @property def data(self) -> Dict: return self.__data @property def system(self) -> "cosapp.systems.System": "System: system of interest" return self.__system @system.setter def system(self, system) -> None: if system is self.__system: return from cosapp.systems.system import System check_arg(system, 'system', (System, type(None))) self.__reset(system) def __reset(self, system=None) -> None: """Reset system and data, with no type check""" self.__system = system self.__data = dict()
[docs] def clear_data(self) -> None: self.__data.clear()
[docs] def check_system(self) -> None: if self.system is None: raise ValueError("object is not associated to any system")
[docs]class UnknownAnalyzer(SystemAnalyzer): """Class used to resolve unknown aliasing """ def __init__(self, system: "cosapp.systems.System"): super().__init__(system) @property def input_mapping(self): return self.system.input_mapping
[docs] def filter_problem(self, problem: MathematicalProblem, name=None) -> MathematicalProblem: self.check_system() context = self.system if problem.context is not context: raise ValueError(f"problem is not defined on system {context.name!r}") if not name: name = f"{problem.name}[filtered]" filtered = MathematicalProblem(name, problem.context) # Add equations filtered.extend(problem, unknowns=False) # Add unknowns input_mapping = self.input_mapping def get_free_unknown(unknown: Unknown, key: str) -> Union[Unknown, None]: """Checks if `unknown` is aliased by pulling. If so, returns alias unknown; else, returns original unknown. """ try: alias = input_mapping[key] except KeyError: logger.warning(f"Skip connected unknown {key!r}") return None if alias.mapping is not unknown.port: if alias.context is not context: alias_name = f"{alias.mapping.contextual_name}.{alias.key}" contextual_name = f"{unknown.context.name}.{key}" logger.warning( f"Unknown {contextual_name!r} is aliased by {alias_name!r}" f", defined outside the context of {context.name!r}" f"; it is likely to be overwritten after the computation." ) else: alias_name = f"{alias.mapping.name}.{alias.key}" logger.info(f"Replace unknown {key!r} by {alias_name!r}") unknown = unknown.transfer(alias.context, alias_name) return unknown for key, unknown in problem.unknowns.items(): free_unknown = get_free_unknown(unknown, key) if free_unknown is None: continue aliased = (free_unknown is not unknown) if aliased: key = free_unknown.name filtered.unknowns[key] = free_unknown else: filtered.unknowns[key] = unknown.copy() return filtered
[docs]class DesignProblemHandler: """Class managing tied design and off-design problems, including unknown aliasing. """ def __init__(self, system: "cosapp.systems.System"): self.__handler = UnknownAnalyzer(system) self.reset()
[docs] @classmethod def make(cls, design: MathematicalProblem, offdesign: MathematicalProblem) -> "DesignProblemHandler": handler = cls(design.context) handler.problems = (design, offdesign) return handler
@property def system(self) -> "cosapp.systems.System": return self.__handler.system @system.setter def system(self, system) -> None: self.__handler.system = system for name in ('design', 'offdesign'): problem = getattr(self, name) try: problem.context = system except ValueError: setattr(self, name, self.new_problem(name)) @property def problems(self) -> Tuple[MathematicalProblem, MathematicalProblem]: """Tuple[MathematicalProblem, MathematicalProblem]: design and off-design problems as a tuple""" return self.design, self.offdesign @problems.setter def problems(self, problems) -> None: """Setter for (design, offdesign) tuple""" design, offdesign = problems if design is None: design = self.new_problem('design') if offdesign is None: offdesign = self.new_problem('offdesign') if design.context is not offdesign.context: raise ValueError("Design and off-design problems must be defined in the same context") self.design, self.offdesign = problems self.__handler.system = design.context
[docs] def reset(self) -> None: """Reset handler""" self.design = self.new_problem('design') self.offdesign = self.new_problem('offdesign')
[docs] def new_problem(self, name: str) -> MathematicalProblem: """Create new `MathematicalProblem` instance""" return MathematicalProblem(name, self.system)
[docs] def export_problems(self, prune=True) -> Tuple[MathematicalProblem, MathematicalProblem]: """Export design and off-design problems. Parameters: ----------- - prune, Optional[bool]: If `True` (default), resolve unknown aliasing first. If `False`, returned problems are copies of object attributes. Results: -------- - (design, offdesign): Filtered design and off-design problems, as a tuple of `MathematicalProblem` objects. """ if prune: handler = self.__handler design = handler.filter_problem(self.design, name='design') offdesign = handler.filter_problem(self.offdesign, name='offdesign') else: design = self.design.copy() offdesign = self.offdesign.copy() return design, offdesign
[docs] def merged_problem(self, name="merged", offdesign_prefix="offdesign") -> MathematicalProblem: """Merge design and off-design problems into a single `MathematicalProblem` instance. """ design, offdesign = self.export_problems() def check(attr, kind): design_attr = getattr(design, attr) offdesign_attr = getattr(offdesign, attr) common = set(design_attr).intersection(offdesign_attr) if common: if len(common) > 1: names = ", ".join(repr(v) for v in sorted(common)) names = f"({names}) are" kind += "s" else: names = f"{common.pop()!r} is" raise ValueError( f"{names} defined as design and off-design {kind}" ) check('unknowns', kind='unknown') check('residues', kind='equation') merged = self.new_problem(name) no_rename = lambda name: name local_name = (lambda name: f"{offdesign_prefix}[{name}]") if offdesign_prefix else no_rename def add_problem(problem: MathematicalProblem, rename_unknowns=True) -> None: nonlocal merged rename = local_name if rename_unknowns else no_rename # Add unknowns for name in list(problem.unknowns.keys()): merged.unknowns[rename(name)] = problem.unknowns.pop(name) # Add residues for name in list(problem.residues.keys()): merged.residues[local_name(name)] = problem.residues.pop(name) for name, residue in problem.get_target_residues().items(): merged.residues[local_name(name)] = residue add_problem(design, rename_unknowns=False) add_problem(offdesign, rename_unknowns=True) return merged
[docs] def extend(self, other: "DesignProblemHandler", prune=True, copy=True, overwrite=False) -> "DesignProblemHandler": if prune or copy: design, offdesign = other.export_problems(prune) else: design, offdesign = other.problems # Extend inner problems; copy is unnecessary at this point, # as `design` and `offdesign` are consistent with argument `copy`. options = dict(copy=False, overwrite=overwrite) self.design.extend(design, **options) self.offdesign.extend(offdesign, **options) return self
[docs]@enum.unique class ConstraintType(enum.Enum): """Enum covering constraint types""" GE = { 'operator': ">=", 'sort': (lambda lhs, rhs: (lhs, rhs)), } LE = { 'operator': "<=", 'sort': (lambda lhs, rhs: (rhs, lhs)), } EQ = { 'operator': "==", 'sort': (lambda lhs, rhs: (lhs, rhs)), }
[docs] def expression(self, lhs: str, rhs: str) -> str: return "{} - ({})".format(*self.sort(lhs, rhs))
[docs] def sort(self, lhs: str, rhs: str) -> Tuple[str, str]: return self.value['sort'](lhs, rhs)
def __str__(self) -> str: return self.description @property def operator(self) -> str: return self.value['operator'] @property def description(self) -> str: return f"Constraint of the kind `lhs {self.operator} rhs`" @property def is_inequality(self) -> bool: return self.operator != "=="
[docs]class Constraint(NamedTuple): """Named tuple representing a non-negative constraint of the kind `lhs <op> rhs`, where `<op>` is either `==` (equality) or `>=` (inequality constraint), depending on Boolean attribute `is_inequality`. Attributes: ----------- - lhs [str]: left-hand side. - rhs [str]: right-hand side. - is_inequality [bool]. Properties: ----------- - expression [str]: non-negative constraint `lhs - rhs`. """ lhs: str rhs: str is_inequality: bool = True @property def expression(self) -> str: return f"{self.lhs} - ({self.rhs})" def __str__(self) -> str: op = ">=" if self.is_inequality else "==" return f"{self.expression} {op} 0"
[docs]class ConstraintParser:
[docs] @classmethod def parse(cls, expression: Union[str, List[str]]) -> Set[Constraint]: """Parse a string expression or a list thereof as a set of non-negative constraints to be used in solvers. Parameters: ----------- - expression [str or List[str]]: Human-readable equality or inequality constraints, such as 'x >= y', '0 < alpha < 1', or a list thereof. Returns: -------- - constraints [Set[Constraint]]: Set of `Constraint` named tuple objects. """ check_arg(expression, 'expression', (str, Collection)) ctypes = cls.types() not_in_sides = list(ctypes) + ["="] def side_ok(side: str) -> bool: return not any(nogo in side for nogo in not_in_sides) and side.strip() def parse_single(expression: str) -> Set[Constraint]: check_arg(expression, 'expression', str) constraints = set() for operator in "<>": expression = expression.replace(f"{operator}=", operator) expressions, operators = multi_split(expression, ctypes.keys()) for lhs, rhs, operator in zip(expressions, expressions[1:], operators): ok = side_ok(lhs) and side_ok(rhs) ctype = ctypes[operator] constraint = Constraint( *ctype.sort(lhs, rhs), ctype.is_inequality, ) if not ok: raise ValueError(f"Invalid constraint {constraint.expression}") constraints.add(constraint) return constraints expressions = [expression] if isinstance(expression, str) else expression constraints = set() for expression in expressions: constraints |= parse_single(expression) return constraints
[docs] @classmethod def types(cls) -> Dict[str, ConstraintType]: return { ">" : ConstraintType.GE, "<" : ConstraintType.LE, "<=": ConstraintType.LE, "==": ConstraintType.EQ, ">=": ConstraintType.GE, }