Source code for cosapp.drivers.nonlinearsolver

import numpy
import abc
import csv
import inspect
from io import StringIO
from typing import (
    Any,
    Callable,
    Optional,
    Sequence,
    Union,
    Iterable,
    TypeVar,
)

from cosapp.core.numerics.basics import MathematicalProblem, SolverResults
from cosapp.core.numerics.enum import NonLinearMethods
from cosapp.drivers.driver import Driver, System
from cosapp.drivers.abstractsolver import AbstractSolver
from cosapp.drivers.runsinglecase import RunSingleCase
from cosapp.drivers.utils import DesignProblemHandler
from cosapp.utils.logging import LogFormat, LogLevel, HandlerWithContextFilters
from cosapp.utils.options_dictionary import HasOptions

from cosapp.core.numerics.solve import (
    AbstractNonLinearSolver,
    NewtonRaphsonSolver,
    ScipyRootSolver,
)

import logging

logger = logging.getLogger(__name__)

AnyDriver = TypeVar("AnyDriver", bound=Driver)


[docs] class BaseSolverBuilder(abc.ABC): """Base interface for numerical system building strategy used by `NonLinearSolver`. Implementations will differ for single- or multi-point design. """ def __init__(self, solver: AbstractSolver): self.solver = solver self.problem = MathematicalProblem(solver.name, solver.owner) self.initial_values = numpy.empty(0) self.is_design_unknown: dict[str, bool] = dict()
[docs] @abc.abstractmethod def build_system(self) -> None: pass
[docs] @abc.abstractmethod def update_residues(self) -> None: pass
[docs] def update_unknowns(self, x: numpy.ndarray) -> None: counter = 0 for name, unknown in self.problem.unknowns.items(): n = unknown.size value = x[counter] if unknown.is_scalar else x[counter: counter + n] unknown.update_default_value(value, checks=False) counter += n # Update design unknowns if self.is_design_unknown[name]: unknown.set_to_default()
[docs] class BaseSolverRecorder(abc.ABC): """Abstract interface for solver recorder"""
[docs] @abc.abstractmethod def record(self, context: str) -> None: """Record owner system data using solver's recorder"""
[docs] class NonLinearSolver(AbstractSolver): """Solve mathematical problem with algebraic variables. Attributes ---------- compute_jacobian : bool Should the Jacobian matrix be computed? ; default True jac_lup : ndarray, optional LU decomposition of latest Jacobian matrix (if available, None otherwise) jac : ndarray, optional Latest Jacobian matrix computed (if available, None otherwise) """ __slots__ = ( "__method", "__option_aliases", "__trace", "__results", "__design_unknowns", "compute_jacobian", "__builder", "_solver", ) def __init__( self, name: str, owner: Optional[System] = None, method: Union[ NonLinearMethods, type[AbstractNonLinearSolver] ] = NonLinearMethods.NR, **options, ) -> None: """Initialize driver Parameters ---------- name: str, optional Name of the `Driver`. owner: System, optional :py:class:`~cosapp.systems.system.System` to which this driver belong; defaults to `None`. method : Union[NonLinearMethods, type[AbstractNonLinearSolver]] Resolution method to use **kwargs: Additional keywords arguments forwarded to base class. """ if isinstance(method, str): method = NonLinearMethods[method] if isinstance(method, NonLinearMethods): if method == NonLinearMethods.NR: self._solver = NewtonRaphsonSolver() else: self._solver = ScipyRootSolver(method=method) elif inspect.isclass(method) and issubclass(method, AbstractNonLinearSolver): self._solver = method() else: raise TypeError( "Argument 'method' must be either a `NonLinearMethods`" f" or a derived class of `AbstractNonLinearSolver`; got {method!r}." ) self.__method = method self.__trace: list[dict[str, Any]] = list() self.__results: SolverResults = None self.__builder: BaseSolverBuilder = None self.compute_jacobian = True # type: bool # desc='Should the Jacobian matrix be computed?' super().__init__(name, owner, **options) def _get_nested_objects_with_options(self) -> Iterable[HasOptions]: """Gets nested objects having options.""" return (self._solver, ) @classmethod def _slots_not_jsonified(cls) -> tuple[str]: """Returns slots that must not be JSONified.""" return ( *super()._slots_not_jsonified(), "_NonLinearSolver__builder", )
[docs] def reset_problem(self) -> None: """Reset mathematical problem""" super().reset_problem() self.__design_unknowns = dict()
@property def non_linear_solver(self) -> Any: return self._solver @property def linear_solver(self) -> Any: return self._solver._linear_solver @property def jac(self) -> Any: ls = self.linear_solver if ls.need_jacobian: return ls.jacobian return None @jac.setter def jac(self, value) -> None: ls = self.linear_solver if ls.need_jacobian: ls.jacobian = value @property def method(self) -> NonLinearMethods: """NonLinearMethods : Selected solver algorithm.""" return self.__method @property def results(self) -> SolverResults: """SolverResults: structure containing solver results, together with additional detail. """ return self.__results
[docs] def add_child( self, child: AnyDriver, execution_index: Optional[int] = None, desc="" ) -> AnyDriver: """Add a child `Driver` to the current `Driver`. When adding a child `Driver`, it is possible to specified its position in the execution order. Child `Port`, `inwards` and `outwards` can also be pulled at the parent level by providing either the name of the port/inward/outward or a list of them or the name mapping of the child element (dictionary keys) to the parent element (dictionary values). If the argument is not a dictionary, the name in the parent system will be the same as in the child. Parameters ---------- - child: Driver `Driver` to add to the current `Driver` - execution_index: int, optional Index of the execution order list at which the `Module` should be inserted; default latest. - desc [str, optional]: Sub-driver description in the context of its parent driver. Notes ----- The added child will have its owner set to match the one of the current driver. """ self.compute_jacobian = True return super().add_child(child, execution_index, desc)
[docs] def is_standalone(self) -> bool: """Is this Driver able to solve a system? Returns ------- bool Ability to solve a system or not. """ return True
[docs] def resolution_method( self, fresidues: Callable[[Sequence[float], Union[float, str], bool], numpy.ndarray], x0: Sequence[float], args: tuple[Union[float, str]] = (), options: Optional[dict[str, Any]] = None, callback: Optional[Callable[[], None]] = None, ) -> SolverResults: """Solves the mathematical problem with a non linear method.""" self._solver.set_options() # required if options were changed after instantiation self._solver.log_level = ( LogLevel.INFO if self.options.verbose else LogLevel.DEBUG ) results = self._solver.solve(fresidues, x0, args, callback=callback) if results.success: self.compute_jacobian = False return results
[docs] def setup_run(self) -> None: super().setup_run() self._init_problem()
def _print_solution(self) -> None: # TODO better returning a string """Print the solution in the log.""" if self.options.verbose > 0: # TODO move it in MathematicalProblem logger.info(f"Parameters [{len(self.solution)}]: ") for k, v in self.solution.items(): logger.info(f" # {k}: {v}") for name, residue in self.problem.residues.items(): value = numpy.asarray(residue.value) if value.ndim > 0: spacing = " " * len(name) msg = f"Residues [{len(self.problem.residues)}]: \n # ({name}" msg += f"\n # {spacing}".join(f", {v:.5g}" for v in value) msg += ")\n" else: logger.info(f" # ({name}, {value:.5g})") tol = numpy.linalg.norm(self.problem.residue_vector(), numpy.inf) try: target = self.options.tol except: target = 0 logger.debug(f" # Current tolerance {tol} for target {target}") def _init_problem(self): """Initialize mathematical problem""" logger.debug( "\n".join([ "*" * 40, "*", "* Assemble mathematical problem", "*", "*" * 40, ]) ) run_cases = list( filter( lambda driver: isinstance(driver, RunSingleCase), self.children.values(), ) ) if run_cases: builder = MultipointSolverBuilder(self, run_cases) else: builder = StandaloneSolverBuilder(self) builder.build_system() self.problem = builder.problem self.initial_values = builder.initial_values self.__builder = builder self.touch_unknowns() logger.debug( "\n".join([ "Mathematical problem:", f"{'<empty>' if self.problem.is_empty() else self.problem}", ]) ) def _fresidues(self, x: Sequence[float]) -> numpy.ndarray: """ Method used by the solver to take free variables values as input and values of residues as output (after running the System). Parameters ---------- x : Sequence[float] The list of values to set to the free variables of the `System` update_residues_ref : bool Request residues to update their reference Returns ------- numpy.ndarray The list of residues of the `System` """ logger.debug(f"Call fresidues with {x = !r}") self.set_iteratives(x) self._update_system() self.__builder.update_residues() residues = self.problem.residue_vector() logger.debug(f"Residues: {residues!r}") return residues
[docs] def set_iteratives(self, x: Sequence[float]) -> None: x = numpy.asarray(x) self.__builder.update_unknowns(x)
[docs] def compute(self) -> None: """Run the resolution method to find free variable values that cancel the residues""" # Reset status self.status = "" self.error_code = "0" try: self.problem.validate() except ArithmeticError: self.status = "ERROR" self.error_code = "9" raise record_history = self.options.get("history", False) must_resolve = len(self.initial_values) > 0 if must_resolve: self.solution = {} if record_history and self._recorder: # Record system data at each solver iteration callback = SolverRecorderCallback(self) else: callback = None # compute first order results = self.resolution_method( self._fresidues, self.initial_values, options=self._get_solver_options(), callback=callback, ) self.__results = results self.__trace = getattr(results, "trace", []) if results.success: self.status = "" self.error_code = "0" logger.info(f"solver : {self.name}{results.message}") else: self.status = "ERROR" self.error_code = "9" error_msg = f"The solver failed: {results.message}" error_desc = getattr(results, "jac_errors", {}) if error_desc: indices = error_desc.get("unknowns", []) if len(indices) > 0: unknown_names = self.problem.unknown_names() error_msg += ( f" \nThe {len(indices)} following parameter(s)" f" have no influence: {[unknown_names[i] for i in indices]}" ) indices = error_desc.get("residues", []) if len(indices) > 0: residue_names = self.problem.residue_names() error_msg += ( f" \nThe {len(indices)} following residue(s)" f" are not influenced: {[residue_names[i] for i in indices]}" ) if self.parent is not None: raise ArithmeticError(error_msg) else: logger.error(error_msg) self.solution = dict( (key, unknown.default_value) for key, unknown in self.problem.unknowns.items() ) self._print_solution() else: logger.debug("No parameters/residues to solve. Fallback to system update.") self._update_system() if not record_history or not must_resolve: # Record system data at the end of the simulation callback = SolverRecorderCallback(self) callback.record()
def _get_solver_options(self) -> dict[str, Any]: options = self._filter_options(self.__option_aliases) if self.method == NonLinearMethods.NR: self.options.update(self._get_solver_limits()) # self.options["compute_jacobian"] = self.compute_jacobian return options
[docs] def log_debug_message( self, handler: HandlerWithContextFilters, record: logging.LogRecord, format: LogFormat = LogFormat.RAW, ) -> bool: """Callback method on the driver to log more detailed information. This method will be called by the log handler when :py:meth:`~cosapp.utils.logging.LoggerContext.log_context` is active if the logging level is lower or equals to VERBOSE_LEVEL. It allows the object to send additional log message to help debugging a simulation. Parameters ---------- handler : HandlerWithContextFilters Log handler on which additional message should be published. record : logging.LogRecord Log record format : LogFormat Format of the message Returns ------- bool Should the provided record be logged? """ message = record.getMessage() activate = getattr(record, "activate", None) emit_record = super().log_debug_message(handler, record, format) try: self.options["history"] = activate except (KeyError, TypeError): pass if message.endswith("call_setup_run") or message.endswith("call_clean_run"): emit_record = False elif activate == True: emit_record = False elif activate == False: emit_record = False message = "" unknown_trace = None residue_trace = None residue_names = self.problem.residue_names() unknown_names = self.problem.unknown_names() for i, record in enumerate(self.__trace): if i == 0: unknown_trace = record["x"] residue_trace = record["residues"] else: unknown_trace = numpy.vstack((unknown_trace, record["x"])) residue_trace = numpy.vstack((residue_trace, record["residues"])) try: jac_record = record["jac"] except KeyError: continue else: message += f"Iteration {i}\n" with StringIO() as container: numpy.savetxt(container, jac_record, delimiter=",") jacobian = container.getvalue() unknowns = ", ".join(unknown_names) message += f"New Jacobian matrix:\n,{unknowns}\n" for residue, line in zip(residue_names, jacobian.splitlines()): message += f"{residue}, {line}\n" def format_record(records: numpy.ndarray, headers: list[str]) -> str: records = numpy.atleast_2d(records) with StringIO() as stream: writer = csv.writer(stream, delimiter=",", lineterminator="\n") writer.writerows([headers, *records]) return stream.getvalue() if unknown_trace is not None: message += "Unknowns\n{}\n".format( format_record(unknown_trace, unknown_names) ) if residue_trace is not None: message += "Residues\n{}\n".format( format_record(residue_trace, residue_names) ) handler.log( LogLevel.FULL_DEBUG, message, name=logger.name, ) return emit_record
def _declare_options(self): """Declare solver options, and options aliases possibly necessary to use numpy functions""" super()._declare_options() self.__option_aliases = dict()
[docs] def extend(self, problem: MathematicalProblem, *args, **kwargs) -> MathematicalProblem: """Extend solver inner problem. Parameters ---------- - problem [MathematicalProblem]: Source mathematical problem. - *args, **kwargs: Additional arguments forwarded to `MathematicalProblem.extend`. Returns ------- - MathematicalProblem: The extended problem. """ return self._raw_problem.extend(problem, *args, **kwargs)
[docs] def add_unknown( self, name: Union[str, Iterable[Union[dict, str]]], *args, **kwargs, ) -> MathematicalProblem: """Add design unknown(s). More details in `cosapp.core.MathematicalProblem.add_unknown`. Parameters ---------- - name [str or Iterable of dictionary or str]: Name of the variable or collection of variables to be added. - *args, **kwargs: Additional arguments forwarded to `MathematicalProblem.add_unknown`. Returns ------- - MathematicalProblem: The updated problem. """ return self._raw_problem.add_unknown(name, *args, **kwargs)
[docs] def add_equation( self, equation: Union[str, Iterable[Union[dict, str]]], *args, **kwargs, ) -> MathematicalProblem: """Add off-design equation(s). More details in `cosapp.core.MathematicalProblem.add_equation`. Parameters ---------- - equation [str or Iterable of str of the kind 'lhs == rhs']: Equation or collection of equations to be added. - *args, **kwargs: Additional arguments forwarded to `MathematicalProblem.add_equation`. Returns ------- - MathematicalProblem: The updated problem. """ return self._raw_problem.add_equation(equation, *args, **kwargs)
[docs] def add_target( self, expression: Union[str, Iterable[str]], *args, **kwargs, ) -> MathematicalProblem: """Add deferred off-design equation(s). More details in `cosapp.core.MathematicalProblem.add_target`. Parameters ---------- - expression: str Targetted expression - *args, **kwargs : Forwarded to `MathematicalProblem.add_target` Returns ------- MathematicalProblem The modified mathematical problem """ return self._raw_problem.add_target(expression, *args, **kwargs)
[docs] class StandaloneSolverBuilder(BaseSolverBuilder): """System building strategy for solvers with no `RunSingleCase` sub-drivers. In this case, the actual mathematical problem is entirely defined by unknowns and equations declared at solver level. """
[docs] def build_system(self): solver = self.solver system = self.solver.owner problem = self.problem handler = DesignProblemHandler(system) handler.design.extend(solver.raw_problem) handler.offdesign.extend(system.assembled_problem()) handler.prune() problem.clear() problem.extend(handler.merged_problem(), copy=False) self.initial_values = problem.unknown_vector() self.is_design_unknown = dict.fromkeys(problem.unknowns, True)
[docs] def update_residues(self): for residue in self.problem.residues.values(): residue.update()
[docs] class MultipointSolverBuilder(BaseSolverBuilder): """System building strategy for solvers with one or more `RunSingleCase` sub-drivers. In this case, the actual mathematical problem is a combination of unknowns and equations declared at solver level, and of local design and off-design problems declared at case level. """ def __init__(self, solver: NonLinearSolver, points: list[RunSingleCase]): super().__init__(solver) self.points = points
[docs] def build_system(self): solver = self.solver system = self.solver.owner problem = self.problem handler = DesignProblemHandler(system) handler.design.extend(solver.raw_problem, equations=False) handler.offdesign.extend(solver.raw_problem, unknowns=False) handler.prune() # resolve aliasing # Create assembled problem problem.clear() problem.extend(handler.design) self.initial_values = handler.design.unknown_vector() if len(self.points) > 1: # If more than one `RunSingleCase` drivers are present, # use a name wrapper to distinguish dict entries for # residues and off-design unknowns in global problem get_key_wrapper = lambda case: (lambda key: f"{case.name}[{key}]") else: get_key_wrapper = lambda case: None # Set of unknown names to detect design/off-design conflicts design_unknown_names = set(handler.design.unknowns) for point in self.points: local = point.processed_problems # Check that local problem does not introduce design/off-design conflicts design_unknown_names.update(local.design.unknowns) common = design_unknown_names.intersection(local.offdesign.unknowns) if common: kind = "unknown" if len(common) > 1: names = ", ".join(map(repr, sorted(common))) names = f"({names}) are" kind += "s" else: names = f"{common.pop()!r} is" raise ValueError( f"{names} defined as design and off-design {kind} in {point.name!r}" ) # Enforce solver-level off-design problem to child case point.add_offdesign_problem(handler.offdesign) # Assemble case problems (design & off-design) wrapper = get_key_wrapper(point) problem.extend(local.design, copy=False, residue_wrapper=wrapper) problem.extend( local.offdesign, copy=False, unknown_wrapper=wrapper, residue_wrapper=wrapper, ) self.initial_values = numpy.append( self.initial_values, point.get_init(solver.force_init) ) self.is_design_unknown = dict.fromkeys(problem.unknowns, False) for name in design_unknown_names: self.is_design_unknown[name] = True
[docs] def update_residues(self): # Nothing to do, since residues are updated # by `RunSingleCase` sub-drivers pass
[docs] class EmptySolverRecorder(BaseSolverRecorder): """Specialization for solvers with no recorder."""
[docs] def record(self, context: str) -> None: pass
[docs] class StandaloneSolverRecorder(BaseSolverRecorder): """Solver recorder specialization for solvers with no sub-drivers.""" def __init__(self, solver: NonLinearSolver) -> None: self._solver = solver
[docs] def record(self, context: str) -> None: solver = self._solver if not context: context = solver.name solver.recorder.record_state(context, solver.status, solver.error_code)
[docs] class CompositeSolverRecorder(BaseSolverRecorder): """Solver recorder specialization for solvers with sub-drivers.""" def __init__(self, solver: NonLinearSolver) -> None: self._solver = solver
[docs] def record(self, context: str) -> None: solver = self._solver recorder = solver.recorder if context: context = f" ({context})" for child in solver.children.values(): child.run_once() recorder.record_state( f"{child.name}{context}", solver.status, solver.error_code )
[docs] class SolverRecorderCallback: """Callback functor to monitor solver residues""" def __init__(self, solver: NonLinearSolver) -> None: self.reset_iter() if solver.recorder is None: self.recorder = EmptySolverRecorder() elif solver.children: self.recorder = CompositeSolverRecorder(solver) else: self.recorder = StandaloneSolverRecorder(solver)
[docs] def reset_iter(self) -> None: """Reset inner iteration counter""" self._iter = 0
[docs] def record(self, context="") -> None: """Record owner system data using solver's recorder""" self.recorder.record(context)
def __call__(self, *args, **kwargs) -> None: """Callback function""" self.record(f"iter {self._iter}") self._iter += 1