Source code for cosapp.utils.surrogate_models.nn_interpolators.weighted_interpolator

"""Define the WeightedInterpolator class."""

import numpy as np

from .nn_base import NNBase


[docs] class WeightedInterpolator(NNBase): """ Weighted Neighbor Interpolation. """ @staticmethod def _get_weights(ndist, dist_eff): """ Compute the weights based on the distance and distance effect. Parameters ---------- ndist : ndarray Distances from point to nearest neighbors. dist_eff : float Exponent used for the distance weighting. Returns ------- ndarray Weights for each neighbor based on distance. """ # Find the weighted neighbors per defined formula for distance effect dist = np.power(ndist, dist_eff) # Ignore Division by Zero Warnings with np.errstate(divide='ignore'): weights = 1.0 / dist # Replace any nan's to use the training data rows, cols = np.where(np.isinf(weights)) weights[rows, :] = 0. weights[rows, cols] = 1. return weights def __call__(self, prediction_points, num_neighbors=5, dist_eff=0): """ Do interpolation. Parameters ---------- prediction_points : ndarray Points at which interpolation is done. num_neighbors : int Number of neighbors to use. dist_eff : float Exponent used for the distance weighting. Using dist_eff=0 will default to `self._indep_dims + 1`. Returns ------- ndarray Interpolated values at the prediction points. """ if self._ntpts < num_neighbors: raise ValueError('WeightedInterpolant does not have sufficient ' 'training data to use num_neighbors={0}, only {1} points' ' available.'.format(num_neighbors, self._ntpts)) if dist_eff == 0: # If default, use #dims + 1 dist_eff = self._indep_dims + 1 if len(prediction_points.shape) == 1: # Reshape vector to n x 1 array prediction_points.shape = (1, prediction_points.shape[0]) normalized_pts = (prediction_points - self._tpm) / self._tpr # Find them neigbors # KData query takes (data, #ofneighbors) to determine closest # training points to predicted data ndist, nloc = self._KData.query(normalized_pts.real, num_neighbors) # Setup problem # Reshape ndist for 1D problems. if len(ndist.shape) == 1: ndist.shape = (1, ndist.shape[0]) nloc.shape = (1, nloc.shape[0]) weights = self._get_weights(ndist, dist_eff) weight_sum = np.sum(weights, axis=1) vals = self._tv[nloc] wt = np.einsum('ijk,ij->ik', vals, weights) predz = ((wt / weight_sum[:, np.newaxis]) * self._tvr) + self._tvm self._pt_cache = (normalized_pts, ndist, nloc) return predz
[docs] def gradient(self, prediction_points, num_neighbors=5, dist_eff=0): """ Find the gradient at each location of a set of supplied predicted points. Parameters ---------- prediction_points : ndarray Points at which interpolation is done. num_neighbors : int Number of neighbors to use. dist_eff : float Exponent used for the distance weighting. Using dist_eff=0 will default to `self._indep_dims + 1`. Returns ------- ndarray Gradient values at the prediction points. """ if self._ntpts < num_neighbors: raise ValueError('WeightedInterpolant does not have sufficient ' 'training data to use num_neighbors={0}, only {1} points' ' available.'.format(num_neighbors, self._ntpts)) if dist_eff == 0: # If default, use #dims + 1 dist_eff = self._indep_dims + 1 if len(prediction_points.shape) == 1: # Reshape vector to num_neighbors x 1 array prediction_points.shape = (1, prediction_points.shape[0]) normalized_pts = (prediction_points - self._tpm) / self._tpr if self._pt_cache is not None and \ np.allclose(self._pt_cache[0], normalized_pts): ndist, nloc = self._pt_cache[1:] else: ndist, nloc = self._KData.query(normalized_pts, num_neighbors) # Reshape ndist for 1D problems. if len(ndist.shape) == 1: ndist.shape = (1, ndist.shape[0]) nloc.shape = (1, nloc.shape[0]) dimdiff = normalized_pts - self._tp[nloc] weights = np.power(ndist, -dist_eff) dweights = -dist_eff * \ np.power(ndist[..., np.newaxis], -(dist_eff + 2)) * dimdiff weight_sum = np.sum(weights, axis=1) vals = self._tv[nloc] gradient = (weight_sum * np.einsum('ikj,ikl->ilj', dweights, vals) - (np.einsum('ij,ijk->ik', weights, vals)[..., np.newaxis] * np.sum(dweights, axis=1))) / np.power(weight_sum, 2) grad = gradient * (self._tvr[..., np.newaxis] / self._tpr) return grad