Source code for cosapp.drivers.montecarlo

from __future__ import annotations

import numpy
import logging
from collections import OrderedDict
from typing import Any, Iterable, Dict, List, Optional, Union, NamedTuple

from scipy.stats import qmc

from cosapp.core.variableref import VariableReference
from cosapp.ports.port import BasePort
from cosapp.drivers.abstractsetofcases import AbstractSetOfCases, System
from cosapp.drivers.abstractsolver import AbstractSolver
from cosapp.utils.distributions import Distribution
from cosapp.utils.helpers import check_arg
from cosapp.systems.system import SystemConnector

logger = logging.getLogger(__name__)


[docs] class RandomVariable(NamedTuple): variable: VariableReference distribution: Distribution connector: Optional[SystemConnector] = None
[docs] def add_noise(self, quantile=None) -> float: delta = self.draw(quantile) self.set_perturbation(delta) return delta
[docs] def draw(self, quantile=None) -> float: return self.distribution.draw(quantile)
[docs] def set_perturbation(self, value) -> None: connector = self.connector if connector is None: self.variable.value += value else: connector.set_perturbation(self.variable.key, value)
# TODO linearization does not support multipoint cases # TODO Does not work for vector variables (at least partially connected one) # TODO Does it work if a subsystem mutates to higher/lower fidelity # TODO We don't forbid using an unknown
[docs] class MonteCarlo(AbstractSetOfCases): """ This driver execute a MonteCarlo simulation on its system. """ __slots__ = ( 'draws', 'linear', 'random_variables', 'responses', 'solver', 'reference_case_solution', 'X0', 'Y0', 'A', 'perturbations' ) def __init__(self, name: str, owner: Optional[System] = None, **options ) -> None: """Initialize driver Parameters ---------- name: str, optional Name of the `Driver`. owner: System, optional :py:class:`~cosapp.systems.system.System` to which this driver belong; defaults to `None`. **kwargs: Additional keywords arguments forwarded to base class. """ super().__init__(name, owner, **options) self.draws = 200 # type: int # desc="Number of cases performed for Montecarlo calculations." self.linear = False # type: bool # desc="True for linearisation of system before Montecarlo calculation. Default False." self.random_variables: Dict[str, RandomVariable] = OrderedDict() # desc="Random variables in the system." self.responses = list() # type: List[str] # We need a list as set is not ordered # desc="Variable names to study through Monte Carlo calculations." self.solver = None # type: Optional[AbstractSolver] # desc="Solver acting. Used for re-init of case." self.reference_case_solution = dict() # type: Dict[str, float] self.X0 = None # type: Optional[numpy.ndarray] # desc="Vector of imposed disturbed values" self.Y0 = None # type: Optional[numpy.ndarray] # desc="Vector of output evaluated disturbed values." self.A = None # type: Optional[numpy.ndarray] # desc="Matrice of influence of imposed disturbed values on results." self.perturbations = None # type: Optional[numpy.ndarray] # desc="Array of perturbations applied on the system." @classmethod def _slots_not_jsonified(cls) -> tuple[str]: """Returns slots that must not be JSONified.""" return (*super()._slots_not_jsonified(), "random_variables")
[docs] def add_random_variable(self, names: Union[str, Iterable[str]]) -> None: """Add variable to be perturbated. The perturbation distribution is defined by the variable distribution details. .. from cosapp.core.numerics.distribution import Normal port.get_details('my_variable').distribution = Normal(worst=0.0, best=5.0) Parameters ---------- names : Union[str, Iterable[str]] List of variables to be perturbated """ # TODO it should be possible to set the distribution directly name2variable = self.owner.name2variable def add_unique_input_var(name: str): self.check_owner_attr(name) ref = name2variable[name] port = ref.mapping if not isinstance(port, BasePort): raise TypeError(f"{name!r} is not a variable.") if not port.is_input: raise TypeError(f"{name!r} is not an input variable.") distribution = port.get_details(ref.key).distribution if distribution is None: raise ValueError( f"No distribution specified for {name!r}" ) # Test if the variable is connected connection = None if port.owner.parent is not None: connectors = port.owner.parent.all_connectors() for connector in filter(lambda c: c.sink is port, connectors): if ref.key in connector.sink_variables(): connection = connector break self.random_variables[name] = RandomVariable(ref, distribution, connection) check_arg(names, 'names', (str, set, list)) if isinstance(names, str): add_unique_input_var(names) else: for name in names: check_arg(name, f"{name} in 'names'", str) add_unique_input_var(name)
[docs] def add_response(self, name: Union[str, Iterable[str]]) -> None: """Add a variable for which the statistical response will be calculated. Parameters ---------- name : Union[str, Iterable[str]] List of variable names to add """ def add_unique_response_var(name: str): self.check_owner_attr(name) if name not in self.responses: self.responses.append(name) check_arg(name, 'name', (str, set, list)) if isinstance(name, str): add_unique_response_var(name) else: for n in name: check_arg(n, f"{n} in 'name'", str) add_unique_response_var(n)
def _build_cases(self) -> None: """Build the list of cases to run during execution """ sobol = qmc.Sobol(d=len(self.random_variables), scramble=False) sobol.random() self.cases = sobol.random(self.draws) def _reset_transients(self): """Reattribute initial transient values.""" for variable, value in self._transients_variables.items(): self._owner[variable] = value def _precompute(self): """Save reference and build cases.""" super()._precompute() self.run_children() self.solver = None for child in self.children.values(): if isinstance(child, AbstractSolver): self.solver = child self.reference_case_solution = child.save_solution() break if self.linear: # precompute linear system n_input = len(self.random_variables) n_output = len(self.responses) if n_output == 0: raise ValueError("You need to define response variables to use MonteCarlo linear mode.") self.X0 = numpy.zeros(n_input) self.Y0 = numpy.zeros(n_output) self.A = numpy.zeros((n_output, n_input)) # reference for influence matrix computation through center differentiation scheme for i, name in enumerate(self.random_variables): self.X0[i] = self.owner[name] for j, name in enumerate(self.responses): self.Y0[j] = self.owner[name] variation = 0.5 * (numpy.max(self.cases, axis=0) - numpy.min(self.cases, axis=0)) for i, input_name in enumerate(self.random_variables): self.owner[input_name] = self.X0[i] + variation[i] self.run_children() for j, response_name in enumerate(self.responses): self.A[j, i] = 0.5 * (self.owner[response_name] - self.Y0[j]) / variation[i] self.owner[input_name] = self.X0[i] - variation[i] self.run_children() for j, response_name in enumerate(self.responses): self.A[j, i] -= 0.5 * (self.owner[response_name] - self.Y0[j]) / variation[i] # Restore system value self.owner[input_name] = self.X0[i] for j, name in enumerate(self.responses): self.Y0[j] = self.owner[name] self._reset_transients() def _precase(self, case_idx, case): """Hook to be called before running each case. Parameters ---------- case_idx : int Index of the case case : Any Parameters for this case """ super()._precase(case_idx, case) # Set perturbation self.perturbations = numpy.zeros(len(self.random_variables)) for i, variable in enumerate(self.random_variables.values()): perturbation = variable.add_noise(case[i]) self.perturbations[i] = perturbation if len(self.reference_case_solution) > 0: self.solver.load_solution(self.reference_case_solution) @staticmethod def _compute_sequential(mc: MonteCarlo) -> None: """Contains the customized `Module` calculation, to execute after children.""" for case_idx, case in enumerate(mc.cases): if len(case) > 0: mc._precase(case_idx, case) if mc.linear: mc.__run_linear() else: mc.run_children() mc._postcase(case_idx, case) def __run_linear(self) -> None: """Approximate MonteCarlo simulation using partial derivatives matrix.""" # TODO this is not great as we set variables in the system breaking its consistency. if len(self.responses) > 0: X = numpy.zeros(len(self.random_variables)) for i, name in enumerate(self.random_variables): self.X0[i] = self.owner[name] Y = self.Y0 + numpy.matmul(self.A, X - self.X0) for j, name in enumerate(self.responses): self.owner[name] = Y[j] def _postcase(self, case_idx: int, case: Any): """Hook to be called before running each case. Parameters ---------- case_idx : int Index of the case case : Any Parameters for this case """ # Store the results super()._postcase(case_idx, case) # Remove the perturbation for variable, delta in zip(self.random_variables.values(), self.perturbations): if variable.connector is None: variable.set_perturbation(-delta) else: variable.connector.clear_noise() self._reset_transients()