Source code for cosapp.drivers.nonlinearsolver

import abc
import csv
import inspect
from io import StringIO
from typing import (
    Any,
    Optional, Callable, Iterable,
    Sequence, TypeVar,
)
import numpy

from cosapp.core.numerics.basics import MathematicalProblem, SolverResults
from cosapp.core.numerics.enum import NonLinearMethods

from cosapp.core.ir.passes.causal_model_graph import compute_sparsity_from_graph_analysis

from cosapp.drivers.driver import Driver, System
from cosapp.drivers.abstractsolver import AbstractSolver
from cosapp.drivers.runsinglecase import RunSingleCase
from cosapp.drivers.utils import DesignProblemHandler
from cosapp.utils.logging import LogFormat, LogLevel, HandlerWithContextFilters, str_array_format
from cosapp.utils.options_dictionary import HasOptions
from cosapp.utils.deprecation import deprecated

from cosapp.core.numerics.solve import (
    AbstractLinearSolver,
    AbstractNonLinearSolver,
    NewtonRaphsonSolver,
    ScipyRootSolver,
)

import logging

logger = logging.getLogger(__name__)

AnyDriver = TypeVar("AnyDriver", bound=Driver)


[docs] class BaseSolverBuilder(abc.ABC): """Base interface for numerical system building strategy used by `NonLinearSolver`. Implementations will differ for single- or multi-point design. """ def __init__(self, solver: AbstractSolver): self.solver = solver self.problem = MathematicalProblem(solver.name, solver.owner) self.initial_values = numpy.empty(0) self.is_design_unknown: dict[str, bool] = dict()
[docs] @abc.abstractmethod def build_system(self) -> None: pass
[docs] @abc.abstractmethod def update_residues(self) -> None: pass
[docs] def update_unknowns(self, x: numpy.ndarray) -> None: counter = 0 for name, unknown in self.problem.unknowns.items(): n = unknown.size value = x[counter] if unknown.is_scalar else x[counter : counter + n] unknown.update_default_value(value, checks=False) counter += n # Update design unknowns if self.is_design_unknown[name]: unknown.set_to_default()
[docs] def handle_system_problem(self) -> bool: """Should the solver handle the intrinsic problem of the owner system?""" solver = self.solver for driver in solver.tree(downwards=True): if driver is solver: continue if driver.is_standalone(): return False return True
[docs] class BaseSolverRecorder(abc.ABC): """Abstract interface for solver recorder"""
[docs] @abc.abstractmethod def record(self, context: str) -> None: """Record owner system data using solver's recorder"""
[docs] class NonLinearSolver(AbstractSolver): """Solve mathematical problem with algebraic variables. Attributes ---------- compute_jacobian : bool Should the Jacobian matrix be computed? ; default True jac_lup : ndarray, optional LU decomposition of latest Jacobian matrix (if available, None otherwise) jac : ndarray, optional Latest Jacobian matrix computed (if available, None otherwise) """ __slots__ = ( "compute_jacobian", "_backend_solver", "__method", "__option_aliases", "__trace", "__results", "__design_unknowns", "__builder", "_graph_analysis", ) def __init__( self, name: str, owner: Optional[System] = None, method: str | NonLinearMethods | type[AbstractNonLinearSolver] = NonLinearMethods.NR, graph_analysis: bool = False, **options, ) -> None: """Initialize driver Parameters ---------- name: str, optional Name of the `Driver`. owner: System, optional :py:class:`~cosapp.systems.system.System` to which this driver belong; defaults to `None`. method: str | NonLinearMethods | type[AbstractNonLinearSolver] Requested resolution method. If a string is provided, it must be one of the `NonLinearMethods` enum values. If a class is provided, it must be a specialization of `AbstractNonLinearSolver`. Defaults to `NonLinearMethods.NR` (Newton-Raphson method). graph_analysis: bool, default=False If True, allows to generate the dependency graph of the owner system and the dependency sparse matrix of the driver mathematical problem. **options: dict[str, Any] Additional options for the solver. These can include solver-specific parameters such as `tol`, `max_iter`, `verbose`, etc. See the documentation of the specific solver for details. """ if isinstance(method, str): method = NonLinearMethods[method] if isinstance(method, NonLinearMethods): if method == NonLinearMethods.NR: self._backend_solver = NewtonRaphsonSolver() else: self._backend_solver = ScipyRootSolver(method=method) elif inspect.isclass(method) and issubclass(method, AbstractNonLinearSolver): self._backend_solver = method() else: raise TypeError( "Argument 'method' must be either a `NonLinearMethods`" f" or a derived class of `AbstractNonLinearSolver`; got {method!r}." ) if graph_analysis and not isinstance(self._backend_solver, NewtonRaphsonSolver): raise NotImplementedError( "For graph analysis option, non linear solver" " must be a `NewtonRaphsonSolver`." ) self.__method = method self.__trace: list[dict[str, Any]] = list() self.__results: SolverResults = None self.__builder: BaseSolverBuilder = None self.compute_jacobian = True # type: bool # desc='Should the Jacobian matrix be computed?' self._graph_analysis = graph_analysis super().__init__(name, owner, **options) def _get_nested_objects_with_options(self) -> Iterable[HasOptions]: """Gets nested objects having options.""" return (self._backend_solver,) @classmethod def _slots_not_jsonified(cls) -> tuple[str]: """Returns slots that must not be JSONified.""" return ( *super()._slots_not_jsonified(), "_NonLinearSolver__builder", )
[docs] def reset_problem(self) -> None: """Reset mathematical problem""" super().reset_problem() self.__design_unknowns = dict()
@property def non_linear_solver(self) -> AbstractNonLinearSolver | None: return self._backend_solver @property def linear_solver(self) -> AbstractLinearSolver | None: try: return self._backend_solver._linear_solver except AttributeError: return None @property def jac(self) -> numpy.ndarray | None: ls = self.linear_solver try: return ls.jacobian except AttributeError: return None @jac.setter def jac(self, value) -> None: if (ls := self.linear_solver): if ls.need_jacobian: ls.jacobian = value @property def method(self) -> NonLinearMethods: """NonLinearMethods : Selected solver algorithm.""" return self.__method @property def results(self) -> SolverResults: """SolverResults: structure containing solver results, together with additional detail. """ return self.__results
[docs] def add_child( self, child: AnyDriver, execution_index: Optional[int] = None, desc="", ) -> AnyDriver: """Add a child `Driver` to the current `Driver`. When adding a child `Driver`, it is possible to specified its position in the execution order. Child `Port`, `inwards` and `outwards` can also be pulled at the parent level by providing either the name of the port/inward/outward or a list of them or the name mapping of the child element (dictionary keys) to the parent element (dictionary values). If the argument is not a dictionary, the name in the parent system will be the same as in the child. Parameters ---------- - child: Driver `Driver` to add to the current `Driver` - execution_index: int, optional Index of the execution order list at which the `Module` should be inserted; default latest. - desc [str, optional]: Sub-driver description in the context of its parent driver. Notes ----- The added child will have its owner set to match the one of the current driver. """ self.compute_jacobian = True return super().add_child(child, execution_index, desc)
[docs] def is_standalone(self) -> bool: """Is this Driver able to solve a system? Returns ------- bool Ability to solve a system or not. """ return True
[docs] def resolution_method( self, fresidues: Callable[[Sequence[float], float | str, bool], numpy.ndarray], x0: Sequence[float], args: tuple[float | str] = (), options: Optional[dict[str, Any]] = None, callback: Optional[Callable[[], None]] = None, ) -> SolverResults: """Solves the mathematical problem with a non linear method.""" self._backend_solver.set_options() # required if options were changed after instantiation self._backend_solver.log_level = ( LogLevel.INFO if self.options.verbose else LogLevel.DEBUG ) results = self._backend_solver.solve( fresidues, x0, args, callback=callback ) if results.success: self.compute_jacobian = False return results
[docs] def setup_run(self) -> None: super().setup_run() self._init_problem() if self._graph_analysis: self._backend_solver.sparsity = compute_sparsity_from_graph_analysis(self.problem)
def _print_solution(self) -> None: # TODO better returning a string """Print the solution in the log.""" if self.options.verbose > 0: # TODO move it in MathematicalProblem logger.info(f"Parameters [{len(self.solution)}]: ") for k, v in self.solution.items(): logger.info(f" # {k}: {v}") for name, residue in self.problem.residues.items(): value = numpy.asarray(residue.value) if value.ndim > 0: spacing = " " * len(name) msg = f"Residues [{len(self.problem.residues)}]: \n # ({name}" msg += f"\n # {spacing}".join(f", {v:.5g}" for v in value) msg += ")\n" else: logger.info(f" # ({name}, {value:.5g})") tol = numpy.linalg.norm(self.problem.residue_vector(), numpy.inf) try: target = self.options.tol except: target = 0 logger.debug(f" # Current tolerance {tol} for target {target}") def _init_problem(self): """Initialize mathematical problem""" logger.debug( "\n".join( [ "*" * 40, "*", "* Assemble mathematical problem", "*", "*" * 40, ] ) ) run_cases = list( filter( lambda driver: isinstance(driver, RunSingleCase), self.children.values(), ) ) if run_cases: builder = MultipointSolverBuilder(self, run_cases) else: builder = StandaloneSolverBuilder(self) builder.build_system() self.problem = builder.problem self.initial_values = builder.initial_values self.__builder = builder self.touch_unknowns() logger.debug( "\n".join( [ "Mathematical problem:", f"{'<empty>' if self.problem.is_empty() else self.problem}", ] ) ) def _fresidues(self, x: Sequence[float]) -> numpy.ndarray: """ Method used by the solver to take free variables values as input and values of residues as output (after running the System). Parameters ---------- x : Sequence[float] The list of values to set to the free variables of the `System` update_residues_ref : bool Request residues to update their reference Returns ------- numpy.ndarray The list of residues of the `System` """ logger.debug(f"Call fresidues with x = {str_array_format(x)}") self.set_iteratives(x) self._update_system() self.__builder.update_residues() residues = self.problem.residue_vector() logger.debug(f"Residues: {str_array_format(residues)}") return residues
[docs] def set_iteratives(self, x: Sequence[float]) -> None: x = numpy.asarray(x) self.__builder.update_unknowns(x)
[docs] def compute(self) -> None: """Run the resolution method to find free variable values that cancel the residues""" # Reset status self.status = "" self.error_code = "0" try: self.problem.validate() except ArithmeticError: self.status = "ERROR" self.error_code = "9" raise record_history = self.options.get("history", False) must_resolve = len(self.initial_values) > 0 if must_resolve: self.solution = {} if record_history and self._recorder: # Record system data at each solver iteration callback = SolverRecorderCallback(self) else: callback = None # compute first order results = self.resolution_method( self._fresidues, self.initial_values, options=self._get_solver_options(), callback=callback, ) self.__results = results self.__trace = getattr(results, "trace", []) if results.success: self.status = "" self.error_code = "0" logger.info(f"solver : {self.name}{results.message}") else: self.status = "ERROR" self.error_code = "9" error_msg = f"The solver failed: {results.message}" error_desc = getattr(results, "jac_errors", {}) if error_desc: indices = error_desc.get("unknowns", []) if len(indices) > 0: unknown_names = self.problem.unknown_names() error_msg += ( f" \nThe {len(indices)} following parameter(s)" f" have no influence: {[unknown_names[i] for i in indices]}" ) indices = error_desc.get("residues", []) if len(indices) > 0: residue_names = self.problem.residue_names() error_msg += ( f" \nThe {len(indices)} following residue(s)" f" are not influenced: {[residue_names[i] for i in indices]}" ) if self.parent is not None: raise ArithmeticError(error_msg) else: logger.error(error_msg) self.solution = dict( (key, unknown.default_value) for key, unknown in self.problem.unknowns.items() ) self._print_solution() else: logger.debug("No parameters/residues to solve. Fallback to system update.") self._update_system() if not record_history or not must_resolve: # Record system data at the end of the simulation callback = SolverRecorderCallback(self) callback.record()
def _get_solver_options(self) -> dict[str, Any]: options = self._filter_options(self.__option_aliases) if self.method == NonLinearMethods.NR: self.options.update(self._get_solver_limits()) # self.options["compute_jacobian"] = self.compute_jacobian return options
[docs] def log_debug_message( self, handler: HandlerWithContextFilters, record: logging.LogRecord, format: LogFormat = LogFormat.RAW, ) -> bool: """Callback method on the driver to log more detailed information. This method will be called by the log handler when :py:meth:`~cosapp.utils.logging.LoggerContext.log_context` is active if the logging level is lower or equals to VERBOSE_LEVEL. It allows the object to send additional log message to help debugging a simulation. Parameters ---------- handler : HandlerWithContextFilters Log handler on which additional message should be published. record : logging.LogRecord Log record format : LogFormat Format of the message Returns ------- bool Should the provided record be logged? """ message = record.getMessage() activate = getattr(record, "activate", None) emit_record = super().log_debug_message(handler, record, format) try: self.options["history"] = activate except (KeyError, TypeError): pass if message.endswith("call_setup_run") or message.endswith("call_clean_run"): emit_record = False elif activate == True: emit_record = False elif activate == False: emit_record = False message = "" unknown_trace = None residue_trace = None residue_names = self.problem.residue_names() unknown_names = self.problem.unknown_names() for i, record in enumerate(self.__trace): if i == 0: unknown_trace = record["x"] residue_trace = record["residues"] else: unknown_trace = numpy.vstack((unknown_trace, record["x"])) residue_trace = numpy.vstack((residue_trace, record["residues"])) try: jac_record = record["jac"] except KeyError: continue else: message += f"Iteration {i}\n" with StringIO() as container: numpy.savetxt(container, jac_record, delimiter=",") jacobian = container.getvalue() unknowns = ", ".join(unknown_names) message += f"New Jacobian matrix:\n,{unknowns}\n" for residue, line in zip(residue_names, jacobian.splitlines()): message += f"{residue}, {line}\n" def format_record(records: numpy.ndarray, headers: list[str]) -> str: records = numpy.atleast_2d(records) with StringIO() as stream: writer = csv.writer(stream, delimiter=",", lineterminator="\n") writer.writerows([headers, *records]) return stream.getvalue() if unknown_trace is not None: message += "Unknowns\n{}\n".format( format_record(unknown_trace, unknown_names) ) if residue_trace is not None: message += "Residues\n{}\n".format( format_record(residue_trace, residue_names) ) handler.log( LogLevel.FULL_DEBUG, message, name=logger.name, ) return emit_record
def _declare_options(self): """Declare solver options, and options aliases possibly necessary to use numpy functions""" super()._declare_options() self.__option_aliases = dict()
[docs] def add_problem( self, problem: MathematicalProblem, *args, **kwargs ) -> MathematicalProblem: """Merge a mathematical problem into the solver problem. Parameters ---------- - problem [MathematicalProblem]: Source mathematical problem. - *args, **kwargs: Additional arguments forwarded to `MathematicalProblem.extend`. Returns ------- - MathematicalProblem: The extended problem. """ return self._raw_problem.extend(problem, *args, **kwargs)
[docs] @deprecated(redirect=add_problem) def extend(
self, problem: MathematicalProblem, *args, **kwargs ) -> MathematicalProblem: ...
[docs] def add_unknown( self, name: str | Iterable[dict | str], *args, **kwargs, ) -> MathematicalProblem: """Add design unknown(s). More details in `cosapp.core.MathematicalProblem.add_unknown`. Parameters ---------- - name [str or Iterable of dictionary or str]: Name of the variable or collection of variables to be added. - *args, **kwargs: Additional arguments forwarded to `MathematicalProblem.add_unknown`. Returns ------- - MathematicalProblem: The updated problem. """ return self._raw_problem.add_unknown(name, *args, **kwargs)
[docs] def add_equation( self, equation: str | Iterable[dict | str], *args, **kwargs, ) -> MathematicalProblem: """Add off-design equation(s). More details in `cosapp.core.MathematicalProblem.add_equation`. Parameters ---------- - equation [str or Iterable of str of the kind 'lhs == rhs']: Equation or collection of equations to be added. - *args, **kwargs: Additional arguments forwarded to `MathematicalProblem.add_equation`. Returns ------- - MathematicalProblem: The updated problem. """ return self._raw_problem.add_equation(equation, *args, **kwargs)
[docs] def add_target( self, expression: str | Iterable[str], *args, **kwargs, ) -> MathematicalProblem: """Add deferred off-design equation(s). More details in `cosapp.core.MathematicalProblem.add_target`. Parameters ---------- - expression: str Targetted expression - *args, **kwargs : Forwarded to `MathematicalProblem.add_target` Returns ------- MathematicalProblem The modified mathematical problem """ return self._raw_problem.add_target(expression, *args, **kwargs)
[docs] class StandaloneSolverBuilder(BaseSolverBuilder): """System building strategy for solvers with no `RunSingleCase` sub-drivers. In this case, the actual mathematical problem is entirely defined by unknowns and equations declared at solver level. """
[docs] def build_system(self): solver = self.solver system = self.solver.owner problem = self.problem handler = DesignProblemHandler(system) handler.design.extend(solver.raw_problem) if self.handle_system_problem(): handler.offdesign.extend(system.assembled_problem()) handler.prune() problem.clear() problem.extend(handler.merged_problem(), copy=False) self.initial_values = problem.unknown_vector() self.is_design_unknown = dict.fromkeys(problem.unknowns, True)
[docs] def update_residues(self): self.problem.update_residues()
[docs] class MultipointSolverBuilder(BaseSolverBuilder): """System building strategy for solvers with one or more `RunSingleCase` sub-drivers. In this case, the actual mathematical problem is a combination of unknowns and equations declared at solver level, and of local design and off-design problems declared at case level. """ def __init__(self, solver: NonLinearSolver, points: list[RunSingleCase]): super().__init__(solver) self.points = points
[docs] def build_system(self): solver = self.solver system = self.solver.owner problem = self.problem handler = DesignProblemHandler(system) handler.design.extend(solver.raw_problem, equations=False) handler.offdesign.extend(solver.raw_problem, unknowns=False) handler.prune() # resolve aliasing # Create assembled problem problem.clear() problem.extend(handler.design) self.initial_values = handler.design.unknown_vector() if len(self.points) > 1: # If more than one `RunSingleCase` drivers are present, # use a name wrapper to distinguish dict entries for # residues and off-design unknowns in global problem get_key_wrapper = lambda case: lambda key: f"{case.name}[{key}]" else: get_key_wrapper = lambda case: None # Set of unknown names to detect design/off-design conflicts design_unknown_names = set(handler.design.unknowns) for point in self.points: local = point.processed_problems # Check that local problem does not introduce design/off-design conflicts design_unknown_names.update(local.design.unknowns) common = design_unknown_names.intersection(local.offdesign.unknowns) if common: kind = "unknown" if len(common) > 1: names = ", ".join(map(repr, sorted(common))) names = f"({names}) are" kind += "s" else: names = f"{common.pop()!r} is" raise ValueError( f"{names} defined as design and off-design {kind} in {point.name!r}" ) # Enforce solver-level off-design problem to child case point.add_offdesign_problem(handler.offdesign) # Assemble case problems (design & off-design) wrapper = get_key_wrapper(point) problem.extend(local.design, copy=False, residue_wrapper=wrapper) problem.extend( local.offdesign, copy=False, unknown_wrapper=wrapper, residue_wrapper=wrapper, ) self.initial_values = numpy.append( self.initial_values, point.get_init(solver.force_init) ) self.is_design_unknown = dict.fromkeys(problem.unknowns, False) for name in design_unknown_names: self.is_design_unknown[name] = True
[docs] def update_residues(self): # Nothing to do, since residues are updated # by `RunSingleCase` sub-drivers pass
[docs] class EmptySolverRecorder(BaseSolverRecorder): """Specialization for solvers with no recorder."""
[docs] def record(self, context: str) -> None: pass
[docs] class StandaloneSolverRecorder(BaseSolverRecorder): """Solver recorder specialization for solvers with no sub-drivers.""" def __init__(self, solver: NonLinearSolver) -> None: self._solver = solver
[docs] def record(self, context: str) -> None: solver = self._solver if not context: context = solver.name solver.recorder.record_state(context, solver.status, solver.error_code)
[docs] class CompositeSolverRecorder(BaseSolverRecorder): """Solver recorder specialization for solvers with sub-drivers.""" def __init__(self, solver: NonLinearSolver) -> None: self._solver = solver
[docs] def record(self, context: str) -> None: solver = self._solver recorder = solver.recorder if context: context = f" ({context})" for child in solver.children.values(): child.run_once() recorder.record_state( f"{child.name}{context}", solver.status, solver.error_code )
[docs] class SolverRecorderCallback: """Callback functor to monitor solver residues""" def __init__(self, solver: NonLinearSolver) -> None: self.reset_iter() if solver.recorder is None: self.recorder = EmptySolverRecorder() elif solver.children: self.recorder = CompositeSolverRecorder(solver) else: self.recorder = StandaloneSolverRecorder(solver)
[docs] def reset_iter(self) -> None: """Reset inner iteration counter""" self._iter = 0
[docs] def record(self, context="") -> None: """Record owner system data using solver's recorder""" self.recorder.record(context)
def __call__(self, *args, **kwargs) -> None: """Callback function""" self.record(f"iter {self._iter}") self._iter += 1