Source code for cosapp.drivers.time.bdf

import numpy
from typing import Optional

from cosapp.drivers.time.implicit import ImplicitTimeDriver, System


[docs] def bdf_weights_1(steps: numpy.ndarray) -> list[float]: """BDF weights for y_{n}, y_{n+1}.""" dt_inv = 1.0 / steps[0] return [-dt_inv, dt_inv]
[docs] def bdf_weights_2(steps: numpy.ndarray) -> list[float]: """BDF weights for y_{n}, y_{n+1}, y_{n+2}.""" h1, h2 = steps[:2] hs = h1 + h2 return [ +h2 / (h1 * hs), -hs / (h1 * h2), (hs + h2) / (h2 * hs), ]
[docs] def bdf_weights_3(steps: numpy.ndarray) -> list[float]: """BDF weights for y_{n}, .., y_{n+3}.""" h1, h2, h3 = steps[:3] hs = h1 + h2 + h3 return [ -h3 * (h2 + h3) / (h1 * hs * (h1 + h2)), +h3 * hs / (h1 * h2 * (h2 + h3)), -hs * (h2 + h3) / (h2 * h3 * (h1 + h2)), (hs * (h2 + h3 + h3) + h3 * (h2 + h3)) / (hs * h3 * (h2 + h3)), ]
[docs] def bdf_weights_4(steps: numpy.ndarray) -> list[float]: """BDF weights for y_{n}, .., y_{n+4}.""" h1, h2, h3, h4 = steps[:4] s3 = h2 + h3 + h4 hs = h1 + s3 return [ +h4 * s3 * (h3 + h4) / (h1 * hs * (h1 + h2) * (h1 + h2 + h3)), -h4 * hs * (h3 + h4) / (h1 * h2 * s3 * (h2 + h3)), +h4 * hs * s3 / (h2 * h3 * (h1 + h2) * (h3 + h4)), -hs * s3 * (h3 + h4) / (h3 * h4 * (h2 + h3) * (h1 + h2 + h3)), (hs * s3 * (h3 + h4 + h4) + h4 * (h3 + h4) * (s3 + hs)) / (hs * s3 * h4 * (h3 + h4)), ]
[docs] class BdfWeights: bdf_weight_funcs = { 1: bdf_weights_1, 2: bdf_weights_2, 3: bdf_weights_3, 4: bdf_weights_4, } __slots__ = ("order", "steps", "weights", "counter", "_iso_steps") def __init__(self, order: int): try: self.bdf_weight_funcs[order] except KeyError: raise ValueError( f"Invalid {order=}; must be in {sorted(self.bdf_weight_funcs)}" ) self.order = order self.steps = numpy.zeros(order) self.weights = numpy.zeros(order + 1) self.reset()
[docs] def reset(self) -> None: """Reset all steps and weights to zero.""" self.counter = 0 self.steps.fill(0.0) self.weights.fill(0.0) self._iso_steps = True
[docs] def push_step(self, step: float) -> None: if step <= 0.0: raise ValueError("steps must be positive") steps = self.steps if not self._iso_steps or step != steps[-1]: self.counter = min(self.counter + 1, self.order) # Shift steps and take new value as last step steps[:-1] = steps[1:] self.__set_last_step(step)
[docs] def replace_last_step(self, step: float) -> None: if step <= 0.0: raise ValueError("steps must be positive") if step != self.steps[-1]: self.__set_last_step(step)
def __set_last_step(self, step: float): steps = self.steps steps[-1] = step self._iso_steps = all(steps[1:] == steps[0]) self.__update_weights() def __update_weights(self) -> None: """Update weights from steps""" w = self.weights c = self.counter calc_weights = self.bdf_weight_funcs[c] n = c + 1 w[:-n] = 0.0 w[-n:] = calc_weights(self.steps[-c:])
[docs] class BdfIntegrator(ImplicitTimeDriver): """Second-order Crank-Nicolson implicit integrator.""" __slots__ = ( "_bdf", "_prev_yn", "_y_size", "_pre_transition_y", "_pre_transition_dy", ) def __init__(self, name = "BDF time driver", order: int = 2, owner: Optional[System] = None, time_interval: Optional[tuple[float, float]] = None, dt: Optional[float] = None, record_dt: bool = False, **options, ): self._bdf = BdfWeights(order) self._y_size = 0 self._prev_yn = numpy.empty(0) self._pre_transition_y = numpy.empty(0) self._pre_transition_dy = numpy.empty(0) super().__init__(name, owner, time_interval, dt, record_dt, **options) @property def order(self) -> int: """int: order of the BDF scheme""" return self._bdf.order @order.setter def order(self, order: int): if order != self._bdf.order: self._bdf = BdfWeights(order) def _initialize(self): """Initialization at time t=t0""" super()._initialize() y, dy = [], [] time_problem = self._var_manager.problem for transient in time_problem.transients.values(): y.extend(numpy.ravel(transient.value)) dy.extend(numpy.ravel(transient.d_dt)) y = numpy.array(y, dtype=float) self._prev_yn = numpy.tile(y, (self.order, 1)) self._pre_transition_dy = numpy.array(dy, dtype=float) self._y_size = y.size def _update_transients(self, dt: float): """Integrate transient variable over time step `dt`.""" self._bdf.push_step(dt) super()._update_transients(dt) # Push latest transients for next iteration prev_yn = self._prev_yn prev_yn[:-1] = prev_yn[1:] prev_yn[-1] = self._prev_x[-self._y_size:] def _pre_transition(self) -> None: """Pre-transition actions""" self._pre_transition_y, self._pre_transition_dy = self._transient_derivatives() super()._pre_transition() def _post_transition(self, dt: float) -> None: """Post-transition actions: check for discontinuities in transient values or their derivatives.""" super()._post_transition(dt) y, dy = self._transient_derivatives() prev_y = self._pre_transition_y prev_dy = self._pre_transition_dy identical = numpy.array_equal(y, prev_y) and numpy.array_equal(dy, prev_dy) if identical: self._bdf.replace_last_step(dt) else: self._bdf.reset() self._prev_yn.fill(0.0) self._prev_yn[-1] = y self._pre_transition_dy = dy def _transient_derivatives(self): """Computes the transient unknown vector and its time derivative. """ y, dy = [], [] time_problem = self._var_manager.problem for transient in time_problem.transients.values(): y.extend(numpy.ravel(transient.value)) dy.extend(numpy.ravel(transient.d_dt)) y = numpy.asarray(y, dtype=float) dy = numpy.asarray(dy, dtype=float) return y, dy def _time_residues(self, dt: float, current: bool): """Computes and returns the current- or next-time component of the transient problem residue vector. Parameters: ----------- - dt [float]: Time step - current [bool]: If `True`, compute the current time (n) part of the residues. If `False`, compute the time (n + 1) part of the residues. """ bdf = self._bdf if bdf.counter < 2 and bdf.order > 1: # Use Crank-Nicolson scheme for the first time step, when order > 1 coeff = (0.5 if current else -0.5) * dt time_problem = self._var_manager.problem residues = [] for transient in time_problem.transients.values(): r = transient.value + coeff * transient.d_dt residues.extend(numpy.ravel(r)) elif not current: # time (n + 1) coeff = bdf.weights[-1] * dt time_problem = self._var_manager.problem residues = [] for transient in time_problem.transients.values(): r = coeff * transient.value - dt * transient.d_dt residues.extend(numpy.ravel(r)) else: # past times n, n - 1, ... n = bdf.counter prev_yn = self._prev_yn residues = numpy.zeros_like(prev_yn[0]) for weight, prev_y in zip(bdf.weights[-n-1:-1], prev_yn[-n:]): residues -= (weight * dt) * prev_y return numpy.asarray(residues)