Source code for cosapp.systems.system

"""
Basic classes handling model resolution, system connections and conversion between level of
modelings.
"""
from __future__ import annotations
import collections
import importlib
import json
import logging
import os
import warnings
from contextlib import contextmanager
from copy import deepcopy
from enum import Enum
from io import StringIO
from numbers import Number
from pathlib import Path
from typing import (
    Any, Callable, ClassVar, Dict, FrozenSet,
    Iterable, Iterator, List, Type,
    Optional, Tuple, Union, TypeVar,
    Collection,
)
from types import MappingProxyType

import jsonschema
import numpy
import pandas

from cosapp.patterns.visitor import Visitor
from cosapp.core.module import Module, CommonPorts
from cosapp.core.eval_str import EvalString
from cosapp.core.variableref import VariableReference
from cosapp.core.numerics.basics import MathematicalProblem, TimeProblem
from cosapp.core.numerics.boundary import TimeDerivative, TimeUnknown, Unknown
from cosapp.core.time import TimeObserver
from cosapp.ports.enum import PortType, Scope, Validity
from cosapp.ports.port import BasePort, ExtensiblePort, ModeVarPort, Port
from cosapp.ports.units import UnitError
from cosapp.ports.variable import RangeValue, Types, Variable
from cosapp.ports.connectors import BaseConnector, Connector, ConnectorError
from cosapp.utils.distributions import Distribution
from cosapp.utils.context import ContextLock
from cosapp.utils.helpers import check_arg, is_number, is_numerical
from cosapp.utils.json import JSONEncoder, decode_cosapp_dict
from cosapp.utils.logging import LogFormat, LogLevel, rollover_logfile
from cosapp.utils.naming import natural_varname
from cosapp.utils.pull_variables import pull_variables
from cosapp.utils.find_variables import get_attributes
from cosapp.utils.surrogate_models import FloatKrigingSurrogate
from cosapp.systems.systemConnector import SystemConnector
from cosapp.systems.systemSurrogate import SystemSurrogate
from cosapp.multimode.event import Event, EventState, ZeroCrossing

logger = logging.getLogger(__name__)

AnyPort = TypeVar("AnyPort", bound=Port)
AnySystem = TypeVar("AnySystem", bound="System")
AnyDriver = TypeVar("AnyDriver", bound="Driver")


[docs] class ExecutionOrdering(Enum): """Enumeration of `System` algorithm to define component execution order.""" MANUAL = 'manual'
[docs] class ConversionType(Enum): """Enumeration of `System` conversion type.""" manual = "manual" best_fidelity_to_cost = "best_fidelity_to_cost_ratio" high_fidelity = "highest_fidelity" low_fidelity = "lowest_fidelity" high_cost = "highest_cost" low_cost = "lowest_cost"
# Default value use in list_inputs method to be able to tell if the caller set a value to out_stream _DEFAULT_OUT_STREAM = object()
[docs] class System(Module, TimeObserver): # TODO check and complete documentation """ A class to describe generic properties and functions of a component that can be single or made of child `System`. Parameters ---------- name: str Name of the `System` Attributes ---------- name : str `System` name inputs : :obj:`collections.OrderedDict` of :obj:`BasePort` Dictionary of `BasePort` containing the values needed to compute the `System` outputs : Dict[BasePort] Dictionary of `BasePort` containing the values computed by the `System` residues : :obj:`collections.OrderedDict` of :obj:float Dictionary of residues generated by the `System` children : Dict[System] Child `System` of this `System` parent : System Parent `System` of this `System`; None if there is no parent. exec_order : MappingView[str] Execution order in which sub-systems are computed. properties : Dict[str, Any] Dictionary of immutable parameters of the system. design_methods : Dict[Equations] `System` pre-defined design methods name2variable : Dict[str, VariableReference] Variable name mapping to its value by reference _locked : bool if True, `add_input`, `add_output`, `add_inward` and `add_outward` are desactivated. This is the default behavior outside the `setup` function. _active : bool if False, the `System` will not execute its `compute` method _is_tree_clean : bool Reflects the status of the inputs and outputs. `clean` status means the group of ports was not updated since last computation of the `System` _compute_calls: bool Store if the `System` was computed at last call or not (due to inhibition of clean status) Examples -------- To create your own `System`, you should inherit from this class and define your own `setup` and `compute` methods. >>> import numpy as np >>> >>> class RealPort(Port): >>> >>> def setup(self): >>> self.add_variable('x',0.) >>> >>> class InvertedParabola(System): >>> >>> def setup(self): >>> self.add_input(RealPort, 'iterative') >>> self.add_input(RealPort, 'target_y') >>> self.add_output(RealPort, 'max_root') >>> self.add_inward({'a': 0.01, >>> 'b': 2., >>> 'c': 4.}) >>> self.add_outward('roots', None) >>> self.add_equation("a * iterative.x**2 + b * iterative.x + c == target_y.x", "y") >>> >>> def compute(self): >>> >>> discriminant = self.b**2 - 4. * self.a * self.c >>> if discriminant >= 0.: >>> self.roots = ((-self.b + np.sqrt(discriminant)) / (2. * self.a), >>> (-self.b - np.sqrt(discriminant)) / (2. * self.a)) >>> self.max_roots.x = max(self.roots) >>> else: >>> self.roots = None >>> self.max_roots.x = np.nan """ __slots__ = ( '__context_lock', '_is_tree_clean', '_locked', '_math', '_time_pb', 'design_methods', 'drivers', 'inputs', 'outputs', 'name2variable', '__readonly', '__events', '_meta', '__runner', '__input_mapping', '__loop_problem', '__child_connectors', '__pulling_connectors', '__free_problem', ) INWARDS = CommonPorts.INWARDS.value # type: ClassVar[str] OUTWARDS = CommonPorts.OUTWARDS.value # type: ClassVar[str] MODEVARS_IN = CommonPorts.MODEVARS_IN.value # type: ClassVar[str] MODEVARS_OUT = CommonPorts.MODEVARS_OUT.value # type: ClassVar[str] COMMON_PORTS = list(CommonPorts.__members__) # type: List[str] tags = frozenset() # type: ClassVar[FrozenSet[str]] _user_context = None # type: ClassVar[Optional[Scope]] # Python packages containing components definition _components_librairies = ['', ] # type: ClassVar[List[str]] # Is the master system known (i.e. the one from which a simulation has started) __master_set: ClassVar[bool] = False
[docs] @classmethod @contextmanager def set_master(cls, name: str, type_checking: bool=True) -> bool: """Set the master System Parameters ---------- name : str Name of the System calling this context manager type_checking : bool, optional Whether to activate the type checking in the ports or not (default: True) Returns ------- bool Is the callee the master System? """ is_master = False if not cls.__master_set: logger.debug(f"System <{name}> is the execution master.") rollover_logfile() # Create a new logfile BasePort.set_type_checking(type_checking) # May deactivate type checking is_master = True cls.__master_set = True try: yield is_master finally: if is_master: cls.__master_set = False BasePort.set_type_checking(True) # Reactivate type ckecking from cosapp.drivers.optionaldriver import OptionalDriver OptionalDriver.set_inhibited(False)
def __init__(self, name: str, **kwargs): """Initialize a System Parameters ---------- - name [str]: System name """ Module.__init__(self, name) TimeObserver.__init__(self, sign_in=False) from cosapp.drivers import Driver self._math = self.new_problem(name) self._time_pb = TimeProblem(name, self) self.__loop_problem = self.new_problem('loop') self.drivers = collections.OrderedDict() # type: Dict[str, Driver] self.design_methods = dict() # type: Dict[str, MathematicalProblem] self.__readonly = dict() # type: Dict[str, Any] self.__ctor_kwargs = kwargs.copy() self.__context_lock = ContextLock() self.__free_problem = ContextLock() self.inputs = dict() # type: Dict[str, BasePort] self.outputs = dict() # type: Dict[str, BasePort] self.__events = dict() # type: Dict[str, Event] # Connectors are grouped in a dictionary where the key is the sink system i.e. the receiving system self.__child_connectors = dict() # type: Dict[str, List[SystemConnector]] self.__pulling_connectors = list() # type: List[SystemConnector] self._locked = False # type: bool self._is_tree_clean = False self._meta = None self.__runner = self # type: Union[System, SystemSurrogate] self.__input_mapping = None # type: Dict[str, VariableReference] # For efficiency purpose, links to objects are stored as reference # !! name2variable must be the latest attribute set # => lock __setattr__ on previously defined attributes self.name2variable = dict() # type: Dict[str, VariableReference] # Create extensible ports for orphan variables self._add_port(ExtensiblePort(System.INWARDS, PortType.IN)) self._add_port(ExtensiblePort(System.OUTWARDS, PortType.OUT)) self._add_port(ModeVarPort(System.MODEVARS_IN, PortType.IN)) self._add_port(ModeVarPort(System.MODEVARS_OUT, PortType.OUT)) # Customized subclass `System` before applying user wishes kwargs = self._initialize(**kwargs) # Customized the `System` according to user wishes with self.__free_problem: self.setup(**kwargs) self.update() self.__enforce_scope() self._locked = True def _update(self, dt) -> None: """Required by `TimeObserver` base class""" pass
[docs] def accept(self, visitor: Visitor) -> None: """Specifies course of action when visited by `visitor`""" visitor.visit_system(self)
[docs] def ports(self) -> Iterator[BasePort]: """Iterator on all system ports (both inputs and outputs)""" yield from self.inputs.values() yield from self.outputs.values()
[docs] def is_clean(self, direction: Optional[PortType] = None) -> bool: """Are the `System` ports with the given direction clean? If no direction is specified, checks if both directions are clean. Parameters ---------- direction : PortType, optional Direction of interest Returns ------- bool Clean status """ if direction == PortType.IN: ports = self.inputs.values() elif direction == PortType.OUT: ports = self.outputs.values() else: ports = self.ports() return all(port.is_clean for port in ports)
[docs] def set_clean(self, direction: PortType) -> None: """Set to clean ports of a certain direction. Parameters ---------- direction : PortType Direction to set """ ports = self.inputs.values() if direction == PortType.IN else self.outputs.values() for port in ports: port.set_clean()
[docs] def set_dirty(self, direction: PortType) -> None: """Set to dirty ports of a certain direction. Parameters ---------- direction : PortType Direction to set """ if direction == PortType.IN: self.touch() else: for port in self.outputs.values(): port.touch()
[docs] def touch(self): for system in self.path_to_root(): if not system._is_tree_clean: break system._is_tree_clean = False
def __enforce_scope(self) -> None: """Encapsulate input ports for which some variables are out of scope.""" if self._user_context is None: # Ensure tags is a frozenset self.tags = tags = frozenset(self.tags) def get_scope(role_sets: FrozenSet[FrozenSet[str]]) -> Scope: """Get context depending on the matching between user roles and tags: - No tags or full match with one role => `PRIVATE` - Partial match, at best, between tags and one role => `PROTECTED` - Else `PUBLIC` Parameters ---------- roles : FrozenSet[FrozenSet[str]] Returns ------- Scope Examples -------- >>> tags = frozenset() >>> roles = frozenset([frozenset(['Aerodynamics', 'Compressor'])]) >>> assert get_context(tags, roles) == Scope.PRIVATE >>> >>> tags = frozenset(['Aerodynamics', 'Compressor']) >>> roles = frozenset([frozenset(['Aerodynamics', 'Compressor'])]) >>> assert get_context(tags, roles) == Scope.PRIVATE >>> >>> tags = frozenset(['Aerodynamics', 'Compressor']) >>> roles = frozenset([frozenset(['Mechanics', 'Compressor'])]) >>> assert get_context(tags, roles) == Scope.PROTECTED >>> >>> tags = frozenset(['Aerodynamics', 'Compressor']) >>> roles = frozenset([frozenset(['Heat transfert', 'Turbine'])]) >>> assert get_context(tags, roles) == Scope.PUBLIC """ if not tags or tags in role_sets: return Scope.PRIVATE else: best = Scope.PUBLIC for roles in role_sets: if tags.issubset(roles): return Scope.PRIVATE if tags.intersection(roles): best = Scope.PROTECTED return best from cosapp.core.config import CoSAppConfiguration user_config = CoSAppConfiguration() self._user_context = get_scope(user_config.roles) # print(f"{tags = }", user_config.roles, self._user_context) scope = self._user_context if scope != Scope.PRIVATE: # Some ports may be restrained for port in self.inputs.values(): port.scope_clearance = scope def _initialize(self, **kwargs) -> Dict[str, Any]: """Hook method to add `System` member before calling `setup` method. Parameters ---------- **kwargs : Dict[str, Any] Optional keywords arguments of __init__ Returns ------- Dict[str, Any] Optional keywords arguments not consumed by `_initialize` """ return kwargs
[docs] def setup(self, **kwargs) -> None: """`System` port and/or child `System` are defined in this function. This function allows to populate a customized `System` class. The helper functions for the user are: - `add_input` : add an input port - `add_output` : add an output port - `add_inward` : add a inward variable - `add_outward` : add a outward variable - `add_child` : add a child `System` See Also -------- add_input, add_output, add_inward, add_outward, add_child Examples -------- Here is an example of `System` subclassing: >>> class AdvancedDuct(System): >>> >>> def setup(self): >>> self.add_input(FlowPort, 'in') >>> self.add_output(FlowPort, 'out') >>> self.add_inward({'heat_source': 1.0, >>> 'pressure_loss': 0.01}) >>> self.add_outward('wall_temperature', 400.) >>> >>> self.add_child(AnotherSystem('system2')) """ pass # pragma: no cover
[docs] def update(self): """Perform complex tasks due to data change. Some parameters may be a file used to initialize a complex object. Updating that parameter may imply the modification of that complex object. This should be done in this method. Examples -------- >>> class TableModel(System): >>> >>> def setup(self): >>> # Add a inwards to the file containing the table values >>> self.add_inward('table_file', '{myProject}/ressources/my_table.csv') >>> # Set a object able to interpolate the table >>> self.add_outward('table', Table(self.table_file)) >>> >>> def update(self): >>> # Update the table object as the source file may have been updated. >>> self.table = Table(self.table_file) Notes ----- This method is called systematically after the :py:meth:`~cosapp.systems.system.System.setup` call. Otherwise the user is responsible to explicitly call it when needed. """ pass # pragma: no cover
def __getattr__(self, name: str) -> Any: try: # Faster than testing `if name in self` variable_ref = self.name2variable[name] except KeyError: try: return super().__getattribute__(name) except AttributeError as error: # Last try: checkout read-only properties # Note: checked last, as access is less likely than # `name2variable` and `super().__getattribute__()`. try: return self.__readonly[name] except KeyError: raise error else: return variable_ref.value def __setattr__(self, name: str, value: Any) -> None: try: # Faster than testing `if name in self` # Faster to duplicate __setitem__ call than calling it variable_ref = super().__getattribute__("name2variable")[name] except KeyError: if name in self.__readonly: raise AttributeError(f"can't set attribute {self.name}.{name}") elif hasattr(self, name): super().__setattr__(name, value) else: raise AttributeError( f"System {self.__class__.__qualname__!r} has no attribute {name!r}." ) except AttributeError: # Exception catcher to create all initial variables defined prior to name2variable # Then KeyError will be raised and it won't be allowed to create new attributes super().__setattr__(name, value) else: variable_ref.value = value def __contains__(self, item: str) -> bool: try: self.__find(item) except AttributeError: return False else: return True def __find(self, name: str) -> Any: for collection in ( self.name2variable, self.__readonly, self.__events, ): try: return collection[name] except KeyError: continue raise AttributeError(name) def __getitem__(self, name: str) -> Any: try: return getattr(self, name) except AttributeError: raise KeyError( f"Variable {name!r} not found in the context of System {self.name!r}" ) def __setitem__(self, name: str, value: Any) -> None: try: variable_ref = self.name2variable[name] except KeyError: if name in self.__readonly: raise AttributeError(f"Can't set read-only attribute {self.name}.{name}") raise KeyError( f"Variable {name!r} not found in the context of System {self.name!r}" ) variable_ref.value = value def __repr__(self) -> str: return f"{self.name} - {type(self).__name__}" @property def input_mapping(self) -> Dict[str, VariableReference]: """Dict[str, VariableReference]: free input mapping""" if self.__input_mapping is None: self._update_input_mapping() return MappingProxyType(self.__input_mapping) def _update_input_mapping(self) -> None: """Run graph analysis to identify free inputs, and store data in self.__input_mapping dict.""" from cosapp.utils.graph_analysis import get_free_inputs self.__input_mapping = get_free_inputs(self) def __reset_input_mapping(self) -> None: self.__input_mapping = None parent = self.parent if parent is not None: parent.__reset_input_mapping()
[docs] def append_name2variable( self, additional_mapping: Iterable[Tuple[str, VariableReference]] ) -> None: """Append the `Iterable` of (`str`, `VariableReference`) `Tuple` to the lookup variables mapping. The additional mapping is also transfer upward to the parent `System`. Parameters ---------- additional_mapping: Iterable[Tuple[str, VariableReference]] Additional list of (str, VariableReference) tuple to append """ # TODO raise error if name already exists (example a inwards variable name == component name) additional_mapping = list(additional_mapping) self.name2variable.update(additional_mapping) for key, _ in additional_mapping: self._add_member(key) if self.parent is not None: name = self.name rel2absname = lambda item: (f"{name}.{item[0]}", item[1]) self.parent.append_name2variable(map(rel2absname, iter(additional_mapping)))
[docs] def pop_name2variable(self, keys: Iterable[str]) -> None: """Remove the given keys from the name mapping dictionary. The keys will be remove from the local mapping. Then the keys list will be sent upward to the parent `System` for deletion. Parameters ---------- keys: Iterable[str] Keys to remove """ keys = list(keys) name_mapping = self.name2variable for key in keys: name_mapping.pop(key) if self.parent is not None: rel2absname = lambda relname: f"{self.name}.{relname}" self.parent.pop_name2variable(map(rel2absname, keys))
[docs] def add_property(self, name: str, value: Any) -> None: """Create new read-only property `name`, set to `value`""" self.__lock_check("add_property") name = Variable.name_check(name) self.__check_attr(name, f"cannot add read-only property {name!r}") self.__readonly[name] = value self._add_member(name) cls = self.__class__ def getter(self): try: return self.__readonly[name] except KeyError: raise AttributeError(f"{cls.__name__} object {self.name!r} has no attribute {name!r}") setattr(cls, name, property(getter))
@property def properties(self) -> Dict[str, Any]: """Dict[str, Any]: list of read-only properties and associated values""" return MappingProxyType(self.__readonly)
[docs] def add_input(self, port_class: Type[AnyPort], name: str, variables: Optional[Dict[str, Any]] = None, desc: str = "", ) -> AnyPort: """Add an input `Port` to the `System`. This function cannot be called outside `System.setup`. Parameters ---------- - port_class [type[Port]] Class of the `Port` to create - name [str]: `Port` name - desc [str, optional]: `Port` description - variables [dict[str, Any], optional]: Dictionary of initial values (default: None) Returns ------- The created port Examples -------- >>> class MyModule(System): >>> def setup(self): >>> self.add_input(MyPort, 'p_in1') >>> self.add_input(MyPort, 'p_in2', {'x': 1.5}) """ self.__lock_check("add_input") # Type validation check_arg(port_class, 'port_class', type) if not issubclass(port_class, Port): raise TypeError( f"port_class should be a subclass of Port; got {type(port_class).__name__}." ) new_port = port_class(name, PortType.IN, variables=variables) self._add_port(new_port, desc) self.__reset_input_mapping() return new_port
[docs] def add_output(self, port_class: Type[AnyPort], name: str, variables: Optional[Dict[str, Any]] = None, desc: str = "", ) -> AnyPort: """Add an output `Port` to the `System`. This function cannot be called outside `System.setup`. Parameters ---------- - port_class [type[Port]] Class of the `Port` to create - name [str]: `Port` name - desc [str, optional]: `Port` description - variables [dict[str, Any], optional]: Dictionary of initial values (default: None) Returns ------- The created port Examples -------- >>> class MyModule(System): >>> def setup(self): >>> self.add_output(MyPort, 'p_out1') >>> self.add_output(MyPort, 'p_out2', {'y': 1.5}) """ self.__lock_check("add_output") # Type validation check_arg(port_class, 'port_class', type) if not issubclass(port_class, Port): raise TypeError( f"port_class should be a subclass of Port; got {type(port_class).__name__}." ) new_port = port_class(name, PortType.OUT, variables=variables) self._add_port(new_port, desc) return new_port
def _add_port(self, port: BasePort, desc="") -> None: """Add a port to the system Parameters ---------- port : `BasePort` instance of a port class """ self.__check_attr(port.name, f"cannot add {type(port).__qualname__} {port.name!r}") port.owner = self port.description = desc if port.is_input: inputs = self.inputs if port.name in inputs: raise ValueError(f"Port name {port.name!r} already exists as input") inputs[port.name] = port port_key = (port.name, VariableReference(context=self, mapping=inputs, key=port.name)) elif port.is_output: outputs = self.outputs if port.name in outputs: raise ValueError(f"Port name {port.name!r} already exists as output") outputs[port.name] = port port_key = (port.name, VariableReference(context=self, mapping=outputs, key=port.name)) else: raise ValueError(f"Unknown `PortType` {port.direction}") keys = [port_key] rel2absname = lambda relname: ( f"{port.name}.{relname}", VariableReference(context=self, mapping=port, key=relname), ) keys.extend(map(rel2absname, iter(port))) if isinstance(port, Port): attributes = get_attributes(port) - get_attributes(Port) - set(port._variables) for attr in attributes: keys.append((f"{port.name}.{attr}", VariableReference(context=self, mapping=port, key=attr))) self.append_name2variable(keys) def __lock_check(self, method: str) -> None: """Raises AttributeError if system is locked""" if self._locked: raise AttributeError(f"`{method}` cannot be called outside `setup`") def __check_attr(self, name: str, prefix: str = "") -> None: """Raises ValueError if attribute `name` already exists in system""" try: obj = self.__find(name) except AttributeError: return # attribute does not exist - OK if isinstance(obj, VariableReference): if obj.context is self: value = obj.value if isinstance(value, Port): pdir = "input" if value.is_input else "output" suffix = f"as an {pdir} {type(value).__name__}" elif isinstance(value, System): suffix = f"as a sub-system {type(value).__name__}" else: port = obj.mapping if port is self.inputs[System.INWARDS]: suffix = "as an inward" elif port is self.outputs[System.OUTWARDS]: suffix = "as an outward" else: suffix = "" elif isinstance(obj, Event): suffix = f"as an event" else: suffix = "as a read-only property" message = f"{self.name}.{name} already exists" prefix = prefix.strip() if prefix: message = f"{prefix}; {message}" if suffix: message = f"{message} {suffix}" raise ValueError(f"{message}.")
[docs] def add_inward(self, definition: Union[str, Dict[str, Any]], value: Any = 1, unit: str = "", dtype: Types = None, valid_range: RangeValue = None, invalid_comment: str = "", limits: RangeValue = None, out_of_limits_comment: str = "", desc: str = "", distribution: Optional[Distribution] = None, scope: Scope = Scope.PRIVATE, ) -> None: """Add a inward variable to the `System`. A inward variable is calculated by the `System`. But its value is not mandatory in any variables fluxes between this `System` and another one. An unique inward variable can be defined by providing directly all arguments. And multiple inward variables can be defined by passing a `dict` of pair (`str`, `Any`) with an entry for each variable. This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`. Parameters ---------- definition : str or Dict[str, Any] Name of the unique variable or a dictionary for multiple variables at once value : Any, optional Value of the variable if `definition` is a `str`; default 1 unit : str, optional Variable unit; default empty string (i.e. dimensionless) dtype : type or iterable of types, optional Variable type; default None (i.e. type of initial value) valid_range : Tuple[Any, Any], optional Validity range of the variable; default None (i.e. all values are valid) invalid_comment : str, optional Comment to show in case the value is not valid; default '' limits : Tuple[Any, Any], optional Limits over which the use of the model is wrong; default valid_range out_of_limits_comment : str, optional Comment to show in case the value is not valid; default '' desc : str, optional Variable description; default '' distribution : Distribution, optional Probability distribution of the variable; default None (no distribution) scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional Variable visibility; default PRIVATE Examples -------- To add one inward variable, arguments must be directly specified. >>> system.add_inward('data', 2.) To add multiple inward variables, a dictionary with one key per data should be provided. >>> system.add_inward({ >>> 'data1': 42., >>> 'data2': False >>> }) """ self.__lock_check("add_inward") # Type validation check_arg(definition, 'definition', (str, dict)) def add_unique_data( name: str, value: Any, unit: str = "", dtype: Types = None, valid_range: RangeValue = None, invalid_comment: str = "", limits: RangeValue = None, out_of_limits_comment: str = "", distribution: Optional[Distribution] = None, desc: str = "", scope: Scope = Scope.PRIVATE, ) -> None: self.__check_attr(name, f"cannot add inward {name!r}") inputs = self.inputs inputs[System.INWARDS].add_variable( name, value, unit=unit, dtype=dtype, valid_range=valid_range, invalid_comment=invalid_comment, limits=limits, out_of_limits_comment=out_of_limits_comment, desc=desc, distribution=distribution, scope=scope, ) reference = VariableReference(context=self, mapping=inputs[System.INWARDS], key=name) self.append_name2variable( [(f"{System.INWARDS}.{name}", reference), (name, reference)] ) self.__reset_input_mapping() if isinstance(definition, dict): for key, value in definition.items(): if isinstance(value, dict): try: add_unique_data(key, **value) except TypeError: add_unique_data(key, value) else: add_unique_data(key, value) else: add_unique_data( definition, value, unit=unit, dtype=dtype, valid_range=valid_range, invalid_comment=invalid_comment, limits=limits, out_of_limits_comment=out_of_limits_comment, desc=desc, distribution=distribution, scope=scope, )
[docs] def add_outward(self, definition: Union[str, Dict[str, Any]], value: Any = 1, unit: str = "", dtype: Types = None, valid_range: RangeValue = None, invalid_comment: str = "", limits: RangeValue = None, out_of_limits_comment: str = "", desc: str = "", scope: Scope = Scope.PUBLIC, ) -> None: """Add a outward variable to the `System`. A outward variable is calculated by the `System`. But its value is not mandatory in any variables fluxes between this `System` and another one. An unique outward variable can be defined by providing directly all arguments. And multiple outward variables can be defined by passing a `dict` of pair (`str`, `Any`) with an entry for each variable. This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`. Parameters ---------- definition : str or Dict[str, Any] Name of the unique variable or a dictionary for multiple variables at once value : Any, optional Value of the variable if `definition` is a `str`; default 1 unit : str, optional Variable unit; default empty string (i.e. dimensionless) dtype : type or iterable of types, optional Variable type; default None (i.e. type of initial value) valid_range : Tuple[Any, Any], optional Validity range of the variable; default None (i.e. all values are valid) invalid_comment : str, optional Comment to show in case the value is not valid; default '' limits : Tuple[Any, Any], optional Limits over which the use of the model is wrong; default valid_range out_of_limits_comment : str, optional Comment to show in case the value is not valid; default '' desc : str, optional Variable description; default '' scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional Variable visibility; default PUBLIC Examples -------- To add an unique variable, arguments must be directly specified. >>> system.add_outward('info', 2.) To add multiple variables, a dictionary with one key per outward variable should be provided. >>> system.add_inward({ >>> 'info1': 42., >>> 'info2': False >>> }) """ self.__lock_check("add_outward") # Type validation check_arg(definition, "definition", (str, dict)) def add_local( name: str, value: Any, unit: str = "", dtype: Types = None, valid_range: RangeValue = None, invalid_comment: str = "", limits: RangeValue = None, out_of_limits_comment: str = "", desc: str = "", scope: Scope = Scope.PUBLIC, ) -> None: self.__check_attr(name, f"cannot add outward {name!r}") outputs = self.outputs outputs[System.OUTWARDS].add_variable( name, value, unit=unit, dtype=dtype, valid_range=valid_range, invalid_comment=invalid_comment, limits=limits, out_of_limits_comment=out_of_limits_comment, desc=desc, scope=scope, ) reference = VariableReference(context=self, mapping=outputs[System.OUTWARDS], key=name) self.append_name2variable( [(f"{System.OUTWARDS}.{name}", reference), (name, reference)] ) if isinstance(definition, dict): for key, value in definition.items(): if isinstance(value, dict): try: add_local(key, **value) except TypeError: add_local(key, value) else: add_local(key, value) else: add_local( definition, value, unit=unit, dtype=dtype, valid_range=valid_range, invalid_comment=invalid_comment, limits=limits, out_of_limits_comment=out_of_limits_comment, desc=desc, scope=scope, )
[docs] def add_transient(self, name: str, der: str, # time derivative of `name` desc: str = None, max_time_step: Union[Number, str] = numpy.inf, max_abs_step: Union[Number, str] = numpy.inf, ) -> None: """ Declare a transient variable, defined implicitly by the expression of its time derivative, and add a time-dependent unknown to the mathematical problem. Transients can be used withinn the system, but their time evolution is computed outside the system, by a time driver. If `name` already exists, it must refer to an inward. If `name` does not exist, a new inward `name` is created on the fly. This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`. Parameters ---------- name : str Name of the transient variable der : str Expression of the time derivative, used for integration during time driver execution. The type and unit of the transient variable are deduced from `der`. desc : str, optional Variable description; default None max_time_step : Number or evaluable expression (str), optional Maximum time step admissible for the integration of the variable (for stability, typically). max_abs_step : Number or evaluable expression compatible with transient variable, optional Maximum variable step admissible over one time step (for stability, typically). Examples -------- >>> system.add_inward('v', numpy.zeros(3), desc='Velocity') >>> system.add_inward('a', numpy.zeros(3), desc='Acceleration') >>> system.add_inward('area', 1.0) >>> system.add_outward('flowrate', desc='Volumetric flowrate') >>> system.add_transient('x', der='v') # x defined by dx/dt = v >>> system.add_transient('v', der = 'a', max_time_step = '0.1 / max(norm(a), 1e-9)', ) >>> system.add_transient('h', der = 'flowrate / area', max_abs_step = 0.1, ) """ check_arg(name, "name", str) check_arg(der, "der", str) _, value, dtype = TimeUnknown.der_type(der, self) if name in self: if not self.is_input_var(name): raise TypeError( "Only input variables can be declared as transient variables; got {!r} in {!r}".format( name, self.name) ) # Check that existing inward is compatible with derivative expression declared_value = self[name] if isinstance(value, numpy.ndarray): ok = (numpy.shape(declared_value) == value.shape) else: ok = is_number(declared_value) if not ok: raise TypeError( "Type incompatibility: {} {!r} cannot be the time derivative of {} variable {!r}".format( dtype.__name__, der, type(declared_value).__name__, name)) else: # Create new inward on the fly logger.info(f"Creation of new time-dependent inward {name!r} within system {self.name!r}") if desc is None: str_der = self.str_der(1) desc = f"Transient variable defined as {str_der(name)} = {der}" # Need to copy the value to avoid vector linked by reference between variable and its derivative # See https://gitlab.safrantech.safran/cosapp/cosapp/issues/179 self.add_inward(name, value=deepcopy(value), desc=desc, dtype=dtype) self._time_pb.add_transient(name, der, max_time_step, max_abs_step)
[docs] def add_rate(self, name: str, source: Any, # `name` is the time derivative of `source` initial_value: Union[Number, numpy.ndarray, str] = None, desc: str = None, ) -> None: """ Add a variable monitoring the rate-of-change of a given quantity (referred to as `source`) as the system evolves. Rates are defined by their source, and are updated by the time driver computing the time evolution of the system. This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`. Parameters ---------- name : str Name of the transient variable source : Any Expression of the quantity of interest whose rate will be computed. The type and unit of the rate are deduced from `source`. desc : str, optional Variable description; default None initial_value : Number/array, or evaluable expression (str), optional. Initial value of the rate (otherwise unknown, as rates are only updated from the first time step on). If specified, it must be consistent with the type of source (scalar vs. array). Examples -------- >>> system.add_inward('U', 0.1, desc='Input voltage') >>> system.add_rate('dUdt', source='U') # dUdt defined as dU/dt """ check_arg(name, "name", str) self.__check_attr(name, f"cannot add rate {name!r}") _, src_value, dtype = TimeDerivative.source_type(source, self) def cast(expression): if expression is None: return None if isinstance(expression, str): value = EvalString(expression, self).eval() elif isinstance(expression, EvalString): value = expression.eval() else: value = expression if isinstance(src_value, numpy.ndarray): value = numpy.asarray(value) return value if initial_value is not None and isinstance(src_value, numpy.ndarray): # If provided, `initial_value` must conform to `value.shape` check_arg( cast(initial_value), 'initial_value', numpy.ndarray, value_ok = lambda array: (numpy.shape(array) == src_value.shape), ) if name in self: if not self.is_input_var(name): raise TypeError( f"Only input variables can be declared as rates; got {self.name}.{name}" ) # Check that existing inward is compatible with requested rate declared_value = self[name] if isinstance(src_value, numpy.ndarray): ok = (numpy.shape(declared_value) == src_value.shape) else: ok = is_number(declared_value) if not ok: raise TypeError( "Type incompatibility: {} {!r} cannot be the time derivative of {} {}".format( type(declared_value).__name__, name, dtype.__name__, source)) self[name] = cast(initial_value) else: # Create new inward on the fly logger.info(f"Creation of new time-dependent inward {name!r} within system {self.name!r}") if desc is None: str_der = self.str_der(1) desc = str_der(source) if initial_value is None: value = None dtype = (dtype, type(None)) else: value = cast(initial_value) self.add_inward(name, value=value, desc=desc, dtype=dtype) self._time_pb.add_rate(name, source, initial_value)
[docs] def is_input_var(self, name: str) -> bool: """Returns `True` if `name` is the name of an input variable, `False` otherwise""" return self.__check_var(name, PortType.IN)
[docs] def is_output_var(self, name: str) -> bool: """Returns `True` if `name` is the name of an output variable, `False` otherwise""" return self.__check_var(name, PortType.OUT)
def __check_var(self, name: str, direction: PortType) -> bool: """Driver function for `is_input` and `is_output`""" try: container = self.name2variable[name].mapping except KeyError: return False else: return isinstance(container, BasePort) and (container.direction == direction)
[docs] @staticmethod def str_der(order: int) -> Callable[[str,], str]: """Derivate name factory. Parameters ---------- order : int Derivative order Returns ------- Callable[[int, ], str] Function generating the derivate name from the variable name """ if order == 1: return lambda s: f"d({s})/dt" elif order > 1: return lambda s: "d^{0}({1})/dt^{0}".format(s, order) return lambda s: s
def _precompute(self) -> None: for rate in self._time_pb.rates.values(): rate.touch() # ensured that system is recomputed at first time step
[docs] def check(self, name: Optional[str] = None) -> Union[Dict[str, Validity], Validity]: """Get variable value validity for a given variable, port or system or all of them. If `name` is not provided, returns a dictionary with the validity of all `System` variables; i.e. all variables in input and output ports including inwards and outwards of this system and all its children. Else only, the validity for the given variable will be returned. Parameters ---------- name : str, optional Variable name Returns ------- Dict[str, Validity] or Validity (Dictionary of) the variable(s) value validity """ def plain_check(name) -> Union[Dict[str, Validity], Validity]: """Internal version with no attribute check""" obj = getattr(self, name) if isinstance(obj, (System, BasePort)): return obj.check() else: ref = self.name2variable[name] return ref.mapping.check(ref.key) # TODO unit tests if name is None: def variable_filter(dict_items: tuple) -> bool: key, ref = dict_items if isinstance(ref.value, (System, BasePort)): return False elif "." in key: owner_name = key.rsplit(".", maxsplit=1)[0] return isinstance(self[owner_name], BasePort) else: # Shortcut to inwards or outwards => not taken return False return { name: plain_check(name) for name, ref in filter(variable_filter, self.name2variable.items()) } elif name in self: return plain_check(name) else: raise AttributeError(f"Variable {name} does not exist in System {self.name}")
[docs] def is_running(self) -> bool: """Is this System in execution? Returns ------- bool In execution status """ return self.__context_lock.is_active
@property def residues(self): """Dict[str, Residue] : Get the residues for the current `System`.""" # MappingProxyType forbids external modification return MappingProxyType(self._math.residues) @property def unknowns(self): """Dict[str, Unknown] : Get the unknowns for the current `System`.""" # MappingProxyType forbids external modification return MappingProxyType(self._math.unknowns) @property def transients(self): """Returns a dictionary containing all transient unknowns in current system tree""" # MappingProxyType forbids external modification return MappingProxyType(self._time_pb.transients) @property def rates(self): """Returns a dictionary containing all time derivatives (rates) in current system tree""" # MappingProxyType forbids external modification return MappingProxyType(self._time_pb.rates)
[docs] def events(self) -> Iterator[Event]: """Iterator on all events locally defined on system.""" yield from self.__events.values()
[docs] def all_events(self) -> Iterator[Event]: """Recursive iterator on all events in complete system tree.""" for elem in self.tree(): yield from elem.events()
@property def child_connectors(self) -> Dict[str, List[Connector]]: """Dict[str, List[Connector]] : Connectors between sub-systems, referenced by system names.""" return MappingProxyType(self.__child_connectors)
[docs] def connectors(self) -> Dict[str, Connector]: """Constructs a dictionary of all connectors within system, referenced by connector name. """ make_items = lambda connector: (connector.name, connector) return dict(map(make_items, self.all_connectors()))
[docs] def all_connectors(self) -> Iterator[Connector]: """Iterator yielding all connectors within system.""" yield from self.__pulling_connectors for connectors in self.__child_connectors.values(): yield from connectors
[docs] def incoming_connectors(self) -> Iterator[Connector]: """Iterator yielding all connectors targetting system.""" yield from self.__pulling_connectors parent: System = self.parent if parent: yield from parent.__child_connectors.get(self.name, [])
@property def problem(self) -> MathematicalProblem: """MathematicalProblem: locally defined off-design mathematical problem, without considering sub-system tree. Raises: ------- `AttributeError` unless system is being modified by a driver. """ if not self.__free_problem.is_active: raise AttributeError("Can't access attribute `problem`") return self._math
[docs] def assembled_problem(self) -> MathematicalProblem: """Returns the consolidated mathematical problem, assembled from entire system tree, and accounting for cyclic dependency loops. Returns ------- MathematicalProblem The assembled mathematical problem. """ problem = self.new_problem('off-design') # Make shallow copy of `self._math` properties for name in ('residues', 'deferred_residues', 'unknowns'): attr = getattr(problem, name) attr.update(getattr(self._math, name)) def transfer_unknown(unknown: Unknown, new_name: str): options = { attr: getattr(unknown, attr) for attr in ('max_abs_step', 'max_rel_step', 'lower_bound', 'upper_bound') } problem.add_unknown(new_name, **options) children: dict[str, System] = self.children for child in children.values(): if child.is_standalone(): continue child_problem = child.assembled_problem() connectors = self.__child_connectors.get(child.name, []) unknowns = child_problem.unknowns for connector in connectors: # Transfer unknowns to parent (when necessary) for name in list(unknowns.keys()): unknown = unknowns[name] port = unknown.port connected = ( port.owner in self.children.values() and port is connector.sink and unknown.variable in connector.sink_variables() ) if connected: # Port is connected => remove unknown unknowns.pop(name) pulled = connector.source.owner is self if pulled: # Transfer unknown to parent level src = connector.source_variable(unknown.variable) transfer_unknown(unknown, f"{connector.source.name}.{src}") # Prune target equations defined as weak if target is connected origin = connector.source.owner deferred_residues = origin._math.deferred_residues.values() for deferred in filter(lambda target: target.weak, deferred_residues): targetted = list(deferred.variables)[0] ref = origin.name2variable[targetted] port = ref.mapping name = ref.key connected = ( port.owner in self.children.values() and port is connector.source and name in connector.source_variables() ) if connected: # Remove deferred equation key = MathematicalProblem.target_key(f"{origin.name}.{targetted}") problem.deferred_residues.pop(key) problem.extend(child_problem, copy=False) return problem.extend(self.__loop_problem)
[docs] def assembled_time_problem(self) -> TimeProblem: """Returns the consolidated transient problem, assembled from the entire system tree. Returns ------- TimeProblem The assembled time problem. """ problem = TimeProblem('off-design', self) # Make shallow copy of `self._math` properties for name in ('transients', 'rates'): attr: dict = getattr(problem, name) attr.update(getattr(self._time_pb, name)) def transfer_transient(transient: TimeUnknown, name: str): ref = transient.context.name2variable[transient.basename] problem.add_transient(name, der = transient.der, max_time_step = transient.max_time_step_expr, pulled_from = transient.pulled_from or ref, ) def transfer_rate(rate: TimeDerivative, name: str): problem.add_rate(name, source = rate.source_expr, initial_value = rate.initial_value_expr, ) popped: list[tuple[str, str]] = list() for child in self.children.values(): child_problem = child.assembled_time_problem() connectors = self.__child_connectors.get(child.name, []) transfer_items = [ (child_problem.transients, transfer_transient), (child_problem.rates, transfer_rate), ] for connector in connectors: # Transfer transients and rates to parent (when necessary) for unknowns, transfer in transfer_items: for name in list(unknowns.keys()): unknown: Unknown = unknowns[name] port = unknown.port connected = ( port.owner in self.children.values() and port is connector.sink and unknown.variable in connector.sink_variables() ) if connected: # Port is connected => remove unknown unknowns.pop(name) if connector.source.owner is self: # Transfer unknown to parent level src = connector.source_variable(unknown.variable) transfer(unknown, f"{connector.source.name}.{src}") else: popped.append(unknown.contextual_name()) problem.extend(child_problem, copy=False) if popped: if len(popped) == 1: message = f"variable {popped[0]!r} is connected to an output" else: message = f"variables {popped} are connected to outputs" warnings.warn( f"In {self.full_name()}, time-dependent {message} and will not be computed." ) return problem
[docs] def get_unsolved_problem(self) -> MathematicalProblem: """Returns the consolidated mathematical problem, assembled from entire system tree, and accounting for cyclic dependency loops. Deprecated; use `assembled_problem` instead. Returns ------- MathematicalProblem The assembled mathematical problem. """ warnings.warn( "Method `get_unsolved_problem` is deprecated; use `assembled_problem` instead." ) return self.assembled_problem()
[docs] def add_child(self, child: AnySystem, execution_index: Optional[int] = None, pulling: Optional[Union[str, Collection[str], Dict[str, str]]] = None, desc: str = "", ) -> AnySystem: """Add a child `System` to the current `System`. When adding a child `System`, it is possible to specified its position in the execution order. Child ports or individual `inwards` and `outwards` can also be pulled at the parent level by providing either the name of the port/inward/outward or a list of them or the name mapping of the child element (dictionary keys) to the parent element (dictionary values). If the argument is not a dictionary, the name in the parent system will be the same as in the child. Parameters ---------- - child [System]: `System` to add to the current `System` execution_index [int, optional]: Index of the execution order list at which the `System` should be inserted; default latest. - pulling [str or list[str] or dict[str, str], optional]: Map of child ports to pulled ports at the parent system level; default None (no pulling) - desc [str, optional]: Sub-system description in the context of its parent. Returns ------- `child` """ # Type validation check_arg(child, 'child', System) check_arg(pulling, 'pulling', (type(None), str, Collection)) child = super().add_child(child, execution_index, desc) if child.name in self.name2variable: if isinstance(self[child.name], System): logger.warning( f"A subsystem named {child.name} already exists within system {self.name}." f" Child system {child!r} will overwrite it." ) else: raise ValueError(f"An object named {child.name!r} already exists in system {self}") keys = [(child.name, VariableReference(context=self, mapping=self.children, key=child.name))] # Append child system name mapping to current mapping rel2absname = lambda item: (f"{child.name}.{item[0]}", item[1]) keys.extend(map(rel2absname, child.name2variable.items())) self.append_name2variable(keys) self.__reset_input_mapping() # Add read-only constants to parent line prefix = f"{child.name}." for system in self.path_to_root(): for name, value in child.__readonly.items(): system.__readonly[f"{prefix}{name}"] = value prefix = f"{system.name}.{prefix}" if pulling is not None: try: pull_variables(child, pulling) except (ConnectorError, UnitError): self.pop_child(child.name) raise # If child is added outside of `setup`, we must force system tree inspection # to ensure input propagation during the next system execution. self.touch() return child
[docs] def pop_child(self, name: str) -> System: """Remove the subsystem called `name`. Parameters ---------- name: str Name of the subsystem to be removed Returns ------- `System` The removed subsystem Raises ------ `AttributeError` if no match is found """ popped: System = super().pop_child(name) # Remove all connections to and from popped child child_connectors = self.__child_connectors child_connectors.pop(popped.name, None) is_valid = lambda connector: connector.source.owner is not popped for key, connectors in child_connectors.items(): child_connectors[key] = list( filter(is_valid, connectors) ) self.__pulling_connectors = list( filter(is_valid, self.__pulling_connectors) ) # Remove references to popped child from name mapping popped_attr = lambda key: key.startswith(f"{name}.") popped_keys = [name] + list( filter(popped_attr, self.name2variable.keys()) ) self.pop_name2variable(popped_keys) self.__reset_input_mapping() return popped
[docs] def add_driver(self, driver: AnyDriver) -> AnyDriver: """Add a driver to this system. Parameters ---------- driver : Driver Driver to add Returns ------- Driver The added driver """ from cosapp.drivers import Driver check_arg(driver, 'driver', Driver) name = driver.name default_name = f"default_driver_for_{hex(id(self))}" if default_name in self.drivers and name != default_name: self.drivers.clear() if name in self.drivers: logger.warning(f"Driver {name!r} already exists and will be replaced") driver.owner = self driver.parent = None # Clear driver parent in case of driver reused self.drivers[name] = driver return driver
[docs] def add_unknown(self, name: Union[str, Iterable[Union[dict, str]]], max_abs_step: Number = numpy.inf, max_rel_step: Number = numpy.inf, lower_bound: Number = -numpy.inf, upper_bound: Number = numpy.inf ) -> MathematicalProblem: """Add unknown variables. You can set variable one by one or provide a list of dictionary to set multiple variable at once. The dictionary key are the arguments of this method. Parameters ---------- - name : str or Iterable of dictionary or str Name of the variable or list of variable to add - max_rel_step : float, optional Maximal relative step by which the variable can be modified by the numerical solver; default numpy.inf - max_abs_step : float, optional Maximal absolute step by which the variable can be modified by the numerical solver; default numpy.inf - lower_bound : float, optional Lower bound on which the solver solution is saturated; default -numpy.inf - upper_bound : float, optional Upper bound on which the solver solution is saturated; default numpy.inf Returns ------- MathematicalProblem The modified mathematical problem """ def create_unknown( name: str, max_abs_step: Number = numpy.inf, max_rel_step: Number = numpy.inf, lower_bound: Number = -numpy.inf, upper_bound: Number = numpy.inf ): unknown = Unknown(self, name, max_abs_step, max_rel_step, lower_bound, upper_bound) # Remove existing unknown if user wants to update the parameters. if unknown.name in self._math.unknowns: self._math.unknowns.pop(unknown.name) self._math.add_unknown([unknown]) if isinstance(name, str): create_unknown(name, max_abs_step, max_rel_step, lower_bound, upper_bound) else: for item in name: create_unknown(item, max_abs_step, max_rel_step, lower_bound, upper_bound) return self._math
[docs] def add_equation(self, equation: Union[str, Iterable[Union[dict, str, Tuple[str, str]]]], name: Optional[str] = None, reference: Union[Number, numpy.ndarray, str] = 1, ) -> MathematicalProblem: """Add off-design equation. Equations may be added one by one, or provided by a list of dictionaries to add multiple equations at once. The dictionary keys are the arguments of this method. Parameters ---------- - equation : str or Iterable of str of the kind 'lhs == rhs' Equation or list of equations to add - name : str, optional Name of the equation; default None => 'lhs == rhs' - reference : Number, numpy.ndarray or "norm", optional Reference value(s) used to normalize the equation; default is 1. If value is "norm", actual reference value is estimated from order of magnitude. Returns ------- MathematicalProblem The modified mathematical problem """ self.__lock_check("add_equation") return self._math.add_equation(equation, name, reference)
[docs] def add_target(self, name: Union[str, Iterable[Union[dict, str, Tuple[str, str]]]], reference: Union[Number, numpy.ndarray, str] = 1, weak = False, ) -> MathematicalProblem: """Add deferred off-design equation on a targetted quantity. Target equations may be added one by one, or provided by a list of dictionaries to add multiple equations at once. The dictionary keys are the arguments of this method. Parameters ---------- - name: str, optional Name of the equation; default None => 'lhs == rhs' - reference: Number, numpy.ndarray or "norm", optional Reference value(s) used to normalize the equation; default is 1. If value is "norm", actual reference value is estimated from order of magnitude. - weak: bool, optional If True, the target is disregarded if the corresponding variable is connected; default is `False`. Returns ------- MathematicalProblem The modified mathematical problem """ self.__lock_check("add_target") return self._math.add_target(name, reference, weak)
[docs] def add_event(self, name: str, desc: str = "", trigger: Optional[Union[str, Event, EventState, ZeroCrossing]] = None, final: bool = False, ) -> Event: """Add an event to system. An event occurrence can either be determined by the system itself, or triggered by an event from another system. This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`. Parameters ---------- - name [str]: Name of the event. - desc [str, optional]: Event description; defaults to ''. - trigger [Union[str, Event, EventState, ZeroCrossing], optional]: String, primary or derived event defining the event trigger; defaults to `None`. - final [bool, optional]: Defines whether or not event is final; defaults to `False`. Returns ------- - event [Event]: The newly created event. Examples -------- >>> import numpy as np >>> >>> class PointMassDynamics(System): >>> # Free fall of a point mass, with friction >>> def setup(self): >>> self.add_inward('mass', 1.2, desc='Mass') >>> self.add_inward('cf', 0.1, desc='Friction coefficient') >>> self.add_inward('g', np.r_[0, 0, -9.81], unit='m/s**2', desc='External acceleration field') >>> >>> self.add_outward('a', np.zeros(3), unit='m/s**2', desc='Acceleration') >>> self.add_transient('v', der='a') >>> self.add_transient('x', der='v') >>> >>> def compute(self): >>> self.a = self.g - (self.cf / self.mass * np.linalg.norm(self.v)) * self.v >>> >>> class BouncingPointMass(System): >>> def setup(self): >>> self.add_child(PointMassDynamics('dyn'), pulling=['x', 'v', 'a', 'mass', 'cf', 'g']) >>> self.add_event('rebound', trigger="x[2] <= 0") >>> >>> def transition(self): >>> if self.rebound.present: >>> self.v[2] *= -1 """ self.__lock_check("add_event") # Type and name validation check_arg(name, 'name', str) name = Variable.name_check(name) self.__check_attr(name, f"cannot add event {name!r}") # Event creation self.__events[name] = event = Event(name, self, desc, trigger, final) # Getter cls = self.__class__ def getter(self): try: return self.__events[name] except KeyError: raise AttributeError(f"{cls.__name__} object {self.name!r} has no attribute {name!r}") setattr(cls, name, property(getter)) self._add_member(name) return event
[docs] def add_inward_modevar(self, name: str, value: Any = False, unit: str = "", dtype: Types = None, desc: str = "", scope: Scope = Scope.PRIVATE, ) -> None: """Add an inward mode variable to the `System`. A mode variable is a piecewise constant variable in each continuous time phase. Like any input, inward mode variables should not be modified during system transitions. This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`. Parameters ---------- name : str Name of the variable value : Any, optional Value of the variable; default 1 unit : str, optional Variable unit; default empty string (i.e. dimensionless) dtype : type Variable type; default None (i.e. type of initial value) desc : str, optional Variable description; default '' scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional Variable visibility; default PRIVATE Examples -------- >>> class MultimodeSystem(System): >>> def setup(self): >>> self.add_inward_modevar('composite', True) >>> self.reconfig() >>> >>> def reconfig(self): >>> # Define one or two sub-systems, depending on mode >>> for name in list(self.children): >>> self.pop_child(name) >>> if self.composite: >>> a = self.add_child(Foo('a')) >>> b = self.add_child(Bar('b')) >>> self.connect(a, b, {'x', 'y'}) >>> else: >>> self.add_child(Bogus('c')) >>> >>> def transition(self): >>> self.reconfig() """ self.__lock_check("add_inward_modevar") # Type validation check_arg(name, 'name', str) self.__check_attr(name, f"cannot add inward {name!r}") port = self.inputs[System.MODEVARS_IN] port.add_mode_variable(name, value, unit, dtype, desc, scope=scope) ref = VariableReference(context=self, mapping=port, key=name) self.append_name2variable( [(f"{System.MODEVARS_IN}.{name}", ref), (name, ref)] )
[docs] def add_outward_modevar(self, name: str, value: Optional[Any] = None, unit: str = "", dtype: Types = None, desc: str = "", init: Optional[Any] = None, scope: Scope = Scope.PRIVATE, ) -> None: """Add an outward mode variable to the `System`. A mode variable is a piecewise constant variable in each continuous time phase. It may only be modified in the `transition` method of the `System`, and never inside method `compute`. Notwithstanding, its value may be used in `compute`. The modification of an output mode variable is often associated with the occurence of an event in the system. This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`. Parameters ---------- name : str Name of the variable value : Any, optional Value of the variable; default 1 unit : str, optional Variable unit; default empty string (i.e. dimensionless) dtype : type Variable type; default None (i.e. type of initial value) desc : str, optional Variable description; default '' init : Any, optional Value imposed at the beginning of time simulations. If unspecified (default), the variable remains untouched. scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional Variable visibility; default PRIVATE Examples -------- >>> class ThresholdSystem(System): >>> def setup(self): >>> self.add_inward('threshold', 1.0) >>> self.add_inward('x', 0.0) >>> self.add_outward('y', 0.0) >>> self.add_event('activates', trigger='x >= threshold') >>> self.add_outward_modevar('activated', init='x > threshold') >>> >>> def compute(self): >>> self.y = self.x if self.activated else 0.0 >>> >>> def transition(self): >>> if self.activates.present: >>> self.activated = True """ self.__lock_check("add_outward_modevar") # Type validation check_arg(name, 'name', str) self.__check_attr(name, f"cannot add outward {name!r}") port = self.outputs[System.MODEVARS_OUT] port.add_mode_variable(name, value, unit, dtype, desc, init=init, scope=scope) ref = VariableReference(context=self, mapping=port, key=name) self.append_name2variable( [(f"{System.MODEVARS_OUT}.{name}", ref), (name, ref)] )
[docs] def new_problem(self, name='problem') -> MathematicalProblem: """Create a new, empty `MathematicalProblem` in the context of system. Parameters ---------- name: str Name of the mathematical problem. Defaults to 'problem'. Returns ------- MathematicalProblem The newly created mathematical problem. """ return MathematicalProblem(name, self)
[docs] def add_design_method(self, name: str) -> MathematicalProblem: """Add a design method to the `System` A design method is a set of free variables and equations that defines a way to design the `System` It is a easy way to pre-define a design of this `System` for users The returned mathematical system is empty. It should be populated with the needed variables and equations (see Examples). Parameters ---------- name: str The name of the design method Returns ------- MathematicalProblem The newly created mathematical problem. Examples -------- >>> system1.add_design_method("method1") \ >>> .add_unknown([ >>> dict(name="x", max_rel_step=0.1), >>> "y" >>> ]) \ >>> .add_equation([ >>> "u == 0", >>> "v == 800" >>> ]) """ self.__lock_check("add_design_method") self.design_methods[name] = mathpb = self.new_problem(name) return mathpb
[docs] def design(self, method: str) -> MathematicalProblem: """Returns the chosen group of design equations. Parameters ---------- method : str Name of the group of equations to extract Returns ------- MathematicalProblem Mathematical system to solve for the chosen method """ return self.design_methods[method]
[docs] def log_debug_message( self, handler: "HandlerWithContextFilters", record: logging.LogRecord, format: LogFormat = LogFormat.RAW ) -> bool: """Callback method on the system to log more detailed information. This method will be called by the log handler when :py:meth:`~cosapp.utils.logging.LoggerContext.log_context` is active if the logging level is lower or equals to VERBOSE_LEVEL. It allows the object to send additional log message to help debugging a simulation. .. note:: logger.log method cannot be used here. Use handler.handle(record) Parameters ---------- handler : HandlerWithContextFilters Log handler on which additional message should be published. record : logging.LogRecord Log record format : LogFormat Format of the message Returns ------- bool Should the provided record be logged? """ def print_port(header, ports) -> List[str]: msg = [f"#### {header}", ] for name, port in ports.items(): if len(port) == 0: continue msg.append(f"- {name}:") for var in port: value = port[var] if is_number(value): v = f"{value:.5g}" else: v = f"{value!s}" msg.append(f" - {var} = {v}") return "\n".join(msg) message = record.getMessage() activate = getattr(record, "activate", None) emit_record = super().log_debug_message(handler, record, format) if message.endswith("call_setup_run") or message.endswith("call_clean_run"): emit_record = False elif activate == True: handler.log( LogLevel.FULL_DEBUG, print_port(f"{self!s} - Inputs", self.inputs), name=logger.name ) emit_record = False elif activate == False: handler.log( LogLevel.FULL_DEBUG, print_port(f"{self!s} - Outputs", self.outputs), name=logger.name ) emit_record = False return emit_record
[docs] def call_setup_run(self, skip_driver: bool = False) -> None: """Execute `setup_run` recursively on all modules. Parameters ---------- skip_drivers : bool Skip calling :py:meth:`cosapp.drivers.driver.Driver.setup_run` """ if not skip_driver: for driver in self.drivers.values(): driver.call_setup_run() super().call_setup_run()
[docs] def call_clean_run(self, skip_driver: bool = False) -> None: """Execute `clean_run` recursively on all modules. Parameters ---------- skip_drivers : bool Skip calling :py:meth:`cosapp.drivers.driver.Driver.clean_run` """ super().call_clean_run() if not skip_driver: for driver in self.drivers.values(): driver.call_clean_run()
def _postcompute(self) -> None: """Actions performed after the `System.compute` call.""" if self.is_clean(PortType.IN): if self._is_tree_clean: self.set_clean(PortType.OUT) else: self.set_clean(PortType.IN) self.set_dirty(PortType.OUT) # Evaluate the residues for residue in self.residues.values(): residue.update()
[docs] def transition(self) -> None: """Method describing system transition upon the occurrence of events (if any). Does not do anything by default, but should be implemented for systems with events. Examples -------- >>> import numpy as np >>> >>> class PointMassDynamics(System): >>> # Free fall of a point mass, with friction >>> def setup(self): >>> self.add_inward('mass', 1.2, desc='Mass') >>> self.add_inward('cf', 0.1, desc='Friction coefficient') >>> self.add_inward('g', np.r_[0, 0, -9.81], desc='External acceleration field') >>> self.add_outward('a', np.zeros(3), desc='Acceleration') >>> self.add_transient('v', der='a') >>> self.add_transient('x', der='v') >>> >>> def compute(self): >>> v_norm = np.linalg.norm(self.v) >>> self.a = self.g - (self.cf / self.mass * v_norm) * self.v >>> >>> class BouncingPointMass(PointMassDynamics): >>> def setup(self): >>> super().setup() >>> self.add_event('rebound', trigger="x[2] <= 0") >>> >>> def transition(self): >>> if self.rebound.present: >>> self.v[2] *= -1 """ pass
[docs] def retrieve_incoming_data(self) -> None: """Transfer data from all incoming connectors""" for connector in self.incoming_connectors(): connector.transfer()
[docs] def tree_transition(self) -> None: """Invoke transition in entire system tree""" for system in self.tree(): system.retrieve_incoming_data() with system.__free_problem: system.transition()
[docs] def run_once(self) -> None: """Run the system once. Execute the model of this `System` and its children in the execution order. Notes ----- The drivers are not executed when calling this method; only the physical model. """ with System.set_master(repr(self)) as is_master: if is_master: logger.debug("Start setup_run recursive calls.") self.call_setup_run(skip_driver=True) with self.log_context(" - run_once"): if self.is_active(): self._precompute() dirty_inputs = not self.is_clean(PortType.IN) if dirty_inputs: logger.debug(f"Call {self.name}.compute_before") with self.__context_lock: self.compute_before() else: logger.debug(f"Skip {self.name}.compute_before - Clean inputs") dirty_tree = not self._is_tree_clean any_dirty_child = False if dirty_inputs or dirty_tree: for child in self.children.values(): # Update connectors for connector in self.__child_connectors.get(child.name, []): connector.transfer() # Execute sub-system logger.debug(f"Call {self.name}.{child.name}.run_once()") child.run_once() any_dirty_child |= not child.is_clean() # Pull values from subsystems for connector in self.__pulling_connectors: connector.transfer() if dirty_inputs or any_dirty_child: logger.debug(f"Call {self.name}.compute") self._compute_calls += 1 with self.__context_lock: self.__runner.compute() else: logger.debug(f"Skip {self.name}.compute - Clean inputs") self._is_tree_clean = not any_dirty_child self._postcompute() self.computed.emit() else: logger.debug(f"Skip {self.name} execution - Inactive") if is_master: logger.debug("Start clean_run recursive calls.") self.call_clean_run(skip_driver=True)
[docs] def any_active_driver(self) -> bool: return any(driver.is_active() for driver in self.drivers.values())
[docs] def run_drivers(self) -> None: """Run the drivers defined on this `System`. """ with System.set_master(repr(self), type_checking=False) as is_master: if is_master: self.open_loops() logger.debug("Start setup_run recursive calls.") self.call_setup_run() if self.any_active_driver(): if self.is_standalone(): # System not standalone can't set the mathematical problem logger.debug(f"Exec order for {self.name}: {list(self.exec_order)}") for driver in self.drivers.values(): logger.debug(f"Call driver {driver.name}.run_once on {self.name}") driver.run_once() else: self.run_children_drivers() if is_master: logger.debug("Start clean_run recursive calls.") self.call_clean_run() self.close_loops()
[docs] def run_children_drivers(self) -> None: """Solve the children `System` in the execution order. """ with System.set_master(repr(self), type_checking=False) as is_master: if is_master: logger.debug("Start setup_run recursive calls.") self.call_setup_run() with self.log_context(" - run_children_drivers"): if self.is_active(): self._precompute() dirty_inputs = not self.is_clean(PortType.IN) if dirty_inputs: logger.debug(f"Call {self.name}.compute_before") with self.__context_lock: self.compute_before() else: logger.debug(f"Skip {self.name}.compute_before - Clean inputs") dirty_tree = not self._is_tree_clean any_dirty_child = False if dirty_inputs or dirty_tree: for child in self.children.values(): # Update connectors for connector in self.__child_connectors.get(child.name, []): connector.transfer() # Execute sub-system logger.debug(f"Call {self.name}.{child.name}.run_once()") child.run_drivers() any_dirty_child |= not child.is_clean() # Pull values from subsystems for connector in self.__pulling_connectors: connector.transfer() if dirty_inputs or any_dirty_child: self._compute_calls += 1 with self.__context_lock: self.__runner.compute() else: logger.debug(f"Skip {self.name}.compute - Clean inputs") self._is_tree_clean = not any_dirty_child self._postcompute() self.computed.emit() else: logger.debug(f"Skip {self.name} execution - Inactive") if is_master: logger.debug("Start clean_run recursive calls.") self.call_clean_run()
[docs] def is_standalone(self) -> bool: # TODO name is confusing as the real answer is I have a non-linear solver """Is this System able to solve itself? Returns ------- bool Ability to solve the system or not. """ return any(driver.is_standalone() for driver in self.drivers.values())
[docs] def connect(self, object1: Union[BasePort, System], object2: Union[BasePort, System], mapping: Union[str, List[str], Dict[str, str], None] = None, cls: Optional[Type[BaseConnector]] = None, **kwargs ) -> None: """Connect two systems or two ports. This method connects `object1` to `object2`. If no mapping is provided, connection will be made between variables based on their names. If a name mapping is provided as a list, the name should be present in both objects. If the mapping is specified as a dictionary, keys are expected to belong to `object1`, and values to `object2`. If both objects are systems, mapping is mandatory (full system connections are forbidden). In this case, each (key, value) pair of the mapping leads to a port-to-port connection. Connections are oriented (i.e. the direction of value transfer is fixed). The decision tree is as following: - If one port is of type input and the other one of type output, the connection flows from the output to the input. - Else if one port (portA) belong to the parent `System` of the other port (portB), the connection flows from portA to portB if portA is an input. And it flows from portB to portA if portA is an output. - Else if both are inputs, the port is pulled on the parent and connected to the two children. Connectors can only connect systems at the same level or a system with its parent. In all cases the connectors will be stored by the parent hosting the two connected systems. Parameters ---------- object1 : Union[BasePort, System] First end-point of connector. object2 : Union[BasePort, System] Second end-point of connector. mapping : str or List[str] or Dict[str, str], optional (List of) common name(s) or mapping name dictionary; default None (i.e. no mapping). cls : Type[BaseConnector], optional Connector type. When specified, `cls` must be a specialization of `BaseConnector`. If `cls` is `None` (default), the actual connector type is either `Connector`, or a port-specific connector, if any. Port bound connectors are automatically selected when both ports are of same type, and when port class contains inner class `Connector`, derived from `BaseConnector`. **kwargs : Additional keyword arguments forwarded to `cls`. Examples -------- Here is an example, in which we assume a port `p_in` has inputs of a system `s1` that come from some output port `p_out` of another system `s2`. In the last example, `connect` is called with two child systems, rather than ports. The container system being called `top`: >>> class DemoPort(Port): >>> def setup(self): >>> self.add_variable('a', 1.0) >>> self.add_variable('b', 2.0) >>> self.add_variable('c', 3.0) >>> >>> class DummySystem(Port): >>> def setup(self): >>> self.add_inward('x', 1.0) >>> self.add_outward('y', 0.0) >>> self.add_input(DemoPort, 'p_in') >>> self.add_output(DemoPort, 'p_out') >>> >>> top = System('top') >>> top.add(DummySystem('s1')) >>> top.add(DummySystem('s2')) >>> >>> top.connect(top.s1.p_in, top.s2.p_out, 'a') >>> top.s1.p_in.a = 4 >>> top.run_once() >>> assert top.s1.p_in.a == top.s2.p_out.a >>> >>> top.connect(top.s1.p_in, top.s2.p_out, {'b': 'c'}) >>> top.run_once() >>> assert top.s1.p_in.b == top.s2.p_out.c >>> >>> top.connect(top.s1, top.s2, {'x': 'y', 'p_in.c': 'p_out.b'}) >>> top.run_once() >>> assert top.s1.x == top.s2.y >>> assert top.s1.p_in.c == top.s2.p_out.b """ same_kind = lambda base: isinstance(object1, base) and isinstance(object2, base) if same_kind(BasePort): self.__connect_ports(object1, object2, mapping, cls, **kwargs) elif same_kind(System): self.__connect_systems(object1, object2, mapping, cls, **kwargs) else: raise TypeError( f"Connected objects must be either two ports or two systems" f"; got {type(object1).__name__!r} and {type(object2).__name__!r}" )
def __connect_systems(self, system1: System, system2: System, mapping: Union[str, List[str], Dict[str, str]], cls: Optional[Type[BaseConnector]] = None, **kwargs ) -> None: """Connect two sub-systems together. This method connects `system1` to `system2`. Full system connections are forbidden, as they are potentially ambiguous. As a consequence, name mapping is mandatory. If name mapping is provided as a collection, the name should be present in both systems. If it is specified as a dictionary, keys pertain to `system1`, values to `system2`. Each (key, value) pair of the mapping gives rise to a port-to-port connector. Connections are oriented (i.e. the direction of value transfer is fixed). The decision tree is as following: - If one port is of type input and the other one of type output, the connection flows from the output to the input. - Else if one port (portA) belong to the parent `System` of the other port (portB), the connection flows from portA to portB if portA is an input. And it flows from portB to portA if portA is an output. - Else if both are inputs, the port is pulled on the parent and connected to the two children. Connectors can only connect systems at the same level or a system with its parent. In all cases the connectors will be stored by the parent hosting the two connected systems. Parameters ---------- port1 : BasePort First end-point of connector. port2 : BasePort Second end-point of connector. mapping : str or List[str] or Dict[str, str], optional (List of) common name(s) or mapping name dictionary; default None (i.e. no mapping). cls : Type[BaseConnector], optional Connector type. When specified, `cls` must be a specialization of `BaseConnector`. If `cls` is `None` (default), the actual connector type is either `Connector`, or a port-specific connector, if any. Port bound connectors are automatically selected when both ports are of same type, and when port class contains inner class `Connector`, derived from `BaseConnector`. **kwargs : Additional keyword arguments forwarded to `cls`. """ if mapping is None: raise ConnectorError( "Full system connections are forbidden; please provide port or variable mapping." ) def get_variable(system: System, varname: str): key = varname if system is self else f"{system.name}.{varname}" try: return self.name2variable[key] except KeyError: raise AttributeError( f"{key!r} not found in {self.name!r}" ) mapping = BaseConnector.format_mapping(mapping) for name1, name2 in mapping.items(): obj1 = getattr(system1, name1) obj2 = getattr(system2, name2) same_kind = lambda base: isinstance(obj1, base) and isinstance(obj2, base) if same_kind(BasePort): self.__connect_ports(obj1, obj2, cls=cls, **kwargs) elif same_kind(System): self.__connect_systems(obj1, obj2, mapping=None) # raises ValueError else: # Mapping between variables obj1 = get_variable(system1, name1) obj2 = get_variable(system2, name2) self.__connect_ports( obj1.mapping, obj2.mapping, {obj1.key: obj2.key}, cls, **kwargs, ) def __connect_ports(self, port1: BasePort, port2: BasePort, mapping: Union[str, List[str], Dict[str, str], None] = None, cls: Optional[Type[BaseConnector]] = None, **kwargs ) -> None: """Connect two ports together. This method connects `port1` to `port2`. If no mapping is provided, connection will be made between variables based on their names. If a name mapping is provided as a list, the name should be present in both port. And if the mapping is specified as a dictionary, the keys belong to `port1` and the values to `port2`. An connection is oriented (i.e. the direction of value transfer is fixed). The decision tree is as following: - If one port is of type input and the other one of type output, the connection flows from the output to the input. - Else if one port (portA) belong to the parent `System` of the other port (portB), the connection flows from portA to portB if portA is an input. And it flows from portB to portA if portA is an output. - Else if both are inputs, the port is pulled on the parent and connected to the two children. Connectors can only connect systems at the same level or a system with its parent. In all cases the connectors will be stored by the parent hosting the two connected systems. Parameters ---------- port1 : BasePort First end-point of connector. port2 : BasePort Second end-point of connector. mapping : str or List[str] or Dict[str, str], optional (List of) common name(s) or mapping name dictionary; default None (i.e. no mapping). cls : Type[BaseConnector], optional Connector type. When specified, `cls` must be a specialization of `BaseConnector`. If `cls` is `None` (default), the actual connector type is either `Connector`, or a port-specific connector, if any. Port bound connectors are automatically selected when both ports are of same type, and when port class contains inner class `Connector`, derived from `BaseConnector`. **kwargs : Additional keyword arguments forwarded to `cls`. """ # Type validation def check_port(port: BasePort, argname: str): check_arg(port, argname, BasePort, stack_shift=1) if port.owner is None: raise ValueError(f"Cannot connect orphan port {port.contextual_name!r}") check_port(port1, 'port1') check_port(port2, 'port2') check_arg(mapping, 'mapping', (str, list, dict, MappingProxyType, type(None))) if cls is None: ptype = type(port1) if ptype is type(port2): # Check if there exists a port-specific connector try: cls = getattr(ptype, "Connector") except AttributeError: cls = Connector else: pname = ptype.__name__ logger.info( f"{port1.contextual_name!r} and {port2.contextual_name!r}" f" connected by port-specific connector `{pname}.Connector`." ) else: cls = Connector if cls is not Connector: try: SystemConnector.check(cls) except (TypeError, ValueError) as error: error.args = ( f"`cls` must be a concrete implementation of `BaseConnector`; got {cls!r}", ) raise # Generate mapping dictionary if mapping is None: mapping = dict((v, v) for v in port1 if v in port2) else: mapping = BaseConnector.format_mapping(mapping) if len(mapping) == 0: warnings.warn( f"Skipped empty connector between {port1.full_name()} and {port2.full_name()}" ) return child_connectors = self.__child_connectors def contextual_name(port: BasePort) -> str: return port.name if port.owner is self else port.contextual_name def create_connector(sink: BasePort, source: BasePort, mapping: Dict[str, str]) -> None: # Additional validation for sink Port # - Source should be Port (the opposite case is possible == connect sensor) # - If mapping does not cover all variables, print a log message source_name = contextual_name(source) sink_name = contextual_name(sink) if isinstance(sink, ModeVarPort) and sink.is_input and not isinstance(source, ModeVarPort): raise ConnectorError( f"Input mode variables cannot be connected to continuous time variables" f" ({source_name} -> {sink.owner.name})." ) if isinstance(sink, Port): if not (len(sink) == len(source) == len(mapping)): absent = [f"{sink.name}.{v}" for v in sink if v not in mapping.keys()] absent.extend(f"{source.name}.{v}" for v in source if v not in mapping.values()) logger.debug( "Partial connection between {!r} and {!r}. " "Variables ({}) are not part of the mapping.".format( sink_name, source_name, ", ".join(absent) ) ) name = f"{source_name} -> {sink_name}" new_connector = SystemConnector( cls(name, sink, source, mapping, **kwargs) ) connectors = self.connectors() # Check that variables are set only once for connector in connectors.values(): if connector.sink is sink: shared = set(connector.sink_variables()).intersection(new_connector.sink_variables()) if shared: if len(shared) == 1: variables = f"Variable {shared.pop()} is" else: variables = ", ".join(sorted(shared)) variables = f"Variables {sink_name}.{{{variables}}} are" raise ConnectorError(f"{variables} already set by {connector}") try: # Check if connector already exists connector = connectors[new_connector.name] except KeyError: # New connector if sink.owner is self: self.__pulling_connectors.append(new_connector) else: target = new_connector.sink.owner.name child_connectors.setdefault(target, []) child_connectors[target].append(new_connector) else: # Sink and source are already connected: update connector connector.update_mapping(new_connector.mapping) if sink.owner is source.owner.parent: lower = source elif source.owner is sink.owner.parent: lower = sink else: lower = sink lower.owner.__reset_input_mapping() err_msg = f"Ports {port1.contextual_name!r} and {port2.contextual_name!r} cannot be connected" if port1.owner is port2.owner: raise ConnectorError( f"{err_msg}. Connecting ports of the same system is forbidden." ) if port1.direction != port2.direction: # one port is IN and the other is OUT if port1.owner is port2.owner.parent or port2.owner is port1.owner.parent: raise ConnectorError( f"{err_msg}; parent/child connections are only allowed between same-direction ports." ) elif not (port1.owner.parent is port2.owner.parent is self): raise ConnectorError( f"{err_msg}. Only ports belonging to direct children of {self.name!r} can be connected." ) elif not ( self is port1.owner.parent is port2.owner or (self is port1.owner is port2.owner.parent) or (port1.owner in self.children.values() and port2.owner in self.children.values()) ): raise ConnectorError( f"{err_msg}. Same-direction ports can only be connected between a child and its parent, " "or between sibling sub-systems." ) reciprocal = lambda d: dict((v, k) for k, v in d.items()) if port1.is_input and port2.is_output: create_connector(port1, port2, mapping) elif port1.is_output and port2.is_input: create_connector(port2, port1, reciprocal(mapping)) else: if port1.owner is port2.owner.parent: if port1.is_input: create_connector(port2, port1, reciprocal(mapping)) else: # port1.is_output create_connector(port1, port2, mapping) elif port2.owner is port1.owner.parent: if port2.is_input: create_connector(port1, port2, mapping) else: # port2.is_output create_connector(port2, port1, reciprocal(mapping)) elif port1.is_input: # Both port are inputs # In this case the port is transfer on the parent to serve as # true source for all subsystems. # For Port, the port is duplicated in the parent. For ExtensiblePort, # the variable is added in the INWARDS port of the parent. if isinstance(port1, ModeVarPort) != isinstance(port2, ModeVarPort): for p1_var, p2_var in mapping.items(): # One variable is an inward, the other an input mode variable raise ConnectorError( f"Input mode variables cannot be connected to continuous time variables" f" ({port1.owner.name}.{p1_var} <-> {port2.owner.name}.{p2_var})." ) for p1_var, p2_var in mapping.items(): connected = False # Check if the variable is connected for c1 in filter(lambda c: c.sink is port1, child_connectors.get(port1.owner.name, [])): if p1_var in c1.sink_variables(): # Already connected # Check that the other variable is not connected for c2 in filter(lambda c: c.sink is port2, child_connectors.get(port2.owner.name, [])): if p2_var in c2.source_variables(): raise ConnectorError("{}.{} is already connected to {}.{}".format( port1.contextual_name, p1_var, port2.contextual_name, p2_var, ) ) # Connect the other variable self.connect(port2, c1.source, {p2_var: c1.source_variable(p1_var)}) connected = True break if connected: continue # Check if the other variable is connected for c2 in filter(lambda c: c.sink is port2, child_connectors.get(port2.owner.name, [])): if p2_var in c2.sink_variables(): # We know that p1_var is not connected # Connect the variable self.connect(port1, c2.source, {p1_var: c2.source_variable(p2_var)}) connected = True break if connected: continue # General case # create the pulled port if isinstance(port2, Port): pulled_port_name = f"{port2.owner.name}_{port2.name}" source_name = p2_var # Is the full port pulled or only a portion of it? if all(v in mapping.values() for v in port2): pull_variables(port2.owner, {port2.name: pulled_port_name}) else: pulled_port = port2.copy(pulled_port_name) self._add_port(pulled_port) self.connect(port2, pulled_port, p1_var) else: # Variables are both inwards, or input mode variables pulled_port_name = port2.name source_name = f"{port2.owner.name}_{p2_var}" pull_variables(port2.owner, {p2_var: source_name}) # create the other connection self.connect(port1, self.inputs[pulled_port_name], {p1_var: source_name}) else: raise ConnectorError(f"{err_msg} as they are both outputs.")
[docs] def open_loops(self): """Open closed loops in children relations.""" logger.debug(f"Call {self.name}.open_loops") connectors_to_open = list() # type: List[SystemConnector] child_connectors = self.__child_connectors self.__input_mapping = None name2var = self.name2variable loop = self.__loop_problem loop.clear() # Add top system in the execution loop so connection to it won't be opened execution_loop = [self] # type: List[System] for child in self.children.values(): execution_loop.append(child) child.open_loops() for connector in child_connectors.get(child.name, []): if connector.source.owner not in execution_loop: connectors_to_open.append(connector) for connector in connectors_to_open: logger.debug(f"Connector {connector!r} to be opened.") sink, source = connector.sink, connector.source if sink.is_output: raise ValueError( f"Connector {connector.name!r} cannot be opened, as its sink is an output." ) # Check that all variables are of numerical types for name in connector.sink_variables(): value = sink[name] if not is_numerical(value): raise TypeError( f"Cannot open connector {connector.name!r} to resolve system" f", as variable {name!r} is non-numerical (type is {type(value).__name__})." ) connector.deactivate() # Set mathematical problem sink_name = sink.contextual_name source_name = source.contextual_name owner_unknowns = dict( (unknown.ref, unknown) for unknown in sink.owner.unknowns.values() ) for target, origin in connector.mapping.items(): unknown_name = natural_varname(f"{sink_name}.{target}") var = name2var[unknown_name] options = {} try: unknown = owner_unknowns[var] except KeyError: pass else: options = { attr: getattr(unknown, attr) for attr in ( 'max_abs_step', 'max_rel_step', 'upper_bound', 'lower_bound', ) } equation = natural_varname(f"{unknown_name} == {source_name}.{origin}") loop.add_unknown(unknown_name, **options) loop.add_equation( equation, name=f"{equation} (loop)", reference=1.0, )
[docs] def close_loops(self): """Close loops opened to allow system resolution.""" logger.debug(f"Call {self.name}.close_loops") for elem in self.tree(): elem.__loop_problem.clear() for connector in elem.all_connectors(): connector.activate()
[docs] def convert_to(self, *args, **kwargs) -> None: """Convert system into another `System`. Note: not implemented for class `System`. Raises ------ TypeError If the current `System` cannot be converted. """ # TODO: this method should be removed altogether raise TypeError( f"Cannot convert {self.name!r}: system is not part of a family" )
[docs] @classmethod def check_config_dict(cls, params: dict) -> None: """Check if the provided dictionary respects the convention of a model file. Parameters ---------- params : dict Dictionary to be tested Raises ------ jsonschema.exceptions.ValidationError If the provided dictionary does not conform to the JSON schema. """ # Check that the file input file respects the JSON schema path = os.path.dirname(os.path.abspath(__file__)) with open(os.path.join(path, 'system.schema.json')) as fp: config_schema = json.load(fp) jsonschema.validate(params, config_schema)
[docs] @classmethod def load(cls, filepath: Union[str, Path, StringIO], name: Optional[str] = None) -> System: """Load configuration from file-like object. Parameters ---------- filepath : str or Path or file-like Filepath or file-like object (i.e. has a .read() method) name : str, optional Name of the newly created system. If unspecified (default), uses the name contained in `filepath`. Returns ------- System The loaded system. """ if isinstance(filepath, (str, Path)): with open(filepath) as fp: params = json.load(fp) elif hasattr(filepath, 'read'): params = json.load(filepath) else: raise TypeError("Input parameter should be a filepath or file-like object.") cls.check_config_dict(params) # Remove schema reference entry params.pop('$schema', None) # Convert variables if needed params = decode_cosapp_dict(params) system = cls.load_from_dict(*params.popitem()) if name: system.name = name return system
[docs] @classmethod def load_from_dict(cls, name: str, parameters: dict) -> System: """Instantiate a `System` from its name and its parameters. The construction of the `System` is done recursively from the lower level. Therefore this function is also called recursively returning to the upper level, the lower `System` and its connections to the upper `System` via a dictionary. If no connections are to forwarded the second returned argument is `None`. Parameters ---------- name : str Identifier of the system parameters : dict The dictionary containing the system definition Returns ------- System The generated system """ # TODO The all process of saving and reading from a file should be overhaul to take into account missing attr # - setup kwargs # - design methods (?) # Moreover what should be the source of truth JSON or Python? Currently reading from JSON duplicates Python # work as first the object is instantiated. Then its children are removed to create them (and the associated # connections from the JSON file. def get_system_class(class_name: str): if class_name == "System": return System check_arg(class_name, 'class_name', str, stack_shift=1) try: component_module, class_name = class_name.rsplit('.', maxsplit=1) except ValueError: component_module = "" system_class = None found_module = "" component_libs = System._components_librairies for lib_name in component_libs: try: if component_module: mod_name = f"{lib_name}.{component_module}" if mod_name.startswith('.'): mod_name = mod_name[1:] lib_module = importlib.import_module(mod_name) elif lib_name: lib_module = importlib.import_module(lib_name) else: raise ImportError except ImportError: # Try next possible Python package to find module continue else: # Found an existing module found_module = lib_module.__file__ try: system_class = getattr(lib_module, class_name) except AttributeError: # Class not in module continue else: break if system_class is None: raise AttributeError( f"Class {class_name!r} was not found in module: \n{found_module}" if found_module else f"Class {class_name!r} is not in one of the listed component librairies:\n{component_libs}" ) elif not issubclass(system_class, System): raise AttributeError( f"Class {system_class.__name__!r} does not inherit from base class 'System'" ) return system_class class_name = parameters.get('class', 'System') ctor_kwargs = parameters.get('setup_args', {}) system_class = get_system_class(class_name) top_system = system_class(name=name, **ctor_kwargs) for name in top_system.children: top_system.name2variable.pop(name) # Remove children and connectors --> the source of truth is the json file top_system.children.clear() top_system.__child_connectors.clear() top_system.__pulling_connectors.clear() top_system.__readonly = parameters.get('properties', {}).copy() for name, value in parameters.get('inputs', {}).items(): top_system[name] = value for name_system, params in parameters.get('subsystems', {}).items(): subsystem = cls.load_from_dict(name_system, params) top_system.add_child(subsystem) for connection in parameters.get('connections', []): if len(connection) == 2: sink, source = connection mapping = None else: sink, source, mapping = connection top_system.connect(top_system[sink], top_system[source], mapping) try: exec_order = parameters['exec_order'] except KeyError: pass else: if exec_order != list(top_system.exec_order): top_system.exec_order = exec_order return top_system
[docs] def save(self, fp, indent=2, sort_keys=True) -> None: """Serialize the `System` as a JSON formatted stream to fp. Parameters ---------- fp : str or Path or file-like A .write()-supporting file-like object or the filename indent : int, optional Indentation in the file (default: 2) sort_keys : bool, optional Sort the keys in alphabetic order (default: False) """ if isinstance(fp, (str, Path)): with open(fp, 'w') as filepath: filepath.write(self.to_json(indent, sort_keys)) else: fp.write(self.to_json(indent, sort_keys))
[docs] def export_structure(self) -> Dict: """ Export current system structure to a dictionary, which contains the definition of all ports, the structure of sub-system and the connections between ports. """ port_cls_data = {} sys_data = self.__to_dict(True, port_cls_data) return {"Ports": port_cls_data, "Systems": sys_data}
[docs] def to_dict(self) -> Dict: """ Public API to export system to a dictionary """ return self.__to_dict(with_struct=False)
def __json__(self) -> Dict[str, Dict[str, Any]]: """JSONable dictionary representing a variable. Returns ------- Dict[str, Any] The dictionary """ return self.to_dict() def __to_dict(self, with_struct: bool, port_cls_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Convert the `System` to a dictionary. In default mode, `with_struct` flag is `False`, only export input ports and its variable values. In other case where `with_struct` flag is `True`, export all ports with values and class name. `port_cls_data` can be provided to recover the definitions of all ports inside system. Parameters ---------- with_struct : bool, optional Flag to export output ports and the class name of ports (default: False). port_cls_data : dict, optional A dictionary to hold the definition of all ports inside system. Returns ------- dict A dictionary defining fully the `System` """ # TODO Fred - BUG Information missing from dictionary conversion # - kwargs of setup - e.g. Shaft system of librairies_sae has its ports functions of kwargs # - design equations - are not saved (should they be?) data = dict() if self.__class__.__qualname__ == 'System': data['class'] = 'System' # Trick to allow container `System` without special type else: data['class'] = f"{self.__module__}.{self.__class__.__qualname__}" ctor_kwargs = self.__ctor_kwargs if ctor_kwargs: data["setup_args"] = ctor_kwargs properties = self.__readonly if properties: data['properties'] = properties port_data_required = isinstance(port_cls_data, dict) if with_struct: for direction in ["inputs", "outputs"]: ports: Dict[str, BasePort] = getattr(self, direction) temp_dict = {} for port in ports.values(): if port_data_required and isinstance(port, Port): key = port.__class__.__qualname__ if key not in port_cls_data: port_cls_data[key] = { name: variable.to_dict() for name, variable in port._variables.items() } port_dict = port.to_dict(with_struct) temp_dict.update(port_dict) if temp_dict: data[direction] = temp_dict else: inputs = {} for port in self.inputs.values(): port_dict = port.to_dict() inputs.update(port_dict) if inputs: data['inputs'] = inputs connections = [ connector.info() for connector in self.all_connectors() ] if connections: data['connections'] = connections if len(self.children) > 0: data['subsystems'] = { name: component.__to_dict(with_struct, port_cls_data)[name] for name, component in self.children.items() } data['exec_order'] = list(self.exec_order) return {self.name: data}
[docs] def to_json(self, indent=2, sort_keys=True) -> str: """Return a string in JSON format representing the `System`. Parameters ---------- indent : int, optional Indentation of the JSON string (default: 2) sort_keys : bool, optional Sort keys in alphabetic order (default: True) Returns ------- str String in JSON format """ dict_repr = self.to_dict() # If this is updated => workspace template should be updated too. dict_repr['$schema'] = "0-3-0/system.schema.json" # Add self referencing to system JSON version - provision return json.dumps(dict_repr, indent=indent, sort_keys=sort_keys, cls=JSONEncoder)
[docs] def to_d3(self, show=True, size=435) -> "IPython.display.IFrame": """Returns the hierarchical representation of this system in HTML format. Returns ------- IPython.display.IFrame IFrame to the HTML formatted representation """ from cosapp.tools.views.d3js import to_d3 return to_d3(self, show, size)
[docs] def to_html(self, filename: str, embeddable=False) -> None: """Save the `System` as HTML using vis.JS library. Parameters ---------- filename : str Filename to write to embeddable: bool, optional Is the HTML to be embedded in an existing page? Default: False """ from cosapp.tools.views.visjs import VisJsRenderer renderer = VisJsRenderer(self, embeddable) renderer.to_file(filename)
def _repr_html_(self) -> str: """Returns the representation of this system in HTML format. Returns ------- str HTML formatted representation """ # TODO unit tests filename = f"{self.name}.html" self.to_html(filename) from IPython.display import IFrame return IFrame(filename, "810px", "650px")._repr_html_() def _repr_markdown_(self) -> str: """Returns the representation of this system attributes in Markdown format. Returns ------- str Markdown formatted representation """ from cosapp.tools.views.markdown import system_to_md return system_to_md(self)
[docs] def make_surrogate(self, data_in: Union[pandas.DataFrame, Dict[str, List[float]]], model = FloatKrigingSurrogate, activate = True, data_out: Optional[Union[pandas.DataFrame, Dict[str, Any]]] = None, postsynch: Union[str, List[str]] = '*', *args, **kwargs) -> SystemSurrogate: """ Creates a surrogate model superseding the normal behaviour of `compute()`. The surrogate model is trained from datasets `data_in` and `data_out`, given as `pandas.DataFrame` objects, or dictionnaries interpretable as so. If no output data are provided (default), they are computed as the response of system to design-of-experiment `data_in`. Parameters ---------- - data_in: pandas.DataFrame or Dict[str, List[float]] Design of experiments for variables in dataframe columns or dict keys. Column/key names must match variable contextual names, as in 'wing.geom.length', e.g. - model: type Model class, implementing methods `train(x, y)` and `predict(x)`, where x and y are 1D arrays. Default is `cosapp.utils.surrogate_models.FloatKrigingSurrogate`. - activate: bool, optional Boolean determining whether or not surrogate model should be activated once created. Default is `True`. - data_out: pandas.DataFrame or Dict[str, List[float]], optional Y-dataset used for surrogate model training. Default is None (dataset is computed). - postsynch: str or List[str] List of output variable names to synchronize after each surrogate model execution. Default is '*', meaning all outputs are post-synchronized. - *args, **kwargs: Any Additional parameters passed to `model` constructor. Returns: -------- SystemSurrogate System surrogate attached to the system. """ check_arg(data_in, 'data_in', (pandas.DataFrame, dict)) surrogate = SystemSurrogate(self, data_in, model, data_out, postsynch, *args, **kwargs) self.__set_surrogate(surrogate, activate) return surrogate
def __set_surrogate(self, surrogate: SystemSurrogate, activate=True) -> None: """Private setter for surrogate model""" if self.has_surrogate: warnings.warn(f"Existing surrogate model of {self.name} has been overwritten by a new one.") self._meta = surrogate self.active_surrogate = activate
[docs] def load_surrogate(self, filename: str, activate=True) -> None: """ Loads a surrogate model from a binary file created by `dump_surrogate`. Parameters ---------- - filename: str Destination file name. - activate: bool, optional Boolean determining whether or not surrogate model should be activated once loaded. Default is `True`. """ surrogate = SystemSurrogate.load(self, filename) self.__set_surrogate(surrogate, activate)
[docs] def dump_surrogate(self, filename: str) -> None: """Dumps system surrogate model (if any) into a binary file.""" if not self.has_surrogate: raise AttributeError(f"{self.name!r} has no surrogate model") self._meta.dump(filename)
@property def active_surrogate(self) -> bool: """bool: True if surrogate model is activated, False otherwise.""" return self.__runner is self._meta @active_surrogate.setter def active_surrogate(self, activate: bool) -> None: """Activation boolean setter for surrogate model.""" check_arg(activate, "active_surrogate", bool) if activate == self.active_surrogate: return # no change - quit if activate: if self._meta is None: raise AttributeError( "Can't set `active_surrogate` if no surrogate model has been created" " by either `make_surrogate` or `load_surrogate`." ) # Deactivate owner, children, and all drivers self._set_recursive_active_status(False) # Reactivate owner after upper recursion self._active = True self.__runner = self._meta else: self._set_recursive_active_status(True) self.__runner = self self.touch() for port in self.inputs.values(): port.touch() @property def has_surrogate(self) -> bool: """bool: True if system has a surrogate model (even if inactive), False otherwise.""" return self._meta is not None def _set_recursive_active_status(self, active_status: bool) -> None: #TODO save and recover drivers original status logger.debug(f"Starting recursive active status modifying of {self.name}, status to be set is {active_status}") self._active = active_status for driver in self.drivers.values(): logger.debug(f"Targeted driver for driver recursive deactivate is {driver}") driver._set_children_active_status(active_status) for child in self.children.values(): logger.debug(f"Targeted child for system recursive deactivate is {child}") child._set_recursive_active_status(active_status)