Time simulations

Declare transient variables in a system

CoSApp allows one to simulate the behaviour of systems with time-dependent variables, through the use of time drivers.

Transient variables, defined by their time derivative, are declared with method add_transient at system setup. The syntax to declare a differential relation of the kind \(\frac{dx}{dt} = v\) is:

self.add_transient('x', der='v')  # in system `setup`

Variable x will be regarded as an unknown by time drivers, much like unknowns declared with add_unknown are picked up by driver NonLinearSolver. This implies that x must be an input variable (either an inward or an input port variable).

If x does not exist when add_transient('x', der=...) is declared, a new inward is created on the fly; its type is deduced from that of v (in particular, if v is an array, x will be an array of the same shape as v). If x already exists in the system, CoSApp checks that x is indeed an input variable, and that its type is compatible with v.

The time derivative of a transient variable can be any evaluable expression, not just another variable. For instance, one can declare:

self.add_transient('x', der='0.5 * omega / y')

as long as omega and y exist in the system.

Time drivers

Two explicit time integrators are currently available in module cosapp.drivers:

  • EulerExplicit;

  • RungeKutta, of order 2, 3 or 4, specified at driver construction with optional argument order (default is 2).

The time interval and time step of simulations can be specified with optional arguments time_interval and dt, both of which are also settable attributes of the driver.

Sub-drivers of a time driver are called at every time step during time simulations. In particular, for systems with loops, it is recommended to add a NonLinearSolver as a child of the time driver, to resolve cyclic dependencies at all times.

Simulation scenarios

Time simulations require initial and boundary conditions. Those can be specified using method set_scenario of time drivers, with arguments init and values for initial and boundary conditions, respectively. Each of these arguments are dictionaries of the kind {varname: value}, where value can be a constant, or any evaluable expression. In particular, explicit time boundary conditions can be specified using expressions involving t, or time. Finally, a stop criterion can be added with optional argument stop, following the same rules as event triggers (see event tutorial for details).

For example:

import numpy
from cosapp.drivers import RungeKutta

s = MyGreatSystem('s')  # with transients `x` and `sub.f`, say

driver = s.add_driver(RungeKutta(order=4, time_interval=[0, 1], dt=0.01))

driver.set_scenario(
    init = {
        'x': numpy.zeros(3),
        'sub.f': 1.0,
    },
    values = {
        'a': 0.25,
        'omega': 'pi / 6',
        'force': 'a * sin(omega * t)',
    },
    stop = 'x[0] > f**2',
)

Examples

[1]:
from cosapp.base import System
import numpy as np

class PointMass(System):
    """Free fall of a point mass, with friction"""
    def setup(self):
        self.add_inward('mass', 1.2, desc='Mass')
        self.add_inward('k', 0.1, desc='Friction coefficient')
        self.add_inward('g', np.r_[0, 0, -9.81], desc='Uniform acceleration field')

        self.add_outward('a', np.zeros(3))

        self.add_transient('v', der='a')
        self.add_transient('x', der='v')

    def compute(self):
        self.a = self.g - (self.k / self.mass) * self.v
[2]:
from cosapp.drivers import RungeKutta
from cosapp.recorders import DataFrameRecorder
from time_solutions import PointMassSolution

point = PointMass('point')
driver = point.add_driver(RungeKutta(order=3))

driver.time_interval = (0, 2)
driver.dt = 0.05

# Add a recorder to capture time evolution in a dataframe
driver.add_recorder(DataFrameRecorder(includes=['x', 'v', 'a']), period=0.1)

# Initial conditions
x0 = [0, 0, 0]
v0 = [8, 0, 9.5]

# Define a simulation scenario
driver.set_scenario(
    init = {'x': np.array(x0), 'v': np.array(v0)},
    stop = 'x[2] == 0',
    values = {'mass': 1.5, 'k': 0.5},
)

point.run_drivers()

solution = PointMassSolution(point, v0, x0)

# Retrieve recorded data
data = driver.recorder.export_data()
time = np.asarray(data['time'])
traj = {
    'exact': np.array([solution.x(t) for t in time]),
    'num': np.asarray(data['x'].tolist()),
}

if point.k > 0:
    fls = PointMass('frictionless')
    fls.k = 0
    frictionless = PointMassSolution(fls, v0, x0)
    traj['k = 0'] = np.array([frictionless.x(t) for t in time])

with np.errstate(invalid='ignore', divide='ignore'):
    error = np.where(
        traj['exact'] == 0,
        np.abs(traj['num']),
        np.abs(traj['num'] / traj['exact'] - 1),
    )

print(
    f"order = {driver.order}; dt = {driver.dt}",
    f"Max error on trajectory = {error.max():.2e}",
    f"End point: {traj['num'][-1].round(2)}",
    sep="\n",
)
order = 3; dt = 0.05
Max error on trajectory = 1.00e+00
End point: [10.67  0.   -0.  ]
[3]:
# Plot results
import plotly.graph_objs as go

options = {
    'num': dict(mode='markers', name='numerical', line=dict(color='red')),
    'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
    'k = 0': dict(
        mode='lines', name='frictionless',
        line=dict(dash='dot', color='black'),
    ),
}

traces = [
    go.Scatter(
        x = data[:, 0],
        y = data[:, 2],
        **options[name]
    ) for name, data in traj.items()
]

layout = go.Layout(
    title = "Trajectory",
    xaxis = dict(title="x"),
    yaxis = dict(
        title = "z",
        scaleanchor = "x",
        scaleratio = 1,
    ),
    height = 450,
    hovermode = "x",
)

go.Figure(data=traces, layout=layout)

In this example, we consider the equilibration of two liquid filled tanks, hydraulically connected by a pipe. For simplicity, we consider the quasi-steady, viscosity driven limit, neglecting inertial effects.

The liquid height in each tank is only known implicitly, through its instantaneous rate-of-change. Note that the time derivative of height is a context-dependent expression, evaluated at runtime.

[4]:
from cosapp.base import System, Port


class FloatPort(Port):
    def setup(self):
        self.add_variable('value', 0.0)


class Tank(System):
    def setup(self):
        self.add_inward('area', 1.0, desc='Cross-section area')
        self.add_inward('rho', 1e3, desc='Fluid density')

        self.add_input(FloatPort, 'flowrate')
        self.add_output(FloatPort, 'p_bottom')

        self.add_transient('height', der='flowrate.value / area')

    def compute(self):
        g = 9.81
        self.p_bottom.value = self.rho * g * self.height


class Pipe(System):
    """Laminar Poiseuille flow in a cylindrical pipe"""
    def setup(self):
        self.add_inward('D', 0.1, desc="Diameter")
        self.add_inward('L', 2.0, desc="Length")
        self.add_inward('mu', 1e-3, desc="Fluid dynamic viscosity")

        self.add_input(FloatPort, 'p1')
        self.add_input(FloatPort, 'p2')

        self.add_output(FloatPort, 'Q1')
        self.add_output(FloatPort, 'Q2')

        self.add_outward('k', desc='Pressure loss coefficient')

    def compute(self):
        """Computes the volumetric flowrate from the pressure drop"""
        self.k = np.pi * self.D**4 / (256 * self.mu * self.L)
        self.Q1.value = self.k * (self.p2.value - self.p1.value)
        self.Q2.value = -self.Q1.value


class CoupledTanks(System):
    """System describing two tanks connected by a pipe (viscous limit)"""
    def setup(self):
        self.add_child(Tank('tank1'), pulling='rho')
        self.add_child(Tank('tank2'), pulling='rho')
        self.add_child(Pipe('pipe'), pulling='mu')

        self.connect(self.tank1.p_bottom, self.pipe.p1)
        self.connect(self.tank2.p_bottom, self.pipe.p2)
        self.connect(self.tank1.flowrate, self.pipe.Q1)
        self.connect(self.tank2.flowrate, self.pipe.Q2)

[5]:
from cosapp.drivers import NonLinearSolver, RungeKutta
from cosapp.recorders import DataFrameRecorder
from time_solutions import CoupledTanksSolution
import numpy as np

system = CoupledTanks('coupledTanks')
driver = system.add_driver(RungeKutta(time_interval=[0, 5], dt=0.1, order=3))
solver = driver.add_child(NonLinearSolver('solver', factor=1.0))

init = (3, 1)

driver.set_scenario(
    init = {
        # initial conditions:
        'tank1.height': init[0],
        'tank2.height': init[1],
    },
    values = {
        # boundary conditions:
        'rho': 921,
        'mu': 1e-3,
        'pipe.D': 0.07,
        'pipe.L': 2.5,
        'tank1.area': 2,
        'tank2.area': 0.8,
    }
)

driver.add_recorder(DataFrameRecorder(includes='tank?.height'), period=0.1)

system.run_drivers()

# Retrieve recorded data
data = driver.recorder.export_data()
time = np.asarray(data['time'])
solution = CoupledTanksSolution(system, init)

heights = {
    'num': np.asarray([data['tank1.height'], data['tank2.height']]),
    'exact': np.transpose([solution(t) for t in time])
}
error = np.abs(heights['num'] - heights['exact'])

print(
    f"dt = {driver.dt}; order = {driver.order}",
    f"Max error = {error.max():2e}",  # try different RK orders!
    sep="\n",
)
dt = 0.1; order = 3
Max error = 1.641403e-04
[6]:
color = dict(tank1='blue', tank2='orange')
exact = heights['exact']

go.Figure(
    data = [
        go.Scatter(
            x = time,
            y = data['tank1.height'],
            mode = 'markers',
            marker_color = color['tank1'],
            name = 'tank1.height',
        ),
        go.Scatter(
            x = time,
            y = data['tank2.height'],
            mode = 'markers',
            marker_color = color['tank2'],
            name = 'tank2.height',
        ),
        go.Scatter(
            x = time,
            y = exact[0],
            line = dict(dash='solid', width=0.5, color=color['tank1']),
            name = 'Exact tank1',
        ),
        go.Scatter(
            x = time,
            y = exact[1],
            line = dict(dash='solid', width=0.5, color=color['tank2']),
            name = 'Exact tank2',
        ),
    ],
    layout = go.Layout(
        title = "Liquid height in tanks",
        xaxis = {'title': 'time'},
        yaxis = {'title': 'height'},
        hovermode = 'x',
    )
)

This example illustrates the use of option max_time_step in add_transient(), indicating a maximum admissible time step for certain transient variables.

[7]:
from cosapp.base import System
import numpy as np

class DampedMassSpring(System):
    def setup(self):
        self.add_inward('mass', 0.25)
        self.add_inward('length', 0.5, desc='Unloaded spring length')
        self.add_inward('c', 0., desc='Friction coefficient')
        self.add_inward('K', 0.1, desc='Spring stiffness')
        self.add_inward('x', 0.0, desc='Linear position')

        self.add_outward('F', 0.0, desc='Force')
        self.add_outward('a', 0.0, desc='Linear acceleration')

        self.add_transient('v', der='a')
        self.add_transient('x', der='v', max_time_step='0.05 * natural_period')

    @property
    def natural_period(self) -> float:
        """float: Natural oscillating period of the system"""
        return 2 * np.pi * np.sqrt(self.mass / self.K)

    def compute(self):
        self.F = self.K * (self.length - self.x) - self.c * self.v
        self.a = self.F / self.mass

[8]:
from cosapp.drivers import RungeKutta
from cosapp.recorders import DataFrameRecorder
from time_solutions import HarmonicOscillatorSolution as ExactSolution
import numpy as np

spring = DampedMassSpring('spring')
driver = spring.add_driver(RungeKutta(time_interval=(0, 8), order=3))
# Time step does not need to be defined, it will be taken from the max time step
# defined in the DampedMassSpring system. But you can overwrite it.
driver.dt = 0.01

x0, v0 = 0.26, 0.0

driver.set_scenario(
    init = {'x': x0, 'v': v0},
    values = {
        'length': 0.2,
        'mass': 0.6,
        'c': 0.47,
        'K': 20,
    },
)

driver.add_recorder(
    DataFrameRecorder(includes=['x', 'v', 'a']),
    period=0.05,
)

spring.run_drivers()

solution = ExactSolution(spring, (x0, v0))

# Retrieve recorded data
data = driver.recorder.export_data()
time = np.asarray(data['time'])
x = {
    'num': np.asarray(data['x']),
    'exact': np.array([solution.x(t) for t in time]),
}

error = np.abs(x['num'] - x['exact'])

print(
    f"dt = {driver.dt}; order = {driver.order}",
    f"Natural period = {spring.natural_period:.3}",
    f"Damping factor = {solution.damping:.3}",
    f"Max error = {error.max():.2e}",
    sep="\n",
)
if solution.over_damped:
    print("Over-damped spring")

# Plot solution
import plotly.graph_objs as go

options = {
    'num': dict(mode='markers', name='numerical', line=dict(color='red')),
    'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
}

traces = [
    go.Scatter(x=time, y=position, **options[name])
    for name, position in x.items()
]
layout = go.Layout(
    xaxis = dict(title="Time"),
    yaxis = dict(title="Position"),
    hovermode = "x",
)
go.Figure(data=traces, layout=layout)
dt = 0.01; order = 3
Natural period = 1.09
Damping factor = 0.0678
Max error = 2.62e-06

More details and features can be found in the Advanced Time Simulations tutorial.