Source code for cosapp.drivers.runonce
import numpy
from copy import deepcopy
from typing import Any, Dict, Optional
from cosapp.core.numerics.basics import MathematicalProblem
from cosapp.core.numerics.boundary import Boundary
from cosapp.drivers.driver import Driver
import logging
logger = logging.getLogger(__name__)
[docs]
class RunOnce(Driver):
"""Driver running the model on its `System` owner.
Parameters
----------
name : str
Name of the driver
owner : System, optional
:py:class:`~cosapp.systems.system.System` to which driver belongs; defaults to `None`
**kwargs : Any
Keyword arguments will be used to set driver options
"""
__slots__ = ('initial_values', 'solution')
def __init__(
self,
name: str,
owner: Optional["cosapp.systems.System"] = None,
**kwargs
) -> None:
"""Initialize driver
Parameters
----------
name: str, optional
Name of the `Driver`.
owner: System, optional
:py:class:`~cosapp.systems.system.System` to which driver belongs; defaults to `None`.
**kwargs:
Additional keywords arguments forwarded to base class.
"""
super().__init__(name, owner, **kwargs)
self.initial_values: Dict[str, Boundary] = dict() # Initial guess for the iteratives
self.solution: Dict[str, float] = dict() # Dictionary (name, value) of the latest solution reached
[docs]
def set_init(self, modifications: Dict[str, Any]) -> None:
"""Define initial values for one or more variables.
The variable can be contextual `child1.port2.var`. The only rule is that it should belong to
the owner `System` of this driver or any of its descendants.
Parameters
----------
modifications : Dict[str, Any]
Dictionary of (variable name, value)
Examples
--------
>>> driver.set_init({'myvar': 42, 'dummy': 'banana'})
"""
if self.owner is None:
raise AttributeError(
f"Driver {self.name!r} must be attached to a System to be assigned initial values."
)
if not isinstance(modifications, dict):
raise TypeError(
"Initial values must be specified through a dictionary of the kind {varname: value}."
)
for variable, value in modifications.items():
boundary = Boundary(self.owner, variable, default=value, inputs_only=False)
# Check if boundary.name already exists
actual = self.initial_values.setdefault(boundary.name, boundary)
if actual is not boundary:
# Update already existing boundary with new default value and mask
actual.set_default_value(boundary.default_value, boundary.mask)
# Set owner system with the init value - useful if this driver is not inside a solver
actual.set_to_default()
# Setting a new initial value implies, we will use the init and so the solution is cleared
self.solution.clear()
[docs]
def get_init(self, force_init: bool = False) -> numpy.ndarray:
"""Get the System iteratives initial values for this driver.
Parameters
----------
force_init : bool, optional
Force the initial values to be used. Default is False.
Returns
-------
numpy.ndarray
List of iteratives initial values,
in the same order as the unknowns in `get_problem()`.
"""
full_init = numpy.empty(0)
problem = self.get_problem()
for name, unknown in problem.unknowns.items():
if not force_init and name in self.solution:
# We ran successfully at least once and are environmental friendly
data = self.solution[name]
else: # User wants the init or first simulation or crash
if name.startswith(self.name):
name = name[len(self.name) + 1 : -1]
if name in self.initial_values:
boundary = self.initial_values[name]
umask = unknown.mask if unknown.mask is not None else numpy.empty(0)
bmask = boundary.mask if boundary.mask is not None else numpy.empty(0)
if not numpy.array_equal(umask, bmask):
raise ValueError(
f"Unknown and initial conditions on {unknown.name!r} are not masked equally"
)
data = deepcopy(boundary.default_value)
self.initial_values[name] = boundary
else:
data = deepcopy(unknown.value)
full_init = numpy.append(full_init, data)
return full_init
[docs]
def get_problem(self) -> MathematicalProblem:
"""Returns the full mathematical for the case.
Returns
-------
MathematicalProblem
The full mathematical problem to solve for the case
"""
return self.owner.assembled_problem()
[docs]
def setup_run(self):
"""Method called once before starting any simulation."""
super().setup_run()
owner = self.owner
if not owner.is_standalone() and owner.parent is None:
owner.open_loops() # Force loops opening to test if the owner needs a solver
if not self.get_problem().is_empty():
logger.warning(
"Required iterations detected, not taken into account in {} driver.".format(
type(self).__qualname__
)
)
owner.close_loops()
def _precompute(self):
"""Set execution order and start the recorder."""
super()._precompute()
# Solution cannot be cleared in setup_run otherwise it won't be available when get_init is called.
self.solution.clear()
[docs]
def compute(self) -> None:
"""Execute drivers on all child `System` belonging to the driver `System` owner.
"""
if len(self.children) == 0:
self.owner.run_children_drivers()
if self._recorder is not None:
self._recorder.record_state(self.name)
def _postcompute(self):
"""Actions performed after the `Module.compute` call."""
# Should be called in _postcompute and not clean_run otherwise it won't work for multi-points cases
self.solution = dict(
(key, deepcopy(unknown.value))
for key, unknown in self.get_problem().unknowns.items()
)
super()._postcompute()