"""
Basic classes handling model resolution, system connections and conversion between level of
modelings.
"""
from __future__ import annotations
import importlib
import json
import logging
import os
import warnings
from collections import OrderedDict
from collections.abc import Iterable, Callable, Collection, Generator, Iterator
from contextlib import contextmanager
from copy import deepcopy
from enum import Enum
from io import TextIOBase
from numbers import Number
from pathlib import Path
from typing import Any, Optional, ClassVar, TypeVar, TYPE_CHECKING
from types import MappingProxyType
import jsonschema
import numpy
import pandas
from cosapp.patterns.visitor import Visitor
from cosapp.core.module import Module, CommonPorts
from cosapp.core.eval_str import EvalString
from cosapp.core.variableref import VariableReference
from cosapp.core.numerics.basics import MathematicalProblem, TimeProblem
from cosapp.core.numerics.boundary import TimeDerivative, TimeUnknown, Unknown
from cosapp.core.time import TimeObserver
from cosapp.core.execution import ExecutionPolicy
from cosapp.ports.enum import PortType, Scope, Validity
from cosapp.ports.port import BasePort, ExtensiblePort, ModeVarPort, Port
from cosapp.ports.units import UnitError
from cosapp.ports.variable import RangeValue, Types, Variable
from cosapp.ports.mode_variable import ModeVariable
from cosapp.ports.connectors import BaseConnector, Connector, ConnectorError
from cosapp.utils.distributions import Distribution
from cosapp.utils.deprecation import deprecated
from cosapp.utils.context import ContextLock
from cosapp.utils.helpers import check_arg, is_number, is_numerical
from cosapp.utils.json import (
EncodingMetadata,
from_json,
load_json,
jsonify,
)
from cosapp.utils.logging import LogFormat, LogLevel, HandlerWithContextFilters, rollover_logfile
from cosapp.utils.naming import NameChecker, natural_varname
from cosapp.utils.pull_variables import pull_variables
from cosapp.utils.find_variables import get_attributes
from cosapp.utils.surrogate_models import FloatKrigingSurrogate
from cosapp.systems.systemConnector import SystemConnector
from cosapp.systems.systemSurrogate import SystemSurrogate, SurrogateModel
from cosapp.multimode.event import Event, EventState, ZeroCrossing
logger = logging.getLogger(__name__)
AnyPort = TypeVar("AnyPort", bound=Port)
AnySystem = TypeVar("AnySystem", bound="System")
if TYPE_CHECKING:
from cosapp.drivers import Driver
AnyDriver = TypeVar("AnyDriver", bound=Driver)
[docs]
class ConversionType(Enum):
"""Enumeration of `System` conversion type."""
manual = "manual"
best_fidelity_to_cost = "best_fidelity_to_cost_ratio"
high_fidelity = "highest_fidelity"
low_fidelity = "lowest_fidelity"
high_cost = "highest_cost"
low_cost = "lowest_cost"
[docs]
class System(Module, TimeObserver):
# TODO check and complete documentation
"""
A class to describe generic properties and functions of a component that can be single or
made of child `System`.
Parameters
----------
name: str
Name of the `System`
Attributes
----------
name : str
`System` name
inputs : :obj:`OrderedDict` of :obj:`BasePort`
Dictionary of `BasePort` containing the values needed to compute the `System`
outputs : dict[BasePort]
Dictionary of `BasePort` containing the values computed by the `System`
residues : :obj:`OrderedDict` of :obj:float
Dictionary of residues generated by the `System`
children : dict[System]
Child `System` of this `System`
parent : System
Parent `System` of this `System`; None if there is no parent.
exec_order : MappingView[str]
Execution order in which sub-systems are computed.
properties : dict[str, Any]
Dictionary of immutable parameters of the system.
design_methods : dict[Equations]
`System` pre-defined design methods
name2variable : dict[str, VariableReference]
Variable name mapping to its value by reference
_locked : bool
if True, `add_input`, `add_output`, `add_inward` and `add_outward` are desactivated. This is
the default behavior outside the `setup` function.
_active : bool
if False, the `System` will not execute its `compute` method
_is_tree_clean : bool
Reflects the status of the inputs and outputs. `clean` status means the group of ports was not updated since
last computation of the `System`
_compute_calls: bool
Store if the `System` was computed at last call or not (due to inhibition of clean status)
Examples
--------
To create your own `System`, you should inherit from this class and define your own `setup` and
`compute` methods.
>>> import numpy as np
>>>
>>> class RealPort(Port):
>>>
>>> def setup(self):
>>> self.add_variable('x', 0.)
>>>
>>> class InvertedParabola(System):
>>>
>>> def setup(self):
>>> self.add_input(RealPort, 'iterative')
>>> self.add_input(RealPort, 'target_y')
>>> self.add_output(RealPort, 'max_root')
>>> self.add_inward({
>>> 'a': 0.01,
>>> 'b': 2.0,
>>> 'c': 4.0,
>>> })
>>> self.add_outward('roots', None)
>>> self.add_equation("a * iterative.x**2 + b * iterative.x + c == target_y.x", "y")
>>>
>>> def compute(self):
>>> discriminant = self.b**2 - 4. * self.a * self.c
>>> if discriminant >= 0.:
>>> self.roots = (
>>> (-self.b + np.sqrt(discriminant)) / (2. * self.a),
>>> (-self.b - np.sqrt(discriminant)) / (2. * self.a),
>>> )
>>> self.max_roots.x = max(self.roots)
>>> else:
>>> self.roots = None
>>> self.max_roots.x = np.nan
"""
__slots__ = (
"__context_lock", "_is_tree_clean", "_locked", "_math", "_time_pb",
"design_methods", "drivers", "inputs", "outputs", "name2variable",
"__properties", "__events", "_meta", "__runner", "__input_mapping",
"__loop_problem", "__child_connectors", "__pulling_connectors",
"__free_problem", "__readonly",
)
INWARDS: ClassVar = CommonPorts.INWARDS.value
OUTWARDS: ClassVar = CommonPorts.OUTWARDS.value
MODEVARS_IN: ClassVar = CommonPorts.MODEVARS_IN.value
MODEVARS_OUT: ClassVar = CommonPorts.MODEVARS_OUT.value
COMMON_PORTS: ClassVar = CommonPorts.names()
_name_check: ClassVar = NameChecker(excluded=["inputs", "outputs", *CommonPorts.names()])
tags: ClassVar[frozenset[str]] = frozenset()
_user_context: ClassVar[Optional[Scope]] = None
# Python packages containing components definition
_components_librairies: ClassVar = [""]
# Is the master system (from which a simulation has started) known?
__master_set: ClassVar = False
[docs]
@classmethod
@contextmanager
def set_master(cls, name: str, type_checking=True) -> Generator[bool, None, None]:
"""Set the master System
Parameters
----------
name : str
Name of the System calling this context manager
type_checking : bool, optional
Whether to activate the type checking in the ports or not (default: True)
Returns
-------
bool
Is the callee the master System?
"""
is_master = False
if not cls.__master_set:
logger.debug(f"System <{name}> is the execution master.")
rollover_logfile() # Create a new logfile
BasePort.set_type_checking(type_checking) # May deactivate type checking
is_master = True
cls.__master_set = True
try:
yield is_master
finally:
if is_master:
cls.__master_set = False
BasePort.set_type_checking(True) # Reactivate type ckecking
from cosapp.drivers.optionaldriver import OptionalDriver
OptionalDriver.set_inhibited(False)
def __init__(self, name: str, **kwargs):
"""Initialize a System
Parameters
----------
- name [str]:
System name
- **kwargs:
Additional keyword arguments forwarded to method `setup`
"""
self._init(name, **kwargs)
# Customized subclass `System` before applying user wishes
kwargs = self._initialize(**kwargs)
# Customized the `System` according to user wishes
with self.__free_problem:
self.setup(**kwargs)
self.update()
self.__enforce_scope()
self._locked = True
def _init(self, name: str, **kwargs):
Module.__init__(self, name)
TimeObserver.__init__(self, sign_in=False)
self._math = self.new_problem(name)
self._time_pb = TimeProblem(name, self)
self.__loop_problem = self.new_problem("loop")
self.parent: Optional[System] = None
self.children: dict[str, System] = OrderedDict()
self.drivers: dict[str, Driver] = OrderedDict()
self.design_methods: dict[str, MathematicalProblem] = dict()
self.__readonly: dict[str, Any] = dict()
self.__properties: dict[str, Any] = dict()
self.__ctor_kwargs = kwargs.copy()
self.__context_lock = ContextLock()
self.__free_problem = ContextLock()
self.inputs: dict[str, BasePort] = dict()
self.outputs: dict[str, BasePort] = dict()
self.__events: dict[str, Event] = dict()
# Connectors are grouped in a dictionary where the key is the sink system i.e. the receiving system
self.__child_connectors: dict[str, list[SystemConnector]] = dict()
self.__pulling_connectors: list[SystemConnector] = list()
self._locked = False
self._is_tree_clean = False
self._meta = None
self.__runner: System | SystemSurrogate = self
self.__input_mapping: dict[str, VariableReference] = None
# For efficiency purpose, links to objects are stored as reference
# !! name2variable must be the latest attribute set
# => lock __setattr__ on previously defined attributes
self.name2variable: dict[str, VariableReference] = dict()
# Create extensible ports for orphan variables
self._add_port(ExtensiblePort(System.INWARDS, PortType.IN), check=False)
self._add_port(ExtensiblePort(System.OUTWARDS, PortType.OUT), check=False)
self._add_port(ModeVarPort(System.MODEVARS_IN, PortType.IN), check=False)
self._add_port(ModeVarPort(System.MODEVARS_OUT, PortType.OUT), check=False)
def _update(self, dt) -> None:
"""Required by `TimeObserver` base class"""
pass
[docs]
def accept(self, visitor: Visitor) -> None:
"""Specifies course of action when visited by `visitor`"""
visitor.visit_system(self)
[docs]
def ports(self) -> Iterator[BasePort]:
"""Iterator on all system ports (both inputs and outputs)"""
yield from self.inputs.values()
yield from self.outputs.values()
[docs]
def is_clean(self, direction: Optional[PortType] = None) -> bool:
"""Are the `System` ports with the given direction clean?
If no direction is specified, checks if both directions are clean.
Parameters
----------
direction : PortType, optional
Direction of interest
Returns
-------
bool
Clean status
"""
if direction == PortType.IN:
ports = self.inputs.values()
elif direction == PortType.OUT:
ports = self.outputs.values()
else:
ports = self.ports()
return all(port.is_clean for port in ports)
[docs]
def set_clean(self, direction: PortType) -> None:
"""Set to clean ports of a certain direction.
Parameters
----------
direction : PortType
Direction to set
"""
ports = self.inputs.values() if direction == PortType.IN else self.outputs.values()
for port in ports:
port.set_clean()
[docs]
def set_dirty(self, direction: PortType) -> None:
"""Set to dirty ports of a certain direction.
Parameters
----------
direction : PortType
Direction to set
"""
if direction == PortType.IN:
self.touch()
else:
for port in self.outputs.values():
port.touch()
[docs]
def touch(self):
for system in self.path_to_root():
if not system._is_tree_clean:
break
system._is_tree_clean = False
def __enforce_scope(self) -> None:
"""Encapsulate input ports for which some variables are out of scope."""
if self._user_context is None:
# Ensure tags is a frozenset
self.tags = tags = frozenset(self.tags)
def get_scope(role_sets: frozenset[frozenset[str]]) -> Scope:
"""Get context depending on the matching between user roles and tags:
- No tags or full match with one role => `PRIVATE`
- Partial match, at best, between tags and one role => `PROTECTED`
- Else `PUBLIC`
Parameters
----------
roles : frozenset[frozenset[str]]
Returns
-------
Scope
Examples
--------
>>> tags = frozenset()
>>> roles = frozenset([frozenset(['Aerodynamics', 'Compressor'])])
>>> assert get_context(tags, roles) == Scope.PRIVATE
>>>
>>> tags = frozenset(['Aerodynamics', 'Compressor'])
>>> roles = frozenset([frozenset(['Aerodynamics', 'Compressor'])])
>>> assert get_context(tags, roles) == Scope.PRIVATE
>>>
>>> tags = frozenset(['Aerodynamics', 'Compressor'])
>>> roles = frozenset([frozenset(['Mechanics', 'Compressor'])])
>>> assert get_context(tags, roles) == Scope.PROTECTED
>>>
>>> tags = frozenset(['Aerodynamics', 'Compressor'])
>>> roles = frozenset([frozenset(['Heat transfert', 'Turbine'])])
>>> assert get_context(tags, roles) == Scope.PUBLIC
"""
if not tags or tags in role_sets:
return Scope.PRIVATE
else:
best = Scope.PUBLIC
for roles in role_sets:
if tags.issubset(roles):
return Scope.PRIVATE
if tags.intersection(roles):
best = Scope.PROTECTED
return best
from cosapp.core.config import CoSAppConfiguration
user_config = CoSAppConfiguration()
self._user_context = get_scope(user_config.roles)
# print(f"{tags = }", user_config.roles, self._user_context)
scope = self._user_context
if scope != Scope.PRIVATE: # Some ports may be restrained
for port in self.inputs.values():
port.scope_clearance = scope
def _initialize(self, **kwargs) -> dict[str, Any]:
"""Hook method to add `System` member before calling `setup` method.
Parameters
----------
**kwargs : dict[str, Any]
Optional keywords arguments of __init__
Returns
-------
dict[str, Any]
Optional keywords arguments not consumed by `_initialize`
"""
return kwargs
[docs]
def setup(self, **kwargs) -> None:
"""`System` port and/or child `System` are defined in this function.
This function allows to populate a customized `System` class. The helper functions for the
user are:
- `add_input` : add an input port
- `add_output` : add an output port
- `add_inward` : add a inward variable
- `add_outward` : add a outward variable
- `add_child` : add a child `System`
See Also
--------
add_input, add_output, add_inward, add_outward, add_child
Examples
--------
Here is an example of `System` subclassing:
>>> class AdvancedDuct(System):
>>>
>>> def setup(self):
>>> self.add_input(FlowPort, 'in')
>>> self.add_output(FlowPort, 'out')
>>> self.add_inward({'heat_source': 1.0,
>>> 'pressure_loss': 0.01})
>>> self.add_outward('wall_temperature', 400.)
>>>
>>> self.add_child(AnotherSystem('system2'))
"""
pass # pragma: no cover
[docs]
def update(self):
"""Perform complex tasks due to data change.
Some parameters may be a file used to initialize a complex object. Updating that parameter
may imply the modification of that complex object. This should be done in this method.
Examples
--------
>>> class TableModel(System):
>>>
>>> def setup(self):
>>> # Add a inwards to the file containing the table values
>>> self.add_inward('table_file', '{myProject}/ressources/my_table.csv')
>>> # Set a object able to interpolate the table
>>> self.add_outward('table', Table(self.table_file))
>>>
>>> def update(self):
>>> # Update the table object as the source file may have been updated.
>>> self.table = Table(self.table_file)
Notes
-----
This method is called systematically after the :py:meth:`~cosapp.systems.system.System.setup` call.
Otherwise the user is responsible to explicitly call it when needed.
"""
pass # pragma: no cover
def __getstate__(self) -> dict[str, Any]:
"""Creates a state of the object.
The state type does NOT match type specified in
https://docs.python.org/3/library/pickle.html#object.__getstate__
to allow custom serialization.
Returns
-------
dict[str, Any]:
state
"""
data = dict()
data["setup_args"] = self.__ctor_kwargs.copy()
data["name"] = self.name
data["problem"] = self._math
data["properties"] = self.__properties.copy()
data["events"] = self.__events.copy()
for direction in ["inputs", "outputs"]:
ports: dict[str, BasePort] = getattr(self, direction)
data[direction] = {
name: port
for name, port in ports.items()
}
data["child_connectors"] = self.__child_connectors
data["pulling_connectors"] = self.__pulling_connectors
data["name2variable"] = self.name2variable
data["children"] = self.children
data["exec_order"] = list(self.exec_order)
data["drivers"] = self.drivers
data["__master_set"] = self.__master_set
return data
def __reduce_ex__(self, protocol: Any) -> tuple[Callable, tuple, dict]:
"""Defines how to serialize/deserialize the object.
Parameters
----------
_ : Any
Protocol used
Returns
-------
tuple[Callable, tuple, dict]
A tuple of the reconstruction method, the arguments to pass to
this method, and the state of the object
"""
state = self.__getstate__()
state["setup_args"]["name"] = self.name
return self._make_new, (state["setup_args"], ), state
@classmethod
def _make_new(cls, setup_args: dict[str, Any]) -> System:
"""Reconstructs a `System` from the state related to the topology.
The object deserialization must be performed in 2 stages because
some objects will inspect the systems at initialization (e.g. `EvalString`).
"""
name = setup_args.pop("name")
system: System = object.__new__(cls, name)
system._init(name, **setup_args)
return system
def __setstate__(self, state: dict[str, Any]) -> None:
"""Sets the object from a provided state.
Parameters
----------
state : dict[str, Any]
State
"""
for child in state.pop("children").values():
self._add_child(child, check=False)
for port in (state.get("inputs") | state.get("outputs")).values():
self._add_port(port, check=False)
self.name2variable = state.pop("name2variable")
self.__child_connectors = state["child_connectors"]
self.__pulling_connectors = state["pulling_connectors"]
self._math = state["problem"]
self.exec_order = state.get("exec_order", [])
self.__properties = properties = state.get("properties", {})
self.__events = events = state.get("events", {})
self.__readonly = {
**properties,
**events,
}
for driver in state.get("drivers", {}).values():
self.add_driver(driver)
# this should only affect object serialized inside a `set_master`
# context manager (such as drivers relying on multiprocessing)
System.__master_set = state["__master_set"]
def __getattr__(self, name: str) -> Any:
try: # Faster than testing `if name in self`
variable_ref = self.name2variable[name]
except KeyError:
try:
return super().__getattribute__(name)
except AttributeError as error:
# Last try: checkout read-only properties
# Note: checked last, as access is less likely than
# `name2variable` and `super().__getattribute__()`.
try:
return self.__readonly[name]
except KeyError:
raise error
else:
return variable_ref.value
def __setattr__(self, name: str, value: Any) -> None:
try: # Faster than testing `if name in self`
# Faster to duplicate __setitem__ call than calling it
variable_ref: VariableReference = super().__getattribute__("name2variable")[name]
except KeyError:
if name in self.__readonly:
raise AttributeError(f"can't set attribute {self.name}.{name}")
elif hasattr(self, name):
super().__setattr__(name, value)
else:
raise AttributeError(
f"System {self.__class__.__qualname__!r} has no attribute {name!r}."
)
except AttributeError:
# Exception catcher to create all initial variables defined prior to name2variable
# Then KeyError will be raised and it won't be allowed to create new attributes
super().__setattr__(name, value)
else:
variable_ref.value = value
def __contains__(self, item: str) -> bool:
try:
self.__find(item)
except AttributeError:
return False
else:
return True
def __find(self, name: str) -> Any:
for collection in (
self.name2variable,
self.__readonly,
):
try:
return collection[name]
except KeyError:
continue
raise AttributeError(name)
def __getitem__(self, name: str) -> Any:
try:
return getattr(self, name)
except AttributeError:
raise KeyError(
f"Variable {name!r} not found in the context of System {self.name!r}"
)
def __setitem__(self, name: str, value: Any) -> None:
try:
variable_ref = self.name2variable[name]
except KeyError:
if name in self.__readonly:
raise AttributeError(f"Can't set read-only attribute {self.name}.{name}")
raise KeyError(
f"Variable {name!r} not found in the context of System {self.name!r}"
)
variable_ref.value = value
def __repr__(self) -> str:
return f"{self.name} - {type(self).__name__}"
[docs]
def get_variable(self, varname: str) -> Variable | ModeVariable:
try:
var = self.name2variable[varname]
except KeyError:
if varname in self.__readonly:
port = None
else:
raise AttributeError(f"{varname!r} is not a variable of {self.name}")
else:
port = var.mapping
if not isinstance(port, BasePort):
raise ValueError(f"{varname!r} is not a variable of {self.name}")
return port.get_variable(var.key)
@property
def input_mapping(self) -> dict[str, VariableReference]:
"""dict[str, VariableReference]: free input mapping"""
if self.__input_mapping is None:
self._update_input_mapping()
return MappingProxyType(self.__input_mapping)
def _update_input_mapping(self) -> None:
"""Run graph analysis to identify free inputs,
and store data in self.__input_mapping dict."""
from cosapp.utils.graph_analysis import get_free_inputs
self.__input_mapping = get_free_inputs(self)
def __reset_input_mapping(self) -> None:
self.__input_mapping = None
if (parent := self.parent):
parent.__reset_input_mapping()
[docs]
def append_name2variable(
self, additional_mapping: Iterable[tuple[str, VariableReference]]
) -> None:
"""Append the `Iterable` of (`str`, `VariableReference`) `tuple` to the lookup variables mapping.
The additional mapping is also transfer upward to the parent `System`.
Parameters
----------
additional_mapping: Iterable[tuple[str, VariableReference]]
Additional list of (str, VariableReference) tuple to append
"""
# TODO raise error if name already exists (example a inwards variable name == component name)
additional_mapping = list(additional_mapping)
self.name2variable.update(additional_mapping)
for key, _ in additional_mapping:
self._add_member(key)
if (parent := self.parent):
name = self.name
rel2absname = lambda item: (f"{name}.{item[0]}", item[1])
parent.append_name2variable(map(rel2absname, iter(additional_mapping)))
[docs]
def add_property(self, name: str, value: Any) -> None:
"""Create new read-only property `name`, set to `value`"""
self.__lock_check("add_property")
name = Variable.name_check(name)
self.__check_attr(name, f"cannot add read-only property {name!r}")
self.__register_property(name, value)
self._add_member(name)
def __register_property(self, name: str, value: Any):
"""Register attribute `name` as a read-only property."""
self.__properties[name] = self.__readonly[name] = value
def __unregister(self, names: Iterable[str]):
"""Unregister `names` as attributes of the system and its parent tree."""
names = list(names)
for name in names:
self.name2variable.pop(name, None)
self.__readonly.pop(name, None)
self.__properties.pop(name, None)
self.__events.pop(name, None)
if (parent := self.parent):
child_name = self._name
rel2absname = lambda name: f"{child_name}.{name}"
parent.__unregister(map(rel2absname, names))
@property
def properties(self) -> dict[str, Any]:
"""dict[str, Any]: list of read-only properties and associated values"""
return MappingProxyType(self.__properties)
[docs]
def add_output(self,
port_class: type[AnyPort],
name: str,
variables: Optional[dict[str, Any]] = None,
desc = "",
) -> AnyPort:
"""Add an output `Port` to the `System`.
This function cannot be called outside `System.setup`.
Parameters
----------
- port_class [type[Port]]
Class of the `Port` to create
- name [str]:
`Port` name
- desc [str, optional]:
`Port` description
- variables [dict[str, Any], optional]:
Dictionary of initial values (default: None)
Returns
-------
The created port
Examples
--------
>>> class MyModule(System):
>>> def setup(self):
>>> self.add_output(MyPort, 'p_out1')
>>> self.add_output(MyPort, 'p_out2', {'y': 1.5})
"""
self.__lock_check("add_output")
# type validation
check_arg(port_class, 'port_class', type)
if not issubclass(port_class, Port):
raise TypeError(
f"port_class should be a subclass of Port; got {type(port_class).__name__}."
)
new_port = port_class(name, PortType.OUT, variables=variables)
self._add_port(new_port, desc)
return new_port
def _add_port(self, port: BasePort, desc="", check=True) -> None:
"""Add a port to the system
Parameters
----------
port : `BasePort`
instance of a port class
"""
if check:
portname = self._name_check(port.name)
self.__check_attr(portname, f"cannot add {type(port).__qualname__} {portname!r}")
port.owner = self
port.description = desc
if port.is_input:
inputs = self.inputs
if check and port.name in inputs:
raise ValueError(f"Port name {port.name!r} already exists as input")
inputs[port.name] = port
port_key = (port.name, VariableReference(context=self, mapping=inputs, key=port.name))
elif port.is_output:
outputs = self.outputs
if check and port.name in outputs:
raise ValueError(f"Port name {port.name!r} already exists as output")
outputs[port.name] = port
port_key = (port.name, VariableReference(context=self, mapping=outputs, key=port.name))
else:
raise ValueError(f"Unknown `PortType` {port.direction}")
keys = [port_key]
rel2absname = lambda relname: (
f"{port.name}.{relname}",
VariableReference(context=self, mapping=port, key=relname),
)
keys.extend(map(rel2absname, iter(port)))
if isinstance(port, Port):
attributes = get_attributes(port) - get_attributes(Port) - set(port._variables)
for attr in attributes:
keys.append((f"{port.name}.{attr}", VariableReference(context=self, mapping=port, key=attr)))
self.append_name2variable(keys)
def __lock_check(self, method: str) -> None:
"""Raises AttributeError if system is locked"""
if self._locked:
raise AttributeError(f"`{method}` cannot be called outside `setup`")
def __check_attr(self, name: str, prefix="") -> None:
"""Raises ValueError if attribute `name` already exists in system"""
try:
obj = self.__find(name)
except AttributeError:
return # attribute does not exist - OK
suffix = ""
if isinstance(obj, VariableReference):
if obj.context is self:
value = obj.value
if isinstance(value, Port):
pdir = "input" if value.is_input else "output"
suffix = f"as an {pdir} {type(value).__name__}"
elif isinstance(value, System):
suffix = f"as a sub-system {type(value).__name__}"
else:
port = obj.mapping
if port is self.inputs[System.INWARDS]:
suffix = "as an inward"
elif port is self.outputs[System.OUTWARDS]:
suffix = "as an outward"
else:
suffix = ""
elif isinstance(obj, Event):
suffix = f"as an event"
else:
suffix = "as a read-only property"
message = f"{self.name}.{name} already exists"
prefix = prefix.strip()
if prefix:
message = f"{prefix}; {message}"
if suffix:
message = f"{message} {suffix}"
raise ValueError(f"{message}.")
[docs]
def add_inward(self,
name: str,
value: Any = 1,
unit = "",
dtype: Optional[Types] = None,
valid_range: Optional[RangeValue] = None,
invalid_comment = "",
limits: Optional[RangeValue] = None,
out_of_limits_comment = "",
desc = "",
distribution: Optional[Distribution] = None,
scope = Scope.PRIVATE,
) -> None:
"""Add a inward variable to the `System`.
A inward variable is calculated by the `System`. But its value is not mandatory in any
variables fluxes between this `System` and another one.
An unique inward variable can be defined by providing directly all arguments. And multiple
inward variables can be defined by passing a `dict` of pair (`str`, `Any`) with an entry for
each variable.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
name : str
Name of the inward variable
value : Any, optional
Variable value at construction; default 1
unit : str, optional
Variable unit; default empty string (i.e. dimensionless)
dtype : type or iterable of types, optional
Variable type; default None (i.e. type of initial value)
valid_range : tuple[Any, Any], optional
Validity range of the variable; default None (i.e. all values are valid)
invalid_comment : str, optional
Comment to show in case the value is not valid; default ''
limits : tuple[Any, Any], optional
Limits over which the use of the model is wrong; default valid_range
out_of_limits_comment : str, optional
Comment to show in case the value is not valid; default ''
desc : str, optional
Variable description; default ''
distribution : Distribution, optional
Probability distribution of the variable; default None (no distribution)
scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional
Variable visibility; default PRIVATE
Examples
--------
>>> class MyModel(System):
>>> def setup(self):
>>> self.add_inward('v', 1.0, unit='m/s', desc='Velocity')
"""
self.__lock_check("add_inward")
check_arg(name, "name", str)
self.__reset_input_mapping()
self.__check_attr(name, f"cannot add inward {name!r}")
port = self.inputs[System.INWARDS]
port.add_variable(
name,
value,
unit=unit,
dtype=dtype,
valid_range=valid_range,
invalid_comment=invalid_comment,
limits=limits,
out_of_limits_comment=out_of_limits_comment,
desc=desc,
distribution=distribution,
scope=scope,
)
reference = VariableReference(context=self, mapping=port, key=name)
self.append_name2variable(
[(f"{System.INWARDS}.{name}", reference), (name, reference)]
)
[docs]
def add_outward(self,
name: str,
value: Any = 1,
unit = "",
dtype: Optional[Types] = None,
valid_range: Optional[RangeValue] = None,
invalid_comment = "",
limits: Optional[RangeValue] = None,
out_of_limits_comment = "",
desc = "",
scope = Scope.PUBLIC,
) -> None:
"""Add a outward variable to the `System`.
A outward variable is calculated by the `System`. But its value is not mandatory in any
variables fluxes between this `System` and another one.
An unique outward variable can be defined by providing directly all arguments. And multiple
outward variables can be defined by passing a `dict` of pair (`str`, `Any`) with an entry for
each variable.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
name : str
Name of the outward variable
value : Any, optional
Variable value at construction; default 1
unit : str, optional
Variable unit; default empty string (i.e. dimensionless)
dtype : type or iterable of types, optional
Variable type; default None (i.e. type of initial value)
valid_range : tuple[Any, Any], optional
Validity range of the variable; default None (i.e. all values are valid)
invalid_comment : str, optional
Comment to show in case the value is not valid; default ''
limits : tuple[Any, Any], optional
Limits over which the use of the model is wrong; default valid_range
out_of_limits_comment : str, optional
Comment to show in case the value is not valid; default ''
desc : str, optional
Variable description; default ''
scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional
Variable visibility; default PUBLIC
Examples
--------
>>> class MyModel(System):
>>> def setup(self):
>>> self.add_outward('a', 1.0, unit='m/s**2', desc='Acceleration')
"""
self.__lock_check("add_outward")
check_arg(name, "name", str)
self.__check_attr(name, f"cannot add outward {name!r}")
port = self.outputs[System.OUTWARDS]
port.add_variable(
name,
value,
unit=unit,
dtype=dtype,
valid_range=valid_range,
invalid_comment=invalid_comment,
limits=limits,
out_of_limits_comment=out_of_limits_comment,
desc=desc,
scope=scope,
)
reference = VariableReference(context=self, mapping=port, key=name)
self.append_name2variable(
[(f"{System.OUTWARDS}.{name}", reference), (name, reference)]
)
[docs]
def add_transient(self,
name: str,
der: str, # time derivative of `name`
desc: Optional[str] = None,
max_time_step: Number | str = numpy.inf,
max_abs_step: Number | str = numpy.inf,
) -> None:
"""
Declare a transient variable, defined implicitly by the expression of its time derivative, and add
a time-dependent unknown to the mathematical problem.
Transients can be used withinn the system, but their time evolution is computed outside the system, by a time driver.
If `name` already exists, it must refer to an inward.
If `name` does not exist, a new inward `name` is created on the fly.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
name : str
Name of the transient variable
der : str
Expression of the time derivative, used for integration during time driver execution.
The type and unit of the transient variable are deduced from `der`.
desc : str, optional
Variable description; default None
max_time_step : Number or evaluable expression (str), optional
Maximum time step admissible for the integration of the variable (for stability, typically).
max_abs_step : Number or evaluable expression compatible with transient variable, optional
Maximum variable step admissible over one time step (for stability, typically).
Examples
--------
>>> system.add_inward('v', numpy.zeros(3), desc='Velocity')
>>> system.add_inward('a', numpy.zeros(3), desc='Acceleration')
>>> system.add_inward('area', 1.0)
>>> system.add_outward('flowrate', desc='Volumetric flowrate')
>>> system.add_transient('x', der='v') # x defined by dx/dt = v
>>> system.add_transient('v', der = 'a',
max_time_step = '0.1 / max(norm(a), 1e-9)',
)
>>> system.add_transient('h',
der = 'flowrate / area',
max_abs_step = 0.1,
)
"""
check_arg(name, "name", str)
check_arg(der, "der", str)
_, value, dtype = TimeUnknown.der_type(der, self)
if name in self:
if not self.is_input_var(name):
raise TypeError(
"Only input variables can be declared as transient variables; got {!r} in {!r}".format(
name, self.name)
)
# Check that existing inward is compatible with derivative expression
declared_value = self[name]
if isinstance(value, numpy.ndarray):
ok = (numpy.shape(declared_value) == value.shape)
else:
ok = is_number(declared_value)
if not ok:
raise TypeError(
f"type incompatibility: {dtype.__name__} {der!r} cannot be "
f"the time derivative of {type(declared_value).__name__} variable {name!r}"
)
else: # Create new inward on the fly
logger.info(
f"Creation of new time-dependent inward {name!r} within system {self.name!r}"
)
if desc is None:
str_der = self.str_der(1)
desc = f"Transient variable defined as {str_der(name)} = {der}"
# Need to copy the value to avoid vector linked by reference between variable and its derivative
# See https://gitlab.safrantech.safran/cosapp/cosapp/issues/179
self.add_inward(name, value=deepcopy(value), desc=desc, dtype=dtype)
self._time_pb.add_transient(name, der, max_time_step, max_abs_step)
[docs]
def add_rate(self,
name: str,
source: Any, # `name` is the time derivative of `source`
initial_value: Number | numpy.ndarray | str = None,
desc: Optional[str] = None,
) -> None:
"""
Add a variable monitoring the rate-of-change of a given quantity (referred to as `source`) as the system evolves.
Rates are defined by their source, and are updated by the time driver computing the time evolution of the system.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
name : str
Name of the transient variable
source : Any
Expression of the quantity of interest whose rate will be computed.
The type and unit of the rate are deduced from `source`.
desc : str, optional
Variable description; default None
initial_value : Number/array, or evaluable expression (str), optional.
Initial value of the rate (otherwise unknown, as rates are only updated from the first time step on).
If specified, it must be consistent with the type of source (scalar vs. array).
Examples
--------
>>> system.add_inward('U', 0.1, desc='Input voltage')
>>> system.add_rate('dUdt', source='U') # dUdt defined as dU/dt
"""
check_arg(name, "name", str)
self.__check_attr(name, f"cannot add rate {name!r}")
_, src_value, dtype = TimeDerivative.source_type(source, self)
def cast(expression):
if expression is None:
return None
if isinstance(expression, str):
value = EvalString(expression, self).eval()
elif isinstance(expression, EvalString):
value = expression.eval()
else:
value = expression
if isinstance(src_value, numpy.ndarray):
value = numpy.asarray(value)
return value
if initial_value is not None and isinstance(src_value, numpy.ndarray):
# If provided, `initial_value` must conform to `value.shape`
check_arg(
cast(initial_value), 'initial_value', numpy.ndarray,
value_ok = lambda array: (numpy.shape(array) == src_value.shape),
)
if name in self:
if not self.is_input_var(name):
raise TypeError(
f"Only input variables can be declared as rates; got {self.name}.{name}"
)
# Check that existing inward is compatible with requested rate
declared_value = self[name]
if isinstance(src_value, numpy.ndarray):
ok = (numpy.shape(declared_value) == src_value.shape)
else:
ok = is_number(declared_value)
if not ok:
raise TypeError(
f"type incompatibility: {type(declared_value).__name__} {name!r} "
f"cannot be the time derivative of {dtype.__name__} {source!r}"
)
self[name] = cast(initial_value)
else: # Create new inward on the fly
logger.info(f"Creation of new time-dependent inward {name!r} within system {self.name!r}")
if desc is None:
str_der = self.str_der(1)
desc = str_der(source)
if initial_value is None:
value = None
dtype = (dtype, type(None))
else:
value = cast(initial_value)
self.add_inward(name, value=value, desc=desc, dtype=dtype)
self._time_pb.add_rate(name, source, initial_value)
[docs]
def is_output_var(self, name: str) -> bool:
"""Returns `True` if `name` is the name of an output variable, `False` otherwise"""
return self.__check_var(name, PortType.OUT)
def __check_var(self, name: str, direction: PortType) -> bool:
"""Driver function for `is_input` and `is_output`"""
try:
container = self.name2variable[name].mapping
except KeyError:
return False
else:
return isinstance(container, BasePort) and (container.direction == direction)
[docs]
@staticmethod
def str_der(order: int) -> Callable[[str,], str]:
"""Derivate name factory.
Parameters
----------
order : int
Derivative order
Returns
-------
Callable[[int, ], str]
Function generating the derivate name from the variable name
"""
if order == 1:
return lambda s: f"d({s})/dt"
elif order > 1:
return lambda s: "d^{0}({1})/dt^{0}".format(s, order)
return lambda s: s
def _precompute(self) -> None:
for rate in self._time_pb.rates.values():
rate.touch() # ensured that system is recomputed at first time step
[docs]
def check(self, name: Optional[str]=None) -> dict[str, Validity] | Validity:
"""Get variable value validity for a given variable, port or system or all of them.
If `name` is not provided, returns a dictionary with the validity of all `System` variables;
i.e. all variables in input and output ports including inwards and outwards of this system and
all its children.
Else only, the validity for the given variable will be returned.
Parameters
----------
name : str, optional
Variable name
Returns
-------
Validity | dict[str, Validity]
Variable validity status (if `name` is provided), or dictionary thereof (default).
"""
def plain_check(name) -> dict[str, Validity] | Validity:
"""Internal version with no attribute check"""
obj = getattr(self, name)
if isinstance(obj, (System, BasePort)):
return obj.check()
else:
ref = self.name2variable[name]
return ref.mapping.check(ref.key)
# TODO unit tests
if name is None:
def variable_filter(dict_items: tuple[str, VariableReference]) -> bool:
key, ref = dict_items
if isinstance(ref.value, (System, BasePort)):
return False
elif "." in key:
owner_name, varname = key.rsplit(".", maxsplit=1)
owner = self[owner_name]
return isinstance(owner, BasePort) and varname in owner
else: # Shortcut to inwards or outwards => not taken
return False
return {
name: plain_check(name)
for name, _ in filter(variable_filter, self.name2variable.items())
}
elif name in self:
return plain_check(name)
else:
raise AttributeError(f"Variable {name} does not exist in System {self.name}")
[docs]
def is_running(self) -> bool:
"""Is this System in execution?
Returns
-------
bool
In execution status
"""
return self.__context_lock.is_active
@property
def residues(self):
"""dict[str, Residue] : Get the residues for the current `System`."""
# MappingProxyType forbids external modification
return MappingProxyType(self._math.residues)
@property
def unknowns(self):
"""dict[str, Unknown] : Get the unknowns for the current `System`."""
# MappingProxyType forbids external modification
return MappingProxyType(self._math.unknowns)
@property
def transients(self):
"""Returns a dictionary containing all transient unknowns in current system tree"""
# MappingProxyType forbids external modification
return MappingProxyType(self._time_pb.transients)
@property
def rates(self):
"""Returns a dictionary containing all time derivatives (rates) in current system tree"""
# MappingProxyType forbids external modification
return MappingProxyType(self._time_pb.rates)
[docs]
def events(self) -> Iterator[Event]:
"""Iterator on all events locally defined on system."""
yield from self.__events.values()
[docs]
def all_events(self) -> Iterator[Event]:
"""Recursive iterator on all events in complete system tree."""
for elem in self.tree():
yield from elem.events()
@property
def child_connectors(self) -> dict[str, list[Connector]]:
"""dict[str, list[Connector]] : Connectors between sub-systems, referenced by system names."""
return MappingProxyType(self.__child_connectors)
[docs]
def connectors(self) -> dict[str, Connector]:
"""Constructs a dictionary of all connectors within system,
referenced by connector name.
"""
make_items = lambda connector: (connector.name, connector)
return dict(map(make_items, self.all_connectors()))
[docs]
def all_connectors(self) -> Iterator[Connector]:
"""Iterator yielding all connectors within system."""
yield from self.__pulling_connectors
for connectors in self.__child_connectors.values():
yield from connectors
[docs]
def incoming_connectors(self) -> Iterator[Connector]:
"""Iterator yielding all connectors targetting system."""
yield from self.__pulling_connectors
if (parent := self.parent):
yield from parent.__child_connectors.get(self.name, [])
@property
def problem(self) -> MathematicalProblem:
"""MathematicalProblem: locally defined off-design mathematical problem,
without considering sub-system tree.
Raises:
-------
`AttributeError` unless system is being modified by a driver.
"""
if not self.__free_problem.is_active:
raise AttributeError("Can't access attribute `problem`")
return self._math
[docs]
def assembled_problem(self) -> MathematicalProblem:
"""Returns the consolidated mathematical problem, assembled from
entire system tree, and accounting for cyclic dependency loops.
Returns
-------
MathematicalProblem
The assembled mathematical problem.
"""
problem = self.new_problem('off-design')
# Make shallow copy of `self._math` properties
for name in ('residues', 'deferred_residues', 'unknowns'):
attr: dict = getattr(problem, name)
attr.update(getattr(self._math, name))
def transfer_unknown(unknown: Unknown, new_name: str):
options = {
attr: getattr(unknown, attr)
for attr in ('max_abs_step', 'max_rel_step', 'lower_bound', 'upper_bound')
}
problem.add_unknown(new_name, **options)
children = self.children.values()
popped_targets = []
for child in children:
if child.is_standalone():
continue
child_problem = child.assembled_problem()
connectors = self.__child_connectors.get(child.name, [])
unknowns = child_problem.unknowns
for connector in connectors:
# Transfer unknowns to parent (when necessary)
for name in list(unknowns.keys()):
unknown = unknowns[name]
port = unknown.port
connected = (
port.owner in children
and port is connector.sink
and unknown.variable in connector.sink_variables()
)
if connected:
# Port is connected => remove unknown
unknowns.pop(name)
pulled = connector.source.owner is self
if pulled:
# Transfer unknown to parent level
src = connector.source_variable(unknown.variable)
transfer_unknown(unknown, f"{connector.source.name}.{src}")
# Prune target equations defined as weak if target is connected
origin: System = connector.source.owner
deferred_residues = origin._math.deferred_residues.values()
for residue in filter(lambda target: target.weak, deferred_residues):
targetted = list(residue.variables)[0]
ref = origin.name2variable[targetted]
port = ref.mapping
name = ref.key
connected = (
port.owner in children
and port is connector.source
and name in connector.source_variables()
)
if connected:
# Mark deferred equation for removal
key = MathematicalProblem.target_key(f"{origin.name}.{targetted}")
popped_targets.append(key)
problem.extend(child_problem, copy=False)
# Remove connected weak targets
for key in popped_targets:
problem.deferred_residues.pop(key)
return problem.extend(self.__loop_problem)
[docs]
def assembled_time_problem(self) -> TimeProblem:
"""Returns the consolidated transient problem, assembled from the entire system tree.
Returns
-------
TimeProblem
The assembled time problem.
"""
problem = TimeProblem('off-design', self)
# Make shallow copy of `self._math` properties
for name in ('transients', 'rates'):
attr: dict = getattr(problem, name)
attr.update(getattr(self._time_pb, name))
def transfer_transient(transient: TimeUnknown, varname: str):
ref = transient.context.name2variable[transient.basename]
problem.add_transient(
varname,
der = transient.der,
max_time_step = transient.max_time_step_expr,
pulled_from = transient.pulled_from or ref,
)
def transfer_rate(rate: TimeDerivative, varname: str):
problem.add_rate(
varname,
source = rate.source_expr,
initial_value = rate.initial_value_expr,
)
popped: list[tuple[str, str]] = list()
children = self.children.values()
for child in children:
child_problem = child.assembled_time_problem()
connectors = self.__child_connectors.get(child.name, [])
transfer_items: list[tuple[dict[str, Any], Callable]] = [
(child_problem.transients, transfer_transient),
(child_problem.rates, transfer_rate),
]
for connector in connectors:
# Transfer transients and rates to parent (when necessary)
for unknowns, transfer in transfer_items:
for name in list(unknowns.keys()):
unknown: Unknown = unknowns[name]
port = unknown.port
connected = (
port.owner in children
and port is connector.sink
and unknown.variable in connector.sink_variables()
)
if connected:
# Port is connected => remove unknown
unknowns.pop(name)
if connector.source.owner is self:
# Transfer unknown to parent level
src = connector.source_variable(unknown.variable)
transfer(unknown, f"{connector.source.name}.{src}")
else:
popped.append(unknown.contextual_name())
problem.extend(child_problem, copy=False)
if popped:
if len(popped) == 1:
message = f"variable {popped[0]!r} is connected to an output"
else:
message = f"variables {popped} are connected to outputs"
warnings.warn(
f"In {self.full_name()}, time-dependent {message} and will not be computed."
)
return problem
[docs]
@deprecated(redirect=assembled_problem)
def get_unsolved_problem(self) -> MathematicalProblem:
...
def _add_child(self,
child: AnySystem,
execution_index: Optional[int] = None,
pulling: Optional[str | Collection[str | dict[str, str]] | dict[str, str]] = None,
desc = "",
check = True,
) -> AnySystem:
"""Add a child to the current `System`.
This method is the internal implementation of `add_child` but also offer
the capability to perform the operation without checking, which has a huge
impact on performance (serial/deserial, etc.).
Parameters
----------
- child [System]:
`System` to add to the current `System`
- execution_index [int, optional]:
Index of the execution order list at which the `System` should be inserted;
default latest.
- pulling [str | Collection[str | dict[str, str]] | dict[str, str], optional]:
Map of child ports to pulled ports at the parent system level; default None (no pulling)
- desc [str, optional]:
Sub-system description in the context of its parent.
- check [bool]:
Whether to perform checks (types, pre-existing child, etc.) when adding a new child or not
Returns
-------
`child`
"""
# type validation
if check:
check_arg(child, 'child', System)
check_arg(pulling, 'pulling', (type(None), str, Collection))
child = super()._add_child(child, execution_index, desc, check)
if check and child.name in self.name2variable:
if isinstance(self[child.name], System):
logger.warning(
f"A subsystem named {child.name} already exists within system {self.name}."
f" Child system {child!r} will overwrite it."
)
else:
raise ValueError(f"An object named {child.name!r} already exists in system {self}")
keys = [(child.name, VariableReference(context=self, mapping=self.children, key=child.name))]
# Append child system name mapping to current mapping
rel2absname = lambda item: (f"{child.name}.{item[0]}", item[1])
keys.extend(map(rel2absname, child.name2variable.items()))
self.append_name2variable(keys)
self.__reset_input_mapping()
if pulling:
try:
pull_variables(child, pulling)
except (ConnectorError, UnitError):
self.pop_child(child.name)
raise
# Add read-only constants to parent line
prefix = f"{child.name}."
for system in self.path_to_root():
for name, value in child.__properties.items():
system.__register_property(f"{prefix}{name}", value)
for name, value in child.__events.items():
system.__register_event(f"{prefix}{name}", value)
prefix = f"{system.name}.{prefix}"
# If child is added outside of `setup`, we must force system tree inspection
# to ensure input propagation during the next system execution.
self.touch()
return child
[docs]
def add_child(self,
child: AnySystem,
execution_index: Optional[int] = None,
pulling: Optional[str | Collection[str | dict[str, str]] | dict[str, str]] = None,
desc = "",
) -> AnySystem:
"""Add a child `System` to the current `System`.
When adding a child `System`, it is possible to specified its position in the execution
order.
Child ports or individual `inwards` and `outwards` can also be pulled at the parent level by providing
either the name of the port/inward/outward or a list of them or the name mapping of the child element
(dictionary keys) to the parent element (dictionary values).
If the argument is not a dictionary, the name in the parent system will be the same as in
the child.
Parameters
----------
- child [System]:
`System` to add to the current `System`
- execution_index [int, optional]:
Index of the execution order list at which the `System` should be inserted;
default latest.
- pulling [str | Collection[str | dict[str, str]] | dict[str, str], optional]:
Map of child ports to pulled ports at the parent system level; default None (no pulling)
- desc [str, optional]:
Sub-system description in the context of its parent.
Returns
-------
`child`: the added child system
"""
return self._add_child(child, execution_index, pulling, desc, check=True)
[docs]
def pop_child(self, name: str) -> System:
"""Remove the subsystem called `name`.
Parameters
----------
name: str
Name of the subsystem to be removed
Returns
-------
`System`
The removed subsystem
Raises
------
`AttributeError` if no match is found
"""
popped: System = super().pop_child(name)
# Remove all connections to and from popped child
child_connectors = self.__child_connectors
child_connectors.pop(popped.name, None)
is_valid = lambda connector: connector.source.owner is not popped
for key, connectors in child_connectors.items():
child_connectors[key] = list(
filter(is_valid, connectors)
)
self.__pulling_connectors = list(
filter(is_valid, self.__pulling_connectors)
)
# Remove references to popped child from name mapping
popped_attr = lambda key: key.startswith(f"{name}.")
popped_keys = [name]
popped_keys.extend(filter(popped_attr, self.name2variable.keys()))
popped_keys.extend(filter(popped_attr, self.__readonly.keys()))
self.__unregister(popped_keys)
self.__reset_input_mapping()
return popped
[docs]
def add_driver(self, driver: AnyDriver) -> AnyDriver:
"""Add a driver to this system.
Parameters
----------
driver : Driver
Driver to add
Returns
-------
Driver
The added driver
"""
from cosapp.drivers import Driver
check_arg(driver, 'driver', Driver)
name = driver.name
default_name = f"default_driver_for_{hex(id(self))}"
if default_name in self.drivers and name != default_name:
self.drivers.clear()
if name in self.drivers:
logger.warning(f"Driver {name!r} already exists and will be replaced")
driver.owner = self
driver.parent = None # Clear driver parent in case of driver reused
self.drivers[name] = driver
return driver
[docs]
def add_unknown(self,
name: str | Iterable[dict | str],
max_abs_step: Number = numpy.inf,
max_rel_step: Number = numpy.inf,
lower_bound: Number = -numpy.inf,
upper_bound: Number = numpy.inf
) -> MathematicalProblem:
"""Add unknown variables.
You can set variable one by one or provide a list of dictionary to set multiple variable at once. The
dictionary key are the arguments of this method.
Parameters
----------
- name : str | Iterable[dict | str]
Name of the variable or list of variable to add
- max_rel_step : float, optional
Maximal relative step by which the variable can be modified by the numerical solver; default numpy.inf
- max_abs_step : float, optional
Maximal absolute step by which the variable can be modified by the numerical solver; default numpy.inf
- lower_bound : float, optional
Lower bound on which the solver solution is saturated; default -numpy.inf
- upper_bound : float, optional
Upper bound on which the solver solution is saturated; default numpy.inf
Returns
-------
MathematicalProblem
The modified mathematical problem
"""
inner_problem = self._math
def create_unknown(
name: str,
max_abs_step: Number = numpy.inf,
max_rel_step: Number = numpy.inf,
lower_bound: Number = -numpy.inf,
upper_bound: Number = numpy.inf
):
unknown = Unknown(self, name, max_abs_step, max_rel_step, lower_bound, upper_bound)
# Remove existing unknown if user wants to update the parameters.
if unknown.name in inner_problem.unknowns:
inner_problem.unknowns.pop(unknown.name)
inner_problem.add_unknown([unknown])
if isinstance(name, str):
create_unknown(name, max_abs_step, max_rel_step, lower_bound, upper_bound)
else:
for item in name:
create_unknown(item, max_abs_step, max_rel_step, lower_bound, upper_bound)
return inner_problem
[docs]
def add_equation(self,
equation: str | Iterable[dict | str | tuple[str, str]],
name: Optional[str] = None,
reference: Number | numpy.ndarray | str = 1.0,
) -> MathematicalProblem:
"""Add off-design equation.
Equations may be added one by one, or provided by a list of dictionaries to add multiple equations at once.
The dictionary keys are the arguments of this method.
Parameters
----------
- equation : str or Iterable of str of the kind 'lhs == rhs'
Equation or list of equations to add
- name : str, optional
Name of the equation; default None => 'lhs == rhs'
- reference : Number, numpy.ndarray or "norm", optional
Reference value(s) used to normalize the equation; default is 1.
If value is "norm", actual reference value is estimated from order of magnitude.
Returns
-------
MathematicalProblem
The modified mathematical problem
"""
self.__lock_check("add_equation")
return self._math.add_equation(equation, name, reference)
[docs]
def add_target(self,
name: str | Iterable[dict | str | tuple[str, str]],
reference: Number | numpy.ndarray | str = 1.0,
weak = False,
) -> MathematicalProblem:
"""Add deferred off-design equation on a targetted quantity.
Target equations may be added one by one, or provided by a list of dictionaries to add multiple equations at once.
The dictionary keys are the arguments of this method.
Parameters
----------
- name: str, optional
Name of the equation; default None => 'lhs == rhs'
- reference: Number, numpy.ndarray or "norm", optional
Reference value(s) used to normalize the equation; default is 1.
If value is "norm", actual reference value is estimated from order of magnitude.
- weak: bool, optional
If True, the target is disregarded if the corresponding variable is connected; default is `False`.
Returns
-------
MathematicalProblem
The modified mathematical problem
"""
self.__lock_check("add_target")
return self._math.add_target(name, reference, weak)
[docs]
def add_event(self,
name: str,
desc = "",
trigger: Optional[str | Event | EventState | ZeroCrossing] = None,
final = False,
) -> Event:
"""Add an event to system.
An event occurrence can either be determined by the system itself,
or triggered by an event from another system.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
- name [str]:
Name of the event.
- desc [str, optional]:
Event description; defaults to ''.
- trigger [str | Event | EventState | ZeroCrossing, optional]:
String, primary or derived event defining the event trigger; defaults to `None`.
- final [bool, optional]:
Defines whether or not event is final; defaults to `False`.
Returns
-------
- event [Event]:
The newly created event.
Examples
--------
>>> import numpy as np
>>>
>>> class PointMassDynamics(System):
>>> # Free fall of a point mass, with friction
>>> def setup(self):
>>> self.add_inward('mass', 1.2, desc='Mass')
>>> self.add_inward('cf', 0.1, desc='Friction coefficient')
>>> self.add_inward('g', np.r_[0, 0, -9.81], unit='m/s**2', desc='External acceleration field')
>>>
>>> self.add_outward('a', np.zeros(3), unit='m/s**2', desc='Acceleration')
>>> self.add_transient('v', der='a')
>>> self.add_transient('x', der='v')
>>>
>>> def compute(self):
>>> self.a = self.g - (self.cf / self.mass * np.linalg.norm(self.v)) * self.v
>>>
>>> class BouncingPointMass(System):
>>> def setup(self):
>>> self.add_child(PointMassDynamics('dyn'), pulling=['x', 'v', 'a', 'mass', 'cf', 'g'])
>>> self.add_event('rebound', trigger="x[2] <= 0")
>>>
>>> def transition(self):
>>> if self.rebound.present:
>>> self.v[2] *= -1
"""
self.__lock_check("add_event")
# type and name validation
check_arg(name, 'name', str)
name = Variable.name_check(name)
self.__check_attr(name, f"cannot add event {name!r}")
# Event creation
event = Event(name, self, desc, trigger, final)
self.__register_event(name, event)
self._add_member(name)
return event
def __register_event(self, name: str, event: Event) -> None:
"""Register attribute `name` as a system event and a read-only property."""
self.__events[name] = self.__readonly[name] = event
[docs]
def add_inward_modevar(self,
name: str,
value: Any = False,
unit = "",
dtype: Optional[Types] = None,
desc = "",
scope = Scope.PRIVATE,
) -> None:
"""Add an inward mode variable to the `System`.
A mode variable is a piecewise constant variable in each continuous time phase.
Like any input, inward mode variables should not be modified during system transitions.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
name : str
Name of the variable
value : Any, optional
Value of the variable; default 1
unit : str, optional
Variable unit; default empty string (i.e. dimensionless)
dtype : type
Variable type; default None (i.e. type of initial value)
desc : str, optional
Variable description; default ''
scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional
Variable visibility; default PRIVATE
Examples
--------
>>> class MultimodeSystem(System):
>>> def setup(self):
>>> self.add_inward_modevar('composite', True)
>>> self.reconfig()
>>>
>>> def reconfig(self):
>>> # Define one or two sub-systems, depending on mode
>>> for name in list(self.children):
>>> self.pop_child(name)
>>> if self.composite:
>>> a = self.add_child(Foo('a'))
>>> b = self.add_child(Bar('b'))
>>> self.connect(a, b, {'x', 'y'})
>>> else:
>>> self.add_child(Bogus('c'))
>>>
>>> def transition(self):
>>> self.reconfig()
"""
self.__lock_check("add_inward_modevar")
# type validation
check_arg(name, 'name', str)
self.__check_attr(name, f"cannot add inward {name!r}")
port = self.inputs[System.MODEVARS_IN]
port.add_mode_variable(name, value, unit, dtype, desc, scope=scope)
ref = VariableReference(context=self, mapping=port, key=name)
self.append_name2variable(
[(f"{System.MODEVARS_IN}.{name}", ref), (name, ref)]
)
[docs]
def add_outward_modevar(self,
name: str,
value: Optional[Any] = None,
unit = "",
dtype: Optional[Types] = None,
desc = "",
init: Optional[Any] = None,
scope = Scope.PRIVATE,
) -> None:
"""Add an outward mode variable to the `System`.
A mode variable is a piecewise constant variable in each continuous time phase.
It may only be modified in the `transition` method of the `System`, and never
inside method `compute`. Notwithstanding, its value may be used in `compute`.
The modification of an output mode variable is often associated with the occurence
of an event in the system.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
name : str
Name of the variable
value : Any, optional
Value of the variable; default 1
unit : str, optional
Variable unit; default empty string (i.e. dimensionless)
dtype : type
Variable type; default None (i.e. type of initial value)
desc : str, optional
Variable description; default ''
init : Any, optional
Value imposed at the beginning of time simulations.
If unspecified (default), the variable remains untouched.
scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional
Variable visibility; default PRIVATE
Examples
--------
>>> class ThresholdSystem(System):
>>> def setup(self):
>>> self.add_inward('threshold', 1.0)
>>> self.add_inward('x', 0.0)
>>> self.add_outward('y', 0.0)
>>> self.add_event('activates', trigger='x >= threshold')
>>> self.add_outward_modevar('activated', init='x > threshold')
>>>
>>> def compute(self):
>>> self.y = self.x if self.activated else 0.0
>>>
>>> def transition(self):
>>> if self.activates.present:
>>> self.activated = True
"""
self.__lock_check("add_outward_modevar")
# type validation
check_arg(name, 'name', str)
self.__check_attr(name, f"cannot add outward {name!r}")
port = self.outputs[System.MODEVARS_OUT]
port.add_mode_variable(name, value, unit, dtype, desc, init=init, scope=scope)
ref = VariableReference(context=self, mapping=port, key=name)
self.append_name2variable(
[(f"{System.MODEVARS_OUT}.{name}", ref), (name, ref)]
)
[docs]
def new_problem(self, name="problem") -> MathematicalProblem:
"""Create a new, empty `MathematicalProblem` in the context of system.
Parameters
----------
name: str
Name of the mathematical problem. Defaults to 'problem'.
Returns
-------
MathematicalProblem
The newly created mathematical problem.
"""
return MathematicalProblem(name, self)
[docs]
def add_design_method(self, name: str) -> MathematicalProblem:
"""Add a design method to the `System`
A design method is a set of free variables and equations that defines a way to design the `System`
It is a easy way to pre-define a design of this `System` for users
The returned mathematical system is empty. It should be populated with the needed variables and
equations (see Examples).
Parameters
----------
name: str
The name of the design method
Returns
-------
MathematicalProblem
The newly created mathematical problem.
Examples
--------
>>> system1.add_design_method("method1") \
>>> .add_unknown([
>>> dict(name="x", max_rel_step=0.1),
>>> "y"
>>> ]) \
>>> .add_equation([
>>> "u == 0",
>>> "v == 800"
>>> ])
"""
self.__lock_check("add_design_method")
self.design_methods[name] = mathpb = self.new_problem(name)
return mathpb
[docs]
def pull_design_method(self, child: System | Collection[System], name: str | Collection[str] | dict[str, str]) -> None:
"""Add design methods to the system, by promoting exisintg sub-system design methods.
Parameters
----------
child: System | collection[System]
Sub-system, or list thereof, from which design methods are pulled at system level.
name: str | collection[str] | dict[str, str]
The name of the design method(s), following the same rules as optional argument `pulling` in `System.add_child`.
"""
self.__lock_check("pull_design_method")
if isinstance(child, Collection) and not isinstance(child, str):
child_list = child
else:
child_list = [child]
mapping = BaseConnector.format_mapping(name)
children = self.children.values()
for child in child_list:
if not isinstance(child, System):
raise TypeError(
f"design methods can only be pulled from children of {self.name!r}; got {child!r}"
)
if not child in children:
raise ValueError(
f"{child.name!r} is not a child of {self.name!r}"
)
for name, alias in mapping.items():
try:
child_method = child.design_methods[name]
except KeyError:
logger.error(f"Sub-system {child.name!r} has no design method {name!r} - skipped.")
else:
design_method = self.design_methods.setdefault(alias, self.new_problem())
design_method.extend(child_method)
[docs]
def design(self, method: str) -> MathematicalProblem:
"""Returns the chosen group of design equations.
Parameters
----------
method : str
Name of the group of equations to extract
Returns
-------
MathematicalProblem
Mathematical system to solve for the chosen method
"""
return self.design_methods[method]
[docs]
def log_debug_message(
self,
handler: HandlerWithContextFilters,
record: logging.LogRecord,
format = LogFormat.RAW,
) -> bool:
"""Callback method on the system to log more detailed information.
This method will be called by the log handler when :py:meth:`~cosapp.utils.logging.LoggerContext.log_context`
is active if the logging level is lower or equals to VERBOSE_LEVEL. It allows
the object to send additional log message to help debugging a simulation.
.. note::
logger.log method cannot be used here. Use handler.handle(record)
Parameters
----------
handler : HandlerWithContextFilters
Log handler on which additional message should be published.
record : logging.LogRecord
Log record
format : LogFormat
Format of the message
Returns
-------
bool
Should the provided record be logged?
"""
def print_port(header, ports) -> list[str]:
msg = [f"#### {header}", ]
for name, port in ports.items():
if len(port) == 0:
continue
msg.append(f"- {name}:")
for var in port:
value = port[var]
if is_number(value):
v = f"{value:.5g}"
else:
v = f"{value!s}"
msg.append(f" - {var} = {v}")
return "\n".join(msg)
message = record.getMessage()
activate = getattr(record, "activate", None)
emit_record = super().log_debug_message(handler, record, format)
if message.endswith("call_setup_run") or message.endswith("call_clean_run"):
emit_record = False
elif activate == True:
handler.log(
LogLevel.FULL_DEBUG,
print_port(f"{self!s} - Inputs", self.inputs),
name=logger.name
)
emit_record = False
elif activate == False:
handler.log(
LogLevel.FULL_DEBUG,
print_port(f"{self!s} - Outputs", self.outputs),
name=logger.name
)
emit_record = False
return emit_record
[docs]
def call_setup_run(self, skip_driver=False) -> None:
"""Execute `setup_run` recursively on all modules.
Parameters
----------
skip_drivers : bool
Skip calling :py:meth:`cosapp.drivers.driver.Driver.setup_run`
"""
with self.__free_problem:
super().call_setup_run()
if not skip_driver:
for driver in self.drivers.values():
driver.call_setup_run()
[docs]
def call_clean_run(self, skip_driver=False) -> None:
"""Execute `clean_run` recursively on all modules.
Parameters
----------
skip_drivers : bool
Skip calling :py:meth:`cosapp.drivers.driver.Driver.clean_run`
"""
super().call_clean_run()
if not skip_driver:
for driver in self.drivers.values():
driver.call_clean_run()
def _postcompute(self) -> None:
"""Actions performed after the `System.compute` call."""
if self.is_clean(PortType.IN):
if self._is_tree_clean:
self.set_clean(PortType.OUT)
else:
self.set_clean(PortType.IN)
self.set_dirty(PortType.OUT)
# Evaluate the residues
for residue in self.residues.values():
residue.update()
[docs]
def transition(self) -> None:
"""Method describing system transition upon the occurrence of events (if any).
Does not do anything by default, but should be implemented for systems with events.
Examples
--------
>>> import numpy as np
>>>
>>> class PointMassDynamics(System):
>>> # Free fall of a point mass, with friction
>>> def setup(self):
>>> self.add_inward('mass', 1.2, desc='Mass')
>>> self.add_inward('cf', 0.1, desc='Friction coefficient')
>>> self.add_inward('g', np.r_[0, 0, -9.81], desc='External acceleration field')
>>> self.add_outward('a', np.zeros(3), desc='Acceleration')
>>> self.add_transient('v', der='a')
>>> self.add_transient('x', der='v')
>>>
>>> def compute(self):
>>> v_norm = np.linalg.norm(self.v)
>>> self.a = self.g - (self.cf / self.mass * v_norm) * self.v
>>>
>>> class BouncingPointMass(PointMassDynamics):
>>> def setup(self):
>>> super().setup()
>>> self.add_event('rebound', trigger="x[2] <= 0")
>>>
>>> def transition(self):
>>> if self.rebound.present:
>>> self.v[2] *= -1
"""
pass
[docs]
def retrieve_incoming_data(self) -> None:
"""Transfer data from all incoming connectors"""
for connector in self.incoming_connectors():
connector.transfer()
[docs]
def tree_transition(self) -> bool:
"""Invoke transition in entire system tree.
Returns
-------
bool: `True` if system structure has changed, `False` otherwise.
"""
modified = False
original_pb = self._math.copy()
for child in self.children.values():
# Retrieve data from sibling systems
for connector in self.__child_connectors.get(child.name, []):
connector.transfer()
modified |= child.tree_transition()
# Pull values from sub-systems
for connector in self.__pulling_connectors:
connector.transfer()
with self.__free_problem:
self.transition()
if not modified:
# Check if intrinsic problem or internal structure have changed
modified = (self.__input_mapping is None) or (original_pb != self._math)
return modified
[docs]
def tree_init_mode(self) -> None:
"""Invoke mode initialization in entire system tree"""
for child in self.children.values():
# Retrieve data from sibling systems
for connector in self.__child_connectors.get(child.name, []):
connector.transfer()
child.tree_init_mode()
# Initialize output mode variables
port: ModeVarPort = self[System.MODEVARS_OUT]
if len(port) > 0:
name = self.full_name()
for variable in port.variables():
variable.initialize()
logger.debug(
f"Mode variable {name}.{variable.name} set to {variable.value}"
)
# Initialize system mode
with self.__free_problem:
self.init_mode()
[docs]
def init_mode(self) -> None:
"""Initialization method for multimode systems, called at the beginning of time simulations.
In this context, attribute `problem` is unlocked and may be modified.
"""
pass
[docs]
def run_once(self) -> None:
"""Run the system once.
Execute the model of this `System` and its children in the execution order.
Notes
-----
The drivers are not executed when calling this method; only the physical model.
"""
with System.set_master(repr(self)) as is_master:
if is_master:
logger.debug("Start setup_run recursive calls.")
self.call_setup_run(skip_driver=True)
with self.log_context(" - run_once"):
if self.is_active():
self._precompute()
dirty_inputs = not self.is_clean(PortType.IN)
if dirty_inputs:
logger.debug(f"Call {self.name}.compute_before")
with self.__context_lock:
self.compute_before()
else:
logger.debug(f"Skip {self.name}.compute_before - Clean inputs")
dirty_tree = not self._is_tree_clean
any_dirty_child = False
if dirty_inputs or dirty_tree:
for child in self.children.values():
# Update connectors
for connector in self.__child_connectors.get(child.name, []):
connector.transfer()
# Execute sub-system
logger.debug(f"Call {self.name}.{child.name}.run_once()")
child.run_once()
any_dirty_child |= not child.is_clean()
# Pull values from subsystems
for connector in self.__pulling_connectors:
connector.transfer()
if dirty_inputs or any_dirty_child:
logger.debug(f"Call {self.name}.compute")
self._compute_calls += 1
with self.__context_lock:
self.__runner.compute()
else:
logger.debug(f"Skip {self.name}.compute - Clean inputs")
self._is_tree_clean = not any_dirty_child
self._postcompute()
self.computed.emit()
else:
logger.debug(f"Skip {self.name} execution - Inactive")
if is_master:
logger.debug("Start clean_run recursive calls.")
self.call_clean_run(skip_driver=True)
[docs]
def any_active_driver(self) -> bool:
return any(driver.is_active() for driver in self.drivers.values())
[docs]
def run_drivers(self) -> None:
"""Run the drivers defined on this `System`.
"""
with System.set_master(repr(self), type_checking=False) as is_master:
if is_master:
self.open_loops()
logger.debug("Start setup_run recursive calls.")
self.call_setup_run()
if self.any_active_driver():
if self.is_standalone(): # System not standalone can't set the mathematical problem
logger.debug(f"Exec order for {self.name}: {list(self.exec_order)}")
for driver in self.drivers.values():
logger.debug(f"Call driver {driver.name}.run_once on {self.name}")
driver.run_once()
else:
self.run_children_drivers()
if is_master:
logger.debug("Start clean_run recursive calls.")
self.call_clean_run()
self.close_loops()
[docs]
def run_children_drivers(self) -> None:
"""Solve the children `System` in the execution order.
"""
with System.set_master(repr(self), type_checking=False) as is_master:
if is_master:
logger.debug("Start setup_run recursive calls.")
self.call_setup_run()
with self.log_context(" - run_children_drivers"):
if self.is_active():
self._precompute()
dirty_inputs = not self.is_clean(PortType.IN)
if dirty_inputs:
logger.debug(f"Call {self.name}.compute_before")
with self.__context_lock:
self.compute_before()
else:
logger.debug(f"Skip {self.name}.compute_before - Clean inputs")
dirty_tree = not self._is_tree_clean
any_dirty_child = False
if dirty_inputs or dirty_tree:
for child in self.children.values():
# Update connectors
for connector in self.__child_connectors.get(child.name, []):
connector.transfer()
# Execute sub-system
logger.debug(f"Call {self.name}.{child.name}.run_once()")
child.run_drivers()
any_dirty_child |= not child.is_clean()
# Pull values from subsystems
for connector in self.__pulling_connectors:
connector.transfer()
if dirty_inputs or any_dirty_child:
self._compute_calls += 1
with self.__context_lock:
self.__runner.compute()
else:
logger.debug(f"Skip {self.name}.compute - Clean inputs")
self._is_tree_clean = not any_dirty_child
self._postcompute()
self.computed.emit()
else:
logger.debug(f"Skip {self.name} execution - Inactive")
if is_master:
logger.debug("Start clean_run recursive calls.")
self.call_clean_run()
[docs]
def is_standalone(self) -> bool: # TODO name is confusing as the real answer is I have a non-linear solver
"""Is this System able to solve itself?
Returns
-------
bool
Ability to solve the system or not.
"""
return any(driver.is_standalone() for driver in self.drivers.values())
[docs]
def connect(self,
object1: BasePort | System,
object2: BasePort | System,
mapping: str | list[str] | dict[str, str] | None = None,
cls: Optional[type[BaseConnector]] = None,
**kwargs
) -> None:
"""Connect two systems or two ports.
This method connects `object1` to `object2`. If no mapping is provided, connection
will be made between variables based on their names. If a name mapping is provided as a list,
the name should be present in both objects. If the mapping is specified as a dictionary,
keys are expected to belong to `object1`, and values to `object2`.
If both objects are systems, mapping is mandatory (full system connections are forbidden).
In this case, each (key, value) pair of the mapping leads to a port-to-port connection.
Connections are oriented (i.e. the direction of value transfer is fixed). The decision tree
is as following:
- If one port is of type input and the other one of type output, the connection flows from
the output to the input.
- Else if one port (portA) belong to the parent `System` of the other port (portB), the
connection flows from portA to portB if portA is an input. And it flows from portB to
portA if portA is an output.
- Else if both are inputs, the port is pulled on the parent and connected to the two children.
Connectors can only connect systems at the same level or a system with its parent. In all
cases the connectors will be stored by the parent hosting the two connected systems.
Parameters
----------
object1 : Union[BasePort, System]
First end-point of connector.
object2 : Union[BasePort, System]
Second end-point of connector.
mapping : str or list[str] or dict[str, str], optional
(List of) common name(s) or mapping name dictionary; default None (i.e. no mapping).
cls : type[BaseConnector], optional
Connector type. When specified, `cls` must be a specialization of `BaseConnector`.
If `cls` is `None` (default), the actual connector type is either `Connector`,
or a port-specific connector, if any. Port bound connectors are automatically selected
when both ports are of same type, and when port class contains inner class `Connector`,
derived from `BaseConnector`.
**kwargs :
Additional keyword arguments forwarded to `cls`.
Examples
--------
Here is an example, in which we assume a port `p_in` has inputs of a system `s1`
that come from some output port `p_out` of another system `s2`. In the last example,
`connect` is called with two child systems, rather than ports. The container system
being called `top`:
>>> class DemoPort(Port):
>>> def setup(self):
>>> self.add_variable('a', 1.0)
>>> self.add_variable('b', 2.0)
>>> self.add_variable('c', 3.0)
>>>
>>> class DummySystem(Port):
>>> def setup(self):
>>> self.add_inward('x', 1.0)
>>> self.add_outward('y', 0.0)
>>> self.add_input(DemoPort, 'p_in')
>>> self.add_output(DemoPort, 'p_out')
>>>
>>> top = System('top')
>>> top.add(DummySystem('s1'))
>>> top.add(DummySystem('s2'))
>>>
>>> top.connect(top.s1.p_in, top.s2.p_out, 'a')
>>> top.s1.p_in.a = 4
>>> top.run_once()
>>> assert top.s1.p_in.a == top.s2.p_out.a
>>>
>>> top.connect(top.s1.p_in, top.s2.p_out, {'b': 'c'})
>>> top.run_once()
>>> assert top.s1.p_in.b == top.s2.p_out.c
>>>
>>> top.connect(top.s1, top.s2, {'x': 'y', 'p_in.c': 'p_out.b'})
>>> top.run_once()
>>> assert top.s1.x == top.s2.y
>>> assert top.s1.p_in.c == top.s2.p_out.b
"""
same_kind = lambda base: isinstance(object1, base) and isinstance(object2, base)
if same_kind(BasePort):
self.__connect_ports(object1, object2, mapping, cls, **kwargs)
elif same_kind(System):
self.__connect_systems(object1, object2, mapping, cls, **kwargs)
else:
raise TypeError(
f"Connected objects must be either two ports or two systems"
f"; got {type(object1).__name__!r} and {type(object2).__name__!r}"
)
def __connect_systems(self,
system1: System,
system2: System,
mapping: Union[str, list[str], dict[str, str]],
cls: Optional[type[BaseConnector]] = None,
**kwargs
) -> None:
"""Connect two sub-systems together.
This method connects `system1` to `system2`. Full system connections are forbidden, as they
are potentially ambiguous. As a consequence, name mapping is mandatory.
If name mapping is provided as a collection, the name should be present in both systems.
If it is specified as a dictionary, keys pertain to `system1`, values to `system2`.
Each (key, value) pair of the mapping gives rise to a port-to-port connector.
Connections are oriented (i.e. the direction of value transfer is fixed). The decision tree
is as following:
- If one port is of type input and the other one of type output, the connection flows from
the output to the input.
- Else if one port (portA) belong to the parent `System` of the other port (portB), the
connection flows from portA to portB if portA is an input. And it flows from portB to
portA if portA is an output.
- Else if both are inputs, the port is pulled on the parent and connected to the two children.
Connectors can only connect systems at the same level or a system with its parent. In all
cases the connectors will be stored by the parent hosting the two connected systems.
Parameters
----------
port1 : BasePort
First end-point of connector.
port2 : BasePort
Second end-point of connector.
mapping : str or list[str] or dict[str, str], optional
(List of) common name(s) or mapping name dictionary; default None (i.e. no mapping).
cls : type[BaseConnector], optional
Connector type. When specified, `cls` must be a specialization of `BaseConnector`.
If `cls` is `None` (default), the actual connector type is either `Connector`,
or a port-specific connector, if any. Port bound connectors are automatically selected
when both ports are of same type, and when port class contains inner class `Connector`,
derived from `BaseConnector`.
**kwargs :
Additional keyword arguments forwarded to `cls`.
"""
if mapping is None:
raise ConnectorError(
"Full system connections are forbidden; please provide port or variable mapping."
)
def get_variable(system: System, varname: str):
key = varname if system is self else f"{system.name}.{varname}"
try:
return self.name2variable[key]
except KeyError:
raise AttributeError(
f"{key!r} not found in {self.name!r}"
)
mapping = BaseConnector.format_mapping(mapping)
for name1, name2 in mapping.items():
obj1 = getattr(system1, name1)
obj2 = getattr(system2, name2)
same_kind = lambda base: isinstance(obj1, base) and isinstance(obj2, base)
if same_kind(BasePort):
self.__connect_ports(obj1, obj2, cls=cls, **kwargs)
elif same_kind(System):
self.__connect_systems(obj1, obj2, mapping=None) # raises ValueError
else:
# Mapping between variables
obj1 = get_variable(system1, name1)
obj2 = get_variable(system2, name2)
self.__connect_ports(
obj1.mapping,
obj2.mapping,
{obj1.key: obj2.key},
cls, **kwargs,
)
def __connect_ports(self,
port1: BasePort,
port2: BasePort,
mapping: Union[str, list[str], dict[str, str], None] = None,
cls: Optional[type[BaseConnector]] = None,
**kwargs,
) -> None:
"""Connect two ports together.
This method connects `port1` to `port2`. If no mapping is provided, connection
will be made between variables based on their names. If a name mapping is provided as a list,
the name should be present in both port. And if the mapping is specified as a dictionary,
the keys belong to `port1` and the values to `port2`.
An connection is oriented (i.e. the direction of value transfer is fixed). The decision tree
is as following:
- If one port is of type input and the other one of type output, the connection flows from
the output to the input.
- Else if one port (portA) belong to the parent `System` of the other port (portB), the
connection flows from portA to portB if portA is an input. And it flows from portB to
portA if portA is an output.
- Else if both are inputs, the port is pulled on the parent and connected to the two children.
Connectors can only connect systems at the same level or a system with its parent. In all
cases the connectors will be stored by the parent hosting the two connected systems.
Parameters
----------
port1 : BasePort
First end-point of connector.
port2 : BasePort
Second end-point of connector.
mapping : str or list[str] or dict[str, str], optional
(List of) common name(s) or mapping name dictionary; default None (i.e. no mapping).
cls : type[BaseConnector], optional
Connector type. When specified, `cls` must be a specialization of `BaseConnector`.
If `cls` is `None` (default), the actual connector type is either `Connector`,
or a port-specific connector, if any. Port bound connectors are automatically selected
when both ports are of same type, and when port class contains inner class `Connector`,
derived from `BaseConnector`.
**kwargs :
Additional keyword arguments forwarded to `cls`.
"""
# type validation
def check_port(port: BasePort, argname: str):
check_arg(port, argname, BasePort, stack_shift=1)
if port.owner is None:
raise ValueError(f"Cannot connect orphan port {port.contextual_name!r}")
check_port(port1, 'port1')
check_port(port2, 'port2')
check_arg(mapping, 'mapping', (str, list, dict, MappingProxyType, type(None)))
if cls is None:
ptype = type(port1)
if ptype is type(port2):
# Check if there exists a port-specific connector
try:
cls = getattr(ptype, "Connector")
except AttributeError:
cls = Connector
else:
pname = ptype.__name__
logger.info(
f"{port1.contextual_name!r} and {port2.contextual_name!r}"
f" connected by port-specific connector `{pname}.Connector`."
)
else:
cls = Connector
if cls is not Connector:
try:
SystemConnector.check(cls)
except (TypeError, ValueError) as error:
error.args = (
f"`cls` must be a concrete implementation of `BaseConnector`; got {cls!r}",
)
raise
# Generate mapping dictionary
if mapping is None:
mapping = dict((v, v) for v in port1 if v in port2)
else:
mapping = BaseConnector.format_mapping(mapping)
if len(mapping) == 0:
warnings.warn(
f"Skipped empty connector between {port1.full_name()} and {port2.full_name()}"
)
return
children = self.children.values()
child_connectors = self.__child_connectors
def contextual_name(port: BasePort) -> str:
return port.name if port.owner is self else port.contextual_name
def create_connector(sink: BasePort, source: BasePort, mapping: dict[str, str]) -> None:
# Additional validation for sink Port
# - Source should be Port (the opposite case is possible == connect sensor)
# - If mapping does not cover all variables, print a log message
source_name = contextual_name(source)
sink_name = contextual_name(sink)
if isinstance(sink, ModeVarPort) and sink.is_input and not isinstance(source, ModeVarPort):
raise ConnectorError(
f"Input mode variables cannot be connected to continuous time variables"
f" ({source_name} -> {sink.owner.name})."
)
if isinstance(sink, Port):
if not (len(sink) == len(source) == len(mapping)):
absent = [f"{sink.name}.{v}" for v in sink if v not in mapping.keys()]
absent.extend(f"{source.name}.{v}" for v in source if v not in mapping.values())
logger.debug(
"Partial connection between {!r} and {!r}. "
"Variables ({}) are not part of the mapping.".format(
sink_name, source_name, ", ".join(absent)
)
)
name = f"{source_name} -> {sink_name}"
new_connector = SystemConnector(
cls(name, sink, source, mapping, **kwargs)
)
connectors = self.connectors()
# Check that variables are set only once
for connector in connectors.values():
if connector.sink is sink:
shared = set(connector.sink_variables()).intersection(new_connector.sink_variables())
if shared:
if len(shared) == 1:
variables = f"Variable {shared.pop()} is"
else:
variables = ", ".join(sorted(shared))
variables = f"Variables {sink_name}.{{{variables}}} are"
raise ConnectorError(f"{variables} already set by {connector}")
try:
# Check if connector already exists
connector = connectors[new_connector.name]
except KeyError:
# New connector
if sink.owner is self:
self.__pulling_connectors.append(new_connector)
else:
child_name = new_connector.sink.owner.name
child_connectors.setdefault(child_name, []).append(new_connector)
else:
# Sink and source are already connected: update connector
connector.update_mapping(new_connector.mapping)
if sink.owner is source.owner.parent:
lower = source
elif source.owner is sink.owner.parent:
lower = sink
else:
lower = sink
lower.owner.__reset_input_mapping()
err_msg = f"Ports {port1.contextual_name!r} and {port2.contextual_name!r} cannot be connected"
if port1.owner is port2.owner:
raise ConnectorError(
f"{err_msg}. Connecting ports of the same system is forbidden."
)
if port1.direction != port2.direction: # one port is IN and the other is OUT
if port1.owner is port2.owner.parent or port2.owner is port1.owner.parent:
raise ConnectorError(
f"{err_msg}; parent/child connections are only allowed between same-direction ports."
)
elif not (port1.owner.parent is port2.owner.parent is self):
raise ConnectorError(
f"{err_msg}. Only ports belonging to direct children of {self.name!r} can be connected."
)
elif not (
self is port1.owner.parent is port2.owner
or (self is port1.owner is port2.owner.parent)
or (port1.owner in children and port2.owner in children)
):
raise ConnectorError(
f"{err_msg}. Same-direction ports can only be connected between a child and its parent, "
"or between sibling sub-systems."
)
reciprocal = lambda d: dict((v, k) for k, v in d.items())
if port1.is_input and port2.is_output:
create_connector(port1, port2, mapping)
elif port1.is_output and port2.is_input:
create_connector(port2, port1, reciprocal(mapping))
else:
if port1.owner is port2.owner.parent:
if port1.is_input:
create_connector(port2, port1, reciprocal(mapping))
else: # port1.is_output
create_connector(port1, port2, mapping)
elif port2.owner is port1.owner.parent:
if port2.is_input:
create_connector(port1, port2, mapping)
else: # port2.is_output
create_connector(port2, port1, reciprocal(mapping))
elif port1.is_input: # Both port are inputs
# In this case the port is transfer on the parent to serve as
# true source for all subsystems.
# For Port, the port is duplicated in the parent. For ExtensiblePort,
# the variable is added in the INWARDS port of the parent.
if isinstance(port1, ModeVarPort) != isinstance(port2, ModeVarPort):
for p1_var, p2_var in mapping.items():
# One variable is an inward, the other an input mode variable
raise ConnectorError(
f"Input mode variables cannot be connected to continuous time variables"
f" ({port1.owner.name}.{p1_var} <-> {port2.owner.name}.{p2_var})."
)
for p1_var, p2_var in mapping.items():
connected = False
# Check if the variable is connected
for c1 in filter(lambda c: c.sink is port1, child_connectors.get(port1.owner.name, [])):
if p1_var in c1.sink_variables(): # Already connected
# Check that the other variable is not connected
for c2 in filter(lambda c: c.sink is port2, child_connectors.get(port2.owner.name, [])):
if p2_var in c2.source_variables():
raise ConnectorError("{}.{} is already connected to {}.{}".format(
port1.contextual_name, p1_var,
port2.contextual_name, p2_var,
)
)
# Connect the other variable
self.connect(port2, c1.source, {p2_var: c1.source_variable(p1_var)})
connected = True
break
if connected:
continue
# Check if the other variable is connected
for c2 in filter(lambda c: c.sink is port2, child_connectors.get(port2.owner.name, [])):
if p2_var in c2.sink_variables():
# We know that p1_var is not connected
# Connect the variable
self.connect(port1, c2.source, {p1_var: c2.source_variable(p2_var)})
connected = True
break
if connected:
continue
# General case
# create the pulled port
if isinstance(port2, Port):
pulled_port_name = f"{port2.owner.name}_{port2.name}"
source_name = p2_var
# Is the full port pulled or only a portion of it?
if all(v in mapping.values() for v in port2):
pull_variables(port2.owner, {port2.name: pulled_port_name})
else:
pulled_port = port2.copy(pulled_port_name)
self._add_port(pulled_port)
self.connect(port2, pulled_port, p1_var)
else:
# Variables are both inwards, or input mode variables
pulled_port_name = port2.name
source_name = f"{port2.owner.name}_{p2_var}"
pull_variables(port2.owner, {p2_var: source_name})
# create the other connection
self.connect(port1, self.inputs[pulled_port_name], {p1_var: source_name})
else:
raise ConnectorError(f"{err_msg} as they are both outputs.")
[docs]
def open_loops(self):
"""Open closed loops in children relations."""
logger.debug(f"Call {self.name}.open_loops")
connectors_to_open = list() # type: list[SystemConnector]
child_connectors = self.__child_connectors
self.__input_mapping = None
name2var = self.name2variable
loop = self.__loop_problem
loop.clear()
# Add top system in the execution loop so connection to it won't be opened
execution_loop = [self] # type: list[System]
for child in self.children.values():
execution_loop.append(child)
child.open_loops()
for connector in child_connectors.get(child.name, []):
if connector.source.owner not in execution_loop:
connectors_to_open.append(connector)
for connector in connectors_to_open:
logger.debug(f"Connector {connector!r} to be opened.")
sink, source = connector.sink, connector.source
if sink.is_output:
raise ValueError(
f"Connector {connector.name!r} cannot be opened, as its sink is an output."
)
# Check that all variables are of numerical types
for name in connector.sink_variables():
value = sink[name]
if not is_numerical(value):
raise TypeError(
f"Cannot open connector {connector.name!r} to resolve system"
f", as variable {name!r} is non-numerical (type is {type(value).__name__})."
)
connector.deactivate()
# Set mathematical problem
sink_name = sink.contextual_name
source_name = source.contextual_name
owner_unknowns = dict(
(unknown.variable_reference, unknown)
for unknown in sink.owner.unknowns.values()
)
for target, origin in connector.mapping.items():
unknown_name = natural_varname(f"{sink_name}.{target}")
var = name2var[unknown_name]
options = {}
try:
unknown = owner_unknowns[var]
except KeyError:
pass
else:
options = {
attr: getattr(unknown, attr)
for attr in (
'max_abs_step',
'max_rel_step',
'upper_bound',
'lower_bound',
)
}
equation = natural_varname(f"{unknown_name} == {source_name}.{origin}")
loop.add_unknown(unknown_name, **options)
loop.add_equation(
equation,
name=f"{equation} (loop)",
reference=1.0,
)
[docs]
def close_loops(self):
"""Close loops opened to allow system resolution."""
logger.debug(f"Call {self.name}.close_loops")
for elem in self.tree():
elem.__loop_problem.clear()
for connector in elem.all_connectors():
connector.activate()
[docs]
def convert_to(self, *args, **kwargs) -> None:
"""Convert system into another `System`.
Note: not implemented for class `System`.
Raises
------
TypeError
If the current `System` cannot be converted.
"""
# TODO: this method should be removed altogether
raise TypeError(
f"Cannot convert {self.name!r}: system is not part of a family"
)
[docs]
@classmethod
def check_config_dict(cls, params: dict) -> None:
"""Check if the provided dictionary respects the convention of a model file.
Parameters
----------
params : dict
Dictionary to be tested
Raises
------
jsonschema.exceptions.ValidationError
If the provided dictionary does not conform to the JSON schema.
"""
# Check that the file input file respects the JSON schema
path = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(path, 'system.schema.json')) as fp:
config_schema = json.load(fp)
jsonschema.validate(params, config_schema)
[docs]
@classmethod
def load(cls, filepath: Union[str, Path, TextIOBase], name: Optional[str] = None) -> System:
"""Load configuration from file-like object.
Parameters
----------
filepath : str or Path or file-like
Filepath or file-like object (i.e. has a .read() method)
name : str, optional
Name of the newly created system.
If unspecified (default), uses the name contained in `filepath`.
Returns
-------
System
The loaded system.
"""
if isinstance(filepath, (str, Path)):
with open(filepath) as fp:
system = load_json(fp)
elif hasattr(filepath, 'read'):
system = load_json(filepath)
else:
raise TypeError("Input parameter should be a filepath or file-like object.")
# # Convert variables if needed
# params = decode_cosapp_dict(params)
# system = cls.load_from_dict(*params.popitem())
if name:
system.name = name
return system
@classmethod
def _create_new(cls, class_name: str, ctor_kwargs: dict) -> System:
"""Instantiate a `System` from its name and its parameters.
The construction of the `System` is done recursively from the lower level. Therefore this
function is also called recursively returning to the upper level, the lower `System` and
its connections to the upper `System` via a dictionary. If no connections are to forwarded
the second returned argument is `None`.
Parameters
----------
name : str
Identifier of the system
parameters : dict
The dictionary containing the system definition
Returns
-------
System
The generated system
"""
def get_system_class(class_name: str):
if class_name == "System":
return System
check_arg(class_name, 'class_name', str, stack_shift=1)
try:
component_module, class_name = class_name.rsplit('.', maxsplit=1)
except ValueError:
component_module = ""
system_class = None
found_module = ""
component_libs = System._components_librairies
for lib_name in component_libs:
try:
if component_module:
mod_name = f"{lib_name}.{component_module}"
if mod_name.startswith('.'):
mod_name = mod_name[1:]
lib_module = importlib.import_module(mod_name)
elif lib_name:
lib_module = importlib.import_module(lib_name)
else:
raise ImportError
except ImportError:
# Try next possible Python package to find module
continue
else:
# Found an existing module
found_module = lib_module.__file__
try:
system_class = getattr(lib_module, class_name)
except AttributeError:
# Class not in module
continue
else:
break
if system_class is None:
raise AttributeError(
f"Class {class_name!r} was not found in module: \n{found_module}"
if found_module else
f"Class {class_name!r} is not in one of the listed component librairies:\n{component_libs}"
)
elif not issubclass(system_class, System):
raise AttributeError(
f"Class {system_class.__name__!r} does not inherit from base class 'System'"
)
return system_class
ty = get_system_class(class_name)
name = ctor_kwargs.pop("name")
return ty(name, **ctor_kwargs)
[docs]
@classmethod
def from_json(j: dict[str, Any]) -> System:
return from_json(j)
[docs]
@classmethod
def load_from_dict(cls, state: dict, decoding_metadata=EncodingMetadata()) -> System:
"""Instantiate a `System` from its name and its parameters.
The construction of the `System` is done recursively from the lower level. Therefore this
function is also called recursively returning to the upper level, the lower `System` and
its connections to the upper `System` via a dictionary. If no connections are to forwarded
the second returned argument is `None`.
Parameters
----------
name : str
Identifier of the system
parameters : dict
The dictionary containing the system definition
Returns
-------
System
The generated system
"""
# TODO The all process of saving and reading from a file should be overhaul to take into account missing attr
# - setup kwargs
# - design methods (?)
# Moreover what should be the source of truth JSON or Python? Currently reading from JSON duplicates Python
# work as first the object is instantiated. Then its children are removed to create them (and the associated
# connections from the JSON file.
if "__encoding_metadata__" in state:
decoding_metadata = EncodingMetadata(**state["__encoding_metadata__"])
_, _, _, value_only = decoding_metadata
class_name = state.get("__class__", "System")
ctor_kwargs = state.get("setup_args", {})
ctor_kwargs["name"] = state["name"]
top_system = cls._create_new(class_name, ctor_kwargs)
for name in top_system.children:
top_system.name2variable.pop(name)
# Remove children and connectors --> the source of truth is the json file
top_system.children.clear()
top_system.__child_connectors.clear()
top_system.__pulling_connectors.clear()
for child_state in state.get('subsystems', {}).values():
subsystem = top_system.load_from_dict(child_state, decoding_metadata=decoding_metadata)
top_system.add_child(subsystem)
for connection in state.get('connections', []):
if len(connection) == 2:
sink, source = connection
mapping = None
else:
sink, source, mapping = connection
top_system.connect(top_system[sink], top_system[source], mapping)
for name, value in state.get('properties', {}).items():
if isinstance(value, dict) and "__class__" in value:
value.pop("__class__")
try:
top_system[name].__setstate__(value)
except AttributeError:
for key, vardata in value.items():
setattr(top_system[name], key, vardata)
else:
top_system.__register_property(name, value)
for name, port_data in (state.get("inputs", {}) | state.get("outputs", {})).items():
variables: dict[str, Any] = port_data["variables"]
port_data.pop("__class__", None)
port: BasePort = top_system[name]
if value_only:
port.set_values(**{var: val for var, val in variables.items()})
else:
port.set_values(**{var: val["value"] for var, val in variables.items()})
for varname, vardata in variables.items():
port[varname] = vardata.pop("value")
var = port.get_variable(varname)
for metadata_name, metadata_value in vardata.items():
setattr(var, f"_{metadata_name}", metadata_value)
for driver in state.get("drivers", {}).values():
top_system.add_driver(driver)
try:
exec_order = state["exec_order"]
except KeyError:
pass
else:
if exec_order != list(top_system.exec_order):
top_system.exec_order = exec_order
return top_system
[docs]
def save(
self,
fp: Union[str, Path, TextIOBase],
*,
indent = 2,
sort_keys = True,
encoding_metadata = EncodingMetadata(),
) -> None:
"""Serialize the `System` as a JSON formatted stream to fp.
Parameters
----------
fp : str or Path or file-like
A .write()-supporting file-like object or the filename
indent : int, optional
Indentation in the file (default: 2)
sort_keys : bool, optional
Sort the keys in alphabetic order (default: False)
"""
options = dict(
indent=indent,
sort_keys=sort_keys,
encoding_metadata=encoding_metadata,
)
if isinstance(fp, (str, Path)):
with open(fp, "w") as filepath:
filepath.write(self.to_json(**options))
else:
fp.write(self.to_json(**options))
[docs]
def to_dict(self, *, encoding_metadata=EncodingMetadata()) -> dict[str, Any]:
"""Exports the `System` as a dictionary.
Parameters
----------
encoding_metadata: EncodingMetadata, optional
Specify the exported level of details (default: EncodingMetadata())
Returns
-------
dict[str, Any]
The dictionary
"""
return self.__to_dict(encoding_metadata=encoding_metadata)
def __json__(self, *, encoding_metadata=EncodingMetadata()) -> dict[str, Any]:
"""Creates a JSONable dictionary representation of the object.
Parameters
----------
encoding_metadata: EncodingMetadata, optional
Specify the exported level of details (default: EncodingMetadata())
Returns
-------
dict[str, Any]
The dictionary
"""
return jsonify(self.to_dict(encoding_metadata=encoding_metadata))
def __to_dict(
self,
*,
encoding_metadata = EncodingMetadata(),
export_encoding_metadata = True,
) -> dict[str, Any]:
"""Convert the `System` to a dictionary.
In default mode, `with_types` flag is `False`, only export input ports
and its variable values.
In other case where `with_types` flag is `True`, export all ports with
values and class name. `port_cls_data` can be provided to recover the
definitions of all ports inside system.
Parameters
----------
with_types : bool, optional
Flag to export output ports and the class name of ports (default: False).
port_cls_data : dict, optional
A dictionary to hold the definition of all ports inside system.
Returns
-------
dict
A dictionary defining fully the `System`
"""
# TODO Fred - BUG Information missing from dictionary conversion
# - kwargs of setup - e.g. Shaft system of librairies_sae has its ports functions of kwargs
# - design equations - are not saved (should they be?)
data = {}
if export_encoding_metadata and encoding_metadata:
data["__encoding_metadata__"] = encoding_metadata.__json__()
with_types, inputs_only, with_drivers, value_only = encoding_metadata
data["__class__"] = f"{self.__module__}.{self.__class__.__qualname__}"
data["name"] = self.name
if (ctor_kwargs := self.__ctor_kwargs):
data["setup_args"] = ctor_kwargs.copy()
if (properties := self.__properties):
data["properties"] = properties
directions = ["inputs"]
if not inputs_only:
directions.append("outputs")
for direction in directions:
ports: dict[str, BasePort] = getattr(self, direction)
temp_dict = OrderedDict()
for name, port in ports.items():
port_dict = port.to_dict(with_types=with_types, value_only=value_only)
if port_dict:
port_dict.pop("name")
vars = port_dict.get("variables", {})
if vars:
temp_dict[name] = port_dict
if temp_dict:
data[direction] = temp_dict
connections = [connector.info() for connector in self.all_connectors()]
if connections:
data["connections"] = connections
if self.children:
data["subsystems"] = {
name: child.__to_dict(
encoding_metadata=encoding_metadata,
export_encoding_metadata=False,
)
for name, child in self.children.items()
}
data["exec_order"] = list(self.exec_order)
if with_drivers and self.drivers:
data["drivers"] = list(self.drivers.values())
return data
[docs]
def to_json(
self,
*,
indent: Optional[int] = None,
sort_keys = True,
encoding_metadata = EncodingMetadata(),
) -> str:
"""Return a string in JSON format representing the `System`.
Parameters
----------
indent : int, optional
Indentation of the JSON string (default: 2)
sort_keys : bool, optional
Sort keys in alphabetic order (default: True)
encoding_metadata: EncodingMetadata, optional
Specify the exported level of details (default: EncodingMetadata())
Returns
-------
str
String in JSON format
"""
dict_repr = self.__json__(encoding_metadata=encoding_metadata)
# If this is updated => workspace template should be updated too.
dict_repr['$schema'] = (
"0-4-0/system.schema.json" # Add self referencing to system JSON version - provision
)
return json.dumps(dict_repr, indent=indent, sort_keys=sort_keys)
[docs]
def to_d3(self, show=True, size=435) -> "IPython.display.IFrame":
"""Returns the hierarchical representation of this system in HTML format.
Returns
-------
IPython.display.IFrame
IFrame to the HTML formatted representation
"""
from cosapp.tools.views.d3js import to_d3
return to_d3(self, show, size)
[docs]
def to_html(self, filename: str, embeddable=False) -> None:
"""Save the `System` as HTML using vis.JS library.
Parameters
----------
filename : str
Filename to write to
embeddable: bool, optional
Is the HTML to be embedded in an existing page? Default: False
"""
from cosapp.tools.views.visjs import VisJsRenderer
renderer = VisJsRenderer(self, embeddable)
renderer.to_file(filename)
def _repr_html_(self) -> str:
"""Returns the representation of this system in HTML format.
Returns
-------
str
HTML formatted representation
"""
# TODO unit tests
filename = f"{self.name}.html"
self.to_html(filename)
from IPython.display import IFrame
return IFrame(filename, "810px", "650px")._repr_html_()
def _repr_markdown_(self) -> str:
"""Returns the representation of this system attributes in Markdown format.
Returns
-------
str
Markdown formatted representation
"""
from cosapp.tools.views.markdown import system_to_md
return system_to_md(self)
[docs]
def make_surrogate(
self,
data_in: Union[pandas.DataFrame, dict[str, list[float]]],
model: type[SurrogateModel] = FloatKrigingSurrogate,
activate = True,
data_out: Optional[Union[pandas.DataFrame, dict[str, Any]]] = None,
postsynch: Union[str, list[str]] = "*",
execution_policy: Optional[ExecutionPolicy] = None,
*args, **kwargs,
) -> SystemSurrogate:
"""
Creates a surrogate model superseding the normal behaviour of `compute()`.
The surrogate model is trained from datasets `data_in` and `data_out`,
given as `pandas.DataFrame` objects, or dictionnaries interpretable as so.
If no output data are provided (default), they are computed as the response
of system to design-of-experiment `data_in`.
Parameters
----------
- data_in: pandas.DataFrame or dict[str, list[float]]
Design of experiments for variables in dataframe columns or dict keys.
Column/key names must match variable contextual names, as in 'wing.geom.length', e.g.
- model: type
Model class, implementing methods `train(x, y)` and `predict(x)`, where x and y are 1D arrays.
Default is `cosapp.utils.surrogate_models.FloatKrigingSurrogate`.
- activate: bool, optional
Boolean determining whether or not surrogate model should be activated once created.
Default is `True`.
- data_out: pandas.DataFrame or dict[str, list[float]], optional
Y-dataset used for surrogate model training. Default is None (dataset is computed).
- postsynch: str or list[str]
List of output variable names to synchronize after each surrogate model execution.
Default is '*', meaning all outputs are post-synchronized.
- *args, **kwargs: Any
Additional parameters passed to `model` constructor.
Returns:
--------
SystemSurrogate
System surrogate attached to the system.
"""
check_arg(data_in, 'data_in', (pandas.DataFrame, dict))
surrogate = SystemSurrogate(self, data_in, model, data_out, postsynch, execution_policy, *args, **kwargs)
self.__set_surrogate(surrogate, activate)
return surrogate
def __set_surrogate(self, surrogate: SystemSurrogate, activate=True) -> None:
"""Private setter for surrogate model"""
if self.has_surrogate:
warnings.warn(f"Existing surrogate model of {self.name} has been overwritten by a new one.")
self._meta = surrogate
self.active_surrogate = activate
[docs]
def load_surrogate(self, filename: str, activate=True) -> None:
"""
Loads a surrogate model from a binary file created by `dump_surrogate`.
Parameters
----------
- filename: str
Destination file name.
- activate: bool, optional
Boolean determining whether or not surrogate model should be activated once loaded.
Default is `True`.
"""
surrogate = SystemSurrogate.load(self, filename)
self.__set_surrogate(surrogate, activate)
[docs]
def dump_surrogate(self, filename: str) -> None:
"""Dumps system surrogate model (if any) into a binary file."""
if not self.has_surrogate:
raise AttributeError(f"{self.name!r} has no surrogate model")
self._meta.dump(filename)
@property
def active_surrogate(self) -> bool:
"""bool: True if surrogate model is activated, False otherwise."""
return self.__runner is self._meta
@active_surrogate.setter
def active_surrogate(self, activate: bool) -> None:
"""Activation boolean setter for surrogate model."""
check_arg(activate, "active_surrogate", bool)
if activate == self.active_surrogate:
return # no change - quit
if activate:
if self._meta is None:
raise AttributeError(
"Can't set `active_surrogate` if no surrogate model has been created"
" by either `make_surrogate` or `load_surrogate`."
)
# Deactivate owner, children, and all drivers
self._set_recursive_active_status(False)
# Reactivate owner after upper recursion
self._active = True
self.__runner = self._meta
else:
self._set_recursive_active_status(True)
self.__runner = self
self.touch()
for port in self.inputs.values():
port.touch()
@property
def has_surrogate(self) -> bool:
"""bool: True if system has a surrogate model (even if inactive), False otherwise."""
return self._meta is not None
def _set_recursive_active_status(self, active_status: bool) -> None:
#TODO save and recover drivers original status
logger.debug(f"Starting recursive active status modifying of {self.name}, status to be set is {active_status}")
self._active = active_status
for driver in self.drivers.values():
logger.debug(f"Targeted driver for driver recursive deactivate is {driver}")
driver._set_children_active_status(active_status)
for child in self.children.values():
logger.debug(f"Targeted child for system recursive deactivate is {child}")
child._set_recursive_active_status(active_status)