Graph-based Jacobian analysis¶
The graph analysis option analyses the variable dependency structure of a CoSApp system to assemble a sparse Jacobian more efficiently, instead of defaulting to a dense finite-difference matrix.
graph_analysis option¶
How to use it¶
This option is proposed for the NonLinearSolver and the implicit time drivers (EulerImplicit, CrankNicolson, BdfIntegrator). To activate this feature, pass a boolean argument to the keyword graph_analysis when the driver is created: NonLinearSolver("solver", graph_analysis=True).
When to use it¶
Systems with large array unknowns where most partial derivatives are structurally zero. Graph analysis can dramatically reduce the number of Jacobian evaluations by exploiting sparsity.
Example 1 — Array system with index-derived sparsity¶
In the example below, the system ArraySys computes each output element from two neighbouring input elements. The Jacobian of the residue with respect to the unknown vector i is therefore tridiagonal — graph analysis detects this automatically from the index expressions in compute, without any explicit annotation.
[1]:
import numpy as np
from cosapp.base import System
from cosapp.drivers import NonLinearSolver
N = 100
class ArraySys(System):
def setup(self):
self.add_inward("i", np.ones((N,)))
self.add_outward("intermediate", np.zeros((N,)))
self.add_outward("o", np.zeros((N,)))
self.add_equation("o == 1.").add_unknown("i")
def compute(self):
# Each intermediate[i] depends only on i[i+1] and i[i-1]
# → the dependency graph is tridiagonal
for i in range(1, N - 1):
self.intermediate[i] = self.i[i + 1] + self.i[i - 1]
self.intermediate[N - 1] = self.i[0] # wrap-around
self.intermediate[0] = self.i[1]
self.o = self.intermediate**2
[2]:
import time
# time without/with graph analysis
for analysis in [False, True]:
sys = ArraySys("sys")
solver = sys.add_driver(NonLinearSolver("solver", graph_analysis=analysis))
t0 = time.time()
sys.run_drivers()
print("Graph analysis:", analysis)
print("\tTime:", np.round((time.time() - t0) * 1e3, 2), "ms")
Graph analysis: False
Time: 124.15 ms
Graph analysis: True
Time: 21.47 ms
What happened: the solver inferred a tridiagonal (sparse) Jacobian structure instead of the 10×10 dense matrix it would use by default. This reduces the finite-difference perturbations from 10 to 3 per Newton step.
Add @relationship method¶
The graph of variable dependencies is built from relations between two variables encountered in the compute. In some cases, it is not possible to guess this relation:
as arguments of callable:
y = f(x, w, z)system variables related by multiple assignements on local variables:
a = self.g h = a + self.i * 10 self.j = h[:3]
non-numerical variables, especially custom python objects:
y = CustomObj() + 50.0in the case whereCustomObj()may overload the__add__operator
At least, for these untreated cases, an API is available to explicitly give the relation between two variables, from a @relationship decorated method.
Writing a @relationship method¶
If the analysis encounters difficulties to retrieve relationships between two variables for an identified reason, a warning is popped explaining it, but the analysis continues by taking into account a full dense matrix as relation between these two variables.
In the case whether anything goes wrong during analysis, the entire graph dependency analysis falls back to a normal dense Jacobian evaluation.
Therefore, provide a @relationship method on any related variable couple whose analytical sparsity is known, helps the analyser avoiding expensive full perturbations.
Template of the @relationship method
@relationship("input_variable", "output_variable")
def my_dependency(self) -> np.ndarray:
...
return pattern # shape (len(output), len(input))
The decorated method must be defined on a
Systemsubclass.It must return a binary
ndarrayof shape(len(output_variable), len(input_variable))relating the both array dependency.
Examples:
np.ones((1,))between two floatsnp.identity(n)between two non-slicing np.ndarrays
Example 2 — Free-fall with friction (implicit time integration)¶
This example models a point mass subject to gravity and quadratic air friction.
[3]:
from cosapp.core.ir.passes.causal_model_graph import relationship
from cosapp.drivers import CrankNicolson
class PointDynamics(System):
"""Point mass dynamics: F = m*a"""
def setup(self):
self.add_inward("mass", 1.0)
self.add_inward("acc_ext", np.zeros(3))
self.add_inward("force_ext", np.zeros(3))
self.add_outward("force", np.zeros(3))
self.add_outward("acc", np.zeros(3))
def compute(self):
mass = np.expand_dims(self.mass, axis=-1)
self.force = self.force_ext + mass * self.acc_ext
self.acc = self.force / mass
class PointFriction(System):
"""Point mass v² friction model."""
def setup(self):
self.add_inward("v", np.zeros(3), desc="Velocity")
self.add_inward("cf", 0.1, desc="Friction coefficient")
self.add_outward("force", np.zeros(3))
def compute(self):
v = self.v
k = self.cf * np.linalg.norm(v, axis=-1)
self.force = -np.expand_dims(k, axis=-1) * v
@relationship("v", "force")
def dependency_v_force(self):
"""
Sparsity dependency pattern of v --> force.
`force` (n×m shape) depends on:
- `v` by an identity matrix `np.eye(n)`
- `np.expand_dims(k, axis=-1)` by a full block `np.ones((m, m))`
It results by a n×n block-diagonal of full m×m blocks:
np.kron(np.eye(n), np.ones((m, m))).
"""
n, m = self.v.shape
unit_block = np.ones((m, m))
diag = np.eye(n)
return np.kron(diag, unit_block)
class PointMass(System):
"""Free fall model with quadratic friction."""
def setup(self):
self.add_child(PointFriction("friction"), pulling=["cf", "v"])
self.add_child(
PointDynamics("dynamics"),
pulling=["mass", "force", {"acc_ext": "g", "acc": "a"}],
)
# friction.force → dynamics.force_ext
self.connect(self.friction, self.dynamics, {"force": "force_ext"})
# Transient states
self.add_transient("v", der="a")
self.add_transient("x", der="v")
# Gravity (pulled as 'g' from dynamics.acc_ext)
self.g = np.r_[0.0, 0.0, -9.81]
self.exec_order = ["friction", "dynamics"]
def set_point_mass(analysis):
N = 10
points = PointMass("points")
points.add_driver(
CrankNicolson(time_interval=[0, 1], dt=0.01, graph_analysis=analysis)
)
# reshape (N x 3)
points.x = np.random.uniform(0.0, 5.0, (N, 3))
points.v = np.random.uniform(0.0, 10.0, (N, 3))
points.mass = np.random.uniform(0.1, 2.0, (N,))
points.run_once()
return points
[4]:
# time without/with graph analysis
for analysis in [False, True]:
points = set_point_mass(analysis)
t0 = time.time()
points.run_drivers()
print("Graph analysis:", analysis)
print("\tTime:", np.round((time.time() - t0) * 1e3, 2), "ms")
Graph analysis: False
Time: 576.96 ms
Graph analysis: True
Time: 198.71 ms
[ ]: