"""
Basic classes handling model resolution, system connections and conversion between level of
modelings.
"""
from __future__ import annotations
import collections
import importlib
import json
import logging
import os
import warnings
from contextlib import contextmanager
from copy import deepcopy
from enum import Enum
from io import StringIO
from numbers import Number
from pathlib import Path
from typing import (
Any, Callable, ClassVar, Dict, FrozenSet,
Iterable, Iterator, List, Type,
Optional, Tuple, Union, TypeVar,
Collection,
)
from types import MappingProxyType
import jsonschema
import numpy
import pandas
from cosapp.patterns.visitor import Visitor
from cosapp.core.module import Module, CommonPorts
from cosapp.core.eval_str import EvalString
from cosapp.core.variableref import VariableReference
from cosapp.core.numerics.basics import MathematicalProblem, TimeProblem
from cosapp.core.numerics.boundary import TimeDerivative, TimeUnknown, Unknown
from cosapp.core.time import TimeObserver
from cosapp.ports.enum import PortType, Scope, Validity
from cosapp.ports.port import BasePort, ExtensiblePort, ModeVarPort, Port
from cosapp.ports.units import UnitError
from cosapp.ports.variable import RangeValue, Types, Variable
from cosapp.ports.connectors import BaseConnector, Connector, ConnectorError
from cosapp.utils.distributions import Distribution
from cosapp.utils.context import ContextLock
from cosapp.utils.helpers import check_arg, is_number, is_numerical
from cosapp.utils.json import JSONEncoder, decode_cosapp_dict
from cosapp.utils.logging import LogFormat, LogLevel, rollover_logfile
from cosapp.utils.naming import natural_varname
from cosapp.utils.pull_variables import pull_variables
from cosapp.utils.find_variables import get_attributes
from cosapp.utils.surrogate_models import FloatKrigingSurrogate
from cosapp.systems.systemConnector import SystemConnector
from cosapp.systems.systemSurrogate import SystemSurrogate
from cosapp.multimode.event import Event, EventState, ZeroCrossing
logger = logging.getLogger(__name__)
AnyPort = TypeVar("AnyPort", bound=Port)
AnySystem = TypeVar("AnySystem", bound="System")
AnyDriver = TypeVar("AnyDriver", bound="Driver")
[docs]
class ExecutionOrdering(Enum):
"""Enumeration of `System` algorithm to define component execution order."""
MANUAL = 'manual'
[docs]
class ConversionType(Enum):
"""Enumeration of `System` conversion type."""
manual = "manual"
best_fidelity_to_cost = "best_fidelity_to_cost_ratio"
high_fidelity = "highest_fidelity"
low_fidelity = "lowest_fidelity"
high_cost = "highest_cost"
low_cost = "lowest_cost"
# Default value use in list_inputs method to be able to tell if the caller set a value to out_stream
_DEFAULT_OUT_STREAM = object()
[docs]
class System(Module, TimeObserver):
# TODO check and complete documentation
"""
A class to describe generic properties and functions of a component that can be single or
made of child `System`.
Parameters
----------
name: str
Name of the `System`
Attributes
----------
name : str
`System` name
inputs : :obj:`collections.OrderedDict` of :obj:`BasePort`
Dictionary of `BasePort` containing the values needed to compute the `System`
outputs : Dict[BasePort]
Dictionary of `BasePort` containing the values computed by the `System`
residues : :obj:`collections.OrderedDict` of :obj:float
Dictionary of residues generated by the `System`
children : Dict[System]
Child `System` of this `System`
parent : System
Parent `System` of this `System`; None if there is no parent.
exec_order : MappingView[str]
Execution order in which sub-systems are computed.
properties : Dict[str, Any]
Dictionary of immutable parameters of the system.
design_methods : Dict[Equations]
`System` pre-defined design methods
name2variable : Dict[str, VariableReference]
Variable name mapping to its value by reference
_locked : bool
if True, `add_input`, `add_output`, `add_inward` and `add_outward` are desactivated. This is
the default behavior outside the `setup` function.
_active : bool
if False, the `System` will not execute its `compute` method
_is_tree_clean : bool
Reflects the status of the inputs and outputs. `clean` status means the group of ports was not updated since
last computation of the `System`
_compute_calls: bool
Store if the `System` was computed at last call or not (due to inhibition of clean status)
Examples
--------
To create your own `System`, you should inherit from this class and define your own `setup` and
`compute` methods.
>>> import numpy as np
>>>
>>> class RealPort(Port):
>>>
>>> def setup(self):
>>> self.add_variable('x',0.)
>>>
>>> class InvertedParabola(System):
>>>
>>> def setup(self):
>>> self.add_input(RealPort, 'iterative')
>>> self.add_input(RealPort, 'target_y')
>>> self.add_output(RealPort, 'max_root')
>>> self.add_inward({'a': 0.01,
>>> 'b': 2.,
>>> 'c': 4.})
>>> self.add_outward('roots', None)
>>> self.add_equation("a * iterative.x**2 + b * iterative.x + c == target_y.x", "y")
>>>
>>> def compute(self):
>>>
>>> discriminant = self.b**2 - 4. * self.a * self.c
>>> if discriminant >= 0.:
>>> self.roots = ((-self.b + np.sqrt(discriminant)) / (2. * self.a),
>>> (-self.b - np.sqrt(discriminant)) / (2. * self.a))
>>> self.max_roots.x = max(self.roots)
>>> else:
>>> self.roots = None
>>> self.max_roots.x = np.nan
"""
__slots__ = (
'__context_lock', '_is_tree_clean', '_locked', '_math', '_time_pb',
'design_methods', 'drivers', 'inputs', 'outputs', 'name2variable',
'__readonly', '__events', '_meta', '__runner', '__input_mapping',
'__loop_problem', '__child_connectors', '__pulling_connectors',
'__free_problem',
)
INWARDS = CommonPorts.INWARDS.value # type: ClassVar[str]
OUTWARDS = CommonPorts.OUTWARDS.value # type: ClassVar[str]
MODEVARS_IN = CommonPorts.MODEVARS_IN.value # type: ClassVar[str]
MODEVARS_OUT = CommonPorts.MODEVARS_OUT.value # type: ClassVar[str]
COMMON_PORTS = list(CommonPorts.__members__) # type: List[str]
tags = frozenset() # type: ClassVar[FrozenSet[str]]
_user_context = None # type: ClassVar[Optional[Scope]]
# Python packages containing components definition
_components_librairies = ['', ] # type: ClassVar[List[str]]
# Is the master system known (i.e. the one from which a simulation has started)
__master_set: ClassVar[bool] = False
[docs]
@classmethod
@contextmanager
def set_master(cls, name: str, type_checking: bool=True) -> bool:
"""Set the master System
Parameters
----------
name : str
Name of the System calling this context manager
type_checking : bool, optional
Whether to activate the type checking in the ports or not (default: True)
Returns
-------
bool
Is the callee the master System?
"""
is_master = False
if not cls.__master_set:
logger.debug(f"System <{name}> is the execution master.")
rollover_logfile() # Create a new logfile
BasePort.set_type_checking(type_checking) # May deactivate type checking
is_master = True
cls.__master_set = True
try:
yield is_master
finally:
if is_master:
cls.__master_set = False
BasePort.set_type_checking(True) # Reactivate type ckecking
from cosapp.drivers.optionaldriver import OptionalDriver
OptionalDriver.set_inhibited(False)
def __init__(self, name: str, **kwargs):
"""Initialize a System
Parameters
----------
- name [str]:
System name
"""
Module.__init__(self, name)
TimeObserver.__init__(self, sign_in=False)
from cosapp.drivers import Driver
self._math = self.new_problem(name)
self._time_pb = TimeProblem(name, self)
self.__loop_problem = self.new_problem('loop')
self.drivers = collections.OrderedDict() # type: Dict[str, Driver]
self.design_methods = dict() # type: Dict[str, MathematicalProblem]
self.__readonly = dict() # type: Dict[str, Any]
self.__ctor_kwargs = kwargs.copy()
self.__context_lock = ContextLock()
self.__free_problem = ContextLock()
self.inputs = dict() # type: Dict[str, BasePort]
self.outputs = dict() # type: Dict[str, BasePort]
self.__events = dict() # type: Dict[str, Event]
# Connectors are grouped in a dictionary where the key is the sink system i.e. the receiving system
self.__child_connectors = dict() # type: Dict[str, List[SystemConnector]]
self.__pulling_connectors = list() # type: List[SystemConnector]
self._locked = False # type: bool
self._is_tree_clean = False
self._meta = None
self.__runner = self # type: Union[System, SystemSurrogate]
self.__input_mapping = None # type: Dict[str, VariableReference]
# For efficiency purpose, links to objects are stored as reference
# !! name2variable must be the latest attribute set
# => lock __setattr__ on previously defined attributes
self.name2variable = dict() # type: Dict[str, VariableReference]
# Create extensible ports for orphan variables
self._add_port(ExtensiblePort(System.INWARDS, PortType.IN))
self._add_port(ExtensiblePort(System.OUTWARDS, PortType.OUT))
self._add_port(ModeVarPort(System.MODEVARS_IN, PortType.IN))
self._add_port(ModeVarPort(System.MODEVARS_OUT, PortType.OUT))
# Customized subclass `System` before applying user wishes
kwargs = self._initialize(**kwargs)
# Customized the `System` according to user wishes
with self.__free_problem:
self.setup(**kwargs)
self.update()
self.__enforce_scope()
self._locked = True
def _update(self, dt) -> None:
"""Required by `TimeObserver` base class"""
pass
[docs]
def accept(self, visitor: Visitor) -> None:
"""Specifies course of action when visited by `visitor`"""
visitor.visit_system(self)
[docs]
def ports(self) -> Iterator[BasePort]:
"""Iterator on all system ports (both inputs and outputs)"""
yield from self.inputs.values()
yield from self.outputs.values()
[docs]
def is_clean(self, direction: Optional[PortType] = None) -> bool:
"""Are the `System` ports with the given direction clean?
If no direction is specified, checks if both directions are clean.
Parameters
----------
direction : PortType, optional
Direction of interest
Returns
-------
bool
Clean status
"""
if direction == PortType.IN:
ports = self.inputs.values()
elif direction == PortType.OUT:
ports = self.outputs.values()
else:
ports = self.ports()
return all(port.is_clean for port in ports)
[docs]
def set_clean(self, direction: PortType) -> None:
"""Set to clean ports of a certain direction.
Parameters
----------
direction : PortType
Direction to set
"""
ports = self.inputs.values() if direction == PortType.IN else self.outputs.values()
for port in ports:
port.set_clean()
[docs]
def set_dirty(self, direction: PortType) -> None:
"""Set to dirty ports of a certain direction.
Parameters
----------
direction : PortType
Direction to set
"""
if direction == PortType.IN:
self.touch()
else:
for port in self.outputs.values():
port.touch()
[docs]
def touch(self):
for system in self.path_to_root():
if not system._is_tree_clean:
break
system._is_tree_clean = False
def __enforce_scope(self) -> None:
"""Encapsulate input ports for which some variables are out of scope."""
if self._user_context is None:
# Ensure tags is a frozenset
self.tags = tags = frozenset(self.tags)
def get_scope(role_sets: FrozenSet[FrozenSet[str]]) -> Scope:
"""Get context depending on the matching between user roles and tags:
- No tags or full match with one role => `PRIVATE`
- Partial match, at best, between tags and one role => `PROTECTED`
- Else `PUBLIC`
Parameters
----------
roles : FrozenSet[FrozenSet[str]]
Returns
-------
Scope
Examples
--------
>>> tags = frozenset()
>>> roles = frozenset([frozenset(['Aerodynamics', 'Compressor'])])
>>> assert get_context(tags, roles) == Scope.PRIVATE
>>>
>>> tags = frozenset(['Aerodynamics', 'Compressor'])
>>> roles = frozenset([frozenset(['Aerodynamics', 'Compressor'])])
>>> assert get_context(tags, roles) == Scope.PRIVATE
>>>
>>> tags = frozenset(['Aerodynamics', 'Compressor'])
>>> roles = frozenset([frozenset(['Mechanics', 'Compressor'])])
>>> assert get_context(tags, roles) == Scope.PROTECTED
>>>
>>> tags = frozenset(['Aerodynamics', 'Compressor'])
>>> roles = frozenset([frozenset(['Heat transfert', 'Turbine'])])
>>> assert get_context(tags, roles) == Scope.PUBLIC
"""
if not tags or tags in role_sets:
return Scope.PRIVATE
else:
best = Scope.PUBLIC
for roles in role_sets:
if tags.issubset(roles):
return Scope.PRIVATE
if tags.intersection(roles):
best = Scope.PROTECTED
return best
from cosapp.core.config import CoSAppConfiguration
user_config = CoSAppConfiguration()
self._user_context = get_scope(user_config.roles)
# print(f"{tags = }", user_config.roles, self._user_context)
scope = self._user_context
if scope != Scope.PRIVATE: # Some ports may be restrained
for port in self.inputs.values():
port.scope_clearance = scope
def _initialize(self, **kwargs) -> Dict[str, Any]:
"""Hook method to add `System` member before calling `setup` method.
Parameters
----------
**kwargs : Dict[str, Any]
Optional keywords arguments of __init__
Returns
-------
Dict[str, Any]
Optional keywords arguments not consumed by `_initialize`
"""
return kwargs
[docs]
def setup(self, **kwargs) -> None:
"""`System` port and/or child `System` are defined in this function.
This function allows to populate a customized `System` class. The helper functions for the
user are:
- `add_input` : add an input port
- `add_output` : add an output port
- `add_inward` : add a inward variable
- `add_outward` : add a outward variable
- `add_child` : add a child `System`
See Also
--------
add_input, add_output, add_inward, add_outward, add_child
Examples
--------
Here is an example of `System` subclassing:
>>> class AdvancedDuct(System):
>>>
>>> def setup(self):
>>> self.add_input(FlowPort, 'in')
>>> self.add_output(FlowPort, 'out')
>>> self.add_inward({'heat_source': 1.0,
>>> 'pressure_loss': 0.01})
>>> self.add_outward('wall_temperature', 400.)
>>>
>>> self.add_child(AnotherSystem('system2'))
"""
pass # pragma: no cover
[docs]
def update(self):
"""Perform complex tasks due to data change.
Some parameters may be a file used to initialize a complex object. Updating that parameter
may imply the modification of that complex object. This should be done in this method.
Examples
--------
>>> class TableModel(System):
>>>
>>> def setup(self):
>>> # Add a inwards to the file containing the table values
>>> self.add_inward('table_file', '{myProject}/ressources/my_table.csv')
>>> # Set a object able to interpolate the table
>>> self.add_outward('table', Table(self.table_file))
>>>
>>> def update(self):
>>> # Update the table object as the source file may have been updated.
>>> self.table = Table(self.table_file)
Notes
-----
This method is called systematically after the :py:meth:`~cosapp.systems.system.System.setup` call.
Otherwise the user is responsible to explicitly call it when needed.
"""
pass # pragma: no cover
def __getattr__(self, name: str) -> Any:
try: # Faster than testing `if name in self`
variable_ref = self.name2variable[name]
except KeyError:
try:
return super().__getattribute__(name)
except AttributeError as error:
# Last try: checkout read-only properties
# Note: checked last, as access is less likely than
# `name2variable` and `super().__getattribute__()`.
try:
return self.__readonly[name]
except KeyError:
raise error
else:
return variable_ref.value
def __setattr__(self, name: str, value: Any) -> None:
try: # Faster than testing `if name in self`
# Faster to duplicate __setitem__ call than calling it
variable_ref = super().__getattribute__("name2variable")[name]
except KeyError:
if name in self.__readonly:
raise AttributeError(f"can't set attribute {self.name}.{name}")
elif hasattr(self, name):
super().__setattr__(name, value)
else:
raise AttributeError(
f"System {self.__class__.__qualname__!r} has no attribute {name!r}."
)
except AttributeError:
# Exception catcher to create all initial variables defined prior to name2variable
# Then KeyError will be raised and it won't be allowed to create new attributes
super().__setattr__(name, value)
else:
variable_ref.value = value
def __contains__(self, item: str) -> bool:
try:
self.__find(item)
except AttributeError:
return False
else:
return True
def __find(self, name: str) -> Any:
for collection in (
self.name2variable,
self.__readonly,
self.__events,
):
try:
return collection[name]
except KeyError:
continue
raise AttributeError(name)
def __getitem__(self, name: str) -> Any:
try:
return getattr(self, name)
except AttributeError:
raise KeyError(
f"Variable {name!r} not found in the context of System {self.name!r}"
)
def __setitem__(self, name: str, value: Any) -> None:
try:
variable_ref = self.name2variable[name]
except KeyError:
if name in self.__readonly:
raise AttributeError(f"Can't set read-only attribute {self.name}.{name}")
raise KeyError(
f"Variable {name!r} not found in the context of System {self.name!r}"
)
variable_ref.value = value
def __repr__(self) -> str:
return f"{self.name} - {type(self).__name__}"
@property
def input_mapping(self) -> Dict[str, VariableReference]:
"""Dict[str, VariableReference]: free input mapping"""
if self.__input_mapping is None:
self._update_input_mapping()
return MappingProxyType(self.__input_mapping)
def _update_input_mapping(self) -> None:
"""Run graph analysis to identify free inputs,
and store data in self.__input_mapping dict."""
from cosapp.utils.graph_analysis import get_free_inputs
self.__input_mapping = get_free_inputs(self)
def __reset_input_mapping(self) -> None:
self.__input_mapping = None
parent = self.parent
if parent is not None:
parent.__reset_input_mapping()
[docs]
def append_name2variable(
self, additional_mapping: Iterable[Tuple[str, VariableReference]]
) -> None:
"""Append the `Iterable` of (`str`, `VariableReference`) `Tuple` to the lookup variables mapping.
The additional mapping is also transfer upward to the parent `System`.
Parameters
----------
additional_mapping: Iterable[Tuple[str, VariableReference]]
Additional list of (str, VariableReference) tuple to append
"""
# TODO raise error if name already exists (example a inwards variable name == component name)
additional_mapping = list(additional_mapping)
self.name2variable.update(additional_mapping)
for key, _ in additional_mapping:
self._add_member(key)
if self.parent is not None:
name = self.name
rel2absname = lambda item: (f"{name}.{item[0]}", item[1])
self.parent.append_name2variable(map(rel2absname, iter(additional_mapping)))
[docs]
def pop_name2variable(self, keys: Iterable[str]) -> None:
"""Remove the given keys from the name mapping dictionary.
The keys will be remove from the local mapping. Then the keys list will be sent
upward to the parent `System` for deletion.
Parameters
----------
keys: Iterable[str]
Keys to remove
"""
keys = list(keys)
name_mapping = self.name2variable
for key in keys:
name_mapping.pop(key)
if self.parent is not None:
rel2absname = lambda relname: f"{self.name}.{relname}"
self.parent.pop_name2variable(map(rel2absname, keys))
[docs]
def add_property(self, name: str, value: Any) -> None:
"""Create new read-only property `name`, set to `value`"""
self.__lock_check("add_property")
name = Variable.name_check(name)
self.__check_attr(name, f"cannot add read-only property {name!r}")
self.__readonly[name] = value
self._add_member(name)
cls = self.__class__
def getter(self):
try:
return self.__readonly[name]
except KeyError:
raise AttributeError(f"{cls.__name__} object {self.name!r} has no attribute {name!r}")
setattr(cls, name, property(getter))
@property
def properties(self) -> Dict[str, Any]:
"""Dict[str, Any]: list of read-only properties and associated values"""
return MappingProxyType(self.__readonly)
[docs]
def add_output(self,
port_class: Type[AnyPort],
name: str,
variables: Optional[Dict[str, Any]] = None,
desc: str = "",
) -> AnyPort:
"""Add an output `Port` to the `System`.
This function cannot be called outside `System.setup`.
Parameters
----------
- port_class [type[Port]]
Class of the `Port` to create
- name [str]:
`Port` name
- desc [str, optional]:
`Port` description
- variables [dict[str, Any], optional]:
Dictionary of initial values (default: None)
Returns
-------
The created port
Examples
--------
>>> class MyModule(System):
>>> def setup(self):
>>> self.add_output(MyPort, 'p_out1')
>>> self.add_output(MyPort, 'p_out2', {'y': 1.5})
"""
self.__lock_check("add_output")
# Type validation
check_arg(port_class, 'port_class', type)
if not issubclass(port_class, Port):
raise TypeError(
f"port_class should be a subclass of Port; got {type(port_class).__name__}."
)
new_port = port_class(name, PortType.OUT, variables=variables)
self._add_port(new_port, desc)
return new_port
def _add_port(self, port: BasePort, desc="") -> None:
"""Add a port to the system
Parameters
----------
port : `BasePort`
instance of a port class
"""
self.__check_attr(port.name, f"cannot add {type(port).__qualname__} {port.name!r}")
port.owner = self
port.description = desc
if port.is_input:
inputs = self.inputs
if port.name in inputs:
raise ValueError(f"Port name {port.name!r} already exists as input")
inputs[port.name] = port
port_key = (port.name, VariableReference(context=self, mapping=inputs, key=port.name))
elif port.is_output:
outputs = self.outputs
if port.name in outputs:
raise ValueError(f"Port name {port.name!r} already exists as output")
outputs[port.name] = port
port_key = (port.name, VariableReference(context=self, mapping=outputs, key=port.name))
else:
raise ValueError(f"Unknown `PortType` {port.direction}")
keys = [port_key]
rel2absname = lambda relname: (
f"{port.name}.{relname}",
VariableReference(context=self, mapping=port, key=relname),
)
keys.extend(map(rel2absname, iter(port)))
if isinstance(port, Port):
attributes = get_attributes(port) - get_attributes(Port) - set(port._variables)
for attr in attributes:
keys.append((f"{port.name}.{attr}", VariableReference(context=self, mapping=port, key=attr)))
self.append_name2variable(keys)
def __lock_check(self, method: str) -> None:
"""Raises AttributeError if system is locked"""
if self._locked:
raise AttributeError(f"`{method}` cannot be called outside `setup`")
def __check_attr(self, name: str, prefix: str = "") -> None:
"""Raises ValueError if attribute `name` already exists in system"""
try:
obj = self.__find(name)
except AttributeError:
return # attribute does not exist - OK
if isinstance(obj, VariableReference):
if obj.context is self:
value = obj.value
if isinstance(value, Port):
pdir = "input" if value.is_input else "output"
suffix = f"as an {pdir} {type(value).__name__}"
elif isinstance(value, System):
suffix = f"as a sub-system {type(value).__name__}"
else:
port = obj.mapping
if port is self.inputs[System.INWARDS]:
suffix = "as an inward"
elif port is self.outputs[System.OUTWARDS]:
suffix = "as an outward"
else:
suffix = ""
elif isinstance(obj, Event):
suffix = f"as an event"
else:
suffix = "as a read-only property"
message = f"{self.name}.{name} already exists"
prefix = prefix.strip()
if prefix:
message = f"{prefix}; {message}"
if suffix:
message = f"{message} {suffix}"
raise ValueError(f"{message}.")
[docs]
def add_inward(self,
definition: Union[str, Dict[str, Any]],
value: Any = 1,
unit: str = "",
dtype: Types = None,
valid_range: RangeValue = None,
invalid_comment: str = "",
limits: RangeValue = None,
out_of_limits_comment: str = "",
desc: str = "",
distribution: Optional[Distribution] = None,
scope: Scope = Scope.PRIVATE,
) -> None:
"""Add a inward variable to the `System`.
A inward variable is calculated by the `System`. But its value is not mandatory in any
variables fluxes between this `System` and another one.
An unique inward variable can be defined by providing directly all arguments. And multiple
inward variables can be defined by passing a `dict` of pair (`str`, `Any`) with an entry for
each variable.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
definition : str or Dict[str, Any]
Name of the unique variable or a dictionary for multiple variables at once
value : Any, optional
Value of the variable if `definition` is a `str`; default 1
unit : str, optional
Variable unit; default empty string (i.e. dimensionless)
dtype : type or iterable of types, optional
Variable type; default None (i.e. type of initial value)
valid_range : Tuple[Any, Any], optional
Validity range of the variable; default None (i.e. all values are valid)
invalid_comment : str, optional
Comment to show in case the value is not valid; default ''
limits : Tuple[Any, Any], optional
Limits over which the use of the model is wrong; default valid_range
out_of_limits_comment : str, optional
Comment to show in case the value is not valid; default ''
desc : str, optional
Variable description; default ''
distribution : Distribution, optional
Probability distribution of the variable; default None (no distribution)
scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional
Variable visibility; default PRIVATE
Examples
--------
To add one inward variable, arguments must be directly specified.
>>> system.add_inward('data', 2.)
To add multiple inward variables, a dictionary with one key per data should be provided.
>>> system.add_inward({
>>> 'data1': 42.,
>>> 'data2': False
>>> })
"""
self.__lock_check("add_inward")
# Type validation
check_arg(definition, 'definition', (str, dict))
def add_unique_data(
name: str,
value: Any,
unit: str = "",
dtype: Types = None,
valid_range: RangeValue = None,
invalid_comment: str = "",
limits: RangeValue = None,
out_of_limits_comment: str = "",
distribution: Optional[Distribution] = None,
desc: str = "",
scope: Scope = Scope.PRIVATE,
) -> None:
self.__check_attr(name, f"cannot add inward {name!r}")
inputs = self.inputs
inputs[System.INWARDS].add_variable(
name,
value,
unit=unit,
dtype=dtype,
valid_range=valid_range,
invalid_comment=invalid_comment,
limits=limits,
out_of_limits_comment=out_of_limits_comment,
desc=desc,
distribution=distribution,
scope=scope,
)
reference = VariableReference(context=self, mapping=inputs[System.INWARDS], key=name)
self.append_name2variable(
[(f"{System.INWARDS}.{name}", reference), (name, reference)]
)
self.__reset_input_mapping()
if isinstance(definition, dict):
for key, value in definition.items():
if isinstance(value, dict):
try:
add_unique_data(key, **value)
except TypeError:
add_unique_data(key, value)
else:
add_unique_data(key, value)
else:
add_unique_data(
definition,
value,
unit=unit,
dtype=dtype,
valid_range=valid_range,
invalid_comment=invalid_comment,
limits=limits,
out_of_limits_comment=out_of_limits_comment,
desc=desc,
distribution=distribution,
scope=scope,
)
[docs]
def add_outward(self,
definition: Union[str, Dict[str, Any]],
value: Any = 1,
unit: str = "",
dtype: Types = None,
valid_range: RangeValue = None,
invalid_comment: str = "",
limits: RangeValue = None,
out_of_limits_comment: str = "",
desc: str = "",
scope: Scope = Scope.PUBLIC,
) -> None:
"""Add a outward variable to the `System`.
A outward variable is calculated by the `System`. But its value is not mandatory in any
variables fluxes between this `System` and another one.
An unique outward variable can be defined by providing directly all arguments. And multiple
outward variables can be defined by passing a `dict` of pair (`str`, `Any`) with an entry for
each variable.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
definition : str or Dict[str, Any]
Name of the unique variable or a dictionary for multiple variables at once
value : Any, optional
Value of the variable if `definition` is a `str`; default 1
unit : str, optional
Variable unit; default empty string (i.e. dimensionless)
dtype : type or iterable of types, optional
Variable type; default None (i.e. type of initial value)
valid_range : Tuple[Any, Any], optional
Validity range of the variable; default None (i.e. all values are valid)
invalid_comment : str, optional
Comment to show in case the value is not valid; default ''
limits : Tuple[Any, Any], optional
Limits over which the use of the model is wrong; default valid_range
out_of_limits_comment : str, optional
Comment to show in case the value is not valid; default ''
desc : str, optional
Variable description; default ''
scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional
Variable visibility; default PUBLIC
Examples
--------
To add an unique variable, arguments must be directly specified.
>>> system.add_outward('info', 2.)
To add multiple variables, a dictionary with one key per outward variable should be provided.
>>> system.add_inward({
>>> 'info1': 42.,
>>> 'info2': False
>>> })
"""
self.__lock_check("add_outward")
# Type validation
check_arg(definition, "definition", (str, dict))
def add_local(
name: str,
value: Any,
unit: str = "",
dtype: Types = None,
valid_range: RangeValue = None,
invalid_comment: str = "",
limits: RangeValue = None,
out_of_limits_comment: str = "",
desc: str = "",
scope: Scope = Scope.PUBLIC,
) -> None:
self.__check_attr(name, f"cannot add outward {name!r}")
outputs = self.outputs
outputs[System.OUTWARDS].add_variable(
name,
value,
unit=unit,
dtype=dtype,
valid_range=valid_range,
invalid_comment=invalid_comment,
limits=limits,
out_of_limits_comment=out_of_limits_comment,
desc=desc,
scope=scope,
)
reference = VariableReference(context=self, mapping=outputs[System.OUTWARDS], key=name)
self.append_name2variable(
[(f"{System.OUTWARDS}.{name}", reference), (name, reference)]
)
if isinstance(definition, dict):
for key, value in definition.items():
if isinstance(value, dict):
try:
add_local(key, **value)
except TypeError:
add_local(key, value)
else:
add_local(key, value)
else:
add_local(
definition,
value,
unit=unit,
dtype=dtype,
valid_range=valid_range,
invalid_comment=invalid_comment,
limits=limits,
out_of_limits_comment=out_of_limits_comment,
desc=desc,
scope=scope,
)
[docs]
def add_transient(self,
name: str,
der: str, # time derivative of `name`
desc: str = None,
max_time_step: Union[Number, str] = numpy.inf,
max_abs_step: Union[Number, str] = numpy.inf,
) -> None:
"""
Declare a transient variable, defined implicitly by the expression of its time derivative, and add
a time-dependent unknown to the mathematical problem.
Transients can be used withinn the system, but their time evolution is computed outside the system, by a time driver.
If `name` already exists, it must refer to an inward.
If `name` does not exist, a new inward `name` is created on the fly.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
name : str
Name of the transient variable
der : str
Expression of the time derivative, used for integration during time driver execution.
The type and unit of the transient variable are deduced from `der`.
desc : str, optional
Variable description; default None
max_time_step : Number or evaluable expression (str), optional
Maximum time step admissible for the integration of the variable (for stability, typically).
max_abs_step : Number or evaluable expression compatible with transient variable, optional
Maximum variable step admissible over one time step (for stability, typically).
Examples
--------
>>> system.add_inward('v', numpy.zeros(3), desc='Velocity')
>>> system.add_inward('a', numpy.zeros(3), desc='Acceleration')
>>> system.add_inward('area', 1.0)
>>> system.add_outward('flowrate', desc='Volumetric flowrate')
>>> system.add_transient('x', der='v') # x defined by dx/dt = v
>>> system.add_transient('v', der = 'a',
max_time_step = '0.1 / max(norm(a), 1e-9)',
)
>>> system.add_transient('h',
der = 'flowrate / area',
max_abs_step = 0.1,
)
"""
check_arg(name, "name", str)
check_arg(der, "der", str)
_, value, dtype = TimeUnknown.der_type(der, self)
if name in self:
if not self.is_input_var(name):
raise TypeError(
"Only input variables can be declared as transient variables; got {!r} in {!r}".format(
name, self.name)
)
# Check that existing inward is compatible with derivative expression
declared_value = self[name]
if isinstance(value, numpy.ndarray):
ok = (numpy.shape(declared_value) == value.shape)
else:
ok = is_number(declared_value)
if not ok:
raise TypeError(
"Type incompatibility: {} {!r} cannot be the time derivative of {} variable {!r}".format(
dtype.__name__, der, type(declared_value).__name__, name))
else: # Create new inward on the fly
logger.info(f"Creation of new time-dependent inward {name!r} within system {self.name!r}")
if desc is None:
str_der = self.str_der(1)
desc = f"Transient variable defined as {str_der(name)} = {der}"
# Need to copy the value to avoid vector linked by reference between variable and its derivative
# See https://gitlab.safrantech.safran/cosapp/cosapp/issues/179
self.add_inward(name, value=deepcopy(value), desc=desc, dtype=dtype)
self._time_pb.add_transient(name, der, max_time_step, max_abs_step)
[docs]
def add_rate(self,
name: str,
source: Any, # `name` is the time derivative of `source`
initial_value: Union[Number, numpy.ndarray, str] = None,
desc: str = None,
) -> None:
"""
Add a variable monitoring the rate-of-change of a given quantity (referred to as `source`) as the system evolves.
Rates are defined by their source, and are updated by the time driver computing the time evolution of the system.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
name : str
Name of the transient variable
source : Any
Expression of the quantity of interest whose rate will be computed.
The type and unit of the rate are deduced from `source`.
desc : str, optional
Variable description; default None
initial_value : Number/array, or evaluable expression (str), optional.
Initial value of the rate (otherwise unknown, as rates are only updated from the first time step on).
If specified, it must be consistent with the type of source (scalar vs. array).
Examples
--------
>>> system.add_inward('U', 0.1, desc='Input voltage')
>>> system.add_rate('dUdt', source='U') # dUdt defined as dU/dt
"""
check_arg(name, "name", str)
self.__check_attr(name, f"cannot add rate {name!r}")
_, src_value, dtype = TimeDerivative.source_type(source, self)
def cast(expression):
if expression is None:
return None
if isinstance(expression, str):
value = EvalString(expression, self).eval()
elif isinstance(expression, EvalString):
value = expression.eval()
else:
value = expression
if isinstance(src_value, numpy.ndarray):
value = numpy.asarray(value)
return value
if initial_value is not None and isinstance(src_value, numpy.ndarray):
# If provided, `initial_value` must conform to `value.shape`
check_arg(
cast(initial_value), 'initial_value', numpy.ndarray,
value_ok = lambda array: (numpy.shape(array) == src_value.shape),
)
if name in self:
if not self.is_input_var(name):
raise TypeError(
f"Only input variables can be declared as rates; got {self.name}.{name}"
)
# Check that existing inward is compatible with requested rate
declared_value = self[name]
if isinstance(src_value, numpy.ndarray):
ok = (numpy.shape(declared_value) == src_value.shape)
else:
ok = is_number(declared_value)
if not ok:
raise TypeError(
"Type incompatibility: {} {!r} cannot be the time derivative of {} {}".format(
type(declared_value).__name__, name, dtype.__name__, source))
self[name] = cast(initial_value)
else: # Create new inward on the fly
logger.info(f"Creation of new time-dependent inward {name!r} within system {self.name!r}")
if desc is None:
str_der = self.str_der(1)
desc = str_der(source)
if initial_value is None:
value = None
dtype = (dtype, type(None))
else:
value = cast(initial_value)
self.add_inward(name, value=value, desc=desc, dtype=dtype)
self._time_pb.add_rate(name, source, initial_value)
[docs]
def is_output_var(self, name: str) -> bool:
"""Returns `True` if `name` is the name of an output variable, `False` otherwise"""
return self.__check_var(name, PortType.OUT)
def __check_var(self, name: str, direction: PortType) -> bool:
"""Driver function for `is_input` and `is_output`"""
try:
container = self.name2variable[name].mapping
except KeyError:
return False
else:
return isinstance(container, BasePort) and (container.direction == direction)
[docs]
@staticmethod
def str_der(order: int) -> Callable[[str,], str]:
"""Derivate name factory.
Parameters
----------
order : int
Derivative order
Returns
-------
Callable[[int, ], str]
Function generating the derivate name from the variable name
"""
if order == 1:
return lambda s: f"d({s})/dt"
elif order > 1:
return lambda s: "d^{0}({1})/dt^{0}".format(s, order)
return lambda s: s
def _precompute(self) -> None:
for rate in self._time_pb.rates.values():
rate.touch() # ensured that system is recomputed at first time step
[docs]
def check(self, name: Optional[str] = None) -> Union[Dict[str, Validity], Validity]:
"""Get variable value validity for a given variable, port or system or all of them.
If `name` is not provided, returns a dictionary with the validity of all `System` variables;
i.e. all variables in input and output ports including inwards and outwards of this system and
all its children.
Else only, the validity for the given variable will be returned.
Parameters
----------
name : str, optional
Variable name
Returns
-------
Dict[str, Validity] or Validity
(Dictionary of) the variable(s) value validity
"""
def plain_check(name) -> Union[Dict[str, Validity], Validity]:
"""Internal version with no attribute check"""
obj = getattr(self, name)
if isinstance(obj, (System, BasePort)):
return obj.check()
else:
ref = self.name2variable[name]
return ref.mapping.check(ref.key)
# TODO unit tests
if name is None:
def variable_filter(dict_items: tuple) -> bool:
key, ref = dict_items
if isinstance(ref.value, (System, BasePort)):
return False
elif "." in key:
owner_name = key.rsplit(".", maxsplit=1)[0]
return isinstance(self[owner_name], BasePort)
else: # Shortcut to inwards or outwards => not taken
return False
return {
name: plain_check(name)
for name, ref in filter(variable_filter, self.name2variable.items())
}
elif name in self:
return plain_check(name)
else:
raise AttributeError(f"Variable {name} does not exist in System {self.name}")
[docs]
def is_running(self) -> bool:
"""Is this System in execution?
Returns
-------
bool
In execution status
"""
return self.__context_lock.is_active
@property
def residues(self):
"""Dict[str, Residue] : Get the residues for the current `System`."""
# MappingProxyType forbids external modification
return MappingProxyType(self._math.residues)
@property
def unknowns(self):
"""Dict[str, Unknown] : Get the unknowns for the current `System`."""
# MappingProxyType forbids external modification
return MappingProxyType(self._math.unknowns)
@property
def transients(self):
"""Returns a dictionary containing all transient unknowns in current system tree"""
# MappingProxyType forbids external modification
return MappingProxyType(self._time_pb.transients)
@property
def rates(self):
"""Returns a dictionary containing all time derivatives (rates) in current system tree"""
# MappingProxyType forbids external modification
return MappingProxyType(self._time_pb.rates)
[docs]
def events(self) -> Iterator[Event]:
"""Iterator on all events locally defined on system."""
yield from self.__events.values()
[docs]
def all_events(self) -> Iterator[Event]:
"""Recursive iterator on all events in complete system tree."""
for elem in self.tree():
yield from elem.events()
@property
def child_connectors(self) -> Dict[str, List[Connector]]:
"""Dict[str, List[Connector]] : Connectors between sub-systems, referenced by system names."""
return MappingProxyType(self.__child_connectors)
[docs]
def connectors(self) -> Dict[str, Connector]:
"""Constructs a dictionary of all connectors within system,
referenced by connector name.
"""
make_items = lambda connector: (connector.name, connector)
return dict(map(make_items, self.all_connectors()))
[docs]
def all_connectors(self) -> Iterator[Connector]:
"""Iterator yielding all connectors within system."""
yield from self.__pulling_connectors
for connectors in self.__child_connectors.values():
yield from connectors
[docs]
def incoming_connectors(self) -> Iterator[Connector]:
"""Iterator yielding all connectors targetting system."""
yield from self.__pulling_connectors
parent: System = self.parent
if parent:
yield from parent.__child_connectors.get(self.name, [])
@property
def problem(self) -> MathematicalProblem:
"""MathematicalProblem: locally defined off-design mathematical problem,
without considering sub-system tree.
Raises:
-------
`AttributeError` unless system is being modified by a driver.
"""
if not self.__free_problem.is_active:
raise AttributeError("Can't access attribute `problem`")
return self._math
[docs]
def assembled_problem(self) -> MathematicalProblem:
"""Returns the consolidated mathematical problem, assembled from
entire system tree, and accounting for cyclic dependency loops.
Returns
-------
MathematicalProblem
The assembled mathematical problem.
"""
problem = self.new_problem('off-design')
# Make shallow copy of `self._math` properties
for name in ('residues', 'deferred_residues', 'unknowns'):
attr = getattr(problem, name)
attr.update(getattr(self._math, name))
def transfer_unknown(unknown: Unknown, new_name: str):
options = {
attr: getattr(unknown, attr)
for attr in ('max_abs_step', 'max_rel_step', 'lower_bound', 'upper_bound')
}
problem.add_unknown(new_name, **options)
children: dict[str, System] = self.children
for child in children.values():
if child.is_standalone():
continue
child_problem = child.assembled_problem()
connectors = self.__child_connectors.get(child.name, [])
unknowns = child_problem.unknowns
for connector in connectors:
# Transfer unknowns to parent (when necessary)
for name in list(unknowns.keys()):
unknown = unknowns[name]
port = unknown.port
connected = (
port.owner in self.children.values()
and port is connector.sink
and unknown.variable in connector.sink_variables()
)
if connected:
# Port is connected => remove unknown
unknowns.pop(name)
pulled = connector.source.owner is self
if pulled:
# Transfer unknown to parent level
src = connector.source_variable(unknown.variable)
transfer_unknown(unknown, f"{connector.source.name}.{src}")
# Prune target equations defined as weak if target is connected
origin = connector.source.owner
deferred_residues = origin._math.deferred_residues.values()
for deferred in filter(lambda target: target.weak, deferred_residues):
targetted = list(deferred.variables)[0]
ref = origin.name2variable[targetted]
port = ref.mapping
name = ref.key
connected = (
port.owner in self.children.values()
and port is connector.source
and name in connector.source_variables()
)
if connected:
# Remove deferred equation
key = MathematicalProblem.target_key(f"{origin.name}.{targetted}")
problem.deferred_residues.pop(key)
problem.extend(child_problem, copy=False)
return problem.extend(self.__loop_problem)
[docs]
def assembled_time_problem(self) -> TimeProblem:
"""Returns the consolidated transient problem, assembled from the entire system tree.
Returns
-------
TimeProblem
The assembled time problem.
"""
problem = TimeProblem('off-design', self)
# Make shallow copy of `self._math` properties
for name in ('transients', 'rates'):
attr: dict = getattr(problem, name)
attr.update(getattr(self._time_pb, name))
def transfer_transient(transient: TimeUnknown, name: str):
ref = transient.context.name2variable[transient.basename]
problem.add_transient(name,
der = transient.der,
max_time_step = transient.max_time_step_expr,
pulled_from = transient.pulled_from or ref,
)
def transfer_rate(rate: TimeDerivative, name: str):
problem.add_rate(name,
source = rate.source_expr,
initial_value = rate.initial_value_expr,
)
popped: list[tuple[str, str]] = list()
for child in self.children.values():
child_problem = child.assembled_time_problem()
connectors = self.__child_connectors.get(child.name, [])
transfer_items = [
(child_problem.transients, transfer_transient),
(child_problem.rates, transfer_rate),
]
for connector in connectors:
# Transfer transients and rates to parent (when necessary)
for unknowns, transfer in transfer_items:
for name in list(unknowns.keys()):
unknown: Unknown = unknowns[name]
port = unknown.port
connected = (
port.owner in self.children.values()
and port is connector.sink
and unknown.variable in connector.sink_variables()
)
if connected:
# Port is connected => remove unknown
unknowns.pop(name)
if connector.source.owner is self:
# Transfer unknown to parent level
src = connector.source_variable(unknown.variable)
transfer(unknown, f"{connector.source.name}.{src}")
else:
popped.append(unknown.contextual_name())
problem.extend(child_problem, copy=False)
if popped:
if len(popped) == 1:
message = f"variable {popped[0]!r} is connected to an output"
else:
message = f"variables {popped} are connected to outputs"
warnings.warn(
f"In {self.full_name()}, time-dependent {message} and will not be computed."
)
return problem
[docs]
def get_unsolved_problem(self) -> MathematicalProblem:
"""Returns the consolidated mathematical problem, assembled from
entire system tree, and accounting for cyclic dependency loops.
Deprecated; use `assembled_problem` instead.
Returns
-------
MathematicalProblem
The assembled mathematical problem.
"""
warnings.warn(
"Method `get_unsolved_problem` is deprecated; use `assembled_problem` instead."
)
return self.assembled_problem()
[docs]
def add_child(self,
child: AnySystem,
execution_index: Optional[int] = None,
pulling: Optional[Union[str, Collection[str], Dict[str, str]]] = None,
desc: str = "",
) -> AnySystem:
"""Add a child `System` to the current `System`.
When adding a child `System`, it is possible to specified its position in the execution
order.
Child ports or individual `inwards` and `outwards` can also be pulled at the parent level by providing
either the name of the port/inward/outward or a list of them or the name mapping of the child element
(dictionary keys) to the parent element (dictionary values).
If the argument is not a dictionary, the name in the parent system will be the same as in
the child.
Parameters
----------
- child [System]:
`System` to add to the current `System`
execution_index [int, optional]:
Index of the execution order list at which the `System` should be inserted;
default latest.
- pulling [str or list[str] or dict[str, str], optional]:
Map of child ports to pulled ports at the parent system level; default None (no pulling)
- desc [str, optional]:
Sub-system description in the context of its parent.
Returns
-------
`child`
"""
# Type validation
check_arg(child, 'child', System)
check_arg(pulling, 'pulling', (type(None), str, Collection))
child = super().add_child(child, execution_index, desc)
if child.name in self.name2variable:
if isinstance(self[child.name], System):
logger.warning(
f"A subsystem named {child.name} already exists within system {self.name}."
f" Child system {child!r} will overwrite it."
)
else:
raise ValueError(f"An object named {child.name!r} already exists in system {self}")
keys = [(child.name, VariableReference(context=self, mapping=self.children, key=child.name))]
# Append child system name mapping to current mapping
rel2absname = lambda item: (f"{child.name}.{item[0]}", item[1])
keys.extend(map(rel2absname, child.name2variable.items()))
self.append_name2variable(keys)
self.__reset_input_mapping()
# Add read-only constants to parent line
prefix = f"{child.name}."
for system in self.path_to_root():
for name, value in child.__readonly.items():
system.__readonly[f"{prefix}{name}"] = value
prefix = f"{system.name}.{prefix}"
if pulling is not None:
try:
pull_variables(child, pulling)
except (ConnectorError, UnitError):
self.pop_child(child.name)
raise
# If child is added outside of `setup`, we must force system tree inspection
# to ensure input propagation during the next system execution.
self.touch()
return child
[docs]
def pop_child(self, name: str) -> System:
"""Remove the subsystem called `name`.
Parameters
----------
name: str
Name of the subsystem to be removed
Returns
-------
`System`
The removed subsystem
Raises
------
`AttributeError` if no match is found
"""
popped: System = super().pop_child(name)
# Remove all connections to and from popped child
child_connectors = self.__child_connectors
child_connectors.pop(popped.name, None)
is_valid = lambda connector: connector.source.owner is not popped
for key, connectors in child_connectors.items():
child_connectors[key] = list(
filter(is_valid, connectors)
)
self.__pulling_connectors = list(
filter(is_valid, self.__pulling_connectors)
)
# Remove references to popped child from name mapping
popped_attr = lambda key: key.startswith(f"{name}.")
popped_keys = [name] + list(
filter(popped_attr, self.name2variable.keys())
)
self.pop_name2variable(popped_keys)
self.__reset_input_mapping()
return popped
[docs]
def add_driver(self, driver: AnyDriver) -> AnyDriver:
"""Add a driver to this system.
Parameters
----------
driver : Driver
Driver to add
Returns
-------
Driver
The added driver
"""
from cosapp.drivers import Driver
check_arg(driver, 'driver', Driver)
name = driver.name
default_name = f"default_driver_for_{hex(id(self))}"
if default_name in self.drivers and name != default_name:
self.drivers.clear()
if name in self.drivers:
logger.warning(f"Driver {name!r} already exists and will be replaced")
driver.owner = self
driver.parent = None # Clear driver parent in case of driver reused
self.drivers[name] = driver
return driver
[docs]
def add_unknown(self,
name: Union[str, Iterable[Union[dict, str]]],
max_abs_step: Number = numpy.inf,
max_rel_step: Number = numpy.inf,
lower_bound: Number = -numpy.inf,
upper_bound: Number = numpy.inf
) -> MathematicalProblem:
"""Add unknown variables.
You can set variable one by one or provide a list of dictionary to set multiple variable at once. The
dictionary key are the arguments of this method.
Parameters
----------
- name : str or Iterable of dictionary or str
Name of the variable or list of variable to add
- max_rel_step : float, optional
Maximal relative step by which the variable can be modified by the numerical solver; default numpy.inf
- max_abs_step : float, optional
Maximal absolute step by which the variable can be modified by the numerical solver; default numpy.inf
- lower_bound : float, optional
Lower bound on which the solver solution is saturated; default -numpy.inf
- upper_bound : float, optional
Upper bound on which the solver solution is saturated; default numpy.inf
Returns
-------
MathematicalProblem
The modified mathematical problem
"""
def create_unknown(
name: str,
max_abs_step: Number = numpy.inf,
max_rel_step: Number = numpy.inf,
lower_bound: Number = -numpy.inf,
upper_bound: Number = numpy.inf
):
unknown = Unknown(self, name, max_abs_step, max_rel_step, lower_bound, upper_bound)
# Remove existing unknown if user wants to update the parameters.
if unknown.name in self._math.unknowns:
self._math.unknowns.pop(unknown.name)
self._math.add_unknown([unknown])
if isinstance(name, str):
create_unknown(name, max_abs_step, max_rel_step, lower_bound, upper_bound)
else:
for item in name:
create_unknown(item, max_abs_step, max_rel_step, lower_bound, upper_bound)
return self._math
[docs]
def add_equation(self,
equation: Union[str, Iterable[Union[dict, str, Tuple[str, str]]]],
name: Optional[str] = None,
reference: Union[Number, numpy.ndarray, str] = 1,
) -> MathematicalProblem:
"""Add off-design equation.
Equations may be added one by one, or provided by a list of dictionaries to add multiple equations at once.
The dictionary keys are the arguments of this method.
Parameters
----------
- equation : str or Iterable of str of the kind 'lhs == rhs'
Equation or list of equations to add
- name : str, optional
Name of the equation; default None => 'lhs == rhs'
- reference : Number, numpy.ndarray or "norm", optional
Reference value(s) used to normalize the equation; default is 1.
If value is "norm", actual reference value is estimated from order of magnitude.
Returns
-------
MathematicalProblem
The modified mathematical problem
"""
self.__lock_check("add_equation")
return self._math.add_equation(equation, name, reference)
[docs]
def add_target(self,
name: Union[str, Iterable[Union[dict, str, Tuple[str, str]]]],
reference: Union[Number, numpy.ndarray, str] = 1,
weak = False,
) -> MathematicalProblem:
"""Add deferred off-design equation on a targetted quantity.
Target equations may be added one by one, or provided by a list of dictionaries to add multiple equations at once.
The dictionary keys are the arguments of this method.
Parameters
----------
- name: str, optional
Name of the equation; default None => 'lhs == rhs'
- reference: Number, numpy.ndarray or "norm", optional
Reference value(s) used to normalize the equation; default is 1.
If value is "norm", actual reference value is estimated from order of magnitude.
- weak: bool, optional
If True, the target is disregarded if the corresponding variable is connected; default is `False`.
Returns
-------
MathematicalProblem
The modified mathematical problem
"""
self.__lock_check("add_target")
return self._math.add_target(name, reference, weak)
[docs]
def add_event(self,
name: str,
desc: str = "",
trigger: Optional[Union[str, Event, EventState, ZeroCrossing]] = None,
final: bool = False,
) -> Event:
"""Add an event to system.
An event occurrence can either be determined by the system itself,
or triggered by an event from another system.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
- name [str]:
Name of the event.
- desc [str, optional]:
Event description; defaults to ''.
- trigger [Union[str, Event, EventState, ZeroCrossing], optional]:
String, primary or derived event defining the event trigger; defaults to `None`.
- final [bool, optional]:
Defines whether or not event is final; defaults to `False`.
Returns
-------
- event [Event]:
The newly created event.
Examples
--------
>>> import numpy as np
>>>
>>> class PointMassDynamics(System):
>>> # Free fall of a point mass, with friction
>>> def setup(self):
>>> self.add_inward('mass', 1.2, desc='Mass')
>>> self.add_inward('cf', 0.1, desc='Friction coefficient')
>>> self.add_inward('g', np.r_[0, 0, -9.81], unit='m/s**2', desc='External acceleration field')
>>>
>>> self.add_outward('a', np.zeros(3), unit='m/s**2', desc='Acceleration')
>>> self.add_transient('v', der='a')
>>> self.add_transient('x', der='v')
>>>
>>> def compute(self):
>>> self.a = self.g - (self.cf / self.mass * np.linalg.norm(self.v)) * self.v
>>>
>>> class BouncingPointMass(System):
>>> def setup(self):
>>> self.add_child(PointMassDynamics('dyn'), pulling=['x', 'v', 'a', 'mass', 'cf', 'g'])
>>> self.add_event('rebound', trigger="x[2] <= 0")
>>>
>>> def transition(self):
>>> if self.rebound.present:
>>> self.v[2] *= -1
"""
self.__lock_check("add_event")
# Type and name validation
check_arg(name, 'name', str)
name = Variable.name_check(name)
self.__check_attr(name, f"cannot add event {name!r}")
# Event creation
self.__events[name] = event = Event(name, self, desc, trigger, final)
# Getter
cls = self.__class__
def getter(self):
try:
return self.__events[name]
except KeyError:
raise AttributeError(f"{cls.__name__} object {self.name!r} has no attribute {name!r}")
setattr(cls, name, property(getter))
self._add_member(name)
return event
[docs]
def add_inward_modevar(self,
name: str,
value: Any = False,
unit: str = "",
dtype: Types = None,
desc: str = "",
scope: Scope = Scope.PRIVATE,
) -> None:
"""Add an inward mode variable to the `System`.
A mode variable is a piecewise constant variable in each continuous time phase.
Like any input, inward mode variables should not be modified during system transitions.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
name : str
Name of the variable
value : Any, optional
Value of the variable; default 1
unit : str, optional
Variable unit; default empty string (i.e. dimensionless)
dtype : type
Variable type; default None (i.e. type of initial value)
desc : str, optional
Variable description; default ''
scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional
Variable visibility; default PRIVATE
Examples
--------
>>> class MultimodeSystem(System):
>>> def setup(self):
>>> self.add_inward_modevar('composite', True)
>>> self.reconfig()
>>>
>>> def reconfig(self):
>>> # Define one or two sub-systems, depending on mode
>>> for name in list(self.children):
>>> self.pop_child(name)
>>> if self.composite:
>>> a = self.add_child(Foo('a'))
>>> b = self.add_child(Bar('b'))
>>> self.connect(a, b, {'x', 'y'})
>>> else:
>>> self.add_child(Bogus('c'))
>>>
>>> def transition(self):
>>> self.reconfig()
"""
self.__lock_check("add_inward_modevar")
# Type validation
check_arg(name, 'name', str)
self.__check_attr(name, f"cannot add inward {name!r}")
port = self.inputs[System.MODEVARS_IN]
port.add_mode_variable(name, value, unit, dtype, desc, scope=scope)
ref = VariableReference(context=self, mapping=port, key=name)
self.append_name2variable(
[(f"{System.MODEVARS_IN}.{name}", ref), (name, ref)]
)
[docs]
def add_outward_modevar(self,
name: str,
value: Optional[Any] = None,
unit: str = "",
dtype: Types = None,
desc: str = "",
init: Optional[Any] = None,
scope: Scope = Scope.PRIVATE,
) -> None:
"""Add an outward mode variable to the `System`.
A mode variable is a piecewise constant variable in each continuous time phase.
It may only be modified in the `transition` method of the `System`, and never
inside method `compute`. Notwithstanding, its value may be used in `compute`.
The modification of an output mode variable is often associated with the occurence
of an event in the system.
This function cannot be called outside :py:meth:`~cosapp.systems.system.System.setup`.
Parameters
----------
name : str
Name of the variable
value : Any, optional
Value of the variable; default 1
unit : str, optional
Variable unit; default empty string (i.e. dimensionless)
dtype : type
Variable type; default None (i.e. type of initial value)
desc : str, optional
Variable description; default ''
init : Any, optional
Value imposed at the beginning of time simulations.
If unspecified (default), the variable remains untouched.
scope : Scope {PRIVATE, PROTECTED, PUBLIC}, optional
Variable visibility; default PRIVATE
Examples
--------
>>> class ThresholdSystem(System):
>>> def setup(self):
>>> self.add_inward('threshold', 1.0)
>>> self.add_inward('x', 0.0)
>>> self.add_outward('y', 0.0)
>>> self.add_event('activates', trigger='x >= threshold')
>>> self.add_outward_modevar('activated', init='x > threshold')
>>>
>>> def compute(self):
>>> self.y = self.x if self.activated else 0.0
>>>
>>> def transition(self):
>>> if self.activates.present:
>>> self.activated = True
"""
self.__lock_check("add_outward_modevar")
# Type validation
check_arg(name, 'name', str)
self.__check_attr(name, f"cannot add outward {name!r}")
port = self.outputs[System.MODEVARS_OUT]
port.add_mode_variable(name, value, unit, dtype, desc, init=init, scope=scope)
ref = VariableReference(context=self, mapping=port, key=name)
self.append_name2variable(
[(f"{System.MODEVARS_OUT}.{name}", ref), (name, ref)]
)
[docs]
def new_problem(self, name='problem') -> MathematicalProblem:
"""Create a new, empty `MathematicalProblem` in the context of system.
Parameters
----------
name: str
Name of the mathematical problem. Defaults to 'problem'.
Returns
-------
MathematicalProblem
The newly created mathematical problem.
"""
return MathematicalProblem(name, self)
[docs]
def add_design_method(self, name: str) -> MathematicalProblem:
"""Add a design method to the `System`
A design method is a set of free variables and equations that defines a way to design the `System`
It is a easy way to pre-define a design of this `System` for users
The returned mathematical system is empty. It should be populated with the needed variables and
equations (see Examples).
Parameters
----------
name: str
The name of the design method
Returns
-------
MathematicalProblem
The newly created mathematical problem.
Examples
--------
>>> system1.add_design_method("method1") \
>>> .add_unknown([
>>> dict(name="x", max_rel_step=0.1),
>>> "y"
>>> ]) \
>>> .add_equation([
>>> "u == 0",
>>> "v == 800"
>>> ])
"""
self.__lock_check("add_design_method")
self.design_methods[name] = mathpb = self.new_problem(name)
return mathpb
[docs]
def design(self, method: str) -> MathematicalProblem:
"""Returns the chosen group of design equations.
Parameters
----------
method : str
Name of the group of equations to extract
Returns
-------
MathematicalProblem
Mathematical system to solve for the chosen method
"""
return self.design_methods[method]
[docs]
def log_debug_message(
self,
handler: "HandlerWithContextFilters",
record: logging.LogRecord,
format: LogFormat = LogFormat.RAW
) -> bool:
"""Callback method on the system to log more detailed information.
This method will be called by the log handler when :py:meth:`~cosapp.utils.logging.LoggerContext.log_context`
is active if the logging level is lower or equals to VERBOSE_LEVEL. It allows
the object to send additional log message to help debugging a simulation.
.. note::
logger.log method cannot be used here. Use handler.handle(record)
Parameters
----------
handler : HandlerWithContextFilters
Log handler on which additional message should be published.
record : logging.LogRecord
Log record
format : LogFormat
Format of the message
Returns
-------
bool
Should the provided record be logged?
"""
def print_port(header, ports) -> List[str]:
msg = [f"#### {header}", ]
for name, port in ports.items():
if len(port) == 0:
continue
msg.append(f"- {name}:")
for var in port:
value = port[var]
if is_number(value):
v = f"{value:.5g}"
else:
v = f"{value!s}"
msg.append(f" - {var} = {v}")
return "\n".join(msg)
message = record.getMessage()
activate = getattr(record, "activate", None)
emit_record = super().log_debug_message(handler, record, format)
if message.endswith("call_setup_run") or message.endswith("call_clean_run"):
emit_record = False
elif activate == True:
handler.log(
LogLevel.FULL_DEBUG,
print_port(f"{self!s} - Inputs", self.inputs),
name=logger.name
)
emit_record = False
elif activate == False:
handler.log(
LogLevel.FULL_DEBUG,
print_port(f"{self!s} - Outputs", self.outputs),
name=logger.name
)
emit_record = False
return emit_record
[docs]
def call_setup_run(self, skip_driver: bool = False) -> None:
"""Execute `setup_run` recursively on all modules.
Parameters
----------
skip_drivers : bool
Skip calling :py:meth:`cosapp.drivers.driver.Driver.setup_run`
"""
if not skip_driver:
for driver in self.drivers.values():
driver.call_setup_run()
super().call_setup_run()
[docs]
def call_clean_run(self, skip_driver: bool = False) -> None:
"""Execute `clean_run` recursively on all modules.
Parameters
----------
skip_drivers : bool
Skip calling :py:meth:`cosapp.drivers.driver.Driver.clean_run`
"""
super().call_clean_run()
if not skip_driver:
for driver in self.drivers.values():
driver.call_clean_run()
def _postcompute(self) -> None:
"""Actions performed after the `System.compute` call."""
if self.is_clean(PortType.IN):
if self._is_tree_clean:
self.set_clean(PortType.OUT)
else:
self.set_clean(PortType.IN)
self.set_dirty(PortType.OUT)
# Evaluate the residues
for residue in self.residues.values():
residue.update()
[docs]
def transition(self) -> None:
"""Method describing system transition upon the occurrence of events (if any).
Does not do anything by default, but should be implemented for systems with events.
Examples
--------
>>> import numpy as np
>>>
>>> class PointMassDynamics(System):
>>> # Free fall of a point mass, with friction
>>> def setup(self):
>>> self.add_inward('mass', 1.2, desc='Mass')
>>> self.add_inward('cf', 0.1, desc='Friction coefficient')
>>> self.add_inward('g', np.r_[0, 0, -9.81], desc='External acceleration field')
>>> self.add_outward('a', np.zeros(3), desc='Acceleration')
>>> self.add_transient('v', der='a')
>>> self.add_transient('x', der='v')
>>>
>>> def compute(self):
>>> v_norm = np.linalg.norm(self.v)
>>> self.a = self.g - (self.cf / self.mass * v_norm) * self.v
>>>
>>> class BouncingPointMass(PointMassDynamics):
>>> def setup(self):
>>> super().setup()
>>> self.add_event('rebound', trigger="x[2] <= 0")
>>>
>>> def transition(self):
>>> if self.rebound.present:
>>> self.v[2] *= -1
"""
pass
[docs]
def retrieve_incoming_data(self) -> None:
"""Transfer data from all incoming connectors"""
for connector in self.incoming_connectors():
connector.transfer()
[docs]
def tree_transition(self) -> None:
"""Invoke transition in entire system tree"""
for system in self.tree():
system.retrieve_incoming_data()
with system.__free_problem:
system.transition()
[docs]
def run_once(self) -> None:
"""Run the system once.
Execute the model of this `System` and its children in the execution order.
Notes
-----
The drivers are not executed when calling this method; only the physical model.
"""
with System.set_master(repr(self)) as is_master:
if is_master:
logger.debug("Start setup_run recursive calls.")
self.call_setup_run(skip_driver=True)
with self.log_context(" - run_once"):
if self.is_active():
self._precompute()
dirty_inputs = not self.is_clean(PortType.IN)
if dirty_inputs:
logger.debug(f"Call {self.name}.compute_before")
with self.__context_lock:
self.compute_before()
else:
logger.debug(f"Skip {self.name}.compute_before - Clean inputs")
dirty_tree = not self._is_tree_clean
any_dirty_child = False
if dirty_inputs or dirty_tree:
for child in self.children.values():
# Update connectors
for connector in self.__child_connectors.get(child.name, []):
connector.transfer()
# Execute sub-system
logger.debug(f"Call {self.name}.{child.name}.run_once()")
child.run_once()
any_dirty_child |= not child.is_clean()
# Pull values from subsystems
for connector in self.__pulling_connectors:
connector.transfer()
if dirty_inputs or any_dirty_child:
logger.debug(f"Call {self.name}.compute")
self._compute_calls += 1
with self.__context_lock:
self.__runner.compute()
else:
logger.debug(f"Skip {self.name}.compute - Clean inputs")
self._is_tree_clean = not any_dirty_child
self._postcompute()
self.computed.emit()
else:
logger.debug(f"Skip {self.name} execution - Inactive")
if is_master:
logger.debug("Start clean_run recursive calls.")
self.call_clean_run(skip_driver=True)
[docs]
def any_active_driver(self) -> bool:
return any(driver.is_active() for driver in self.drivers.values())
[docs]
def run_drivers(self) -> None:
"""Run the drivers defined on this `System`.
"""
with System.set_master(repr(self), type_checking=False) as is_master:
if is_master:
self.open_loops()
logger.debug("Start setup_run recursive calls.")
self.call_setup_run()
if self.any_active_driver():
if self.is_standalone(): # System not standalone can't set the mathematical problem
logger.debug(f"Exec order for {self.name}: {list(self.exec_order)}")
for driver in self.drivers.values():
logger.debug(f"Call driver {driver.name}.run_once on {self.name}")
driver.run_once()
else:
self.run_children_drivers()
if is_master:
logger.debug("Start clean_run recursive calls.")
self.call_clean_run()
self.close_loops()
[docs]
def run_children_drivers(self) -> None:
"""Solve the children `System` in the execution order.
"""
with System.set_master(repr(self), type_checking=False) as is_master:
if is_master:
logger.debug("Start setup_run recursive calls.")
self.call_setup_run()
with self.log_context(" - run_children_drivers"):
if self.is_active():
self._precompute()
dirty_inputs = not self.is_clean(PortType.IN)
if dirty_inputs:
logger.debug(f"Call {self.name}.compute_before")
with self.__context_lock:
self.compute_before()
else:
logger.debug(f"Skip {self.name}.compute_before - Clean inputs")
dirty_tree = not self._is_tree_clean
any_dirty_child = False
if dirty_inputs or dirty_tree:
for child in self.children.values():
# Update connectors
for connector in self.__child_connectors.get(child.name, []):
connector.transfer()
# Execute sub-system
logger.debug(f"Call {self.name}.{child.name}.run_once()")
child.run_drivers()
any_dirty_child |= not child.is_clean()
# Pull values from subsystems
for connector in self.__pulling_connectors:
connector.transfer()
if dirty_inputs or any_dirty_child:
self._compute_calls += 1
with self.__context_lock:
self.__runner.compute()
else:
logger.debug(f"Skip {self.name}.compute - Clean inputs")
self._is_tree_clean = not any_dirty_child
self._postcompute()
self.computed.emit()
else:
logger.debug(f"Skip {self.name} execution - Inactive")
if is_master:
logger.debug("Start clean_run recursive calls.")
self.call_clean_run()
[docs]
def is_standalone(self) -> bool: # TODO name is confusing as the real answer is I have a non-linear solver
"""Is this System able to solve itself?
Returns
-------
bool
Ability to solve the system or not.
"""
return any(driver.is_standalone() for driver in self.drivers.values())
[docs]
def connect(self,
object1: Union[BasePort, System],
object2: Union[BasePort, System],
mapping: Union[str, List[str], Dict[str, str], None] = None,
cls: Optional[Type[BaseConnector]] = None,
**kwargs
) -> None:
"""Connect two systems or two ports.
This method connects `object1` to `object2`. If no mapping is provided, connection
will be made between variables based on their names. If a name mapping is provided as a list,
the name should be present in both objects. If the mapping is specified as a dictionary,
keys are expected to belong to `object1`, and values to `object2`.
If both objects are systems, mapping is mandatory (full system connections are forbidden).
In this case, each (key, value) pair of the mapping leads to a port-to-port connection.
Connections are oriented (i.e. the direction of value transfer is fixed). The decision tree
is as following:
- If one port is of type input and the other one of type output, the connection flows from
the output to the input.
- Else if one port (portA) belong to the parent `System` of the other port (portB), the
connection flows from portA to portB if portA is an input. And it flows from portB to
portA if portA is an output.
- Else if both are inputs, the port is pulled on the parent and connected to the two children.
Connectors can only connect systems at the same level or a system with its parent. In all
cases the connectors will be stored by the parent hosting the two connected systems.
Parameters
----------
object1 : Union[BasePort, System]
First end-point of connector.
object2 : Union[BasePort, System]
Second end-point of connector.
mapping : str or List[str] or Dict[str, str], optional
(List of) common name(s) or mapping name dictionary; default None (i.e. no mapping).
cls : Type[BaseConnector], optional
Connector type. When specified, `cls` must be a specialization of `BaseConnector`.
If `cls` is `None` (default), the actual connector type is either `Connector`,
or a port-specific connector, if any. Port bound connectors are automatically selected
when both ports are of same type, and when port class contains inner class `Connector`,
derived from `BaseConnector`.
**kwargs :
Additional keyword arguments forwarded to `cls`.
Examples
--------
Here is an example, in which we assume a port `p_in` has inputs of a system `s1`
that come from some output port `p_out` of another system `s2`. In the last example,
`connect` is called with two child systems, rather than ports. The container system
being called `top`:
>>> class DemoPort(Port):
>>> def setup(self):
>>> self.add_variable('a', 1.0)
>>> self.add_variable('b', 2.0)
>>> self.add_variable('c', 3.0)
>>>
>>> class DummySystem(Port):
>>> def setup(self):
>>> self.add_inward('x', 1.0)
>>> self.add_outward('y', 0.0)
>>> self.add_input(DemoPort, 'p_in')
>>> self.add_output(DemoPort, 'p_out')
>>>
>>> top = System('top')
>>> top.add(DummySystem('s1'))
>>> top.add(DummySystem('s2'))
>>>
>>> top.connect(top.s1.p_in, top.s2.p_out, 'a')
>>> top.s1.p_in.a = 4
>>> top.run_once()
>>> assert top.s1.p_in.a == top.s2.p_out.a
>>>
>>> top.connect(top.s1.p_in, top.s2.p_out, {'b': 'c'})
>>> top.run_once()
>>> assert top.s1.p_in.b == top.s2.p_out.c
>>>
>>> top.connect(top.s1, top.s2, {'x': 'y', 'p_in.c': 'p_out.b'})
>>> top.run_once()
>>> assert top.s1.x == top.s2.y
>>> assert top.s1.p_in.c == top.s2.p_out.b
"""
same_kind = lambda base: isinstance(object1, base) and isinstance(object2, base)
if same_kind(BasePort):
self.__connect_ports(object1, object2, mapping, cls, **kwargs)
elif same_kind(System):
self.__connect_systems(object1, object2, mapping, cls, **kwargs)
else:
raise TypeError(
f"Connected objects must be either two ports or two systems"
f"; got {type(object1).__name__!r} and {type(object2).__name__!r}"
)
def __connect_systems(self,
system1: System,
system2: System,
mapping: Union[str, List[str], Dict[str, str]],
cls: Optional[Type[BaseConnector]] = None,
**kwargs
) -> None:
"""Connect two sub-systems together.
This method connects `system1` to `system2`. Full system connections are forbidden, as they
are potentially ambiguous. As a consequence, name mapping is mandatory.
If name mapping is provided as a collection, the name should be present in both systems.
If it is specified as a dictionary, keys pertain to `system1`, values to `system2`.
Each (key, value) pair of the mapping gives rise to a port-to-port connector.
Connections are oriented (i.e. the direction of value transfer is fixed). The decision tree
is as following:
- If one port is of type input and the other one of type output, the connection flows from
the output to the input.
- Else if one port (portA) belong to the parent `System` of the other port (portB), the
connection flows from portA to portB if portA is an input. And it flows from portB to
portA if portA is an output.
- Else if both are inputs, the port is pulled on the parent and connected to the two children.
Connectors can only connect systems at the same level or a system with its parent. In all
cases the connectors will be stored by the parent hosting the two connected systems.
Parameters
----------
port1 : BasePort
First end-point of connector.
port2 : BasePort
Second end-point of connector.
mapping : str or List[str] or Dict[str, str], optional
(List of) common name(s) or mapping name dictionary; default None (i.e. no mapping).
cls : Type[BaseConnector], optional
Connector type. When specified, `cls` must be a specialization of `BaseConnector`.
If `cls` is `None` (default), the actual connector type is either `Connector`,
or a port-specific connector, if any. Port bound connectors are automatically selected
when both ports are of same type, and when port class contains inner class `Connector`,
derived from `BaseConnector`.
**kwargs :
Additional keyword arguments forwarded to `cls`.
"""
if mapping is None:
raise ConnectorError(
"Full system connections are forbidden; please provide port or variable mapping."
)
def get_variable(system: System, varname: str):
key = varname if system is self else f"{system.name}.{varname}"
try:
return self.name2variable[key]
except KeyError:
raise AttributeError(
f"{key!r} not found in {self.name!r}"
)
mapping = BaseConnector.format_mapping(mapping)
for name1, name2 in mapping.items():
obj1 = getattr(system1, name1)
obj2 = getattr(system2, name2)
same_kind = lambda base: isinstance(obj1, base) and isinstance(obj2, base)
if same_kind(BasePort):
self.__connect_ports(obj1, obj2, cls=cls, **kwargs)
elif same_kind(System):
self.__connect_systems(obj1, obj2, mapping=None) # raises ValueError
else:
# Mapping between variables
obj1 = get_variable(system1, name1)
obj2 = get_variable(system2, name2)
self.__connect_ports(
obj1.mapping,
obj2.mapping,
{obj1.key: obj2.key},
cls, **kwargs,
)
def __connect_ports(self,
port1: BasePort,
port2: BasePort,
mapping: Union[str, List[str], Dict[str, str], None] = None,
cls: Optional[Type[BaseConnector]] = None,
**kwargs
) -> None:
"""Connect two ports together.
This method connects `port1` to `port2`. If no mapping is provided, connection
will be made between variables based on their names. If a name mapping is provided as a list,
the name should be present in both port. And if the mapping is specified as a dictionary,
the keys belong to `port1` and the values to `port2`.
An connection is oriented (i.e. the direction of value transfer is fixed). The decision tree
is as following:
- If one port is of type input and the other one of type output, the connection flows from
the output to the input.
- Else if one port (portA) belong to the parent `System` of the other port (portB), the
connection flows from portA to portB if portA is an input. And it flows from portB to
portA if portA is an output.
- Else if both are inputs, the port is pulled on the parent and connected to the two children.
Connectors can only connect systems at the same level or a system with its parent. In all
cases the connectors will be stored by the parent hosting the two connected systems.
Parameters
----------
port1 : BasePort
First end-point of connector.
port2 : BasePort
Second end-point of connector.
mapping : str or List[str] or Dict[str, str], optional
(List of) common name(s) or mapping name dictionary; default None (i.e. no mapping).
cls : Type[BaseConnector], optional
Connector type. When specified, `cls` must be a specialization of `BaseConnector`.
If `cls` is `None` (default), the actual connector type is either `Connector`,
or a port-specific connector, if any. Port bound connectors are automatically selected
when both ports are of same type, and when port class contains inner class `Connector`,
derived from `BaseConnector`.
**kwargs :
Additional keyword arguments forwarded to `cls`.
"""
# Type validation
def check_port(port: BasePort, argname: str):
check_arg(port, argname, BasePort, stack_shift=1)
if port.owner is None:
raise ValueError(f"Cannot connect orphan port {port.contextual_name!r}")
check_port(port1, 'port1')
check_port(port2, 'port2')
check_arg(mapping, 'mapping', (str, list, dict, MappingProxyType, type(None)))
if cls is None:
ptype = type(port1)
if ptype is type(port2):
# Check if there exists a port-specific connector
try:
cls = getattr(ptype, "Connector")
except AttributeError:
cls = Connector
else:
pname = ptype.__name__
logger.info(
f"{port1.contextual_name!r} and {port2.contextual_name!r}"
f" connected by port-specific connector `{pname}.Connector`."
)
else:
cls = Connector
if cls is not Connector:
try:
SystemConnector.check(cls)
except (TypeError, ValueError) as error:
error.args = (
f"`cls` must be a concrete implementation of `BaseConnector`; got {cls!r}",
)
raise
# Generate mapping dictionary
if mapping is None:
mapping = dict((v, v) for v in port1 if v in port2)
else:
mapping = BaseConnector.format_mapping(mapping)
if len(mapping) == 0:
warnings.warn(
f"Skipped empty connector between {port1.full_name()} and {port2.full_name()}"
)
return
child_connectors = self.__child_connectors
def contextual_name(port: BasePort) -> str:
return port.name if port.owner is self else port.contextual_name
def create_connector(sink: BasePort, source: BasePort, mapping: Dict[str, str]) -> None:
# Additional validation for sink Port
# - Source should be Port (the opposite case is possible == connect sensor)
# - If mapping does not cover all variables, print a log message
source_name = contextual_name(source)
sink_name = contextual_name(sink)
if isinstance(sink, ModeVarPort) and sink.is_input and not isinstance(source, ModeVarPort):
raise ConnectorError(
f"Input mode variables cannot be connected to continuous time variables"
f" ({source_name} -> {sink.owner.name})."
)
if isinstance(sink, Port):
if not (len(sink) == len(source) == len(mapping)):
absent = [f"{sink.name}.{v}" for v in sink if v not in mapping.keys()]
absent.extend(f"{source.name}.{v}" for v in source if v not in mapping.values())
logger.debug(
"Partial connection between {!r} and {!r}. "
"Variables ({}) are not part of the mapping.".format(
sink_name, source_name, ", ".join(absent)
)
)
name = f"{source_name} -> {sink_name}"
new_connector = SystemConnector(
cls(name, sink, source, mapping, **kwargs)
)
connectors = self.connectors()
# Check that variables are set only once
for connector in connectors.values():
if connector.sink is sink:
shared = set(connector.sink_variables()).intersection(new_connector.sink_variables())
if shared:
if len(shared) == 1:
variables = f"Variable {shared.pop()} is"
else:
variables = ", ".join(sorted(shared))
variables = f"Variables {sink_name}.{{{variables}}} are"
raise ConnectorError(f"{variables} already set by {connector}")
try:
# Check if connector already exists
connector = connectors[new_connector.name]
except KeyError:
# New connector
if sink.owner is self:
self.__pulling_connectors.append(new_connector)
else:
target = new_connector.sink.owner.name
child_connectors.setdefault(target, [])
child_connectors[target].append(new_connector)
else:
# Sink and source are already connected: update connector
connector.update_mapping(new_connector.mapping)
if sink.owner is source.owner.parent:
lower = source
elif source.owner is sink.owner.parent:
lower = sink
else:
lower = sink
lower.owner.__reset_input_mapping()
err_msg = f"Ports {port1.contextual_name!r} and {port2.contextual_name!r} cannot be connected"
if port1.owner is port2.owner:
raise ConnectorError(
f"{err_msg}. Connecting ports of the same system is forbidden."
)
if port1.direction != port2.direction: # one port is IN and the other is OUT
if port1.owner is port2.owner.parent or port2.owner is port1.owner.parent:
raise ConnectorError(
f"{err_msg}; parent/child connections are only allowed between same-direction ports."
)
elif not (port1.owner.parent is port2.owner.parent is self):
raise ConnectorError(
f"{err_msg}. Only ports belonging to direct children of {self.name!r} can be connected."
)
elif not (
self is port1.owner.parent is port2.owner
or (self is port1.owner is port2.owner.parent)
or (port1.owner in self.children.values() and port2.owner in self.children.values())
):
raise ConnectorError(
f"{err_msg}. Same-direction ports can only be connected between a child and its parent, "
"or between sibling sub-systems."
)
reciprocal = lambda d: dict((v, k) for k, v in d.items())
if port1.is_input and port2.is_output:
create_connector(port1, port2, mapping)
elif port1.is_output and port2.is_input:
create_connector(port2, port1, reciprocal(mapping))
else:
if port1.owner is port2.owner.parent:
if port1.is_input:
create_connector(port2, port1, reciprocal(mapping))
else: # port1.is_output
create_connector(port1, port2, mapping)
elif port2.owner is port1.owner.parent:
if port2.is_input:
create_connector(port1, port2, mapping)
else: # port2.is_output
create_connector(port2, port1, reciprocal(mapping))
elif port1.is_input: # Both port are inputs
# In this case the port is transfer on the parent to serve as
# true source for all subsystems.
# For Port, the port is duplicated in the parent. For ExtensiblePort,
# the variable is added in the INWARDS port of the parent.
if isinstance(port1, ModeVarPort) != isinstance(port2, ModeVarPort):
for p1_var, p2_var in mapping.items():
# One variable is an inward, the other an input mode variable
raise ConnectorError(
f"Input mode variables cannot be connected to continuous time variables"
f" ({port1.owner.name}.{p1_var} <-> {port2.owner.name}.{p2_var})."
)
for p1_var, p2_var in mapping.items():
connected = False
# Check if the variable is connected
for c1 in filter(lambda c: c.sink is port1, child_connectors.get(port1.owner.name, [])):
if p1_var in c1.sink_variables(): # Already connected
# Check that the other variable is not connected
for c2 in filter(lambda c: c.sink is port2, child_connectors.get(port2.owner.name, [])):
if p2_var in c2.source_variables():
raise ConnectorError("{}.{} is already connected to {}.{}".format(
port1.contextual_name, p1_var,
port2.contextual_name, p2_var,
)
)
# Connect the other variable
self.connect(port2, c1.source, {p2_var: c1.source_variable(p1_var)})
connected = True
break
if connected:
continue
# Check if the other variable is connected
for c2 in filter(lambda c: c.sink is port2, child_connectors.get(port2.owner.name, [])):
if p2_var in c2.sink_variables():
# We know that p1_var is not connected
# Connect the variable
self.connect(port1, c2.source, {p1_var: c2.source_variable(p2_var)})
connected = True
break
if connected:
continue
# General case
# create the pulled port
if isinstance(port2, Port):
pulled_port_name = f"{port2.owner.name}_{port2.name}"
source_name = p2_var
# Is the full port pulled or only a portion of it?
if all(v in mapping.values() for v in port2):
pull_variables(port2.owner, {port2.name: pulled_port_name})
else:
pulled_port = port2.copy(pulled_port_name)
self._add_port(pulled_port)
self.connect(port2, pulled_port, p1_var)
else:
# Variables are both inwards, or input mode variables
pulled_port_name = port2.name
source_name = f"{port2.owner.name}_{p2_var}"
pull_variables(port2.owner, {p2_var: source_name})
# create the other connection
self.connect(port1, self.inputs[pulled_port_name], {p1_var: source_name})
else:
raise ConnectorError(f"{err_msg} as they are both outputs.")
[docs]
def open_loops(self):
"""Open closed loops in children relations."""
logger.debug(f"Call {self.name}.open_loops")
connectors_to_open = list() # type: List[SystemConnector]
child_connectors = self.__child_connectors
self.__input_mapping = None
name2var = self.name2variable
loop = self.__loop_problem
loop.clear()
# Add top system in the execution loop so connection to it won't be opened
execution_loop = [self] # type: List[System]
for child in self.children.values():
execution_loop.append(child)
child.open_loops()
for connector in child_connectors.get(child.name, []):
if connector.source.owner not in execution_loop:
connectors_to_open.append(connector)
for connector in connectors_to_open:
logger.debug(f"Connector {connector!r} to be opened.")
sink, source = connector.sink, connector.source
if sink.is_output:
raise ValueError(
f"Connector {connector.name!r} cannot be opened, as its sink is an output."
)
# Check that all variables are of numerical types
for name in connector.sink_variables():
value = sink[name]
if not is_numerical(value):
raise TypeError(
f"Cannot open connector {connector.name!r} to resolve system"
f", as variable {name!r} is non-numerical (type is {type(value).__name__})."
)
connector.deactivate()
# Set mathematical problem
sink_name = sink.contextual_name
source_name = source.contextual_name
owner_unknowns = dict(
(unknown.ref, unknown)
for unknown in sink.owner.unknowns.values()
)
for target, origin in connector.mapping.items():
unknown_name = natural_varname(f"{sink_name}.{target}")
var = name2var[unknown_name]
options = {}
try:
unknown = owner_unknowns[var]
except KeyError:
pass
else:
options = {
attr: getattr(unknown, attr)
for attr in (
'max_abs_step',
'max_rel_step',
'upper_bound',
'lower_bound',
)
}
equation = natural_varname(f"{unknown_name} == {source_name}.{origin}")
loop.add_unknown(unknown_name, **options)
loop.add_equation(
equation,
name=f"{equation} (loop)",
reference=1.0,
)
[docs]
def close_loops(self):
"""Close loops opened to allow system resolution."""
logger.debug(f"Call {self.name}.close_loops")
for elem in self.tree():
elem.__loop_problem.clear()
for connector in elem.all_connectors():
connector.activate()
[docs]
def convert_to(self, *args, **kwargs) -> None:
"""Convert system into another `System`.
Note: not implemented for class `System`.
Raises
------
TypeError
If the current `System` cannot be converted.
"""
# TODO: this method should be removed altogether
raise TypeError(
f"Cannot convert {self.name!r}: system is not part of a family"
)
[docs]
@classmethod
def check_config_dict(cls, params: dict) -> None:
"""Check if the provided dictionary respects the convention of a model file.
Parameters
----------
params : dict
Dictionary to be tested
Raises
------
jsonschema.exceptions.ValidationError
If the provided dictionary does not conform to the JSON schema.
"""
# Check that the file input file respects the JSON schema
path = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(path, 'system.schema.json')) as fp:
config_schema = json.load(fp)
jsonschema.validate(params, config_schema)
[docs]
@classmethod
def load(cls, filepath: Union[str, Path, StringIO], name: Optional[str] = None) -> System:
"""Load configuration from file-like object.
Parameters
----------
filepath : str or Path or file-like
Filepath or file-like object (i.e. has a .read() method)
name : str, optional
Name of the newly created system.
If unspecified (default), uses the name contained in `filepath`.
Returns
-------
System
The loaded system.
"""
if isinstance(filepath, (str, Path)):
with open(filepath) as fp:
params = json.load(fp)
elif hasattr(filepath, 'read'):
params = json.load(filepath)
else:
raise TypeError("Input parameter should be a filepath or file-like object.")
cls.check_config_dict(params)
# Remove schema reference entry
params.pop('$schema', None)
# Convert variables if needed
params = decode_cosapp_dict(params)
system = cls.load_from_dict(*params.popitem())
if name:
system.name = name
return system
[docs]
@classmethod
def load_from_dict(cls, name: str, parameters: dict) -> System:
"""Instantiate a `System` from its name and its parameters.
The construction of the `System` is done recursively from the lower level. Therefore this
function is also called recursively returning to the upper level, the lower `System` and
its connections to the upper `System` via a dictionary. If no connections are to forwarded
the second returned argument is `None`.
Parameters
----------
name : str
Identifier of the system
parameters : dict
The dictionary containing the system definition
Returns
-------
System
The generated system
"""
# TODO The all process of saving and reading from a file should be overhaul to take into account missing attr
# - setup kwargs
# - design methods (?)
# Moreover what should be the source of truth JSON or Python? Currently reading from JSON duplicates Python
# work as first the object is instantiated. Then its children are removed to create them (and the associated
# connections from the JSON file.
def get_system_class(class_name: str):
if class_name == "System":
return System
check_arg(class_name, 'class_name', str, stack_shift=1)
try:
component_module, class_name = class_name.rsplit('.', maxsplit=1)
except ValueError:
component_module = ""
system_class = None
found_module = ""
component_libs = System._components_librairies
for lib_name in component_libs:
try:
if component_module:
mod_name = f"{lib_name}.{component_module}"
if mod_name.startswith('.'):
mod_name = mod_name[1:]
lib_module = importlib.import_module(mod_name)
elif lib_name:
lib_module = importlib.import_module(lib_name)
else:
raise ImportError
except ImportError:
# Try next possible Python package to find module
continue
else:
# Found an existing module
found_module = lib_module.__file__
try:
system_class = getattr(lib_module, class_name)
except AttributeError:
# Class not in module
continue
else:
break
if system_class is None:
raise AttributeError(
f"Class {class_name!r} was not found in module: \n{found_module}"
if found_module else
f"Class {class_name!r} is not in one of the listed component librairies:\n{component_libs}"
)
elif not issubclass(system_class, System):
raise AttributeError(
f"Class {system_class.__name__!r} does not inherit from base class 'System'"
)
return system_class
class_name = parameters.get('class', 'System')
ctor_kwargs = parameters.get('setup_args', {})
system_class = get_system_class(class_name)
top_system = system_class(name=name, **ctor_kwargs)
for name in top_system.children:
top_system.name2variable.pop(name)
# Remove children and connectors --> the source of truth is the json file
top_system.children.clear()
top_system.__child_connectors.clear()
top_system.__pulling_connectors.clear()
top_system.__readonly = parameters.get('properties', {}).copy()
for name, value in parameters.get('inputs', {}).items():
top_system[name] = value
for name_system, params in parameters.get('subsystems', {}).items():
subsystem = cls.load_from_dict(name_system, params)
top_system.add_child(subsystem)
for connection in parameters.get('connections', []):
if len(connection) == 2:
sink, source = connection
mapping = None
else:
sink, source, mapping = connection
top_system.connect(top_system[sink], top_system[source], mapping)
try:
exec_order = parameters['exec_order']
except KeyError:
pass
else:
if exec_order != list(top_system.exec_order):
top_system.exec_order = exec_order
return top_system
[docs]
def save(self, fp, indent=2, sort_keys=True) -> None:
"""Serialize the `System` as a JSON formatted stream to fp.
Parameters
----------
fp : str or Path or file-like
A .write()-supporting file-like object or the filename
indent : int, optional
Indentation in the file (default: 2)
sort_keys : bool, optional
Sort the keys in alphabetic order (default: False)
"""
if isinstance(fp, (str, Path)):
with open(fp, 'w') as filepath:
filepath.write(self.to_json(indent, sort_keys))
else:
fp.write(self.to_json(indent, sort_keys))
[docs]
def export_structure(self) -> Dict:
"""
Export current system structure to a dictionary, which contains
the definition of all ports, the structure of sub-system and the
connections between ports.
"""
port_cls_data = {}
sys_data = self.__to_dict(True, port_cls_data)
return {"Ports": port_cls_data, "Systems": sys_data}
[docs]
def to_dict(self) -> Dict:
"""
Public API to export system to a dictionary
"""
return self.__to_dict(with_struct=False)
def __json__(self) -> Dict[str, Dict[str, Any]]:
"""JSONable dictionary representing a variable.
Returns
-------
Dict[str, Any]
The dictionary
"""
return self.to_dict()
def __to_dict(self, with_struct: bool, port_cls_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Convert the `System` to a dictionary.
In default mode, `with_struct` flag is `False`, only export input ports
and its variable values.
In other case where `with_struct` flag is `True`, export all ports with
values and class name. `port_cls_data` can be provided to recover the
definitions of all ports inside system.
Parameters
----------
with_struct : bool, optional
Flag to export output ports and the class name of ports (default: False).
port_cls_data : dict, optional
A dictionary to hold the definition of all ports inside system.
Returns
-------
dict
A dictionary defining fully the `System`
"""
# TODO Fred - BUG Information missing from dictionary conversion
# - kwargs of setup - e.g. Shaft system of librairies_sae has its ports functions of kwargs
# - design equations - are not saved (should they be?)
data = dict()
if self.__class__.__qualname__ == 'System':
data['class'] = 'System' # Trick to allow container `System` without special type
else:
data['class'] = f"{self.__module__}.{self.__class__.__qualname__}"
ctor_kwargs = self.__ctor_kwargs
if ctor_kwargs:
data["setup_args"] = ctor_kwargs
properties = self.__readonly
if properties:
data['properties'] = properties
port_data_required = isinstance(port_cls_data, dict)
if with_struct:
for direction in ["inputs", "outputs"]:
ports: Dict[str, BasePort] = getattr(self, direction)
temp_dict = {}
for port in ports.values():
if port_data_required and isinstance(port, Port):
key = port.__class__.__qualname__
if key not in port_cls_data:
port_cls_data[key] = {
name: variable.to_dict()
for name, variable in port._variables.items()
}
port_dict = port.to_dict(with_struct)
temp_dict.update(port_dict)
if temp_dict:
data[direction] = temp_dict
else:
inputs = {}
for port in self.inputs.values():
port_dict = port.to_dict()
inputs.update(port_dict)
if inputs:
data['inputs'] = inputs
connections = [
connector.info()
for connector in self.all_connectors()
]
if connections:
data['connections'] = connections
if len(self.children) > 0:
data['subsystems'] = {
name: component.__to_dict(with_struct, port_cls_data)[name]
for name, component in self.children.items()
}
data['exec_order'] = list(self.exec_order)
return {self.name: data}
[docs]
def to_json(self, indent=2, sort_keys=True) -> str:
"""Return a string in JSON format representing the `System`.
Parameters
----------
indent : int, optional
Indentation of the JSON string (default: 2)
sort_keys : bool, optional
Sort keys in alphabetic order (default: True)
Returns
-------
str
String in JSON format
"""
dict_repr = self.to_dict()
# If this is updated => workspace template should be updated too.
dict_repr['$schema'] = "0-3-0/system.schema.json" # Add self referencing to system JSON version - provision
return json.dumps(dict_repr, indent=indent, sort_keys=sort_keys, cls=JSONEncoder)
[docs]
def to_d3(self, show=True, size=435) -> "IPython.display.IFrame":
"""Returns the hierarchical representation of this system in HTML format.
Returns
-------
IPython.display.IFrame
IFrame to the HTML formatted representation
"""
from cosapp.tools.views.d3js import to_d3
return to_d3(self, show, size)
[docs]
def to_html(self, filename: str, embeddable=False) -> None:
"""Save the `System` as HTML using vis.JS library.
Parameters
----------
filename : str
Filename to write to
embeddable: bool, optional
Is the HTML to be embedded in an existing page? Default: False
"""
from cosapp.tools.views.visjs import VisJsRenderer
renderer = VisJsRenderer(self, embeddable)
renderer.to_file(filename)
def _repr_html_(self) -> str:
"""Returns the representation of this system in HTML format.
Returns
-------
str
HTML formatted representation
"""
# TODO unit tests
filename = f"{self.name}.html"
self.to_html(filename)
from IPython.display import IFrame
return IFrame(filename, "810px", "650px")._repr_html_()
def _repr_markdown_(self) -> str:
"""Returns the representation of this system attributes in Markdown format.
Returns
-------
str
Markdown formatted representation
"""
from cosapp.tools.views.markdown import system_to_md
return system_to_md(self)
[docs]
def make_surrogate(self,
data_in: Union[pandas.DataFrame, Dict[str, List[float]]],
model = FloatKrigingSurrogate,
activate = True,
data_out: Optional[Union[pandas.DataFrame, Dict[str, Any]]] = None,
postsynch: Union[str, List[str]] = '*',
*args, **kwargs) -> SystemSurrogate:
"""
Creates a surrogate model superseding the normal behaviour of `compute()`.
The surrogate model is trained from datasets `data_in` and `data_out`,
given as `pandas.DataFrame` objects, or dictionnaries interpretable as so.
If no output data are provided (default), they are computed as the response
of system to design-of-experiment `data_in`.
Parameters
----------
- data_in: pandas.DataFrame or Dict[str, List[float]]
Design of experiments for variables in dataframe columns or dict keys.
Column/key names must match variable contextual names, as in 'wing.geom.length', e.g.
- model: type
Model class, implementing methods `train(x, y)` and `predict(x)`, where x and y are 1D arrays.
Default is `cosapp.utils.surrogate_models.FloatKrigingSurrogate`.
- activate: bool, optional
Boolean determining whether or not surrogate model should be activated once created.
Default is `True`.
- data_out: pandas.DataFrame or Dict[str, List[float]], optional
Y-dataset used for surrogate model training. Default is None (dataset is computed).
- postsynch: str or List[str]
List of output variable names to synchronize after each surrogate model execution.
Default is '*', meaning all outputs are post-synchronized.
- *args, **kwargs: Any
Additional parameters passed to `model` constructor.
Returns:
--------
SystemSurrogate
System surrogate attached to the system.
"""
check_arg(data_in, 'data_in', (pandas.DataFrame, dict))
surrogate = SystemSurrogate(self, data_in, model, data_out, postsynch, *args, **kwargs)
self.__set_surrogate(surrogate, activate)
return surrogate
def __set_surrogate(self, surrogate: SystemSurrogate, activate=True) -> None:
"""Private setter for surrogate model"""
if self.has_surrogate:
warnings.warn(f"Existing surrogate model of {self.name} has been overwritten by a new one.")
self._meta = surrogate
self.active_surrogate = activate
[docs]
def load_surrogate(self, filename: str, activate=True) -> None:
"""
Loads a surrogate model from a binary file created by `dump_surrogate`.
Parameters
----------
- filename: str
Destination file name.
- activate: bool, optional
Boolean determining whether or not surrogate model should be activated once loaded.
Default is `True`.
"""
surrogate = SystemSurrogate.load(self, filename)
self.__set_surrogate(surrogate, activate)
[docs]
def dump_surrogate(self, filename: str) -> None:
"""Dumps system surrogate model (if any) into a binary file."""
if not self.has_surrogate:
raise AttributeError(f"{self.name!r} has no surrogate model")
self._meta.dump(filename)
@property
def active_surrogate(self) -> bool:
"""bool: True if surrogate model is activated, False otherwise."""
return self.__runner is self._meta
@active_surrogate.setter
def active_surrogate(self, activate: bool) -> None:
"""Activation boolean setter for surrogate model."""
check_arg(activate, "active_surrogate", bool)
if activate == self.active_surrogate:
return # no change - quit
if activate:
if self._meta is None:
raise AttributeError(
"Can't set `active_surrogate` if no surrogate model has been created"
" by either `make_surrogate` or `load_surrogate`."
)
# Deactivate owner, children, and all drivers
self._set_recursive_active_status(False)
# Reactivate owner after upper recursion
self._active = True
self.__runner = self._meta
else:
self._set_recursive_active_status(True)
self.__runner = self
self.touch()
for port in self.inputs.values():
port.touch()
@property
def has_surrogate(self) -> bool:
"""bool: True if system has a surrogate model (even if inactive), False otherwise."""
return self._meta is not None
def _set_recursive_active_status(self, active_status: bool) -> None:
#TODO save and recover drivers original status
logger.debug(f"Starting recursive active status modifying of {self.name}, status to be set is {active_status}")
self._active = active_status
for driver in self.drivers.values():
logger.debug(f"Targeted driver for driver recursive deactivate is {driver}")
driver._set_children_active_status(active_status)
for child in self.children.values():
logger.debug(f"Targeted child for system recursive deactivate is {child}")
child._set_recursive_active_status(active_status)