Source code for cosapp.core.numerics.boundary

from __future__ import annotations

import abc
import copy
import numpy
import warnings
from numpy.typing import ArrayLike

from numbers import Number
from collections.abc import MutableSequence
from typing import (
    Any, Union, Optional,
    Collection, NamedTuple, T,
    MutableSequence, TYPE_CHECKING,
)

from cosapp.core.eval_str import EvalString
from cosapp.core.variableref import VariableReference
from cosapp.ports.port import BasePort
from cosapp.ports.exceptions import ScopeError
from cosapp.utils.helpers import check_arg
from cosapp.utils.naming import natural_varname
from cosapp.utils.parsing import find_selector
from cosapp.utils.state_io import object__getstate__

if TYPE_CHECKING:
    from cosapp.systems import System


[docs] class AttrRef: """Attribute Reference for scalar object. In addition to System and its derivatives, manage also complex object which could be included in the evaluation context. Parameters ---------- obj: cosapp.systems.System System in which the boundary name is defined. key: str Name of the boundary """ def __init__(self, context: System, key: str) -> None: self._obj: Union[BasePort, Any] name = key try: base, key = key.rsplit(".", maxsplit=1) except ValueError: obj = context base = "" else: obj = eval(f"context.{base}", {"context": context}) from cosapp.systems import System if isinstance(obj, System): self._obj = obj.name2variable[key].mapping else: self._obj = obj if isinstance(self._obj, dict): raise ValueError("Only variables can be used in mathematical algorithms") self._key: str = key self._base: str = base self._name: str = natural_varname(name)
[docs] def update_shape(self) -> None: pass
@property def value(self) -> Number: return getattr(self._obj, self._key) @value.setter def value(self, val: Number) -> None: setattr(self._obj, self._key, val) def __copy__(self) -> AttrRef: return AttrRef(self._obj, self._key) def __eq__(self, other: AttrRef) -> bool: try: return self._obj is other._obj and self._key == other._key except: return False def __getstate__(self) -> dict[str, Any]: """Creates a state of the object. The state type does NOT match type specified in https://docs.python.org/3/library/pickle.html#object.__getstate__ to allow custom serialization. Returns ------- dict[str, Any]: state """ return object__getstate__(self) def __json__(self) -> dict[str, Any]: """Creates a JSONable dictionary representation of the object. Returns ------- dict[str, Any] The dictionary """ state = self.__getstate__().copy() # state.pop[""] return state
[docs] class MaskedAttrRef(AttrRef): """Masked Attribute Reference for MutableSequence-like object. Include a mask applying to an evaluation context vector. Parameters ---------- obj: cosapp.systems.System System in which the boundary name is defined. key: str Name of the boundary mask: numpy.ndarray Mask of the values in the vector boundary. """ def __init__(self, obj: System, key: str, mask: numpy.ndarray) -> None: super().__init__(obj, key) self.set_attributes(mask)
[docs] def set_attributes(self, mask: numpy.ndarray) -> None: self._mask = numpy.atleast_1d(mask) self._mask_nonzeros = self._mask.nonzero() array = getattr(self._obj, self._key) self._ref_shape = (len(array),) self._ref_size = len(array)
@property def value(self) -> MutableSequence: obj = getattr(self._obj, self._key) return list(map(obj.__getitem__, self._mask_nonzeros[0])) @value.setter def value(self, val: MutableSequence): obj = getattr(self._obj, self._key) for i, new in zip(self._mask_nonzeros[0], val): obj.__setitem__(i, new)
[docs] def set_mask(self, mask: numpy.ndarray) -> None: self._mask[:] = mask self._mask_nonzeros = (mask.nonzero()[0],)
def __copy__(self) -> MaskedAttrRef: return MaskedAttrRef(self._obj, self._key, self._mask.copy()) def __eq__(self, other: MaskedAttrRef) -> bool: try: return super().__eq__(other) and numpy.array_equal(self._mask, other._mask) except: return False
[docs] @classmethod def make_from_attr_ref( cls: MaskedAttrRef, attr_ref: AttrRef, obj: Union[BasePort, Any], name: str, mask: numpy.ndarray ) -> MaskedAttrRef: mask_ref = attr_ref.__new__(cls, obj, name, mask) for key, value in vars(attr_ref).items(): setattr(mask_ref, key, value) mask_ref.set_attributes(mask) return mask_ref
[docs] class NumpyArrayAttrRef(AttrRef): """Attribute Reference for unmasked numpy arrays. Parameters ---------- obj: cosapp.systems.System System in which the boundary name is defined. key: str Name of the boundary """ def __init__(self, obj: System, key: str) -> None: super().__init__(obj, key) array: numpy.ndarray = self.value self._ref_shape = array.shape self._ref_size = array.size self._mask_nonzeros = numpy.ones_like(array).nonzero() @property def value(self): array = getattr(self._obj, self._key) return array.view() @value.setter def value(self, val: ArrayLike): array = getattr(self._obj, self._key) array.ravel()[:] = numpy.ravel(val)
[docs] def update_shape(self) -> None: array: numpy.ndarray = getattr(self._obj, self._key) self._ref_shape = array.shape self._ref_size = array.size
def __copy__(self) -> NumpyArrayAttrRef: return NumpyArrayAttrRef(self._obj, self._key)
[docs] class NumpyMaskedAttrRef(AttrRef): """ Masked Attribute Reference for numpy arrays. Include a mask applying to an evaluation context vector. Parameters ---------- obj: cosapp.systems.System System in which the boundary name is defined. key: str Name of the boundary mask: numpy.ndarray Mask of the values in the vector boundary. """ def __init__(self, obj: System, key: str, mask: numpy.ndarray) -> None: super().__init__(obj, key) self.set_attributes(mask)
[docs] def set_attributes(self, mask: numpy.ndarray) -> None: self._mask = numpy.asarray(mask) self._mask_nonzeros = self._mask.nonzero() array: numpy.ndarray = getattr(self._obj, self._key) self._ref_shape = array.shape self._ref_size = array.size
@property def value(self) -> numpy.ndarray: return getattr(self._obj, self._key)[self._mask] @value.setter def value(self, val: Union[numpy.ndarray, MutableSequence]): getattr(self._obj, self._key)[self._mask] = numpy.asarray(val)
[docs] def update_shape(self) -> None: array: numpy.ndarray = getattr(self._obj, self._key) if array.shape != self._ref_shape: raise ValueError( f"The shape of masked unknown {self._name!r} has changed" f" from {self._ref_shape} to {array.shape};" f" the original mask cannot be applied anymore." )
[docs] def set_mask(self, mask: numpy.ndarray) -> None: self._mask[:] = mask
def __copy__(self) -> NumpyMaskedAttrRef: return NumpyMaskedAttrRef(self._obj, self._key, self._mask.copy()) def __eq__(self, other: MaskedAttrRef) -> bool: try: return super().__eq__(other) and numpy.array_equal(self._mask, other._mask) except: return False
[docs] @classmethod def make_from_attr_ref( cls: NumpyMaskedAttrRef, attr_ref: AttrRef, obj: Union[BasePort, Any], name: str, mask: numpy.ndarray ) -> NumpyMaskedAttrRef: mask_ref = attr_ref.__new__(cls, obj, name, mask) for key, value in vars(attr_ref).items(): setattr(mask_ref, key, value) mask_ref.set_attributes(mask) return mask_ref
[docs] class MaskedVarInfo(NamedTuple): basename: str selector: str = "" mask: Optional[numpy.ndarray] = None @property def fullname(self) -> str: return f"{self.basename}{self.selector}" def __eq__(self, other: MaskedVarInfo) -> bool: try: return self[:2] == other[:2] and numpy.array_equal(self.mask, other.mask) except: return False
[docs] class Boundary: """Numerical solver boundary. Parameters ---------- context: cosapp.systems.System System in which the boundary is defined. name: str Name of the boundary mask: numpy.ndarray or None Mask of the values in the vector boundary. default: Number, numpy.ndarray or None Default value to set the boundary with. inputs_only: bool, optional If `True` (default), output variables are regarded as invalid. """ def __init__(self, context: System, name: str, mask: Optional[numpy.ndarray] = None, default: Union[Number, numpy.ndarray, None] = None, inputs_only=True, **kwargs, ) -> None: super().__init__(**kwargs) # for collaborative inheritance self._context = context self._default_value: Union[Number, numpy.ndarray, None] = None basename, selector = Boundary.parse_expression(name) value, mask = Boundary.create_mask(context, basename, selector, mask) self._ref, self._boundary_impl, self._is_scalar = Boundary.create_attr_ref(context, basename, value, mask) self.find_port(inputs_only) self._name_info = MaskedVarInfo(basename, selector, mask) # Set default value if any if default is not None: self.update_default_value(default) @property def is_scalar(self) -> bool: """Returns whether this boundary is scalar or not.""" return self._is_scalar
[docs] def copy(self) -> Boundary: boundary = copy.copy(self) boundary._default_value = copy.copy(self._default_value) boundary._ref = self._ref.__copy__() return boundary
def __getstate__(self) -> Union[dict[str, Any], tuple[Optional[dict[str, Any]], dict[str, Any]]]: """Creates a state of the object. The state may take various forms depending on the object, see https://docs.python.org/3/library/pickle.html#object.__getstate__ for further details. Returns ------- Union[dict[str, Any], tuple[Optional[dict[str, Any]], dict[str, Any]]]: state """ return object__getstate__(self) def __json__(self) -> dict[str, Any]: """Creates a JSONable dictionary representation of the object. Returns ------- dict[str, Any] The dictionary """ qualname = f"{self.__module__}.{self.__class__.__qualname__}" state = self.__getstate__().copy() state.pop("_context") state.pop("_boundary_impl") return { "__class__": qualname, **state, }
[docs] @staticmethod def parse_expression(expression: str) -> MaskedVarInfo: """Decompose a variable specification into its base name and selector. Parameters ---------- expression : str Variable specification (variable name + optional array mask, if required) Returns ------- - str: variable name - str: array selector """ check_arg(expression, 'expression', str) expression = natural_varname(expression) try: basename, selector = find_selector(expression) except ValueError as error: raise SyntaxError(error) return basename, selector
[docs] @staticmethod def create_mask( system: System, varname: str, selector: str, mask: Optional[numpy.ndarray] = None, ) -> tuple[Optional[Union[Number, Collection]], Optional[numpy.ndarray]]: """Evaluate the basename expression within its context and generate a mask if a selector is specified in the fullname expression. Parameters ---------- - system: System System to which variable belongs. - basename: str Variable name without any optional array mask. - selector: str Expression corresponding to an array mask. - mask: Optional[numpy.ndarray] Imposed mask to apply on the variable; default is None (i.e. no mask). Returns ------- Optional[Union[Number, Collection]] Value of the context variable. Optional[numpy.ndarray] Imposed or generated mask to apply on the variable. """ # evaluate expression without mask if any try: value = eval(f"s.{varname}", {}, {"s": system}) except AttributeError as error: error.args = (f"{varname!r} is not known in {system.name!r}",) raise except Exception as error: error.args = (f"Can't evaluate {varname!r} in {system.name!r}",) raise # Check if the value is a scalar numpy array if isinstance(value, numpy.ndarray) and value.ndim == 0: warnings.warn( f"Variable {varname!r} is a scalar numpy array" ", which is not recommended. Use a scalar value instead." " If used in a mathematical algorithm, the value will be treated as a scalar.", category=UserWarning, ) value = value.item() # get or create mask if mask is not None: if isinstance(value, Number): raise TypeError("A mask cannot be applied on a scalar.") check_arg(mask, "mask", (list, tuple, numpy.ndarray)) return value, numpy.asarray(mask, dtype=bool) if selector: # Check value is an array if isinstance(value, numpy.ndarray) or Boundary.is_mutable_sequence(value): if isinstance(value, numpy.ndarray) and not (numpy.issubdtype(value.dtype, numpy.number) or value.size > 1): raise ValueError( f"Only non-empty numpy arrays can be partially selected; got {value}." ) elif Boundary.is_mutable_sequence(value) and not len(value) > 1: raise ValueError( f"Only non-empty MutableSequence-like arrays can be partially selected; got {value}." ) else: raise TypeError( f"Only non-empty arrays can be partially selected; got {type(value)}." ) # Set mask from selector string mask = numpy.zeros_like(value, dtype=bool) try: exec(f"mask{selector} = True", {}, {"mask": mask}) except (SyntaxError, IndexError) as error: varname = f"{system.name}.{varname}" error.args = ( f"Invalid selector {selector!r} for variable {varname!r}: {error!s}", ) raise elif isinstance(value, numpy.ndarray) or Boundary.is_mutable_sequence(value): mask = numpy.ones_like(value, dtype=bool) if isinstance(mask, numpy.ndarray) and mask.all(): mask = None return value, mask
[docs] @staticmethod def is_mutable_sequence(value: Any) -> bool: """Determine if an object is MutableSequence-like.""" mandatory_attrs = ("__getitem__", "__setitem__", "__len__") return all(hasattr(value, attr) for attr in mandatory_attrs)
[docs] @classmethod def create_attr_ref( cls, context: System, basename: str, value: Optional[Union[Number, Collection]], mask: Optional[numpy.ndarray] = None ) -> tuple[Union[AttrRef, NumpyMaskedAttrRef, MaskedAttrRef], AbstractBoundaryImpl, bool]: """ Returns an `AttrRef`, `MaskedAttrRef`, or ǸumpyMaskedAttrRef` object from a name and its evaluation context. The `NumpyMaskedAttrRef` derives from `AttrRef` if the context variable refers to a numpy.array and `MaskedAttrRef` for a variable referring to an object similar to a MutableSequence. In the two latter cases, a mask may be applied on value. Parameters ---------- - context: System System in which the boundary is defined. - basename: str Name of the boundary without its mask if any. - value: Optional[Union[Number, Collection]] Value of the context variable. - mask: Optional[numpy.ndarray] Mask to apply on the variable; default is None (i.e. no mask). Returns ------- Union[AttrRef, NumpyMaskedAttrRef, MaskedAttrRef] (Masked) Attribute Reference object. AbstractBoundaryImpl Object containing methods specific according to the variable type. bool Specify if the boundary value is a scalar. """ if mask is None: is_scalar = True ref_cls = AttrRef if isinstance(value, Number): impl_cls = ScalarBoundaryImpl elif isinstance(value, numpy.ndarray): if value.ndim > 0: ref_cls = NumpyArrayAttrRef impl_cls = NumpyBoundaryImpl is_scalar = False else: impl_cls = ScalarBoundaryImpl elif cls.is_mutable_sequence(value): impl_cls = MutableSeqBoundaryImpl is_scalar = False elif value is None: impl_cls = UndefinedBoundaryImpl else: impl_cls = GenericBoundaryImpl return ref_cls(context, basename), impl_cls(), is_scalar elif isinstance(value, numpy.ndarray): return NumpyMaskedAttrRef(context, basename, mask), NumpyBoundaryImpl(), False elif cls.is_mutable_sequence(value): return MaskedAttrRef(context, basename, mask), MutableSeqBoundaryImpl(), False raise TypeError("type of evaluated expression is incompatible as Boundary object handled type.")
[docs] def find_port(self, inputs_only=False) -> None: """ Find port associated to its `AttrRef`. In the case of a complex object, the port containing it is retrieved and checks. Parameters ---------- - inputs_only [bool, optional]: If `True`, output variables are regarded as invalid. Default is `False`. """ portname = self._ref._base portkey = self._ref._key obj = self._ref._obj if not isinstance(obj, BasePort): for i in range(len(portname.split("."))): if portname in self._context.name2variable: obj = self._context.name2variable[portname].mapping break else: portname = portname.rsplit('.', maxsplit=1)[0] else: portname = ".".join(filter(None, portname.split(".") + portkey.split("."))) if not isinstance(obj, BasePort): raise TypeError(f"Invalid port; got {type(obj)}") self._port = port = obj self._portname = portname if inputs_only and not port.is_input: raise ValueError( f"Only variables in input ports can be used as boundaries; got {portname!r} in {port.contextual_name!r}." ) if port.out_of_scope(portkey): if self._context is not port.owner: # Only owner can set its variables raise ScopeError(f"Trying to set variable {self._ref._name!r} out of your scope through a boundary.")
def __str__(self) -> str: return str(self.value) def __repr__(self) -> str: return f"{self.name} := {self!s}" def __eq__(self, other: Boundary) -> bool: try: return self._context is other._context and self._ref == other._ref except: return False @property def context(self) -> System: """cosapp.systems.System : `System` in which the boundary is defined.""" return self._context @property def port(self) -> BasePort: """BasePort: port containing the boundary.""" return self._port @property def name(self) -> str: """str : Contextual name of the boundary.""" return self._name_info.fullname @property def basename(self) -> str: """str : Base name of the boundary.""" return self._name_info.basename
[docs] def contextual_name(self, context: Optional[System] = None) -> str: """str : Contextual name of the boundary, relative to `context`. If `context` is `None` (default), uses current variable context. """ if context is None: path = self.context.name else: path = context.get_path_to_child(self.context) or self.context.name return f"{path}.{self.name}"
@property def ref(self) -> Union[AttrRef, MaskedAttrRef, NumpyMaskedAttrRef]: """AttrRef : attribute reference accessed by the boundary.""" return self._ref @property def variable_reference(self) -> VariableReference: """VariableReference : variable reference accessed by the boundary.""" return self.context.name2variable[self.portname] @property def portname(self) -> str: """str : name of the port accessed by the boundary.""" return self._portname @property def variable(self) -> str: """str : name of the variable accessed by the boundary.""" return self._ref._key
[docs] def touch(self) -> None: """Set owner port as 'dirty'.""" self.port.touch() self.port.owner.touch()
@property def mask(self) -> Optional[numpy.ndarray]: """numpy.ndarray or None : Mask of the values in the vector boundary.""" return getattr(self._ref, "_mask", None) @mask.setter def mask(self, mask: Optional[numpy.ndarray]) -> None: if not self._is_scalar and mask is not None: mask = numpy.asarray(mask) if mask.shape != self._ref._ref_shape: raise ValueError( f"Set mask does not fit the context array shape" f"; got {mask.shape!r} to set in {self._ref._ref_shape}." ) if self._default_value is not None: default_size = numpy.asarray(self._default_value).size if numpy.count_nonzero(mask) != default_size: raise ValueError( f"Set mask does not fit the current boundary value" f"; got {numpy.count_nonzero(mask)!r} mismatching {default_size}." ) self._ref.set_mask(mask)
[docs] def mask_indices(self): """Iterator over index tuples of the mask (if any).""" nonzeros: tuple[numpy.ndarray, ...] = getattr(self._ref, "_mask_nonzeros", tuple()) return zip(*nonzeros)
@property def value(self) -> Union[Number, numpy.ndarray]: return self._ref.value
[docs] def update_value(self, new: Union[Number, MutableSequence, numpy.ndarray], checks=True) -> None: if new is not None: if self._boundary_impl.update_value(self._ref.value, new, checks): self._ref.value = new self.touch()
[docs] def set_to_default(self) -> None: if self._default_value is not None: self.update_value(self._default_value, checks=False)
@property def default_value(self) -> Union[Number, numpy.ndarray]: return self._default_value
[docs] def update_default_value( self, new: Union[Number, MutableSequence, numpy.ndarray], checks: bool = True, ) -> None: if new is not None: if checks: self._boundary_impl.check_new_value(self._default_value, new) self._default_value = new
@property def size(self) -> int: return self._boundary_impl.size(self._ref.value)
[docs] class AbstractBoundaryImpl(abc.ABC): """Abstract Boundary class to manage methods specific to Boundary type."""
[docs] @abc.abstractmethod def check_new_value(value: T, new: T) -> None: ...
[docs] @abc.abstractmethod def update_value(ref_value: T, new: T, checks=True) -> bool: ...
[docs] @abc.abstractmethod def size(value: T) -> int: ...
[docs] class UndefinedBoundaryImpl(AbstractBoundaryImpl): """Class handling undefined Boundary."""
[docs] @staticmethod def check_new_value(value: T, new: T) -> None: raise NotImplementedError
[docs] @staticmethod def update_value(ref_value: T, new: T, checks=True) -> bool: raise NotImplementedError
[docs] @staticmethod def size(value: T) -> int: raise NotImplementedError
[docs] class ScalarBoundaryImpl(AbstractBoundaryImpl): """Specific methods for Number Boundary."""
[docs] @staticmethod def check_new_value(ref: Number, new: Number) -> None: """Check that reference and new values are compatible.""" if not isinstance(new, Number): raise TypeError( f"Value to set is incompatible with the boundary value type" f"; got {type(new)} mismatching {type(ref)}." )
[docs] @staticmethod def update_value(ref_value: Number, new: Number, checks=True) -> bool: if checks: ScalarBoundaryImpl.check_new_value(ref_value, new) return ref_value != new
[docs] @staticmethod def size(value: Number) -> int: return 1
[docs] class MutableSeqBoundaryImpl(AbstractBoundaryImpl): """Specific methods for MutableSequence-like Boundary."""
[docs] @staticmethod def check_new_value(value: MutableSequence, new: MutableSequence) -> None: """Check that reference and new values are compatible.""" if not Boundary.is_mutable_sequence(new): raise TypeError( f"Value to set is incompatible with the boundary value type" f"; got {type(new)} mismatching {type(value)}." ) if value is not None and len(new) != len(value): raise ValueError( f"Value to set does not fit the current boundary value" f"; got {len(new)} mismatching {len(value)}." )
[docs] @staticmethod def update_value(ref_value: MutableSequence, new: MutableSequence, checks=True) -> bool: if checks: MutableSeqBoundaryImpl.check_new_value(ref_value, new) return not numpy.array_equal(ref_value, new)
[docs] @staticmethod def size(value: MutableSequence) -> int: return len(value)
[docs] class NumpyBoundaryImpl(AbstractBoundaryImpl): """Specific methods for numpy.ndarray Boundary."""
[docs] @staticmethod def check_new_value(ref: numpy.ndarray, new: numpy.ndarray) -> None: """Check that reference and new value shapes are compatible.""" if ref is not None and not numpy.isscalar(new): new_shape = numpy.shape(new) if ref.shape != new_shape: raise ValueError( f"Value to set does not fit the current boundary shape" f"; got {new_shape!r} mismatching {ref.shape}." )
[docs] @staticmethod def update_value(ref_value: numpy.ndarray, new: numpy.ndarray, checks=True) -> bool: if checks: NumpyBoundaryImpl.check_new_value(ref_value, new) return not numpy.array_equal(ref_value, new)
[docs] @staticmethod def size(value: numpy.ndarray) -> int: return value.size
[docs] class GenericBoundaryImpl(AbstractBoundaryImpl): """Class handling undefined Boundary."""
[docs] @staticmethod def check_new_value(ref: T, new: T) -> None: """Check that reference and new values are compatible.""" if None not in (ref, new): if not isinstance(new, type(ref)): raise TypeError( f"Value to set is incompatible with the boundary value type" f"; got {type(new)} mismatching {type(ref)}." )
[docs] @staticmethod def update_value(ref_value: T, new: T, checks=True) -> bool: if checks: GenericBoundaryImpl.check_new_value(ref_value, new) return ref_value != new
[docs] @staticmethod def size(value: T) -> int: raise NotImplementedError
[docs] class Unknown(Boundary): """Numerical solver unknown. Parameters ---------- context : cosapp.systems.System System in which the unknown is defined. name : str Name of the unknown lower_bound : float Minimum value authorized; default -numpy.inf upper_bound : float Maximum value authorized; default numpy.inf max_abs_step : float Max absolute step authorized in one iteration; default numpy.inf max_rel_step : float Max relative step authorized in one iteration; default numpy.inf mask : numpy.ndarray or None Mask of unknown values in the vector variable. Attributes ---------- lower_bound : float Minimum value authorized; default -numpy.inf upper_bound : float Maximum value authorized; default numpy.inf max_abs_step : float Largest absolute step authorized in one iteration; default numpy.inf max_rel_step : float Largest relative step authorized in one iteration; default numpy.inf Notes ----- The dimensionality of the variable should be taken into account in the bounding process. """ def __init__(self, context: System, name: str, # absolute_step: Number = 1.5e-8, # TODO ? # relative_step: Number = 1.5e-8, # TODO ? max_abs_step: Number = numpy.inf, max_rel_step: Number = numpy.inf, lower_bound: Number = -numpy.inf, upper_bound: Number = numpy.inf, # reference: Union[Number, numpy.ndarray] = 1., # TODO normalize unknown mask: Optional[numpy.ndarray] = None, ): super().__init__(context, name, mask, inputs_only=True) self.check_numerical_type() check_arg(max_abs_step, 'max_abs_step', Number, lambda x: x > 0) check_arg(max_rel_step, 'max_rel_step', Number, lambda x: x > 0) check_arg(lower_bound, 'lower_bound', Number) check_arg(upper_bound, 'upper_bound', Number) # TODO take into account the variable dimension in the constructor ? self.lower_bound = lower_bound # type: Number self.upper_bound = upper_bound # type: Number self.max_abs_step = max_abs_step # type: Number self.max_rel_step = max_rel_step # type: Number def __eq__(self, other: Unknown) -> bool: try: return super().__eq__(other) and all( getattr(self, name) == getattr(other, name) for name in ("max_abs_step", "max_rel_step", "lower_bound", "upper_bound") ) except: return False def __str__(self) -> str: try: return str(self.value) except KeyError: # boundary does not exist in the current context return str(self.default_value)
[docs] def check_numerical_type(self) -> None: """Check if the unknown is suitable for a solver. Raises ------ `TypeError` If the unknown is neither a numerical variable nor a mutable sequence. Warnings -------- `UserWarning` If the unknown is declared as an array of integers. """ value = self._ref.value if isinstance(value, numpy.ndarray): if issubclass(value.dtype.type, numpy.integer): warnings.warn( f"Unknown {self.name!r} appears to be an array of integers," f" but solvers only manipulate floating point values." ) elif not isinstance(value, (Number, type(None))): if not Boundary.is_mutable_sequence(value): raise TypeError( f"Only numerical variables can be used in mathematical algorithms" f"; got {self.portname!r} in {self.context.name!r}" )
[docs] def copy(self) -> Unknown: """Copy the unknown object. Returns ------- Unknown Duplicated unknown """ return self.transfer(self.context, self.name)
[docs] def transfer(self, context: System, name: str) -> Unknown: """Transfer a copy of the unknown in a new context. Returns ------- Unknown Duplicated unknown, in new context """ new = Unknown( context, name, max_abs_step=self.max_abs_step, max_rel_step=self.max_rel_step, lower_bound=self.lower_bound, upper_bound=self.upper_bound, mask=copy.copy(self.mask), ) return new
[docs] def to_dict(self) -> dict[str, Any]: """Returns a JSONable representation of the unknown. Returns ------- dict[str, Any] JSONable representation """ mask = self.mask return { "context": self.context.contextual_name, "name": self.name, "varname": self.variable, "max_abs_step": self.max_abs_step, "max_rel_step": self.max_rel_step, "lower_bound": self.lower_bound, "upper_bound": self.upper_bound, "mask": mask.tolist() if isinstance(mask, numpy.ndarray) else None }
[docs] class AbstractTimeUnknown(abc.ABC): def __init__(self, **kwargs): super().__init__(**kwargs) # for collaborative inheritance @property @abc.abstractmethod def der(self) -> EvalString: """Expression of the time derivative, given as an EvalString""" pass @property @abc.abstractmethod def max_time_step_expr(self) -> EvalString: """Expression of the maximum admissible time step, given as an EvalString.""" pass @property @abc.abstractmethod def max_abs_step_expr(self) -> EvalString: """Expression of the maximum absolute step in one iteration, given as an EvalString.""" pass
[docs] @abc.abstractmethod def reset(self) -> None: """Reset transient unknown to a reference value""" pass
[docs] @abc.abstractmethod def touch(self) -> None: """Set owner port as 'dirty'.""" pass
@property def d_dt(self) -> Any: """Value of time derivative""" return self.der.eval() @property def max_abs_step(self) -> float: """float: Maximum absolute step in one iteration""" return self.max_abs_step_expr.eval() @property def max_time_step(self) -> float: """float: Maximum admissible time step in one iteration""" dt_max = self.max_time_step_expr.eval() dx_max = self.max_abs_step if numpy.isfinite(dx_max): step_based_dt = self.extrapolated_time_step(dx_max) dt_max = min(dt_max, step_based_dt) return dt_max @property def constrained(self) -> bool: """bool: is unknown constrained by a limiting time step?""" constrained = lambda expr: numpy.isfinite(expr.eval()) if expr.constant else True return constrained(self.max_time_step_expr) or constrained(self.max_abs_step_expr)
[docs] def extrapolated_time_step(self, step: float) -> float: """ Time step necessary to attain a variation of `step` at a rate given by current value of the time derivative. """ rate = numpy.abs(self.d_dt) step = numpy.where(rate > 0, abs(step), numpy.inf) rate = numpy.where(rate > 0, rate, 1) return numpy.min(step / rate)
[docs] class TimeUnknown(Boundary, AbstractTimeUnknown): """Time-dependent solver unknown. Parameters ---------- context : cosapp.systems.System System in which the unknown is defined. name : str Name of the unknown Attributes ---------- max_time_step : float Max time step authorized in one iteration; default numpy.inf """ def __init__(self, context: System, name: str, der: Any, max_time_step: Union[Number, str] = numpy.inf, max_abs_step: Union[Number, str] = numpy.inf, pulled_from: Optional[VariableReference] = None, ): super().__init__(context, name) self._pulled_from = pulled_from self.__type = None self.__shape = None self.__dt_max = numpy.inf self.__dx_max = numpy.inf self.d_dt = der self.max_time_step = max_time_step self.max_abs_step = max_abs_step
[docs] def update_shape(self) -> None: self._ref.update_shape()
def __str__(self) -> str: try: return str(self.value) except KeyError: # does not exist in current context return str(self.default_value) @property def der(self) -> EvalString: """Expression of time derivative, given as an EvalString""" return self.__der @AbstractTimeUnknown.d_dt.setter def d_dt(self, expression: Any): eval_string, value, dtype = self.der_type(expression, self.context) if self.__type is None: self.__type = dtype if dtype is numpy.ndarray: self.__shape = value.shape elif dtype is not Number: raise TypeError( f"Derivative expressions may only be numbers or array-like collections; got '{value}'") elif self.__type is not dtype: raise TypeError( f"Expression '{expression!s}' is incompatible with declared type {self.__type.__qualname__}") if self.__shape and numpy.shape(value) != self.__shape: raise ValueError( f"Expression '{expression!s}' should be an array of shape {self.__shape}") self.__der = eval_string @property def max_time_step_expr(self) -> EvalString: """Maximum admissible time step, given as an EvalString.""" return self.__dt_max @property def max_abs_step_expr(self) -> EvalString: """Maximum admissible step, given as an EvalString.""" return self.__dx_max @AbstractTimeUnknown.max_time_step.setter def max_time_step(self, expression: Any): self.__dt_max = self.__positive_expr(expression, "max_time_step") @AbstractTimeUnknown.max_abs_step.setter def max_abs_step(self, expression: Any): self.__dx_max = self.__positive_expr(expression, "max_abs_step") def __positive_expr(self, expression: Any, name: str) -> EvalString: eval_string, value, dtype = self.der_type(expression, self.context) check_arg(value, name, Number) # checks that expression is scalar if value <= 0 and eval_string.constant: # Note: # If expression is context-dependent (non-constant), it may turn out to be positive at time driver execution. # Therefore, an exception should only be raised for constant, non-positive expressions. raise ValueError(f"{name} must be strictly positive") return eval_string
[docs] def copy(self) -> TimeUnknown: """Copy time-dependent unknown object. Returns ------- TimeUnknown Duplicated unknown """ return TimeUnknown(self.context, self.name, self.der, self.max_time_step_expr)
[docs] @staticmethod def der_type(expression: Any, context: System) -> tuple[EvalString, Any, type]: """Static method to evaluate the type and default value of an expression used as time derivative""" if isinstance(expression, EvalString): eval_string = expression else: eval_string = EvalString(expression, context) value = eval_string.eval() if isinstance(value, (list, tuple, numpy.ndarray)): value = numpy.array(value) dtype = numpy.ndarray elif TimeUnknown.is_number(value): dtype = Number else: dtype = type(value) return eval_string, value, dtype
[docs] @staticmethod def is_number(value) -> bool: """Is value suitable for a derivative?""" return isinstance(value, Number) and not isinstance(value, bool)
@property def pulled_from(self) -> Optional[VariableReference]: """VariableReference or None: Original time unknown before pulling; None otherwise.""" return self._pulled_from @Boundary.value.setter def value(self, new: Union[Number, numpy.ndarray]) -> None: self.update_value(new)
[docs] def to_dict(self) -> dict[str, Any]: """Returns a JSONable representation of the transient unknown. Returns ------- dict[str, Any] JSONable representation """ return { "context": self.context.contextual_name, "name": self.name, "der": str(self.__der), "max_time_step": str(self.max_time_step_expr), }
[docs] def reset(self) -> None: """Reset transient unknown to a reference value. Inactive for class TimeUnknown.""" pass
[docs] class TimeDerivative(Boundary): """Explicit time derivative. Parameters ---------- context : cosapp.systems.System System in which the unknown is defined. name : str Name of the variable source : str Variable such that name = d(source)/dt initial_value : Any Time derivative initial value """ def __init__(self, context: System, name: str, source: Any, initial_value: Any = None, ): super().__init__(context, name, inputs_only=True) self.__shape = None self.__previous = None eval_string, value, self.__type = self.source_type(source, self.context) if self.__type is Number: self._boundary_impl = ScalarBoundaryImpl() self._is_scalar = True elif self.__type is numpy.ndarray: self._boundary_impl = NumpyBoundaryImpl() self.__shape = value.shape self._is_scalar = False elif Boundary.is_mutable_sequence(value): self._boundary_impl = MutableSeqBoundaryImpl() self.__shape = (len(value), ) self._is_scalar = False else: raise TypeError("type of boundary value is not handle.") # Set source & initial value self.source = source self.initial_value = initial_value self.reset()
[docs] def copy(self) -> TimeDerivative: der = super().copy() der.__src = copy.copy(self.__src) der.__shape = copy.copy(self.__shape) der.__initial = copy.copy(self.__initial) der.__previous = copy.copy(self.__previous) der.__type = self.__type return der
def __str__(self) -> str: try: return str(self.value) except KeyError: # does not exist in current context return str(self.default_value)
[docs] def reset(self, value: Any = None) -> None: self.__previous = self.source if value is not None: self.initial_value = value # NB: `value` may be an expression value = self.initial_value if value is not None: self.__set_value(value)
@property def source_expr(self) -> EvalString: """Variable whose rate is evaluated, returned as an EvalString""" return self.__src @property def source(self) -> Union[Number, numpy.ndarray]: """Value of the variable whose rate is evaluated""" return self.__src.eval() @source.setter def source(self, expression: Any): self.__src = self.__parse(expression) if self.__previous is None: self.__previous = self.source @property def initial_value_expr(self) -> EvalString: """Initial value of time derivative, returned as an EvalString""" return self.__initial @property def initial_value(self) -> Union[Number, numpy.ndarray]: """Initial value of time derivative""" return self.__initial.eval() @initial_value.setter def initial_value(self, expression): if expression is None: self.__initial = EvalString(None, self.context) else: self.__initial = self.__parse(expression)
[docs] def update(self, dt: Number) -> Number: """Evaluate rate-of-change of source over time interval `dt`""" current = self.source rate = (current - self.__previous) / dt # backward finite-difference time derivative self.__set_value(rate) self.__previous = current return rate
def __set_value(self, value: Union[Number, numpy.ndarray]): """Private setter for `value`""" if self._ref.value is None and not self._is_scalar: self.update_value(value) mask = numpy.ones_like(value, dtype=bool) self._ref = MaskedAttrRef.make_from_attr_ref(self._ref, self._ref._obj, self._ref._name, mask) else: self.update_value(value) @Boundary.value.setter def value(self, new: Union[Number, numpy.ndarray]) -> None: raise RuntimeError("Time derivatives are computed, and cannot be explicitly set")
[docs] @staticmethod def source_type(expression: Any, context: System) -> tuple: """Static method to evaluate the type and default value of an expression used as rate source""" eval_string, value, dtype = TimeUnknown.der_type(expression, context) if dtype is numpy.ndarray: value.fill(0.0) else: value = 0.0 return eval_string, value, dtype
def __parse(self, expression: Any) -> EvalString: eval_string, value, dtype = TimeUnknown.der_type(expression, self.context) if self.__type is not dtype: raise TypeError( f"Expression '{expression!s}' is incompatible with declared type {self.__type.__qualname__}") if self.__shape and value.shape != self.__shape: raise ValueError(f"Expression '{expression!s}' should be an array of shape {self.__shape}") return eval_string
[docs] def to_dict(self) -> dict[str, Any]: """Returns a JSONable representation of the time derivative. Returns ------- dict[str, Any] JSONable representation """ return { "context": self.context.contextual_name, "name": self.name, "source": str(self.source_expr), "initial_value": str(self.initial_value_expr), }