Examples

Free fall of a point mass with friction

[1]:
from cosapp.base import System
import numpy as np

class PointMass(System):
    """Free fall of a point mass, with friction"""
    def setup(self):
        self.add_inward('mass', 1.2, desc='Mass')
        self.add_inward('k', 0.1, desc='Friction coefficient')
        self.add_inward('g', np.r_[0, 0, -9.81], desc='Uniform acceleration field')

        self.add_outward('a', np.zeros(3))

        self.add_transient('v', der='a')
        self.add_transient('x', der='v')

    def compute(self):
        self.a = self.g - (self.k / self.mass) * self.v
[2]:
from cosapp.drivers import RungeKutta
from cosapp.recorders import DataFrameRecorder
from time_solutions import PointMassSolution

point = PointMass('point')
driver = point.add_driver(RungeKutta(order=3))

driver.time_interval = (0, 2)
driver.dt = 0.05

# Add a recorder to capture time evolution in a dataframe
driver.add_recorder(
    DataFrameRecorder(includes=['x', 'v', 'a']),
    period=0.1,
)

# Initial conditions
x0 = [0, 0, 0]
v0 = [8, 0, 9.5]

# Define a simulation scenario
driver.set_scenario(
    init = {'x': np.array(x0), 'v': np.array(v0)},
    stop = 'x[2] == 0',
    values = {'mass': 1.5, 'k': 0.5},
)

point.run_drivers()

solution = PointMassSolution(point, v0, x0)

# Retrieve recorded data
data = driver.recorder.export_data()
time = np.asarray(data['time'])
traj = {
    'exact': np.array(list(map(solution.x, time))),
    'num': np.asarray(data['x'].tolist()),
}

if point.k > 0:
    fls = PointMass('frictionless')
    fls.k = 0
    frictionless = PointMassSolution(fls, v0, x0)
    traj['k = 0'] = np.array(list(map(frictionless.x, time)))

error = np.abs(traj['num'] - traj['exact'])

print(
    f"order = {driver.order}; dt = {driver.dt}",
    f"Max error on trajectory = {error.max():.2e}",
    f"End point: {traj['num'][-1].round(2)}",
    sep="\n",
)
order = 3; dt = 0.05
Max error on trajectory = 7.45e-06
End point: [10.67  0.    0.  ]
[3]:
# Plot results
import plotly.graph_objs as go

options = {
    'num': dict(mode='markers', name='numerical', line=dict(color='red')),
    'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
    'k = 0': dict(
        mode='lines', name='frictionless',
        line=dict(dash='dot', color='black'),
    ),
}

traces = [
    go.Scatter(
        x = data[:, 0],
        y = data[:, 2],
        **options[name]
    ) for name, data in traj.items()
]

layout = go.Layout(
    title = "Trajectory",
    xaxis = dict(title="x"),
    yaxis = dict(
        title = "z",
        scaleanchor = "x",
        scaleratio = 1,
    ),
    height = 450,
    hovermode = "x",
)

go.Figure(data=traces, layout=layout)

Equilibration of two fluid tanks hydraulically connected

In this example, we consider the equilibration of two liquid filled tanks, hydraulically connected by a pipe. For simplicity, we consider the quasi-steady, viscosity driven limit, neglecting inertial effects.

The liquid height in each tank is only known implicitly, through its instantaneous rate-of-change. Note that the time derivative of height is a context-dependent expression, evaluated at runtime.

[4]:
from cosapp.base import System


class Tank(System):
    def setup(self):
        self.add_inward('area', 1.0, desc='Cross-section area')
        self.add_inward('rho', 1e3, desc='Fluid density')
        self.add_inward('flowrate', 1.0)
        self.add_outward('p_bottom', 0.0)

        self.add_transient('height', der='flowrate / area')

    def compute(self):
        g = 9.81
        self.p_bottom = self.rho * g * self.height


class Pipe(System):
    """Laminar Poiseuille flow in a cylindrical pipe"""
    def setup(self):
        self.add_inward('D', 0.1, desc="Diameter")
        self.add_inward('L', 2.0, desc="Length")
        self.add_inward('mu', 1e-3, desc="Fluid dynamic viscosity")
        self.add_inward('p1', 1.0e5)
        self.add_inward('p2', 0.5e5)

        self.add_outward('Q1', 0.0)
        self.add_outward('Q2', 0.0)
        self.add_outward('k', desc='Pressure loss coefficient')

    def compute(self):
        """Computes the volumetric flowrate from the pressure drop"""
        self.k = np.pi * self.D**4 / (256 * self.mu * self.L)
        self.Q1 = self.k * (self.p2 - self.p1)
        self.Q2 = -self.Q1


class CoupledTanks(System):
    """System describing two tanks connected by a pipe (viscous limit)"""
    def setup(self):
        self.add_child(Tank('tank1'), pulling='rho')
        self.add_child(Tank('tank2'), pulling='rho')
        self.add_child(Pipe('pipe'), pulling='mu')

        self.connect(self.tank1, self.pipe, {'p_bottom': 'p1', 'flowrate': 'Q1'})
        self.connect(self.tank2, self.pipe, {'p_bottom': 'p2', 'flowrate': 'Q2'})

[5]:
from cosapp.drivers import NonLinearSolver, RungeKutta
from cosapp.recorders import DataFrameRecorder
from time_solutions import CoupledTanksSolution
import numpy as np

system = CoupledTanks('coupledTanks')
driver = system.add_driver(RungeKutta(time_interval=[0, 5], dt=0.1, order=3))
solver = driver.add_child(NonLinearSolver('solver', factor=1.0))

init = (3, 1)

driver.set_scenario(
    init = {
        # initial conditions:
        'tank1.height': init[0],
        'tank2.height': init[1],
    },
    values = {
        # boundary conditions:
        'rho': 921,
        'mu': 1e-3,
        'pipe.D': 0.07,
        'pipe.L': 2.5,
        'tank1.area': 2,
        'tank2.area': 0.8,
    }
)

driver.add_recorder(
    DataFrameRecorder(includes='tank?.height'),
    period=0.1,
)

system.run_drivers()

# Retrieve recorded data
data = driver.recorder.export_data()
time = np.asarray(data['time'])
solution = CoupledTanksSolution(system, init)

heights = {
    'num': np.asarray([data['tank1.height'], data['tank2.height']]),
    'exact': np.transpose(list(map(solution, time)))
}
error = np.abs(heights['num'] - heights['exact'])

print(
    f"dt = {driver.dt}; order = {driver.order}",
    f"Max error = {error.max():.2e}",  # try different RK orders!
    sep="\n",
)
dt = 0.1; order = 3
Max error = 1.64e-04
[6]:
color = dict(tank1='blue', tank2='orange')
exact = heights['exact']

go.Figure(
    data = [
        go.Scatter(
            x = time,
            y = data['tank1.height'],
            mode = 'markers',
            marker_color = color['tank1'],
            name = 'tank1.height',
        ),
        go.Scatter(
            x = time,
            y = data['tank2.height'],
            mode = 'markers',
            marker_color = color['tank2'],
            name = 'tank2.height',
        ),
        go.Scatter(
            x = time,
            y = exact[0],
            line = dict(dash='solid', width=0.5, color=color['tank1']),
            name = 'tank1 (exact)',
        ),
        go.Scatter(
            x = time,
            y = exact[1],
            line = dict(dash='solid', width=0.5, color=color['tank2']),
            name = 'tank2 (exact)',
        ),
    ],
    layout = go.Layout(
        title = "Liquid height in tanks",
        xaxis = {'title': 'time'},
        yaxis = {'title': 'height'},
        hovermode = 'x',
    )
)

Harmonic oscillator

This example illustrates the use of option max_time_step in add_transient(), indicating a maximum admissible time step for certain transient variables.

[7]:
from cosapp.base import System
import numpy as np

class DampedMassSpring(System):
    def setup(self):
        self.add_inward('mass', 0.25)
        self.add_inward('length', 0.5, desc='Unloaded spring length')
        self.add_inward('c', 0., desc='Friction coefficient')
        self.add_inward('K', 0.1, desc='Spring stiffness')
        self.add_inward('x', 0.0, desc='Linear position')

        self.add_outward('F', 0.0, desc='Force')
        self.add_outward('a', 0.0, desc='Linear acceleration')

        self.add_transient('v', der='a')
        self.add_transient('x', der='v', max_time_step='0.05 * natural_period')

    @property
    def natural_period(self) -> float:
        """float: Natural oscillating period of the system"""
        return 2 * np.pi * np.sqrt(self.mass / self.K)

    def compute(self):
        self.F = self.K * (self.length - self.x) - self.c * self.v
        self.a = self.F / self.mass

[8]:
from cosapp.drivers import RungeKutta
from cosapp.recorders import DataFrameRecorder
from time_solutions import HarmonicOscillatorSolution as ExactSolution
import numpy as np

spring = DampedMassSpring('spring')
driver = spring.add_driver(RungeKutta(time_interval=(0, 8), order=3))
# Time step does not need to be defined, it will be taken from the max time step
# defined in the DampedMassSpring system. But you can overwrite it.
driver.dt = 0.01

x0, v0 = 0.26, 0.5

driver.set_scenario(
    init = {'x': x0, 'v': v0},
    values = {
        'length': 0.2,
        'mass': 0.6,
        'c': 0.47,
        'K': 20,
    },
)

driver.add_recorder(
    DataFrameRecorder(includes=['x', 'v', 'a']),
    period=0.05,
)

spring.run_drivers()

solution = ExactSolution(spring, (x0, v0))

# Retrieve recorded data
data = driver.recorder.export_data()
time = np.asarray(data['time'])
x = {
    'num': np.asarray(data['x']),
    'exact': np.array(list(map(solution.x, time))),
}

error = np.abs(x['num'] - x['exact'])

print(
    f"dt = {driver.dt}; order = {driver.order}",
    f"Natural period = {spring.natural_period:.3}",
    f"Damping factor = {solution.damping:.3}",
    f"Max error = {error.max():.2e}",
    sep="\n",
)
if solution.over_damped:
    print("Over-damped spring")
dt = 0.01; order = 3

Natural period = 1.09
Damping factor = 0.0678
Max error = 4.71e-06
[9]:
# Plot solution
import plotly.graph_objs as go

options = {
    'num': dict(mode='markers', name='numerical', line=dict(color='red')),
    'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
}

traces = [
    go.Scatter(x=time, y=position, **options[name])
    for name, position in x.items()
]
layout = go.Layout(
    xaxis = dict(title="Time"),
    yaxis = dict(title="Position"),
    hovermode = "x",
)
go.Figure(data=traces, layout=layout)

More details and features can be found in the Advanced Time Simulations tutorial.