Graph-based Jacobian analysis

The graph analysis option analyses the variable dependency structure of a CoSApp system to assemble a sparse Jacobian more efficiently, instead of defaulting to a dense finite-difference matrix.

graph_analysis option

How to use it

This option is proposed for the NonLinearSolver and the implicit time drivers (EulerImplicit, CrankNicolson, BdfIntegrator). To activate this feature, pass a boolean argument to the keyword graph_analysis when the driver is created: NonLinearSolver("solver", graph_analysis=True).

When to use it

Systems with large array unknowns where most partial derivatives are structurally zero. Graph analysis can dramatically reduce the number of Jacobian evaluations by exploiting sparsity.

Example 1 — Array system with index-derived sparsity

In the example below, the system ArraySys computes each output element from two neighbouring input elements. The Jacobian of the residue with respect to the unknown vector i is therefore tridiagonal — graph analysis detects this automatically from the index expressions in compute, without any explicit annotation.

[1]:
import numpy as np
from cosapp.base import System
from cosapp.drivers import NonLinearSolver

N = 100


class ArraySys(System):
    def setup(self):
        self.add_inward("i", np.ones((N,)))

        self.add_outward("intermediate", np.zeros((N,)))
        self.add_outward("o", np.zeros((N,)))

        self.add_equation("o == 1.").add_unknown("i")

    def compute(self):
        # Each intermediate[i] depends only on i[i+1] and i[i-1]
        # → the dependency graph is tridiagonal
        for i in range(1, N - 1):
            self.intermediate[i] = self.i[i + 1] + self.i[i - 1]
        self.intermediate[N - 1] = self.i[0]  # wrap-around
        self.intermediate[0] = self.i[1]
        self.o = self.intermediate**2
[2]:
import time

# time without/with graph analysis
for analysis in [False, True]:
    sys = ArraySys("sys")
    solver = sys.add_driver(NonLinearSolver("solver", graph_analysis=analysis))
    t0 = time.time()
    sys.run_drivers()
    print("Graph analysis:", analysis)
    print("\tTime:", np.round((time.time() - t0) * 1e3, 2), "ms")
Graph analysis: False
        Time: 124.15 ms
Graph analysis: True
        Time: 21.47 ms

What happened: the solver inferred a tridiagonal (sparse) Jacobian structure instead of the 10×10 dense matrix it would use by default. This reduces the finite-difference perturbations from 10 to 3 per Newton step.

Add @relationship method

The graph of variable dependencies is built from relations between two variables encountered in the compute. In some cases, it is not possible to guess this relation:

  • as arguments of callable: y = f(x, w, z)

  • system variables related by multiple assignements on local variables:

    a = self.g
    h = a + self.i * 10
    self.j = h[:3]
    
  • non-numerical variables, especially custom python objects: y = CustomObj() + 50.0 in the case where CustomObj() may overload the __add__ operator

At least, for these untreated cases, an API is available to explicitly give the relation between two variables, from a @relationship decorated method.

Writing a @relationship method

If the analysis encounters difficulties to retrieve relationships between two variables for an identified reason, a warning is popped explaining it, but the analysis continues by taking into account a full dense matrix as relation between these two variables.

In the case whether anything goes wrong during analysis, the entire graph dependency analysis falls back to a normal dense Jacobian evaluation.

Therefore, provide a @relationship method on any related variable couple whose analytical sparsity is known, helps the analyser avoiding expensive full perturbations.

Template of the @relationship method

@relationship("input_variable", "output_variable")
def my_dependency(self) -> np.ndarray:
    ...
    return pattern # shape (len(output), len(input))
  • The decorated method must be defined on a System subclass.

  • It must return a binary ndarray of shape (len(output_variable), len(input_variable)) relating the both array dependency.

Examples:

  • np.ones((1,)) between two floats

  • np.identity(n) between two non-slicing np.ndarrays

Example 2 — Free-fall with friction (implicit time integration)

This example models a point mass subject to gravity and quadratic air friction.

[3]:
from cosapp.core.ir.passes.causal_model_graph import relationship
from cosapp.drivers import CrankNicolson


class PointDynamics(System):
    """Point mass dynamics:  F = m*a"""

    def setup(self):
        self.add_inward("mass", 1.0)
        self.add_inward("acc_ext", np.zeros(3))
        self.add_inward("force_ext", np.zeros(3))

        self.add_outward("force", np.zeros(3))
        self.add_outward("acc", np.zeros(3))

    def compute(self):
        mass = np.expand_dims(self.mass, axis=-1)
        self.force = self.force_ext + mass * self.acc_ext
        self.acc = self.force / mass


class PointFriction(System):
    """Point mass v² friction model."""

    def setup(self):
        self.add_inward("v", np.zeros(3), desc="Velocity")
        self.add_inward("cf", 0.1, desc="Friction coefficient")
        self.add_outward("force", np.zeros(3))

    def compute(self):
        v = self.v
        k = self.cf * np.linalg.norm(v, axis=-1)
        self.force = -np.expand_dims(k, axis=-1) * v

    @relationship("v", "force")
    def dependency_v_force(self):
        """
        Sparsity dependency pattern of v --> force.

        `force` (n×m shape) depends on:
          - `v` by an identity matrix `np.eye(n)`
          - `np.expand_dims(k, axis=-1)` by a full block `np.ones((m, m))`

        It results by a n×n block-diagonal of full m×m blocks:
        np.kron(np.eye(n), np.ones((m, m))).
        """
        n, m = self.v.shape
        unit_block = np.ones((m, m))
        diag = np.eye(n)
        return np.kron(diag, unit_block)


class PointMass(System):
    """Free fall model with quadratic friction."""

    def setup(self):
        self.add_child(PointFriction("friction"), pulling=["cf", "v"])
        self.add_child(
            PointDynamics("dynamics"),
            pulling=["mass", "force", {"acc_ext": "g", "acc": "a"}],
        )

        # friction.force  →  dynamics.force_ext
        self.connect(self.friction, self.dynamics, {"force": "force_ext"})

        # Transient states
        self.add_transient("v", der="a")
        self.add_transient("x", der="v")

        # Gravity (pulled as 'g' from dynamics.acc_ext)
        self.g = np.r_[0.0, 0.0, -9.81]

        self.exec_order = ["friction", "dynamics"]


def set_point_mass(analysis):
    N = 10
    points = PointMass("points")
    points.add_driver(
        CrankNicolson(time_interval=[0, 1], dt=0.01, graph_analysis=analysis)
    )

    # reshape (N x 3)
    points.x = np.random.uniform(0.0, 5.0, (N, 3))
    points.v = np.random.uniform(0.0, 10.0, (N, 3))
    points.mass = np.random.uniform(0.1, 2.0, (N,))
    points.run_once()

    return points
[4]:
# time without/with graph analysis
for analysis in [False, True]:
    points = set_point_mass(analysis)
    t0 = time.time()
    points.run_drivers()
    print("Graph analysis:", analysis)
    print("\tTime:", np.round((time.time() - t0) * 1e3, 2), "ms")
Graph analysis: False
        Time: 576.96 ms
Graph analysis: True
        Time: 198.71 ms
[ ]: