Monte-Carlo simulations

The propagation of uncertainties can be investigated using driver MonteCarlo. This driver will execute its owner system by varying variables of interest, assuming they follow specific distributions.

Generalities

Declare random variables

Random variables are declared with method add_random_variable, with different possible signatures.

from cosapp.drivers import MonteCarlo
from cosapp.utils.distributions import Normal, Uniform

montecarlo = system.add_driver(MonteCarlo("montecarlo"))

# Add variables one by one, with specific distributions
montecarlo.add_random_variable("x", distribution=Uniform(...))
montecarlo.add_random_variable("port.v", distribution=Normal(...))

# Add several variables with the same distribution
montecarlo.add_random_variable(["x", "port.v"], distribution=Normal(...))

# Add distribution specifications as a dictionary
montecarlo.add_random_variable({
    "x": Uniform(...),
    "port.v": Normal(...),
})

Except for the last case (the dictionary syntax), argument distribution is optional. If it is not provided, the driver will use the distribution attached to the variable (if any), either at system setup, or interactively, using either method System.get_variable or Port.get_variable:

from cosapp.base import System, Port
from cosapp.drivers import MonteCarlo
from cosapp.recorders import DataFrameRecorder
from cosapp.utils.distributions import Normal, Uniform

class UvPort(Port):
    def setup(self):
        self.add_variable("u", 0.0)
        self.add_variable("v", 0.0)

class MyModel(System):
    def setup(self):
        self.add_input(UvPort, "p_in")
        self.add_inward("x", 0.5, distribution=Uniform(...))

        self.add_outward("y", 0.0)

    def compute(self):
        self.y = ...


model = MyModel("model")

# Get variable object behind `model.p_in.v`
v = model.get_variable("p_in.v")
v.distribution = Normal(...)

mc = model.add_driver(MonteCarlo("mc"))

mc.add_random_variable("x")       # will assume a uniform distribution
mc.add_random_variable("port.v")  # will assume a normal distribution

mc.add_recorder(DataFrameRecorder())
mc.draws = 100  # number of trial runs

model.run_drivers()

Perturbation of random variables

At each Monte-Carlo execution, a perturbation is applied to the specified random variables.

For free (unconnected) input variables, the perturbation is added to the nominal values. For connected input variables, the perturbation is added onto the value passed by the connector.

Example

For this example, we will reuse the circuit defined in the multi-points design tutorial, and will investigate how uncertainties on resistance R3 affect resistances R1 and R2, computed as the solution of a multi-point design problem.

simple-circuit

[1]:
import logging

logging.getLogger().setLevel(logging.INFO)
[2]:
from __future__ import annotations
from cosapp.base import Port, System
from abc import abstractmethod


class Voltage(Port):
    def setup(self):
        self.add_variable("V", unit="V")


class Intensity(Port):
    def setup(self):
        self.add_variable("I", unit="A")


class Dipole(System):
    """Abstract directional dipole model computing
    current from end values of electric potential.
    """
    def setup(self):
        self.add_input(Voltage, "V_in")
        self.add_input(Voltage, "V_out")

        self.add_output(Intensity, "I")
        self.add_outward("deltaV", unit="V")

    def compute(self):
        self.deltaV = self.V_in.V - self.V_out.V
        self.compute_I()

    @abstractmethod
    def compute_I(self) -> None:
        pass


class Resistor(Dipole):

    def setup(self, R=1.0):
        super().setup()
        self.add_inward("R", R, desc="Resistance in Ohms")

    def compute_I(self):
        self.I.I = self.deltaV / self.R


class Node(System):
    """Electric node model with `n_in` incoming and `n_out` outgoing currents.
    """
    def setup(self, n_in=1, n_out=1):
        self.add_property("n_in", max(1, int(n_in)))
        self.add_property("n_out", max(1, int(n_out)))

        incoming = tuple(
            self.add_input(Intensity, f"I_in{i}")
            for i in range(self.n_in)
        )
        outgoing = tuple(
            self.add_input(Intensity, f"I_out{i}")
            for i in range(self.n_out)
        )
        self.add_property("incoming_currents", incoming)
        self.add_property("outgoing_currents", outgoing)

        self.add_inward("V", 1.0, unit="V")
        self.add_outward("sum_I_in", 0., unit="A", desc="Sum of all incoming currents")
        self.add_outward("sum_I_out", 0., unit="A", desc="Sum of all outgoing currents")

        self.add_unknown("V")
        self.add_equation("sum_I_in == sum_I_out", name="current balance")

    def compute(self):
        self.sum_I_in = sum(current.I for current in self.incoming_currents)
        self.sum_I_out = sum(current.I for current in self.outgoing_currents)

    @classmethod
    def make(cls, name, parent, incoming: list[Dipole]=[], outgoing: list[Dipole]=[], pulling=None) -> Node:
        """Factory creating new node within `parent`, with
        appropriate connections with incoming and outgoing dipoles.
        """
        node = cls(name, n_in=len(incoming), n_out=len(outgoing))
        parent.add_child(node, pulling=pulling)

        for dipole, current in zip(incoming, node.incoming_currents):
            # print(dipole.name, type(dipole), dipole.I)
            parent.connect(dipole.I, current)
            parent.connect(dipole.V_out, node.inwards, "V")

        for dipole, current in zip(outgoing, node.outgoing_currents):
            parent.connect(dipole.I, current)
            parent.connect(dipole.V_in, node.inwards, "V")

        return node


class Source(System):

    def setup(self, I=0.1):
        self.add_inward("I", I, unit="A")
        self.add_output(Intensity, "I_out", {"I": I})

    def compute(self):
        self.I_out.I = self.I


class Ground(System):

    def setup(self, V=0.):
        self.add_inward("V", V, unit="V")
        self.add_output(Voltage, "V_out", {"V": V})

    def compute(self):
        self.V_out.V = self.V


class Circuit(System):

    def setup(self):
        R1 = self.add_child(Resistor("R1", R=1.00e3), pulling={"V_out": "Vg"})
        R2 = self.add_child(Resistor("R2", R=0.50e3))
        R3 = self.add_child(Resistor("R3", R=0.25e3), pulling={"V_out": "Vg"})

        # Define nodes
        Node.make("n1",
            parent=self,
            pulling={"I_in0": "I_in"},
            outgoing=[R1, R2],
        )
        Node.make("n2",
            parent=self,
            incoming=[R2],
            outgoing=[R3],
        )
[3]:
class AssembledModel(System):

    def setup(self, I=0.1):
        self.add_child(Source("source", I=I))
        self.add_child(Ground("ground", V=0.0))
        self.add_child(Circuit("circuit"))

        self.connect(self.source.I_out, self.circuit.I_in)
        self.connect(self.ground.V_out, self.circuit.Vg)

In the following cell, function set_design_problem sets up a multi-point design problem. Function run_analysis will carry out the execution of the Monte-Carlo simulation, and return recorded data as a pandas dataframe.

[4]:
from cosapp.drivers import NonLinearSolver, RunSingleCase, MonteCarlo
from cosapp.recorders import DataFrameRecorder
from cosapp.utils.distributions import Normal
from cosapp.utils import set_state


def set_design_problem(solver: NonLinearSolver):
    """Add multi-point design problem to `solver`.
    """
    model = solver.owner
    if not isinstance(model, AssembledModel):
        raise TypeError(
            "Argument `solver` must be defined on a system of type `AssembledModel`."
        )
    solver.add_unknown(
        ["circuit.R1.R", "circuit.R2.R"],
        max_rel_step=0.5,
    )
    # Add design points 1 & 2
    point1 = solver.add_child(RunSingleCase("point1"))
    point2 = solver.add_child(RunSingleCase("point2"))
    init = {
        "circuit.R1.R": model.circuit.R1.R,
        "circuit.R2.R": model.circuit.R2.R,
    }
    point1.set_init(init)
    point2.set_init(init)

    point1.set_values({
        "source.I": 0.08,
        "ground.V": 0,
    })
    point1.design.add_equation("circuit.n2.V == 8")

    point2.set_values({
        "source.I": 0.15,
        "ground.V": 0,
    })
    point2.design.add_equation("circuit.n1.V == 50")


def run_analysis(
    model: AssembledModel,
    distribution=Normal(best=100, worst=-50),
    draws=1000,
    linear=False,
    ref_state=None,
):
    """Add MonteCarlo driver and investigate the effect of R3 on R1 and R2.
    """
    if ref_state:
        set_state(model, ref_state)

    # Montecarlo
    model.drivers.clear()
    montecarlo = model.add_driver(MonteCarlo("montecarlo"))
    montecarlo.draws = draws
    montecarlo.linear = linear
    montecarlo.add_recorder(
        DataFrameRecorder(includes="circuit.R?.R"),
    )
    # Solve design problem at each draw
    montecarlo.add_child(NonLinearSolver("solver"))
    set_design_problem(montecarlo.solver)

    # Define uncertainties on R3 around current value
    montecarlo.add_random_variable("circuit.R3.R", distribution)
    montecarlo.add_response(["circuit.R1.R", "circuit.R2.R"])

    # Computation
    try:
        model.run_drivers()
    except:
        pass  # silent fail

    return montecarlo.recorder.export_data()

Solve multi-point design problem at nominal value R3 = 300 Ohm.

[5]:
from cosapp.drivers import NonLinearSolver
from cosapp.utils import get_state

model = AssembledModel("model")
model.source.I = 0.1
model.circuit.R1.R = 1.0e3
model.circuit.R2.R = 0.5e3
model.circuit.R3.R = 0.3e3

# Solve off-design problem
solver = model.add_driver(NonLinearSolver("solver"))
model.run_drivers()
print(
    f"{model.circuit.R1.R = }",
    f"{model.circuit.R2.R = }",
    f"{model.circuit.R3.R = }",
    "\nIntrinsic problem:",
    solver.problem,
    sep="\n",
)

# Solve design problem
set_design_problem(solver)
model.run_drivers()

# Store nominal state
nominal = get_state(model)

print(
    "\nDesign problem:",
    solver.problem,
    sep="\n",
)
model.circuit.R1.R = 1000.0
model.circuit.R2.R = 500.0
model.circuit.R3.R = 300.0

Intrinsic problem:
Unknowns [2]
  circuit.n1_V = 44.44444444444444
  circuit.n2_V = 16.666666666666668
Equations [2]
  circuit.n1: current balance := 0.0
  circuit.n2: current balance := -6.938893903907228e-18

Design problem:
Unknowns [6]
  circuit.R1.R = 499.99999999635264
  circuit.R2.R = 700.0000000003398
  point1[circuit.n1_V] = 26.66666666647135
  point1[circuit.n2_V] = 8.0
  point2[circuit.n1_V] = 50.0
  point2[circuit.n2_V] = 14.999999999745006
Equations [6]
  point1[circuit.n2.V == 8] := 0.0
  point1[circuit.n1: current balance] := 2.935568454986992e-13
  point1[circuit.n2: current balance] := -2.919713082416564e-13
  point2[circuit.n1.V == 50] := 0.0
  point2[circuit.n1: current balance] := -1.0694778396214133e-12
  point2[circuit.n2: current balance] := 1.1899856100505701e-12

Investigate the effect of uncertainties on R3

[6]:
results = run_analysis(model, ref_state=nominal, draws=1000)

print(f"{len(results) = }")
len(results) = 1000
[7]:
import plotly.graph_objs as go
from plotly.subplots import make_subplots

# List of fields to plot
fields = [
    col for col in results.columns
    if col.startswith("circuit.R")
]
# Create figure object
fig = make_subplots(
    rows=len(fields),
    cols=1,
    shared_xaxes="all",
    shared_yaxes="all",
    horizontal_spacing=0.01,
    vertical_spacing=0.02,
)
bin_size = 20

for i, field in enumerate(fields):
    row = i + 1
    col = 1
    # Add plot
    fig.add_trace(
        go.Histogram(
            x=results[field],
            histnorm="percent",
            name=field,
            xbins=dict(size=bin_size),
        ),
        row=row,
        col=col,
    )
    fig.get_subplot(row, col).xaxis.title = None
    fig.get_subplot(row, col).yaxis.title = "Probability [%]"

fig.get_subplot(3, 1).xaxis.title = "Resistance [Ohm]"
fig.update_layout(
    title=f"Probability distributions ({bin_size} Ohm bins)",
    showlegend=True,
    hovermode="x",
    height=800,
)
fig.show()