Find the initial condition leading to a target end point

In this example, we combine a nonlinear solver and a time driver, to compute the initial condition leading to a target point at the end of the time simulation.

[1]:
from cosapp.base import System
import numpy as np


class PointMass(System):
    """Free fall of a point mass, with friction"""
    def setup(self):
        self.add_inward('mass', 1.2, desc='Mass')
        self.add_inward('k', 0.1, desc='Friction coefficient')
        self.add_inward('g', np.r_[0, 0, -9.81], desc='Uniform acceleration field')

        self.add_outward('a', np.zeros(3))

        self.add_transient('v', der='a')
        self.add_transient('x', der='v')

    def compute(self):
        self.a = self.g - (self.k / self.mass) * self.v


class Ballistics(PointMass):
    """System containing an initial condition, to be used as unknown"""
    def setup(self):
        super().setup()
        # Add inward `v0`, to be used as an unknown in a solver
        self.add_inward('v0', np.zeros(3), desc='Initial condition for v')

[2]:
from cosapp.drivers import RungeKutta, NonLinearSolver
from cosapp.recorders import DataFrameRecorder
from time_solutions import PointMassSolution

point = Ballistics('point')  # head system

# Add drivers
solver = point.add_driver(NonLinearSolver("solver"))
driver = solver.add_child(RungeKutta("RungeKutta", order=3))

# Define `v0` as unknown, so that the final value of `x` is a desired target point
# Note:
#   For driver `solver`, variable 'x' represents the position at the end of
#   each time simulation, since it is the parent of the `RungeKutta` time driver.
solver.add_unknown('v0').add_equation('x == [10, 0, 10]')

# Define a simulation scenario
driver.time_interval = (0, 2)
driver.dt = 0.1

x0 = [0, 0, 10]

driver.set_scenario(
    init = {
        'x': np.array(x0),
        'v': 'v0',  # quotes around 'v0' are required to make value an evaluable expression
    },
    values = {'mass': 1.5, 'k': 0.92},
)

# Add a recorder to capture time evolution in a dataframe
driver.add_recorder(DataFrameRecorder(includes=['x', 'v', 'a']), period=0.1)

point.v0 = np.ones(3)  # initial guess for the solver

point.run_drivers()  # solve

solution = PointMassSolution(point, point.v0, x0)

data = driver.recorder.export_data()
time = np.asarray(data['time'])
traj = {
    'exact': np.array(list(map(solution.x, time))),
    'num': np.asarray(data['x'].tolist()),
}

error = np.abs(traj['num'] - traj['exact'])

print(
    f"order = {driver.order}; dt = {driver.dt}",
    f"Max error on trajectory = {error.max():.2e}",
    f"End point: {traj['num'][-1].round(3)}",
    f"v0 = {point.v0.round(3)}",
    sep="\n",
)
vz = point.v0[2]
vh = np.linalg.norm(point.v0[:2])
angle = np.arctan2(vz, vh)
print(
    f"norm = {np.linalg.norm(point.v0):.2f} m/s",
    f"angle = {np.degrees(angle):.1f} deg",
    sep="; ",
)

order = 3; dt = 0.1
Max error on trajectory = 1.68e-04
End point: [10.  0. 10.]
v0 = [ 8.678  0.    11.767]
norm = 14.62 m/s; angle = 53.6 deg
[3]:
# Plot results
import plotly.graph_objs as go

options = {
    'num': dict(mode='markers', name='numerical', line=dict(color='red')),
    'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
}

traces = [
    go.Scatter(
        x = data[:, 0],
        y = data[:, 2],
        **options[name]
    ) for name, data in traj.items()
]

layout = go.Layout(
    title = "Trajectory",
    xaxis = dict(title="x"),
    yaxis = dict(
        title = "z",
        scaleanchor = "x",
        scaleratio = 1,
    )
)

go.Figure(data=traces, layout=layout)

Dynamic time step

Transient variables may enforce a time step limitation, through the use of two optional arguments max_time_step and max_abs_step. For example:

def setup(self):
    self.add_inward('v', numpy.zeros(3))
    self.add_outward('omega', 1.0)

    self.add_transient('x', der='v', max_time_step='0.1 / omega', max_abs_step=0.2)

As the name suggests, max_time_step indicates a maximum admissible time step at each iteration. It is an evaluable expression, so its value may change in time. Option max_abs_step, also an evaluable expression, limits the time step as to limit the evolution of the transient variable within one iteration, by linear extrapolation, based on the current time derivative.

Driver time step may or may not be specified. The actual time step used in simulations will be the minimum of:

  • the nominal time step of the driver (if any);

  • the maximum admissible time steps of all transient variables (infinity by default);

  • the maximum transient variable steps (infinity by default);

  • the time interval to the next recording time (if a recorder is set, with a specified period).

If the time step is ever found to be non-positive or infinite, a ValueError exception is raised.

Variable time step and recorders

If a recorder is created with a given recording period, data will be recorder at the prescribed period. If no recording period is specified (period=None, by default), the recorder will collect data at each time step.

Finally, time step values may be recorded, by specifying record_dt=True at driver creation, or setting attribute record_dt to True before driver execution. The list of time steps are ultimately given as a numpy array by property recorded_dt.

[4]:
from cosapp.base import System

class ExpOde(System):
    """System describing function y(t) = cst * exp(a * t), via ODE dy/dt = a * y"""
    def setup(self):
        self.add_inward('a', 1.0)
        self.add_inward('y', 1.0)
        self.add_transient('y', der='a * y', max_abs_step=1)

[5]:
from cosapp.drivers import RungeKutta
from cosapp.recorders import DataFrameRecorder
import numpy as np

ode = ExpOde('ode')
driver = ode.add_driver(RungeKutta(order=4, record_dt=True))
driver.time_interval = (0, 8)
driver.dt = 0.5

# Define a simulation scenario
y0 = 0.5

driver.set_scenario(
    init = {'y': y0},
    values = {'a': 0.4},
)

# Add a recorder to capture time evolution in a dataframe, at every time step (period=None)
driver.add_recorder(DataFrameRecorder(includes=['y']), period=None)

ode.run_drivers()

data = driver.recorder.export_data()  # recorded DataFrame
time = np.asarray(data['time'])

solution = {
    'exact': y0 * np.exp(ode.a * time),
    'num': np.asarray(data['y']),
}
error = solution['num'] / solution['exact'] - 1

print(
    f"order = {driver.order}",
    f"max relative error = {np.linalg.norm(error, np.inf):6.2e}",
    sep="; ",
)

# Plot results
import plotly.graph_objs as go

options = {
    'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
    'num': dict(mode='markers', name='numerical', line=dict(color='red')),
}

traces = [
    go.Scatter(
        x = time,
        y = data,
        **options[name]
    ) for name, data in solution.items()
]

layout = go.Layout(
    title = "Solution",
    xaxis = dict(title="time"),
    yaxis = dict(
        title = "y",
        scaleanchor = "x",
        scaleratio = 1,
    )
)

go.Figure(data=traces, layout=layout)
order = 4; max relative error = 2.97e-05

In the next two cells, we display the recorded time and y steps, respectively. At the beginning of the simulation, dy/dt is sufficiently small that the evolution of ode.y within time interval driver.dt (0.5) does not exceed the specified max_abs_step. As time goes on, dy/dt increases and eventually becomes too large to limit the y step within driver.dt; time step limitation kicks in, and the actual dt appears to be smaller than driver.dt.

Note that the last time step is imposed by the simulation end time driver.time_interval[1].

Despite time step limitation, the actual recorded y steps may appear slightly larger than max_abs_step. This is because at every time iteration, the new time step is calculated by linear extrapolation, based on the current time derivative of y. In this particular case, however, the second- and all higher-order time derivatives of y are positive, such that the actual step in y is always larger than the first-order approximation.

[6]:
# show recorded time steps
driver.recorded_dt
[6]:
array([0.5       , 0.5       , 0.5       , 0.5       , 0.5       ,
       0.5       , 0.5       , 0.5       , 0.5       , 0.5       ,
       0.5       , 0.5       , 0.45360206, 0.37833531, 0.32520226,
       0.2855356 , 0.25471687, 0.23004293, 0.07256497])
[7]:
# show ode.y steps from recorded values
np.diff(solution['num'])
[7]:
array([0.1107    , 0.13520898, 0.16514425, 0.20170718, 0.24636516,
       0.3009104 , 0.36753196, 0.44890354, 0.54829078, 0.66968236,
       0.81795004, 0.99904418, 1.09645609, 1.07962848, 1.06795234,
       1.05934335, 1.0527176 , 1.04745224, 0.35091246])

Time boundary condition from discrete tabulated data

Time boundary conditions are specified by argument values in set_scenario(). They can be constant values, evaluable expressions, or explicit time boundary conditions:

driver.set_scenario(
    init = {
        # initial conditions for transient variables
        'x': [0, 0, 0],
        'v': [1, 0, 2],
    },
    values = {
        'epsilon': 0.23,       # constant value
        'foo.bar': '2 * b',    # implicit time dependency, as `b` may change in time
        'z': 'cos(omega * t)', # explicit time dependency
    },
)

It is also possible to specify boundary conditions interpolated from tabulated data, using class cosapp.drivers.time.scenario.Interpolator:

from cosapp.drivers.time.scenario import Interpolator

driver.set_scenario(
    values = {
        'gamma': Interpolator(data, kind=Interpolator.Kind.Linear),
    }
)

where data is an array-like 2D collection of the kind [[t0, y0], .., [tn, yn]], or [[t0, .., tn], [y0, .., yn]]. Optional argument kind must be of type Interpolator.Kind, which is an enum with possible values

  • Linear (default)

  • CubicSpline

  • Pchip (Piecewise Cubic Hermite Interpolating Polynomial)

[8]:
from cosapp.base import System

class ScalarOde(System):
    """Simple scalar ODE dy/dt = f(t)"""
    def setup(self):
        self.add_inward('dy', 0.0)
        self.add_transient('y', der='dy')

[9]:
from cosapp.drivers import RungeKutta
from cosapp.drivers.time.scenario import Interpolator
from cosapp.recorders import DataFrameRecorder
import numpy as np

ode = ScalarOde('ode')
driver = ode.add_driver(RungeKutta(order=2))
driver.time_interval = (0, 2)
driver.dt = 0.05

# Define a simulation scenario
y0 = -0.5

table = [[0, 0], [1, 1], [10, -17]]   # F(t) = t if t < 1 else 3 - 2 * t

driver.set_scenario(
    init = {'y': y0},
    values = {
        'dy': Interpolator(table)
#       'dy': 't if t < 1 else 3 - 2 * t',  # equivalent explicit time dependency
    },
)

# Add a recorder to capture time evolution in a dataframe
driver.add_recorder(DataFrameRecorder(includes=['y', 'dy']), period=None)

ode.run_drivers()

data = driver.recorder.export_data()
time = np.asarray(data['time'])

t1, y1 = 1, y0 + 0.5  # constants for exact solution

solution = {
    'exact': np.where(
        time < t1,
        y0 + 0.5 * time**2,
        y1 + t1**2 + 3 * (time - t1) - time**2,
    ),
    'num': np.asarray(data['y']),
}

[10]:
# Plot results
import plotly.graph_objs as go

options = {
    'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
    'num': dict(mode='markers', name='numerical', line=dict(color='red')),
}

traces = [
    go.Scatter(
        x = time,
        y = values,
        **options.get(name, dict(name=name))
    ) for name, values in solution.items()
]

traces.append(
    go.Scatter(
        x = time,
        y = np.asarray(data['dy']),
        name = 'dy/dt',
        mode = 'lines',
        line = dict(color='black', width=1, dash='dot'),
    )
)

layout = go.Layout(
    title = "Solution",
    xaxis = dict(title="time"),
    hovermode = "x",
)

go.Figure(data=traces, layout=layout)