Time simulations - Advanced Features¶
Find the initial condition optimizing the outcome of a time simulation¶
In this example, we combine an optimizer and a time driver, to compute the initial condition maximizing a performance measured at the end of a time simulation.
[1]:
from cosapp.base import System
import numpy as np
class PointMass(System):
"""Free fall of a point mass, with friction"""
def setup(self):
self.add_inward('mass', 1.2, desc='Mass')
self.add_inward('k', 0.1, desc='Friction coefficient')
self.add_inward('g', np.r_[0, 0, -9.81], desc='Uniform acceleration field')
self.add_outward('a', np.zeros(3))
self.add_transient('v', der='a')
self.add_transient('x', der='v')
def compute(self):
self.a = self.g - (self.k / self.mass) * self.v
class Ballistics(PointMass):
"""System containing an initial condition, to be used as unknown"""
def setup(self):
super().setup()
# Add inward `v0`, to be used as an unknown in a solver
self.add_inward('v0', np.zeros(3), desc='Initial condition for v')
self.add_event('rebound', trigger="x[2] <= 0")
self.add_inward('cr', 1.0, desc="Rebound restitution coefficient", limits=(0, 1))
self.add_outward_modevar("n_rebounds", init=0, dtype=int)
def transition(self):
if self.rebound.present:
self.n_rebounds += 1
v = self.v
if abs(v[2]) < 1e-6:
v[2] = 0
else:
v[2] *= -self.cr
[2]:
from cosapp.drivers import RungeKutta, Optimizer
from cosapp.recorders import DataFrameRecorder
point = Ballistics('point') # head system
# Add drivers
optim = point.add_driver(Optimizer('optim', method='SLSQP', verbose=1))
driver = optim.add_child(RungeKutta("RungeKutta", order=3)) # subdriver of `optim`
# Define the optimization problem:
# Compute `v0`, so that the final value of `x[2]` is maximized
# Note:
# For driver `optim`, variable 'x' represents the position at the *end* of
# each time simulation, since it is the parent of the `RungeKutta` time driver.
optim.add_unknown('v0')
optim.set_maximum('x[0]')
optim.add_constraints('norm(v0) == 50')
# Define the time simulation scenario
driver.time_interval = (0, 20)
driver.dt = 0.1
x0 = np.zeros(3)
driver.set_scenario(
init = {
'x': np.array(x0),
'v': 'v0', # quotes around 'v0' are required to make value an evaluable expression
},
values = {'mass': 1.5, 'k': 0.92},
stop = point.rebound,
# stop = point.rebound.filter("n_rebounds == 2"), # try it!
)
# Add a recorder to capture time evolution in a dataframe
driver.add_recorder(DataFrameRecorder(includes=['x', 'v', 'a']), period=0.1)
# Set the initial guess for the optimizer
point.v0 = np.ones(3)
# Solve problem
point.run_drivers()
print(
"",
f"Flight time: {driver.time:.3f} s",
f"x[0] = {point.x[0].round(3)} m",
f"v0 = {point.v0.round(3)} m/s",
sep="\n",
)
vz = point.v0[2]
vh = np.linalg.norm(point.v0[:2])
angle = np.arctan2(vz, vh)
print(
f"norm = {np.linalg.norm(point.v0):.2f} m/s",
f"angle = {np.degrees(angle):.1f} deg",
sep="; ",
)
Optimization terminated successfully (Exit mode 0)
Current function value: -64.04486287819479
Iterations: 15
Function evaluations: 61
Gradient evaluations: 15
Flight time: 3.404 s
x[0] = 64.045 m
v0 = [44.84 0. 22.121] m/s
norm = 50.00 m/s; angle = 26.3 deg
Note: it is faster to remove the unnecessary time recorder during the optimization phase, and re-run a standalone time simulation on the optimized system, using a recorder. This way, only the optimized trajectory is recorded.
Something like:
# Purge existing drivers, and add a new one
point.drivers.clear()
driver = point.add_driver(RungeKutta("RungeKutta", order=3, time_interval=(0, 20), dt=0.1))
# Reuse previous simulation scenario
driver.set_scenario(...)
# Add a recorder to capture the time evolution in a dataframe
driver.add_recorder(DataFrameRecorder(includes=['x', 'v', 'a']), period=0.1)
point.run_drivers()
In this tutorial, we have left the recorder during the optimization phase, for simplicity.
[3]:
# Retrieve recorder data and plot results
from time_solutions import PointMassSolution
import plotly.graph_objs as go
solution = PointMassSolution(point, point.v0, x0)
data = driver.recorder.export_data()
time = np.asarray(data['time'])
traj = {
'exact': solution.x(time),
'num': np.asarray(data['x'].tolist()),
}
options = {
'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
'num': dict(mode='markers', name='numerical', line=dict(color='red')),
}
traces = [
go.Scatter(
x = data[:, 0],
y = data[:, 2],
**options[name],
) for name, data in traj.items()
if name in options
]
layout = go.Layout(
title = "Optimized trajectory",
xaxis = dict(title="x[0]"),
yaxis = dict(
title = "x[2]",
scaleanchor = "x",
scaleratio = 1,
)
)
go.Figure(data=traces, layout=layout)
Dynamic time step¶
Transient variables may enforce a time step limitation, through the use of two optional arguments max_time_step and max_abs_step. For example:
def setup(self):
self.add_inward('v', numpy.zeros(3))
self.add_outward('omega', 1.0)
self.add_transient('x', der='v', max_time_step='0.1 / omega', max_abs_step=0.2)
As the name suggests, max_time_step indicates a maximum admissible time step at each iteration. It is an evaluable expression, so its value may change in time. Option max_abs_step, also an evaluable expression, limits the time step as to limit the evolution of the transient variable within one iteration, by linear extrapolation, based on the current time derivative.
Driver time step may or may not be specified. The actual time step used in simulations will be the minimum of:
the nominal time step of the driver (if any);
the maximum admissible time steps of all transient variables (infinity by default);
the maximum transient variable steps (infinity by default);
the time interval to the next recording time (if a recorder is set, with a specified period).
If the time step is ever found to be non-positive or infinite, a ValueError exception is raised.
Variable time step and recorders¶
If a recorder is created with a given recording period, data will be recorder at the prescribed period. If no recording period is specified (period=None, by default), the recorder will collect data at each time step.
Finally, time step values may be recorded, by specifying record_dt=True at driver creation, or setting attribute record_dt to True before driver execution. The list of time steps are ultimately given as a numpy array by property recorded_dt.
[4]:
from cosapp.base import System
class ExpOde(System):
"""System describing function y(t) = cst * exp(a * t), via ODE dy/dt = a * y"""
def setup(self):
self.add_inward('a', 1.0)
self.add_inward('y', 1.0)
self.add_transient('y', der='a * y', max_abs_step=1)
[5]:
from cosapp.drivers import RungeKutta
from cosapp.recorders import DataFrameRecorder
import numpy as np
ode = ExpOde('ode')
driver = ode.add_driver(RungeKutta(order=4, record_dt=True))
driver.time_interval = (0, 8)
driver.dt = 0.5
# Define a simulation scenario
y0 = 0.5
driver.set_scenario(
init = {'y': y0},
values = {'a': 0.4},
)
# Add a recorder to capture time evolution in a dataframe, at every time step (period=None)
driver.add_recorder(DataFrameRecorder(includes=['y']), period=None)
ode.run_drivers()
data = driver.recorder.export_data() # recorded DataFrame
time = np.asarray(data['time'])
solution = {
'exact': y0 * np.exp(ode.a * time),
'num': np.asarray(data['y']),
}
error = solution['num'] / solution['exact'] - 1
print(
f"order = {driver.order}",
f"max relative error = {np.linalg.norm(error, np.inf):6.2e}",
sep="; ",
)
# Plot results
import plotly.graph_objs as go
options = {
'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
'num': dict(mode='markers', name='numerical', line=dict(color='red')),
}
traces = [
go.Scatter(
x = time,
y = data,
**options[name]
) for name, data in solution.items()
]
layout = go.Layout(
title = "Solution",
xaxis = dict(title="time"),
yaxis = dict(
title = "y",
scaleanchor = "x",
scaleratio = 1,
)
)
go.Figure(data=traces, layout=layout)
order = 4; max relative error = 2.97e-05
In the next two cells, we display the recorded time and y steps, respectively. At the beginning of the simulation, dy/dt is sufficiently small that the evolution of ode.y within time interval driver.dt (0.5) does not exceed the specified max_abs_step. As time goes on, dy/dt increases and eventually becomes too large to limit the y step within driver.dt; time step limitation kicks in, and the actual dt appears to be smaller than driver.dt.
Note that the last time step is imposed by the simulation end time driver.time_interval[1].
Despite time step limitation, the actual recorded y steps may appear slightly larger than max_abs_step. This is because at every time iteration, the new time step is calculated by linear extrapolation, based on the current time derivative of y. In this particular case, however, the second- and all higher-order time derivatives of y are positive, such that the actual step in y is always larger than the first-order approximation.
[6]:
# show recorded time steps
driver.recorded_dt
[6]:
array([0.5 , 0.5 , 0.5 , 0.5 , 0.5 ,
0.5 , 0.5 , 0.5 , 0.5 , 0.5 ,
0.5 , 0.5 , 0.45360206, 0.37833531, 0.32520226,
0.2855356 , 0.25471687, 0.23004293, 0.07256497])
[7]:
# show ode.y steps from recorded values
np.diff(solution['num'])
[7]:
array([0.1107 , 0.13520898, 0.16514425, 0.20170718, 0.24636516,
0.3009104 , 0.36753196, 0.44890354, 0.54829078, 0.66968236,
0.81795004, 0.99904418, 1.09645609, 1.07962848, 1.06795234,
1.05934335, 1.0527176 , 1.04745224, 0.35091246])
Time boundary condition from discrete tabulated data¶
Time boundary conditions are specified by argument values in set_scenario(). They can be constant values, evaluable expressions, or explicit time boundary conditions:
driver.set_scenario(
init = {
# initial conditions for transient variables
'x': [0, 0, 0],
'v': [1, 0, 2],
},
values = {
'epsilon': 0.23, # constant value
'foo.bar': '2 * b', # implicit time dependency, as `b` may change in time
'z': 'cos(omega * t)', # explicit time dependency
},
)
It is also possible to specify boundary conditions interpolated from tabulated data, using class cosapp.drivers.time.scenario.Interpolator:
from cosapp.drivers.time.scenario import Interpolator
driver.set_scenario(
values = {
'gamma': Interpolator(data, kind=Interpolator.Kind.Linear),
}
)
where data is an array-like 2D collection of the kind [[t0, y0], .., [tn, yn]], or [[t0, .., tn], [y0, .., yn]]. Optional argument kind must be of type Interpolator.Kind, which is an enum with possible values
Linear (default)
CubicSpline
Pchip (Piecewise Cubic Hermite Interpolating Polynomial)
[8]:
from cosapp.base import System
class ScalarOde(System):
"""Simple scalar ODE dy/dt = f(t)"""
def setup(self):
self.add_inward('dy', 0.0)
self.add_transient('y', der='dy')
[9]:
from cosapp.drivers import RungeKutta
from cosapp.drivers.time.scenario import Interpolator
from cosapp.recorders import DataFrameRecorder
import numpy as np
ode = ScalarOde('ode')
driver = ode.add_driver(RungeKutta(order=2))
driver.time_interval = (0, 2)
driver.dt = 0.05
# Define a simulation scenario
y0 = -0.5
table = [[0, 0], [1, 1], [10, -17]] # F(t) = t if t < 1 else 3 - 2 * t
driver.set_scenario(
init = {'y': y0},
values = {
'dy': Interpolator(table)
# 'dy': 't if t < 1 else 3 - 2 * t', # equivalent explicit time dependency
},
)
# Add a recorder to capture time evolution in a dataframe
driver.add_recorder(DataFrameRecorder(includes=['y', 'dy']), period=None)
ode.run_drivers()
data = driver.recorder.export_data()
time = np.asarray(data['time'])
t1, y1 = 1, y0 + 0.5 # constants for exact solution
solution = {
'exact': np.where(
time < t1,
y0 + 0.5 * time**2,
y1 + t1**2 + 3 * (time - t1) - time**2,
),
'num': np.asarray(data['y']),
}
[10]:
# Plot results
import plotly.graph_objs as go
options = {
'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
'num': dict(mode='markers', name='numerical', line=dict(color='red')),
}
traces = [
go.Scatter(
x = time,
y = values,
**options.get(name, dict(name=name))
) for name, values in solution.items()
]
traces.append(
go.Scatter(
x = time,
y = np.asarray(data['dy']),
name = 'dy/dt',
mode = 'lines',
line = dict(color='black', width=1, dash='dot'),
)
)
layout = go.Layout(
title = "Solution",
xaxis = dict(title="time"),
hovermode = "x",
)
go.Figure(data=traces, layout=layout)