"""
Module provides dispatch to different non-linear algorithms.
"""
from __future__ import annotations
import abc
import logging
import inspect
from numbers import Number
from typing import (
Callable, Optional, Union,
Sequence, Tuple, Dict, Any,
TypeVar, Type,
)
import numpy
from scipy.linalg import lu_factor, lu_solve, LinAlgWarning
from scipy import optimize
from cosapp.core.numerics.enum import NonLinearMethods
from cosapp.core.numerics.basics import SolverResults
from cosapp.utils.logging import LogLevel
logger = logging.getLogger(__name__)
ConcreteSolver = TypeVar("ConcreteSolver", bound="BaseNumericSolver")
RootFunction = Callable[[Sequence[float], Any], numpy.ndarray]
[docs]
class BaseNumericalSolver(abc.ABC):
def __init__(self, options={}) -> None:
self.__options = dict(options)
@property
def options(self) -> Dict[str, Any]:
return self.__options
[docs]
@classmethod
def from_options(cls: Type[ConcreteSolver], options: Dict[str, Any]) -> ConcreteSolver:
solver = cls()
solver.transfer_options(options)
return solver
[docs]
def transfer_options(self, options: Dict[str, Any]) -> None:
settings = self.options
for key in settings.keys():
try:
settings[key] = options.pop(key)
except KeyError:
continue
[docs]
@abc.abstractmethod
def solve(
self,
fun: RootFunction,
x0: Sequence[float],
args: Tuple[Any] = tuple(),
*other_args, **kwargs,
) -> Union[SolverResults, optimize.OptimizeResult]:
pass
[docs]
class NumpySolver(BaseNumericalSolver):
"""Encapsulation of `scipy.optimize.root`
"""
def __init__(self, method: str, options={}) -> None:
"""
Parameters
----------
method: str
Method forwarded to `scipy.optimize.root`
options: Dict[str, Any], optional
A dictionary of solver options. E.g. `tol` or `max_iter`
"""
self._method = method
super().__init__(options)
try:
self.options.pop('verbose')
except KeyError:
pass
[docs]
def solve(self,
fun: RootFunction,
x0: Sequence[float],
args: Tuple[Union[float, str]] = (),
jac=None,
callback=None,
) -> optimize.OptimizeResult:
"""Cancel residues produced by `fun` starting with `x0` as initial guess.
Parameters
----------
fun : Callable[[Sequence[float], Any], numpy.ndarray[float]]
Callable residues function
x0 : Sequence[float]
Initial solution guess
args : Tuple[Any, ...]
Additional arguments passed to `fun`
jac : bool or callable, optional
If `jac` is a Boolean and is `True`, `fun` is assumed to return
the value of Jacobian along with the objective function.
If `False`, the Jacobian will be estimated numerically.
`jac` can also be a callable returning the Jacobian of `fun`.
In this case, it must accept the same arguments as `fun`.
(default: `None`)
callback : Callable, optional
Optional callback function. It is called on every iteration as `callback(x, r)`,
where x is the current solution and r the corresponding residual.
Returns
-------
scipy.optimize.OptimizeResult
The solution represented as a OptimizeResult object. Important attributes are: x the solution array,
success a Boolean flag indicating if the algorithm exited successfully and message which describes the
cause of the termination. See :py:class:`cosapp.core.numerics.basics.SolverResults` for a description
of other attributes.
"""
x0 = numpy.atleast_1d(x0)
results = optimize.root(
fun, x0, args=args,
method=self._method,
jac=jac,
tol=self.options.get('tol', None),
callback=callback,
options=self.options,
)
results.jac_lup = (None, None)
if 'jac' in results:
try:
results.jac_lup = lu_factor(results.jac, check_finite=False)
except (ValueError, LinAlgWarning) as err: # Silent LU decomposition failure
logger.debug(f"Silent error: {err}")
return results
[docs]
def get_kwargs():
"""Returns kwargs as a dictionary, using `inspect.currentframe`
"""
frame = inspect.currentframe().f_back
keys, _, _, values = inspect.getargvalues(frame)
kwargs = {}
for key in keys:
if key != 'self':
kwargs[key] = values[key]
return kwargs
[docs]
class CustomSolver(BaseNumericalSolver):
"""Custom Newton-Raphson solver.
"""
def __init__(
self,
tol='auto',
factor=1.0,
max_iter=100,
verbose=False,
eps=2**(-23),
jac_update_tol=0.01,
history=False,
partial_jac=True,
partial_jac_tries=5,
tol_update_period=4,
tol_to_noise_ratio=16,
):
options = get_kwargs()
super().__init__(options)
[docs]
def solve(
self,
fresidues: RootFunction,
x0: Sequence[float],
args: Tuple[Union[float, str]] = (),
jac=None,
recorder=None,
**options,
) -> SolverResults:
"""Customized Newton-Raphson algorithm to solve `fresidues` starting with `x0`.
Parameters
----------
fresidues : Callable[[Sequence[float], Any], numpy.ndarray[float]]
Callable residues function
x0 : Sequence[float]
Initial solution guess
args : Tuple[Any, ...]
Additional arguments for `fun`
options : dict, optional
A dictionary of problem-dependent solver options.
Options
-------
jac_lup : (ndarray[float], ndarray[int]), optional
LU decomposition of Jacobian given as tuple (LU, perm) to reuse as initial direction
jac : ndarray, optional
Jacobian to reuse for partial update
compute_jacobian : bool
Force to update the Jacobian matrix even if the provided one is useful
lower_bound : numpy.ndarray
Min values for parameters iterated by solver.
upper_bound : numpy.ndarray
Max values for parameters iterated by solver.
abs_step : numpy.ndarray
Max absolute step for parameters iterated by solver.
rel_step : numpy.ndarray
Max relative step for parameters iterated by solver.
Returns
-------
SolverResults
The solution represented as a `SolverResults` object.
See :py:class:`cosapp.core.numerics.basics.SolverResults` for details.
"""
solver_options = self.options
verbose = solver_options.get('verbose', False)
log_level = LogLevel.INFO if verbose else LogLevel.DEBUG
jac_update_tol = solver_options.get('jac_update_tol', 0.01)
jac_rel_perturbation = solver_options['eps']
factor_ref = factor = solver_options['factor']
history = solver_options.get('history', False)
if jac is not None:
jac = numpy.asarray(jac)
jac_lu, piv = options.get('jac_lup', (None, None))
calc_jac = options.get('compute_jacobian', True) or jac is None or jac.shape[1] != x0.shape[0]
def calc_jacobian(
x: numpy.ndarray,
residues: numpy.ndarray,
it_jac: int,
it_p_jac: int,
jac: numpy.ndarray=None,
r_indices_to_update=set(),
) -> Tuple[numpy.ndarray, int, int]:
"""Estimate function gradient with respect to x"""
n = x.size
if jac is None or jac.shape != (n, n):
new_jac = numpy.zeros((n, n), dtype=float)
x_indices_to_update = list(range(n))
else:
new_jac = jac.copy()
ncol = new_jac.shape[1]
x_indices_to_update = set()
for i in r_indices_to_update:
x_indices_to_update.update(j for j in range(ncol) if new_jac[i, j] != 0)
x_copy = x.copy()
for j in x_indices_to_update:
delta = jac_rel_perturbation
if abs(x_copy[j]) >= abs(jac_rel_perturbation):
delta = x_copy[j] * jac_rel_perturbation
x_copy[j] += delta
logger.debug(f"Perturb unknown {j}")
perturbation = fresidues(x_copy, *args)
new_jac[:, j] = (perturbation - residues) * (1 / delta)
x_copy[j] = x[j]
n_partial = len(x_indices_to_update)
if n_partial == n:
logger.log(log_level, f'Jacobian matrix: full update')
it_jac += 1
elif x_indices_to_update:
it_p_jac += 1
logger.log(log_level,
f'Jacobian matrix: {n_partial} over {n} derivative(s) updated')
return new_jac, it_jac, it_p_jac
logger.debug("NR - Reference call")
x = numpy.array(x0, dtype=numpy.float64).flatten()
r = fresidues(x, *args)
r_norm = numpy.linalg.norm(r, numpy.inf)
logger.log(log_level, f"Initial residue: {r_norm}")
logger.log(LogLevel.FULL_DEBUG, f"Initial relaxation factor {factor}.")
results = SolverResults()
def check_numerical_features(name, default_value):
if name not in options or options[name] is None or numpy.shape(options[name]) != x.shape:
options[name] = numpy.full_like(x, default_value)
check_numerical_features('lower_bound', -numpy.inf)
check_numerical_features('upper_bound', numpy.inf)
check_numerical_features('abs_step', numpy.inf)
check_numerical_features('rel_step', numpy.inf)
it_solver = 0
if recorder is not None:
record_state = lambda iter: recorder.record_state(str(iter), 'ok',)
else:
record_state = lambda iter: None
if not calc_jac:
logger.debug('Reuse of previous Jacobian matrix')
tol = solver_options['tol']
if tol is None:
tol = 'auto'
elif isinstance(tol, str):
if tol != 'auto':
raise ValueError(f"Tolerance must be a float, None, or 'auto'.")
elif not isinstance(tol, Number):
raise TypeError(f"Tolerance must be a number, None, or 'auto'; got {tol!r}")
auto_tol = (tol == 'auto')
if auto_tol:
tol = 0.0
iter_period = solver_options['tol_update_period']
tol_to_noise = solver_options['tol_to_noise_ratio']
must_update_tol = lambda it, tol: it % iter_period == 0 or tol <= 0.0
elif tol < 0:
raise ValueError(f"Tolerance must be non-negative (got {tol})")
else:
tol_to_noise = None
must_update_tol = lambda *args: False
results.trace = trace = list()
if history:
record = {
"x": x.copy(),
"residues": r.copy(),
"tol": tol,
}
trace.append(record)
max_iter = solver_options['max_iter']
p_jac = solver_options['partial_jac']
p_jac_tries = solver_options['partial_jac_tries']
it_jac, it_p_jac = 0, 0 # number of full and partial evalutions of the Jacobian
n_broyden_update = 0
dr = numpy.zeros_like(r)
dx = numpy.full_like(r, numpy.nan)
rtol_x = 1e-14
epsilon = numpy.finfo(numpy.float64).eps
logger.log(
LogLevel.FULL_DEBUG,
"\t".join([
f"iter #{it_solver}",
f"{tol = :.2e}",
f"|R| = {r_norm}",
f"{x = }",
])
)
try:
res_index_to_update = set()
consecutive_p_jac = 0
prev_tol = -1.0
while r_norm > tol and it_solver < max_iter:
logger.log(log_level, f'Iteration {it_solver}')
if calc_jac:
if consecutive_p_jac > p_jac_tries or not p_jac:
res_index_to_update = range(x.size)
consecutive_p_jac = 0
else:
consecutive_p_jac += 1
jac, it_jac, it_p_jac = calc_jacobian(
x, r, it_jac, it_p_jac,
jac, res_index_to_update,
)
jac_lu, piv = self.lu_factor(jac) # may raise an exception
elif it_solver > 0:
# Good Broyden update - source: https://nickcdryan.com/2017/09/16/broydens-method-in-python/
logger.log(log_level, f'Broyden update')
n_broyden_update += 1
corr = numpy.outer(dr - jac.dot(dx), dx) / dx.dot(dx)
jac += corr
jac_lu, piv = self.lu_factor(jac)
if must_update_tol(it_solver, tol):
norm_J = numpy.linalg.norm(jac, numpy.inf)
norm_x = numpy.linalg.norm(x, numpy.inf)
noise = epsilon * norm_J * norm_x
tol = tol_to_noise * noise
logger.log(
LogLevel.FULL_DEBUG,
f"iter #{it_solver}; {noise = }; {tol = }; |J| = {norm_J}; |x| = {norm_x}"
)
if tol == prev_tol:
logger.log(log_level, f"Numerical saturation detected at iteration {it_solver}; {x = }")
break
prev_tol = tol
dx = -lu_solve((jac_lu, piv), r)
it_solver += 1
# Compute relaxation factors from max absolute and relative steps
with numpy.errstate(invalid='ignore', divide='ignore'):
abs_x = numpy.abs(x)
abs_dx = numpy.abs(dx)
dx_max_abs = options['abs_step']
dx_max_rel = numpy.where(abs_x > 0, abs_x * options['rel_step'], numpy.inf)
factor_abs = numpy.where(abs_dx > dx_max_abs, dx_max_abs / abs_dx, 1).min()
factor_rel = numpy.where(abs_dx > dx_max_rel, dx_max_rel / abs_dx, 1).min()
factor = min(factor_rel, factor_abs, factor, 1)
if factor < factor_ref:
logger.debug(f"\trelaxation factor lowered to {factor}")
r_norm_prev = r_norm
x_prev = x.copy()
dx *= factor
x += dx
# Force solution within admissible bounds
# Note:
# This is clearly wrong, and does NOT belong in a nonlinear solver.
# The correct way to deal with such constraints is to use an optimizer.
# This part was only kept for non-regression purposes, but should be
# removed in a future revision.
x = numpy.maximum(x, options['lower_bound'])
x = numpy.minimum(x, options['upper_bound'])
new_r = fresidues(x, *args)
r_norm = numpy.linalg.norm(new_r, numpy.inf)
# logger.log(log_level, f'Residue: {r_norm:.5g}')
logger.log(
LogLevel.DEBUG,
"\t".join([
f"iter #{it_solver}",
f"{tol = :.2e}",
f"|R| = {r_norm}",
f"{x = }",
])
)
if numpy.allclose(x, x_prev, rtol=rtol_x, atol=0):
logger.log(log_level, f"Fixed point detected: {x = }, dx = {x - x_prev}")
break
# Estimate non-linearity by comparing extrapolated and actual residues
abs_r = numpy.abs(r)
extrapolated_r = r + jac.dot(dx)
delta_vs_linear = numpy.abs(extrapolated_r - new_r)
res_index_to_update = set(
numpy.argwhere(delta_vs_linear > jac_update_tol * abs_r).ravel()
)
res_index_to_update -= set(numpy.argwhere(abs_r < tol).ravel())
r, dr = new_r, new_r - r
if history:
record = {
"x": x.copy(),
"residues": r.copy(),
"tol": tol,
}
if calc_jac:
record["jac"] = jac.copy()
trace.append(record)
record_state(it_solver)
if res_index_to_update:
factor = factor_ref
calc_jac = True
elif r_norm != 0:
growth_rate = 1 + 0.1 * (1 - factor) * r_norm_prev / r_norm
factor = min(growth_rate * factor, 1)
calc_jac = False
logger.log(LogLevel.FULL_DEBUG, f"New relaxation factor {factor}.")
except (ValueError, LinAlgWarning):
is_nil = (jac == 0)
zero_rows = numpy.argwhere(is_nil.all(axis=1)).flatten()
zero_cols = numpy.argwhere(is_nil.all(axis=0)).flatten()
results.success = False
results.message = 'Singular {}x{} Jacobian matrix'.format(*jac.shape)
results.jac_errors = {'unknowns': zero_cols, 'residues': zero_rows}
else:
results.x = x
results.jac_lup = (jac_lu, piv)
results.jac_calls = it_jac + it_p_jac
results.fres_calls = it_solver
results.success = (r_norm <= tol)
status = f'Converged' if results.success else 'Not converged'
message = (
f"{status} ({r_norm:.4e}) in {it_solver} iterations,"
f" {it_jac} complete, {it_p_jac} partial Jacobian"
f" and {n_broyden_update} Broyden evaluation(s)"
f" ({tol = :.1e})"
)
results.message = f" -> {message}"
finally:
results.jac = jac
results.tol = tol
return results
[docs]
@staticmethod
def lu_factor(matrix: numpy.ndarray):
lu, piv = lu_factor(matrix, check_finite=True)
min_diag = numpy.abs(lu.diagonal()).min()
if min_diag < 1e-14:
raise LinAlgWarning(
f"Quasi-singular Jacobian matrix; min diag element of U matrix is {min_diag}"
)
return lu, piv
[docs]
def root(
fun: RootFunction,
x0: Sequence[float],
args: Tuple[Any] = tuple(),
method: NonLinearMethods = NonLinearMethods.POWELL,
options: Dict[str, Any] = {},
) -> Union[SolverResults, optimize.OptimizeResult]:
if method == NonLinearMethods.NR:
solver = CustomSolver.from_options(options)
return solver.solve(fun, x0, args, **options)
else:
solver = NumpySolver(method.value, options)
return solver.solve(fun, x0, args)