"""
Surrogate model based on Kriging.
"""
import numpy as np
import scipy.linalg as linalg
from scipy.optimize import minimize
from .base import SurrogateModel
MACHINE_EPSILON = np.finfo(np.double).eps
[docs]
class KrigingSurrogate(SurrogateModel):
"""
Surrogate Modeling method based on the simple Kriging interpolation.
Predictions are returned as a tuple of mean and RMSE. Based on Gaussian Processes
for Machine Learning (GPML) by Rasmussen and Williams. (see also: scikit-learn).
Attributes
----------
alpha : ndarray
Reduced likelihood parameter: alpha
eval_rmse : bool
When true, calculate the root mean square prediction error.
L : ndarray
Reduced likelihood parameter: L
n_dims : int
Number of independents in the surrogate
n_samples : int
Number of training points.
nugget : double or ndarray, optional
Nugget smoothing parameter for smoothing noisy data. Represents the variance
of the input values. If nugget is an ndarray, it must be of the same length
as the number of training points. Default: 10. * Machine Epsilon
sigma2 : ndarray
Reduced likelihood parameter: sigma squared
thetas : ndarray
Kriging hyperparameters.
X : ndarray
Training input values, normalized.
X_mean : ndarray
Mean of training input values, normalized.
X_std : ndarray
Standard deviation of training input values, normalized.
Y : ndarray
Training model response values, normalized.
Y_mean : ndarray
Mean of training model response values, normalized.
Y_std : ndarray
Standard deviation of training model response values, normalized.
"""
def __init__(self, nugget=10. * MACHINE_EPSILON, eval_rmse=False):
"""
Initialize all attributes.
Parameters
----------
nugget : double or ndarray, optional
Nugget smoothing parameter for smoothing noisy data. Represents the variance
of the input values. If nugget is an ndarray, it must be of the same length
as the number of training points. Default: 10. * Machine Epsilon
eval_rmse : bool
Flag indicating whether the Root Mean Squared Error (RMSE) should be computed.
Set to False by default.
"""
self.n_dims = 0 # number of independent
self.n_samples = 0 # number of training points
self.thetas = np.zeros(0)
# nugget smoothing parameter from [Sasena, 2002]
self.nugget = nugget
self.alpha = np.zeros(0)
self.L = np.zeros(0)
self.sigma2 = np.zeros(0)
# Normalized Training Values
self.X = np.zeros(0)
self.Y = np.zeros(0)
self.X_mean = np.zeros(0)
self.X_std = np.zeros(0)
self.Y_mean = np.zeros(0)
self.Y_std = np.zeros(0)
self.eval_rmse = eval_rmse
[docs]
def train(self, x, y):
"""
Train the surrogate model with the given set of inputs and outputs.
Parameters
----------
x : array-like
Training input locations
y : array-like
Model responses at given inputs.
"""
x, y = np.atleast_2d(x, y)
self.n_samples, self.n_dims = x.shape
if self.n_samples <= 1:
raise ValueError("KrigingSurrogate require at least 2 training points.")
# Normalize the data
X_mean = np.mean(x, axis=0)
X_std = np.std(x, axis=0)
Y_mean = np.mean(y, axis=0)
Y_std = np.std(y, axis=0)
X_std[X_std == 0.] = 1.
Y_std[Y_std == 0.] = 1.
self.X = X = (x - X_mean) / X_std
self.Y = Y = (y - Y_mean) / Y_std
self.X_mean, self.X_std = X_mean, X_std
self.Y_mean, self.Y_std = Y_mean, Y_std
def _calcll(thetas):
"""Calculate loglike (callback function)."""
loglike = self._calculate_reduced_likelihood_params(np.exp(thetas))[0]
return -loglike
bounds = [(np.log(1e-5), np.log(1e5))] * self.n_dims
optResult = minimize(
_calcll, np.full(self.n_dims, 1e-1),
method = 'slsqp',
options = {'eps': 1e-3},
bounds = bounds,
)
if not optResult.success:
raise RuntimeError(
f"Kriging Hyper-parameter optimization failed: {optResult.message}"
)
self.thetas = np.exp(optResult.x)
_, params = self._calculate_reduced_likelihood_params()
self.alpha = params['alpha']
self.U = params['U']
self.S_inv = params['S_inv']
self.Vh = params['Vh']
self.sigma2 = params['sigma2']
def _calculate_reduced_likelihood_params(self, thetas=None):
"""
Calculate quantity with same maximum location as the log-likelihood for a given theta.
Parameters
----------
thetas : ndarray, optional
Given input correlation coefficients. If none given, uses self.thetas
from training.
Returns
-------
ndarray
Calculated reduced_likelihood
dict
Dictionary containing the parameters.
"""
if thetas is None:
thetas = self.thetas
X, Y = self.X, self.Y
# Correlation Matrix
distances = np.zeros((self.n_samples, self.n_dims, self.n_samples))
for i in range(self.n_samples):
distances[i, :, i + 1:] = np.abs(X[i, ...] - X[i + 1:, ...]).T
distances[i + 1:, :, i] = distances[i, :, i + 1:].T
R = np.exp(-thetas.dot(np.square(distances)))
R[np.diag_indices_from(R)] = 1. + self.nugget
[U, S, Vh] = linalg.svd(R)
# Penrose-Moore Pseudo-Inverse:
# Given A = USV^* and Ax=b, the least-squares solution is
# x = V S^-1 U^* b.
# Tikhonov regularization is used to make the solution significantly
# more robust.
h = 1e-8 * S[0]
inv_factors = S / (S ** 2. + h ** 2.)
alpha = Vh.T.dot(np.einsum('j,kj,kl->jl', inv_factors, U, Y))
logdet = -np.sum(np.log(inv_factors))
sigma2 = np.dot(Y.T, alpha).sum(axis=0) / self.n_samples
reduced_likelihood = -(np.log(np.sum(sigma2)) + logdet / self.n_samples)
params = dict(
alpha = alpha,
sigma2 = sigma2 * np.square(self.Y_std),
S_inv = inv_factors,
U = U,
Vh = Vh,
)
return reduced_likelihood, params
[docs]
def predict(self, x):
"""
Calculate predicted value of the response based on the current trained model.
Parameters
----------
x : array-like
Point at which the surrogate is evaluated.
Returns
-------
ndarray
Kriging prediction.
ndarray, optional (if eval_rmse is True)
Root mean square of the prediction error.
"""
thetas = self.thetas
x = np.atleast_2d(x)
n_eval = x.shape[0]
# Normalize input
x_n = (x - self.X_mean) / self.X_std
r = np.zeros((n_eval, self.n_samples), dtype=x.dtype)
for r_i, x_i in zip(r, x_n):
r_i[:] = np.exp(-thetas.dot(np.square((x_i - self.X).T)))
# Scaled Predictor
y_t = np.dot(r, self.alpha)
# Predictor
y = self.Y_mean + self.Y_std * y_t
if self.eval_rmse:
mse = (
1. - np.dot(
np.dot(r, self.Vh.T),
np.einsum('j,kj,lk->jl', self.S_inv, self.U, r)
)
) * self.sigma2
# Forcing negative RMSE to zero if negative due to machine precision
mse[mse < 0.] = 0.
return y, np.sqrt(mse)
return y
[docs]
def linearize(self, x):
"""
Calculate the jacobian of the Kriging surface at the requested point.
Parameters
----------
x : array-like
Point at which the surrogate Jacobian is evaluated.
Returns
-------
ndarray
Jacobian of surrogate output wrt inputs.
"""
thetas = self.thetas
# Normalize Input
x_n = (x - self.X_mean) / self.X_std
r = np.exp(-thetas.dot(np.square((x_n - self.X).T)))
# Z = einsum('i,ij->ij', X, Y) is equivalent to, but much faster and
# memory efficient than, diag(X).dot(Y) for vector X and 2D array Y.
# i.e., Z[i, j] = X[i] * Y[i, j]
gradr = r * -2 * np.einsum('i,ij->ij', thetas, (x_n - self.X).T)
jac = np.einsum(
'i,j,ij->ij',
self.Y_std,
1. / self.X_std,
gradr.dot(self.alpha).T
)
return jac
[docs]
class FloatKrigingSurrogate(KrigingSurrogate):
"""
Surrogate model based on the simple Kriging interpolation.
Predictions are returned as floats, which are the mean of the model's prediction.
"""
[docs]
def predict(self, x):
"""
Calculate predicted value of response based on the current trained model.
Parameters
----------
x : array-like
Point at which the surrogate is evaluated.
Returns
-------
float
Mean value of kriging prediction.
"""
dist = super(FloatKrigingSurrogate, self).predict(x)
return dist[0] # mean value