CoSApp tutorials: Multiprocessing
Multiprocessing
¶
As of version 1.0, certain drivers offer multiprocessing capabilities:
LinearDoEandMonteCarloNonLinearSolver
The first two consist in simulating the system of interest on an arbitrary number of independent parameter sets. For the nonlinear solver, multiprocessing consists in computing the columns of the Jacobian matrix in parallel.
Multiprocessing relies on module cosapp.core.execution. In particular, parallel computations require an execution policy, determining the size of the worker pool and the type of computation (multi-processing or multi-threading). Currently, though, multi-threading is not supported.
The syntax is:
from cosapp.core.execution import ExecutionPolicy, ExecutionType
execution_policy = ExecutionPolicy(
workers_count=8, # number of parallel processes
execution_type=ExecutionType.MULTI_PROCESSING,
)
Examples are given below for the two types of drivers that support multiprocessing (LinearDoE/MonteCarlo and NonLinearSolver).
DoE and Monte-Carlo simulations¶
[1]:
from cosapp.base import System
from cosapp.drivers import MonteCarlo, Optimizer
from cosapp.recorders import DataFrameRecorder
from cosapp.core.execution import ExecutionPolicy, ExecutionType
from cosapp.utils.distributions import Normal
import math
class SimpleSystem(System):
def setup(self, n=2):
self.add_property("n", n)
self.add_inward("x", 1.0)
self.add_inward("a", 1.0)
self.add_outward("y", 0.0)
def compute(self):
x, n = self.x, self.n
self.y = x**n * math.exp(-x / self.a)
system = SimpleSystem("system", n=4)
montecarlo = system.add_driver(
MonteCarlo(
"montecarlo",
execution_policy=ExecutionPolicy(
workers_count=2, # number of parallel processes
execution_type=ExecutionType.MULTI_PROCESSING,
),
)
)
# Add optimization problem using a sub-driver
optim = montecarlo.add_child(Optimizer("optim"))
optim.set_maximum("y")
optim.add_unknown("x")
montecarlo.add_recorder(DataFrameRecorder(excludes=["n"]))
# Define `system.a` as random variable
montecarlo.add_random_variable("a", Normal(-0.1, 0.1))
montecarlo.draws = 10
system.a = 0.5 # nominal value
system.x = 1.0 # initialization
system.run_drivers()
df = montecarlo.recorder.export_data()
df = df.drop(["Reference", "Section", "Status", "Error code"], axis=1)
df["x (exact)"] = df["a"] * system.n
df["x (error)"] = df["x"] / df["x (exact)"] - 1.0
df
[1]:
| a | x | y | x (exact) | x (error) | |
|---|---|---|---|---|---|
| 0 | 0.500000 | 2.000000 | 0.293050 | 2.000000 | 5.128209e-09 |
| 1 | 0.565078 | 2.260312 | 0.478074 | 2.260312 | -3.946923e-09 |
| 2 | 0.434922 | 1.739688 | 0.167767 | 1.739688 | 3.908075e-09 |
| 3 | 0.469256 | 1.877025 | 0.227354 | 1.877025 | -1.215091e-08 |
| 4 | 0.610991 | 2.443965 | 0.653434 | 2.443965 | -7.050479e-09 |
| 5 | 0.530744 | 2.122975 | 0.372050 | 2.122975 | -1.765528e-10 |
| 6 | 0.389009 | 1.556035 | 0.107374 | 1.556035 | 1.388482e-08 |
| 7 | 0.414404 | 1.657616 | 0.138279 | 1.657616 | 2.287183e-09 |
| 8 | 0.547159 | 2.188638 | 0.420260 | 2.188638 | -2.478977e-09 |
| 9 | 0.648019 | 2.592077 | 0.826825 | 2.592077 | 1.377789e-08 |
Nonlinear solver¶
Use case¶
We reuse the simple free-fall model introduced in the tutorial on time simulations.
[2]:
from cosapp.base import System
import numpy as np
class PointMass(System):
"""Free fall of a point mass, with simple k * v friction model"""
def setup(self):
self.add_inward("mass", 1.2, desc="Mass")
self.add_inward("k", 0.1, desc="Friction coefficient")
self.add_inward("g", np.r_[0, 0, -9.81], desc="Uniform acceleration field")
self.add_outward("a", np.zeros(3))
self.add_transient("v", der="a")
self.add_transient("x", der="v")
def compute(self):
self.a = self.g - (self.k / self.mass) * self.v
class Ballistics(PointMass):
"""System containing an initial condition, to be used as unknown"""
def setup(self):
super().setup()
# Add inward `v0`, to be used as an unknown in a solver
self.add_inward("v0", np.zeros(3), desc="Initial condition for v")
Design problem¶
We compute the initial velocity that leads to a target point, with an imposed flight time of two seconds.
In this example, we require an evaluation of the Jacobian matrix by forward finite difference, using class FfdJacobianEvaluation, together with an execution policy using 3 processes (as the unknown is a three-dimensional vector).
[3]:
from cosapp.drivers import RungeKutta, NonLinearSolver
from cosapp.recorders import DataFrameRecorder
from cosapp.core.execution import ExecutionPolicy, ExecutionType
from cosapp.core.numerics.solve import FfdJacobianEvaluation
from time_solutions import PointMassSolution
system = Ballistics("point") # head system
# Add drivers
solver = system.add_driver(
NonLinearSolver(
"solver",
jac=FfdJacobianEvaluation(
execution_policy=ExecutionPolicy(
workers_count=3, # number of parallel processes
execution_type=ExecutionType.MULTI_PROCESSING,
),
),
)
)
rk = solver.add_child(RungeKutta("rk", order=3)) # subdriver of `solver`
# Define the design problem:
# Compute `v0` that leads to a target end point `x`
# Note:
# For driver `solver`, variable "x" represents the position at the *end* of
# each time simulation, since it is the parent of the `RungeKutta` time driver.
solver.add_unknown("v0").add_equation("x == [10, 0, 2]")
# Define the time simulation scenario
rk.time_interval = (0, 2)
rk.dt = 0.05
x0 = np.zeros(3)
rk.set_scenario(
init = {
"x": x0,
"v": "v0", # quotes are required to make value an evaluable expression
},
values = {"mass": 1.5, "k": 0.92},
)
# Add a recorder to capture time evolution in a dataframe
rk.add_recorder(DataFrameRecorder(includes=["x", "v", "a"]), period=0.1)
# Set the initial guess for the solver
system.v0 = np.ones(3)
# Solve problem
system.run_drivers()
# Get analytical solution with computed v0
solution = PointMassSolution(system, system.v0, x0)
# Retrieve recorded data
data = rk.recorder.export_data()
time = np.asarray(data["time"])
traj = {
"exact": solution.x(time),
"num": np.asarray(data["x"].tolist()),
}
error = np.abs(traj["num"] - traj["exact"])
print(
f"order = {rk.order}; dt = {rk.dt}",
f"Max error on trajectory = {error.max():.2e}",
f"End point: {traj['num'][-1].round(3)}",
f"v0 = {system.v0.round(3)}",
sep="\n",
)
vz = system.v0[2]
vh = np.linalg.norm(system.v0[:2])
angle = np.arctan2(vz, vh)
print(
f"norm = {np.linalg.norm(system.v0):.2f} m/s",
f"angle = {np.degrees(angle):.1f} deg",
sep="; ",
)
order = 3; dt = 0.05
Max error on trajectory = 2.18e-05
End point: [10. -0. 2.]
v0 = [ 8.678 -0. 13.503]
norm = 16.05 m/s; angle = 57.3 deg
[4]:
# Plot results
import plotly.graph_objs as go
options = {
"num": dict(mode="markers", name="numerical", line=dict(color="red")),
"exact": dict(mode="lines", name="analytical", line=dict(color="blue")),
}
traces = [
go.Scatter(
x = data[:, 0],
y = data[:, 2],
**options[name]
) for name, data in traj.items()
]
layout = go.Layout(
title = "Trajectory",
xaxis = dict(title="x[0]"),
yaxis = dict(
title = "x[2]",
scaleanchor = "x",
scaleratio = 1,
)
)
go.Figure(data=traces, layout=layout)