Source code for cosapp.drivers.fixedpoint

import numpy
import copy
from typing import Any, Dict, List, Optional, Union, Optional
from dataclasses import dataclass, field

from cosapp.core import MathematicalProblem
from cosapp.core.numerics.boundary import Boundary
from cosapp.drivers.driver import Driver
from cosapp.systems.system import System, SystemConnector
from cosapp.ports.port import BasePort
from cosapp.utils.state_io import object__getstate__

import logging
logger = logging.getLogger(__name__)


[docs] def default_array_factory(shape=0, dtype=float): """Default factory for dataclass fields.""" def factory(): return numpy.empty(shape, dtype=dtype) return factory
[docs] @dataclass class SolverResults: """Data class for solver solution Attributes ---------- - x [numpy.ndarray[float]]: Solution vector. - r [numpy.ndarray[float]]: Residue vector. - success [bool]: Whether or not the solver exited successfully. - message [str]: Description of the cause of the termination. - tol [float, optional]: Tolerance level; `NaN` if not available. - n_iter [int]: Number of iterations. """ x: numpy.ndarray = field(default_factory=default_array_factory()) r: numpy.ndarray = field(default_factory=default_array_factory()) success: bool = False message: str = "" tol: float = numpy.nan n_iter: int = 0 def __getstate__(self) -> Union[Dict[str, Any], tuple[Optional[Dict[str, Any]], Dict[str, Any]]]: """Creates a state of the object. The state type depend on the object, see https://docs.python.org/3/library/pickle.html#object.__getstate__ for further details. Returns ------- Union[Dict[str, Any], tuple[Optional[Dict[str, Any]], Dict[str, Any]]]: state """ return object__getstate__(self) def __json__(self) -> Dict[str, Any]: """Creates a JSONable dictionary representation of the object. Returns ------- Dict[str, Any] The dictionary """ return self.__getstate__().copy()
[docs] class FixedPointSolver(Driver): """Driver attempting to solve the internal cyclic dependencies of its owner `System` by iteratively running it. Parameters ---------- name : str Name of the driver owner : System, :py:class:`~cosapp.systems.system.System` to which driver belongs. **options : Any Keyword arguments used to set driver options """ __slots__ = ('initial_values', 'problem', 'results', '_loop_connectors') def __init__( self, name='solver', owner: Optional[System]=None, **options, ) -> None: """Initialize driver Parameters ---------- name: str, optional Name of the `Driver` (default: 'solver'). owner: `System`. optional :py:class:`~cosapp.systems.system.System` to which the driver belongs (defaults: `None`). **options: Additional keywords arguments forwarded to base class. """ super().__init__(name, owner, **options) self.problem: MathematicalProblem = None self.results = SolverResults() self.initial_values: Dict[str, Boundary] = dict() # Initial guess for the iteratives self._loop_connectors: List[SystemConnector] = [] def _declare_options(self) -> None: super()._declare_options() self.options.declare( 'tol', 1e-6, dtype=float, allow_none=False, desc='Absolute tolerance (in max-norm) for the residual.' ) self.options.declare( 'max_iter', 50, dtype=int, lower=1, desc='The maximum number of iterations.', ) self.options.declare( 'factor', 1.0, dtype=float, lower=1e-3, desc='Relaxation factor; applies: next value = factor * current + (1 - factor) * previous.' ) self.options.declare( 'history', False, dtype=bool, desc='Should the recorder (if any) capture all iterations, or just the last one?' ) self.options.declare( 'force_init', False, dtype=bool, desc='Whether or not initial conditions should be applied at the beginning of the resolution.' )
[docs] def set_init(self, modifications: Dict[str, Any]) -> None: """Define initial values for one or more variables. The variable can be contextual `child1.port2.var`. The only rule is that it should belong to the owner `System` of this driver or any of its descendants. Parameters ---------- modifications : dict[str, Any] Dictionary of (variable name, value) Examples -------- >>> driver.set_init({'myvar': 42, 'foo.dummy': 'banana'}) """ if self.owner is None: raise AttributeError( f"Driver {self.name!r} must be attached to a System to be assigned initial values." ) if not isinstance(modifications, dict): raise TypeError( "Initial values must be specified through a dictionary of the kind {varname: value}." ) for variable, value in modifications.items(): boundary = Boundary(self.owner, variable, default=value, inputs_only=False) # Check if boundary.name already exists actual = self.initial_values.setdefault(boundary.name, boundary) if actual is not boundary: # Update already existing boundary with new default value and mask actual.update_default_value(boundary.default_value, boundary.mask)
[docs] def get_init(self) -> Dict[str, Any]: """Get the initial values used by the solver. Returns ------- dict[str, Any] Dictionary of initial values, referenced by variable names. """ init = { varname: boundary.value for varname, boundary in self.initial_values.items() } return init
[docs] def apply_init(self) -> None: """Apply initial values in owner system.""" for boundary in self.initial_values.values(): boundary.set_to_default()
[docs] def setup_run(self): """Method called once before starting any simulation.""" super().setup_run() owner = self.owner owner.open_loops() self.problem = owner.assembled_problem() self._loop_connectors = loop_connectors = [] is_open = lambda connector: not connector.is_active for system in owner.tree(): loop_connectors.extend(filter(is_open, system.all_connectors())) if self.options['force_init']: self.apply_init()
[docs] def run_once(self) -> None: """Run solver once, assuming driver has already been initialized. """ with self.log_context(" - run_once"): if self.is_active(): self._precompute() logger.debug(f"Call {self.name}.compute_before()") self.compute_before() # Sub-drivers are executed at each iteration in `compute`, # so the child loop before `self.compute()` is omitted. logger.debug(f"Call {self.name}.compute()") self._compute_calls += 1 self.compute() self._postcompute() self.computed.emit() else: logger.debug(f"Skip {self.name} execution - Inactive")
[docs] def compute(self) -> None: """Execute drivers on all child `System` belonging to the driver `System` owner. """ owner: System = self.owner subdrivers = self.children.values() if subdrivers: def update_system() -> None: for subdriver in subdrivers: logger.debug(f"Call {subdriver.name}.run_once()") subdriver.run_once() else: def update_system() -> None: owner.run_children_drivers() if (recorder := self._recorder): def record_state(ref: str, *args, **kwargs): recorder.record_state(ref, *args, **kwargs) else: record_state = lambda *args, **kwargs: None problem = self.problem results = self.results if problem.is_empty(): record_history = False update_system() results.message = "No iterations were necessary" results.success = True results.n_iter = 0 else: loop_connectors = self._loop_connectors # Deactivate all loop connectors for the first iteration for connector in loop_connectors: connector.deactivate() tol = self.options['tol'] factor = self.options['factor'] max_iter = self.options['max_iter'] record_history = self.options['history'] residues = problem.residues.values() converged = False if factor == 1.0: def update_unknowns(): for connector in loop_connectors: connector.activate() connector.transfer() else: def update_unknowns(): for connector in loop_connectors: sink: BasePort = connector.sink previous_values = { varname: copy.copy(getattr(sink, varname)) for varname in connector.sink_variables() } connector.activate() connector.transfer() # Apply relaxation for varname, previous in previous_values.items(): current = getattr(sink, varname) setattr(sink, varname, factor * current + (1.0 - factor) * previous) connector.deactivate() try: for i in range(max_iter): update_system() for residue in residues: residue.update() r = problem.residue_vector() r_norm = numpy.linalg.norm(r, numpy.inf) logger.debug(f"iteration #{i},\t{r = }") if record_history: record_state(f"{self.name} (iter #{i})") update_unknowns() converged = bool(r_norm < tol) if converged: break except: pass # silent fail, for now finally: if converged: self.status = "Converged" self.error_code = "0" message = f"Fixed-point resolution has converged in {i} iterations ({r_norm = :.2e})" logger.info(message) else: self.status = "Not converged" self.error_code = "1" message = f"Fixed-point resolution has not converged in {max_iter} iterations" logger.warn(message) # Store solver results results.success = converged results.message = message results.n_iter = i results.tol = tol results.x = problem.unknown_vector() results.r = r if not record_history: record_state(self.name, self.status, self.error_code)
def _postcompute(self) -> None: self.owner.close_loops() super()._postcompute()
[docs] def is_standalone(self) -> bool: """Is this Driver able to solve a system? Returns ------- bool Ability to solve a system or not. """ return True