Source code for cosapp.drivers.utils

from __future__ import annotations
import enum
from typing import Optional, Union, Dict, Tuple, List, Set, NamedTuple
from collections.abc import Collection

from cosapp.core.numerics.boundary import Unknown
from cosapp.core.numerics.basics import MathematicalProblem
from cosapp.utils.helpers import check_arg
from cosapp.utils.parsing import multi_split

import logging
logger = logging.getLogger(__name__)


[docs]class SystemAnalyzer: """Class containing data collected on a system, to be shared between different drivers. """ __slots__ = ('__system',) def __init__(self, system: Optional["cosapp.systems.System"] = None): self.__reset() self.system = system @property def system(self) -> "cosapp.systems.System": "System: system of interest" return self.__system @system.setter def system(self, system) -> None: if system is self.__system: return from cosapp.base import System check_arg(system, 'system', (System, type(None))) self.__reset(system) def __reset(self, system=None) -> None: """Reset system and data, with no type check""" self.__system = system self.clear()
[docs] def clear(self) -> None: """Hook function to clear internal data, if any. Called at object instanciation. """ pass
[docs] def check_system(self) -> None: """Check that object is associated to a system. Raises ------ `ValueError` if object is not associated to a system. """ if self.__system is None: raise ValueError("object is not associated to any system")
[docs]def dealias_problem(problem: MathematicalProblem, name=None) -> MathematicalProblem: """Resolve unknown aliasing in `problem` due to pulling connectors (if any) in context system. Parameters: ----------- - problem [MathematicalProblem]: Mathematical problem to be de-aliased. - name [str, optional]: Name of output `MathematicalProblem` object. Returns: -------- MathematicalProblem: New mathematical problem, with identical equations as source problem, but de-aliased unknowns. """ if problem.context is None: raise ValueError(f"problem is not defined on a system") if not name: name = f"{problem.name}[filtered]" from cosapp.base import System context: System = problem.context input_mapping = context.input_mapping def get_free_unknown(unknown: Unknown) -> Union[Unknown, None]: """Checks if `unknown` is aliased by pulling. If so, returns alias unknown; else, returns original unknown. """ if unknown.context is context: contextual_name = unknown.basename else: contextual_name = f"{context.get_path_to_child(unknown.context)}.{unknown.basename}" try: alias = input_mapping[contextual_name] except KeyError: logger.warning(f"Skip connected unknown {contextual_name!r}") return None aliased = (alias is not unknown.ref) if aliased: try: path = context.get_path_to_child(alias.context) except ValueError: # `alias.context` is not in `system` tree logger.warning( f"Unknown {unknown.contextual_name()!r} is aliased by {alias.contextual_name!r}" f", defined outside the context of {context.name!r}" f"; it is likely to be overwritten after the computation." ) else: alias_contextual_name = f"{path}.{alias.name}" if path else alias.name unknown = unknown.transfer(context, alias_contextual_name) logger.info(f"Replace unknown {contextual_name!r} by {alias_contextual_name!r}") return unknown # Create output filtered = context.new_problem(name) # Add equations filtered.extend(problem, unknowns=False) # Add unknowns for key, unknown in problem.unknowns.items(): free_unknown = get_free_unknown(unknown) if free_unknown is None: continue aliased = (free_unknown is not unknown) if aliased: key = free_unknown.name filtered.unknowns[key] = free_unknown else: filtered.unknowns[key] = unknown.copy() return filtered
[docs]class DesignProblemHandler(SystemAnalyzer): """Class managing tied design and off-design problems, including unknown aliasing. """
[docs] def clear(self) -> None: """Reset inner problems""" self.design = self.new_problem('design') self.offdesign = self.new_problem('offdesign')
[docs] @classmethod def make(cls, design: MathematicalProblem, offdesign: MathematicalProblem) -> DesignProblemHandler: handler = cls(design.context) handler.problems = (design, offdesign) return handler
@property def problems(self) -> Tuple[MathematicalProblem, MathematicalProblem]: """Tuple[MathematicalProblem, MathematicalProblem]: design and off-design problems as a tuple""" return self.design, self.offdesign @problems.setter def problems(self, problems: Tuple[MathematicalProblem, MathematicalProblem]) -> None: """Setter for (design, offdesign) tuple""" wrong_type = lambda problem: not isinstance(problem, MathematicalProblem) if any(map(wrong_type, problems)): raise TypeError( f"expected `MathematicalProblem` instances; got {tuple(map(type, problems))}." ) self.check_context(*problems) self.design, self.offdesign = problems
[docs] def check_context(self, design: MathematicalProblem, offdesign: MathematicalProblem) -> None: ok = design.context is offdesign.context is self.system if not ok: raise ValueError( f"Design and off-design problems must be defined in the context of {self.system}." )
[docs] def new_problem(self, name: str) -> MathematicalProblem: """Create new `MathematicalProblem` instance""" return MathematicalProblem(name, self.system)
[docs] def copy_problems(self, prune=True) -> Tuple[MathematicalProblem, MathematicalProblem]: """Export design and off-design problems. Parameters: ----------- - prune, Optional[bool]: If `True` (default), resolve unknown aliasing first. If `False`, returned problems are copies of object attributes. Results: -------- - (design, offdesign): Copies of design and off-design problems, as a tuple of `MathematicalProblem` objects. """ if prune: design = dealias_problem(self.design, name='design') offdesign = dealias_problem(self.offdesign, name='offdesign') else: design = self.design.copy() offdesign = self.offdesign.copy() return design, offdesign
[docs] def copy(self, prune=True) -> DesignProblemHandler: """Returns a copy of the current object. Parameters: ----------- - prune, Optional[bool]: If `True` (default), resolve unknown aliasing first. If `False`, returned handler contains copies of object problems. """ design, offdesign = self.copy_problems(prune) return DesignProblemHandler.make(design, offdesign)
[docs] def prune(self) -> None: """Remove connected unknowns and resolve unknown aliasing in design and off-desing problems. """ self.design, self.offdesign = self.copy_problems(prune=True)
[docs] def merged_problem(self, name="merged", offdesign_prefix="offdesign", copy=True) -> MathematicalProblem: """Merge design and off-design problems into a single `MathematicalProblem` instance. Parameters ---------- - name [str, optional]: Merged problem name (default: 'merged'). - offdesign_prefix [str, optional]: If not empty or `None`, applies a prefix to dict keys in off-design unknowns and equations. - copy [bool, optional]: Perform copies if `True` (default). """ design, offdesign = self.copy_problems() if copy else self.problems def check(attr, kind): design_attr = getattr(design, attr) offdesign_attr = getattr(offdesign, attr) common = set(design_attr).intersection(offdesign_attr) if common: if len(common) > 1: names = ", ".join(repr(v) for v in sorted(common)) names = f"({names}) are" kind += "s" else: names = f"{common.pop()!r} is" raise ValueError( f"{names} defined as design and off-design {kind}" ) check('unknowns', kind='unknown') check('residues', kind='equation') merged = self.new_problem(name) no_rename = lambda name: name local_name = (lambda name: f"{offdesign_prefix}[{name}]") if offdesign_prefix else no_rename def add_problem(problem: MathematicalProblem, rename_unknowns=True) -> None: nonlocal merged rename = local_name if rename_unknowns else no_rename # Add unknowns for name in list(problem.unknowns.keys()): merged.unknowns[rename(name)] = problem.unknowns.get(name) # Add residues for name in list(problem.residues.keys()): merged.residues[local_name(name)] = problem.residues.get(name) for name, residue in problem.get_target_residues().items(): merged.residues[local_name(name)] = residue add_problem(design, rename_unknowns=False) add_problem(offdesign, rename_unknowns=True) return merged
[docs] def extend(self, other: DesignProblemHandler, prune=True, copy=True, overwrite=False) -> DesignProblemHandler: """Extend both design and off-design problems from `other` handler. Parameters ---------- - prune [bool, optional]: If `True` (default), added problems are pruned before being added. - copy [bool, optional]: Determines whether problem copies should be made before extension. - overwrite [bool, optional]: Overwrite option, forwarded to `MathematicalProblem.extend`. """ if prune or copy: design, offdesign = other.copy_problems(prune) else: design, offdesign = other.problems # Extend inner problems; copy is unnecessary at this point, # as `design` and `offdesign` are consistent with argument `copy`. options = dict(copy=False, overwrite=overwrite) self.design.extend(design, **options) self.offdesign.extend(offdesign, **options) return self
[docs]@enum.unique class ConstraintType(enum.Enum): """Enum covering constraint types""" GE = { 'operator': ">=", 'sort': (lambda lhs, rhs: (lhs, rhs)), } LE = { 'operator': "<=", 'sort': (lambda lhs, rhs: (rhs, lhs)), } EQ = { 'operator': "==", 'sort': (lambda lhs, rhs: (lhs, rhs)), }
[docs] def expression(self, lhs: str, rhs: str) -> str: return "{} - ({})".format(*self.sort(lhs, rhs))
[docs] def sort(self, lhs: str, rhs: str) -> Tuple[str, str]: return self.value['sort'](lhs, rhs)
def __str__(self) -> str: return self.description @property def operator(self) -> str: return self.value['operator'] @property def description(self) -> str: return f"Constraint of the kind `lhs {self.operator} rhs`" @property def is_inequality(self) -> bool: return self.operator != "=="
[docs]class Constraint(NamedTuple): """Named tuple representing a non-negative constraint of the kind `lhs <op> rhs`, where `<op>` is either `==` (equality) or `>=` (inequality constraint), depending on Boolean attribute `is_inequality`. Attributes: ----------- - lhs [str]: left-hand side. - rhs [str]: right-hand side. - is_inequality [bool]. Properties: ----------- - expression [str]: non-negative constraint `lhs - rhs`. """ lhs: str rhs: str is_inequality: bool = True @property def expression(self) -> str: return f"{self.lhs} - ({self.rhs})" def __str__(self) -> str: op = ">=" if self.is_inequality else "==" return f"{self.expression} {op} 0"
[docs]class ConstraintParser:
[docs] @classmethod def parse(cls, expression: Union[str, List[str]]) -> Set[Constraint]: """Parse a string expression or a list thereof as a set of non-negative constraints to be used in solvers. Parameters: ----------- - expression [str or List[str]]: Human-readable equality or inequality constraints, such as 'x >= y', '0 < alpha < 1', or a list thereof. Returns: -------- - constraints [Set[Constraint]]: Set of `Constraint` named tuple objects. """ check_arg(expression, 'expression', (str, Collection)) ctypes = cls.types() not_in_sides = list(ctypes) + ["="] def side_ok(side: str) -> bool: return not any(nogo in side for nogo in not_in_sides) and side.strip() def parse_single(expression: str) -> Set[Constraint]: check_arg(expression, 'expression', str) constraints = set() for operator in "<>": expression = expression.replace(f"{operator}=", operator) expressions, operators = multi_split(expression, ctypes.keys()) for lhs, rhs, operator in zip(expressions, expressions[1:], operators): ok = side_ok(lhs) and side_ok(rhs) ctype = ctypes[operator] constraint = Constraint( *ctype.sort(lhs, rhs), ctype.is_inequality, ) if not ok: raise ValueError(f"Invalid constraint {constraint.expression}") constraints.add(constraint) return constraints expressions = [expression] if isinstance(expression, str) else expression constraints = set() for expression in expressions: constraints |= parse_single(expression) return constraints
[docs] @classmethod def types(cls) -> Dict[str, ConstraintType]: return { ">" : ConstraintType.GE, "<" : ConstraintType.LE, "<=": ConstraintType.LE, "==": ConstraintType.EQ, ">=": ConstraintType.GE, }