diff --git a/bluemath_tk/core/decorators.py b/bluemath_tk/core/decorators.py index 45c1f53..bf983b8 100644 --- a/bluemath_tk/core/decorators.py +++ b/bluemath_tk/core/decorators.py @@ -274,6 +274,7 @@ def wrapper( normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, num_threads: int = None, + iteratively_update_sigma: bool = False, ): if subset_data is None: raise ValueError("Subset data cannot be None") @@ -306,6 +307,8 @@ def wrapper( if num_threads is not None: if not isinstance(num_threads, int) or num_threads <= 0: raise ValueError("Number of threads must be integer and > 0") + if not isinstance(iteratively_update_sigma, bool): + raise TypeError("Iteratively update sigma must be a boolean") return func( self, subset_data, @@ -316,6 +319,7 @@ def wrapper( normalize_target_data, target_custom_scale_factor, num_threads, + iteratively_update_sigma, ) return wrapper diff --git a/bluemath_tk/interpolation/rbf.py b/bluemath_tk/interpolation/rbf.py index 07a401f..f8e97af 100644 --- a/bluemath_tk/interpolation/rbf.py +++ b/bluemath_tk/interpolation/rbf.py @@ -1,15 +1,54 @@ -from typing import List -import copy import time +from typing import List, Tuple, Callable import numpy as np import pandas as pd -from scipy.optimize import fminbound -from scipy.interpolate import RBFInterpolator -from sklearn.metrics import mean_squared_error +from scipy.optimize import fminbound, fmin from ._base_interpolation import BaseInterpolation from ..core.decorators import validate_data_rbf +def gaussian_kernel(r: float, const: float) -> float: + """ + This function calculates the Gaussian kernel for the given distance and constant. + + Parameters + ---------- + r : float + The distance. + const : float + The constant (default name is usually sigma for gaussian kernel). + + Returns + ------- + float + The Gaussian kernel value. + + Notes + ----- + - The Gaussian kernel is defined as: + K(r) = exp(r^2 / 2 * const^2) (https://en.wikipedia.org/wiki/Gaussian_function) + - Here, we are assuming the mean is 0. + """ + + return np.exp(-0.5 * r * r / (const * const)) + + +def multiquadratic_kernel(r, const): + return np.sqrt(1 + (r / const) ** 2) + + +def inverse_kernel(r, const): + return 1 / np.sqrt(1 + (r / const) ** 2) + + +def cubic_kernel(r, const): + return r**3 + + +def thin_plate_kernel(r, const): + return r**2 * np.log(r / const) + + class RBFError(Exception): """ Custom exception for RBF class. @@ -24,9 +63,6 @@ class RBF(BaseInterpolation): """ Radial Basis Function (RBF) interpolation model. - Here, scipy's RBFInterpolator is used to interpolate the data. - https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.RBFInterpolator.html - Attributes ---------- sigma_min : float @@ -36,43 +72,22 @@ class RBF(BaseInterpolation): The maximum value for the sigma parameter. This value might change in the optimization process. sigma_diff : float - The minimum difference between the optimal sigma and the minimum and maximum - sigma values. + The difference between the optimal bounded sigma and the minimum and maximum sigma values. + If the difference is less than this value, the optimization process continues. kernel : str - Type of RBF. This should be one of - - - 'linear' : ``-r`` - - 'thin_plate_spline' : ``r**2 * log(r)`` - - 'cubic' : ``r**3`` - - 'quintic' : ``-r**5`` - - 'multiquadric' : ``-sqrt(1 + r**2)`` - - 'inverse_multiquadric' : ``1/sqrt(1 + r**2)`` - - 'inverse_quadratic' : ``1/(1 + r**2)`` - - 'gaussian' : ``exp(-r**2)`` - - smoothing : float or (npoints, ) array_like - Smoothing parameter. The interpolant perfectly fits the data when this - is set to 0. For large values, the interpolant approaches a least - squares fit of a polynomial with the specified degree. - degree : int - Degree of the added polynomial. For some RBFs the interpolant may not - be well-posed if the polynomial degree is too small. Those RBFs and - their corresponding minimum degrees are - - - 'multiquadric' : 0 - - 'linear' : 0 - - 'thin_plate_spline' : 1 - - 'cubic' : 1 - - 'quintic' : 2 - - The default value is the minimum degree for `kernel` or 0 if there is - no minimum degree. Set this to -1 for no added polynomial. - neighbors : int - If specified, the value of the interpolant at each evaluation point - will be computed using only this many nearest data points. All the data - points are used by default. - rbfs : dict - Dict with RBFInterpolator instances. + The kernel to use for the RBF model. + The available kernels are: + + - gaussian : ``exp(-1/2 * (r / const)**2)`` + - multiquadratic : ``sqrt(1 + (r / const)**2)`` + - inverse : ``1 / sqrt(1 + (r / const)**2)`` + - cubic : ``r**3`` + - thin_plate : ``r**2 * log(r / const)`` + + kernel_func : function + The kernel function to use for the RBF model. + smooth : float + The smoothness parameter. subset_data : pd.DataFrame The subset data used to fit the model. normalized_subset_data : pd.DataFrame @@ -105,69 +120,95 @@ class RBF(BaseInterpolation): Methods ------- - fit : Fits the model to the data. - predict : Predicts the data for the provided dataset. - fit_predict : Fits the model to the subset and predicts the interpolated dataset. - - See Also - -------- - OtherInterpolationMethod - - References - ---------- - .. [1] Fasshauer, G., 2007. Meshfree Approximation Methods with Matlab. - World Scientific Publishing Co. - - .. [2] http://amadeus.math.iit.edu/~fass/603_ch3.pdf - - .. [3] Wahba, G., 1990. Spline Models for Observational Data. SIAM. - - .. [4] http://pages.stat.wisc.edu/~wahba/stat860public/lect/lect8/lect8.pdf - - Examples - -------- - >>> import numpy as np + fit(...) : + Fits the model to the data. + predict(...) : + Predicts the data for the provided dataset. + fit_predict(...) : + Fits the model to the subset and predicts the interpolated dataset. Notes ----- - .. versionadded:: 1.0.3 TODO: For the moment, this class only supports optimization for one parameter kernels. For this reason, we only have sigma as the parameter to optimize. This sigma refers to the sigma parameter in the Gaussian kernel (but is used for all kernels). + + Main reference for sigma optimization: https://link.springer.com/article/10.1023/A:1018975909870 + + Examples + -------- + >>> import numpy as np + >>> import pandas as pd + >>> from bluemath_tk.interpolation.rbf import RBF + >>> dataset = pd.DataFrame( + ... { + ... "Hs": np.random.rand(1000) * 7, + ... "Tp": np.random.rand(1000) * 20, + ... "Dir": np.random.rand(1000) * 360, + ... } + ... ) + >>> subset = dataset.sample(frac=0.25) + >>> target = pd.DataFrame( + ... { + ... "HsPred": self.subset["Hs"] * 2 + self.subset["Tp"] * 3, + ... "DirPred": - self.subset["Dir"], + ... } + ... ) + >>> rbf = RBF() + >>> predictions = rbf.fit_predict( + ... subset_data=subset, + ... subset_directional_variables=["Dir"], + ... target_data=target, + ... target_directional_variables=["DirPred"], + ... normalize_target_data=True, + ... dataset=dataset, + ... num_threads=4, + ... iteratively_update_sigma=True, + ... ) """ + rbf_kernels = { + "gaussian": gaussian_kernel, + "multiquadratic": multiquadratic_kernel, + "inverse": inverse_kernel, + "cubic": cubic_kernel, + "thin_plate": thin_plate_kernel, + } + def __init__( self, sigma_min: float = 0.001, sigma_max: float = 0.1, sigma_diff: float = 0.0001, + sigma_opt: float = None, kernel: str = "gaussian", - smoothing: float = 0.0, - degree: int = None, - neighbors: int = None, + smooth: float = 1e-5, ): """ - Initializes the RBF model. + Raises + ------ + ValueError + If the sigma_min is not a positive float. + If the sigma_max is not a positive float greater than sigma_min. + If the sigma_diff is not a positive float. + If the kernel is not a string and one of the available kernels. + If the smooth is not a positive float. + """ - Parameters - ---------- - sigma_min : float, optional - The minimum value for the sigma parameter. Default is 0.001. - sigma_max : float, optional - The maximum value for the sigma parameter. Default is 0.1. - sigma_diff : float, optional - The minimum difference between the optimal sigma and the minimum and maximum - sigma values. Default is 0.0001. - kernel : str, optional - Type of RBF. Default is 'gaussian'. - smoothing : float, optional - Smoothing parameter. Default is 0.0. - degree : int, optional - Degree of the added polynomial. Default is None. - neighbors : int, optional - If specified, the value of the interpolant at each evaluation point will be - computed using only this many nearest data points. Default is None. + initial_msg = f""" + --------------------------------------------------------------------------------- + | Initializing RBF interpolation model with the following parameters: + | - sigma_min: {sigma_min} + | - sigma_max: {sigma_max} + | - sigma_diff: {sigma_diff} + | - sigma_opt: {sigma_opt} + | - kernel: {kernel} + | - smooth: {smooth} + | For more information, please refer to the documentation. + | Recommended lecture: https://link.springer.com/article/10.1023/A:1018975909870 + --------------------------------------------------------------------------------- """ + print(initial_msg) super().__init__() self.set_logger_name(name=self.__class__.__name__) @@ -182,19 +223,23 @@ def __init__( if not isinstance(sigma_diff, float) or sigma_diff < 0: raise ValueError("sigma_diff must be a positive float.") self._sigma_diff = sigma_diff - if not isinstance(kernel, str): - raise ValueError("kernel must be a string.") + if not isinstance(kernel, str) or kernel not in self.rbf_kernels.keys(): + raise ValueError( + f"kernel must be a string and one of {list(self.rbf_kernels.keys())}." + ) + if sigma_opt is not None: + if not isinstance(sigma_opt, float) or sigma_opt < 0: + raise ValueError("sigma_opt must be a positive float.") + self._sigma_opt = sigma_opt + if not isinstance(kernel, str) or kernel not in self.rbf_kernels.keys(): + raise ValueError( + f"kernel must be a string and one of {list(self.rbf_kernels.keys())}." + ) self._kernel = kernel - if not isinstance(smoothing, float): - raise ValueError("smoothing must be a float.") - self._smoothing = smoothing - if not isinstance(degree, int) and degree is not None: - raise ValueError("degree must be an integer.") - self._degree = degree - if not isinstance(neighbors, int) and neighbors is not None: - raise ValueError("neighbors must be an integer.") - self._neighbors = neighbors - self._rbfs: dict = {} # Dict with RBFInterpolator instances + self._kernel_func = self.rbf_kernels[self.kernel] + if not isinstance(smooth, float) or smooth < 0: + raise ValueError("smooth must be a positive float.") + self._smooth = smooth # Below, we initialize the attributes that will be set in the fit method self.is_fitted: bool = False self.is_target_normalized: bool = False @@ -226,24 +271,20 @@ def sigma_diff(self) -> float: return self._sigma_diff @property - def kernel(self) -> str: - return self._kernel - - @property - def smoothing(self) -> float: - return self._smoothing + def sigma_opt(self) -> float: + return self._sigma_opt @property - def degree(self) -> int: - return self._degree + def kernel(self) -> str: + return self._kernel @property - def neighbors(self) -> int: - return self._neighbors + def kernel_func(self) -> Callable: + return self._kernel_func @property - def rbfs(self) -> dict: - return self._rbfs + def smooth(self) -> float: + return self._smooth @property def subset_data(self) -> pd.DataFrame: @@ -301,8 +342,6 @@ def rbf_coeffs(self) -> pd.DataFrame: @property def opt_sigmas(self) -> dict: - if not self._opt_sigmas: - raise ValueError("Specified kernel does not require optimization.") return self._opt_sigmas def _preprocess_subset_data( @@ -316,7 +355,7 @@ def _preprocess_subset_data( subset_data : pd.DataFrame The subset data to preprocess (could be a dataset to predict). is_fit : bool, optional - Whether the data is to fit or not. Default is True. + Whether the data is being fit or not. Default is True. Returns ------- @@ -441,6 +480,78 @@ def _preprocess_target_data( self.logger.info("Target data preprocessed successfully") return target_data.copy() + def _rbf_assemble(self, x, sigma): + """ + Assemble the RBF matrix. + + Parameters + ---------- + x : np.ndarray + The data. + sigma : float + The sigma parameter for the kernel. + + Returns + ------- + np.ndarray + The data with all the calculated kernel values. + """ + + # Get the number of rows and columns in x + dim, n = x.shape + + # Compute the pairwise distances + dists = np.linalg.norm(x[:, :, np.newaxis] - x[:, np.newaxis, :], axis=0) + + # Apply the kernel function to the distances + A = self.kernel_func(dists, sigma) + + # Subtract the smoothing parameter from the diagonal elements + np.fill_diagonal(A, A.diagonal() - self.smooth) + + # Add the identity matrix to the matrix (polynomial term) + P = np.hstack((np.ones((n, 1)), x.T)) + A = np.vstack( + (np.hstack((A, P)), np.hstack((P.T, np.zeros((dim + 1, dim + 1))))) + ) + + return A + + def _calc_rbf_coeff( + self, sigma: float, x: np.ndarray, y: np.ndarray + ) -> Tuple[np.ndarray, np.ndarray]: + """ + This function calculates the RBF coefficients for the given data. + + Parameters + ---------- + sigma : float + The sigma parameter for the kernel. + x : np.ndarray + The subset data used to interpolate. + y : np.ndarray + The target data to interpolate. + + Returns + ------- + Tuple[np.ndarray, np.ndarray] + The RBF coefficients and the A matrix. + """ + + # Get the number of rows and columns in x + m, n = x.shape + + # Assemble the A matrix + A = self._rbf_assemble(x=x, sigma=sigma) + + # Concatenate y with zeros and reshape + b = np.concatenate((y, np.zeros((m + 1,)))).reshape(-1, 1) + + # Calculate the RBF coefficients + rbfcoeff, _, _, _ = np.linalg.lstsq(A, b, rcond=None) # inverse + + return rbfcoeff, A + def _cost_sigma(self, sigma: float, x: np.ndarray, y: np.ndarray) -> float: """ This function is called by fminbound to minimize the cost function. @@ -460,26 +571,40 @@ def _cost_sigma(self, sigma: float, x: np.ndarray, y: np.ndarray) -> float: The cost value. """ - # Instantiate the RBFInterpolator - rbf = RBFInterpolator( - y=x, - d=y, - neighbors=self.neighbors, - smoothing=self.smoothing, - kernel=self.kernel, - epsilon=sigma, - degree=self.degree, - ) - predicted_y = rbf(x) - # TODO: Check if this is the correct cost function - # cost = mean_squared_error(y, predicted_y) - cost = np.dot(y - predicted_y, y - predicted_y) + # Calculate RBF coefficients and A matrix + rbf_coeff, A = self._calc_rbf_coeff(sigma=sigma, x=x, y=y) + + # Extract the top-left n x n submatrix from A + m, n = x.shape + A = A[:n, :n] + + # Compute the pseudo-inverse of the submatrix A + invA = np.linalg.pinv(A) - return cost + # Initialize residuals by subtracting the last m elements of rbf_coeff from y + m1, n1 = rbf_coeff.shape + kk = y - rbf_coeff[m1 - m - 1] + + # Adjust residuals by subtracting the product of rbf_coeff and x + # DEPRECATED: The loop is replaced by the vectorized operation below + # for i in range(m): + # kk = kk - rbf_coeff[m1 - m + i] * x[i, :] + kk -= np.dot(rbf_coeff[m1 - m :].T, x).reshape(-1) + + # Calculate the cost by multiplying invA with kk and normalizing by the diagonal elements of invA + ceps = np.dot(invA, kk) / np.diagonal(invA) + + # Return the norm of ceps, representing the cost + yy = np.linalg.norm(ceps) + + return yy def _calc_opt_sigma( - self, target_variable: np.ndarray, subset_variables: np.ndarray - ) -> RBFInterpolator: + self, + target_variable: np.ndarray, + subset_variables: np.ndarray, + iteratively_update_sigma: bool = False, + ) -> float: """ This function calculates the optimal sigma for the given target variable. @@ -489,6 +614,8 @@ def _calc_opt_sigma( The target variable to interpolate. subset_variables : np.ndarray The subset variables used to interpolate. + iteratively_update_sigma : bool, optional + Whether to iteratively update the sigma parameter. Default is False. Returns ------- @@ -500,39 +627,100 @@ def _calc_opt_sigma( # Initialize sigma_min, sigma_max, and d_sigma sigma_min, sigma_max, d_sigma = self.sigma_min, self.sigma_max, 0 - # Loop until sigma_diff is less than the specified sigma_diff - while d_sigma < self.sigma_diff: - opt_sigma = fminbound( + if self.sigma_opt is not None: + # Optimize sigma using the specified sigma_opt + opt_sigma = fmin( func=self._cost_sigma, - x1=sigma_min, - x2=sigma_max, + x0=self.sigma_opt, args=(subset_variables, target_variable), disp=0, ) - lm_min = np.abs(opt_sigma - sigma_min) - lm_max = np.abs(opt_sigma - sigma_max) - if lm_min < self.sigma_diff: - sigma_min = sigma_min - sigma_min / 2 - elif lm_max < self.sigma_min: - sigma_max = sigma_max + sigma_max / 2 - d_sigma = np.nanmin([lm_min, lm_max]) - - # Save the fitted RBF for the optimal sigma - rbf = RBFInterpolator( - y=subset_variables, - d=target_variable, - neighbors=self.neighbors, - smoothing=self.smoothing, - kernel=self.kernel, - epsilon=opt_sigma, - degree=self.degree, - ) + if iteratively_update_sigma: + self._sigma_opt = opt_sigma + else: + # Loop until sigma_diff is less than the specified sigma_diff + while d_sigma < self.sigma_diff: + opt_sigma = fminbound( + func=self._cost_sigma, + x1=sigma_min, + x2=sigma_max, + args=(subset_variables, target_variable), + disp=0, + ) + lm_min = np.abs(opt_sigma - sigma_min) + lm_max = np.abs(opt_sigma - sigma_max) + if lm_min < self.sigma_diff: + sigma_min = sigma_min - sigma_min / 2 + elif lm_max < self.sigma_min: + sigma_max = sigma_max + sigma_max / 2 + d_sigma = np.nanmin([lm_min, lm_max]) + if iteratively_update_sigma: + self._sigma_opt = opt_sigma # Calculate the time taken to optimize sigma t1 = time.time() self.logger.info(f"Optimal sigma: {opt_sigma} - Time: {t1 - t0:.2f} seconds") - return rbf, opt_sigma + # Calculate the RBF coefficients for the optimal sigma + rbf_coeff, _ = self._calc_rbf_coeff( + sigma=opt_sigma, x=subset_variables, y=target_variable + ) + + return rbf_coeff, opt_sigma + + def _rbf_interpolate(self, dataset: pd.DataFrame) -> pd.DataFrame: + """ + This function interpolates the dataset. + + Parameters + ---------- + dataset : pd.DataFrame + The dataset to interpolate (must have same variables as subset). + + Returns + ------- + pd.DataFrame + The interpolated dataset (with all target variables). + """ + + normalized_dataset = self._preprocess_subset_data( + subset_data=dataset, is_fit=False + ) + + # Get the number of rows and columns in subset and dataset + num_vars_subset, num_points_subset = self.normalized_subset_data.T.shape + _, num_points_dataset = normalized_dataset.T.shape + + # Initialize the interpolated dataset + interpolated_array = np.zeros( + (num_points_dataset, len(self.target_processed_variables)) + ) + + # Loop through the target variables + for i_var, target_var in enumerate(self.target_processed_variables): + self.logger.info(f"Interpolating target variable {target_var}") + rbf_coeff = self._rbf_coeffs[target_var].values + opt_sigma = self._opt_sigmas[target_var] + r = np.linalg.norm( + normalized_dataset.values[:, np.newaxis, :] + - self.normalized_subset_data.values[np.newaxis, :, :], + axis=2, + ) + kernel_values = self.kernel_func(r, opt_sigma) + linear_part = np.dot( + normalized_dataset.values, + rbf_coeff[ + num_points_subset + 1 : num_points_subset + 1 + num_vars_subset + ].T, + ) + s = ( + rbf_coeff[num_points_subset] + + np.dot(kernel_values, rbf_coeff[:num_points_subset]) + + linear_part + ) + interpolated_array[:, i_var] = s + + return pd.DataFrame(interpolated_array, columns=self.target_processed_variables) @validate_data_rbf def fit( @@ -545,6 +733,7 @@ def fit( normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, num_threads: int = None, + iteratively_update_sigma: bool = False, ) -> None: """ Fits the model to the data. @@ -567,6 +756,8 @@ def fit( The custom scale factor for the target data. Default is {}. num_threads : int, optional The number of threads to use for the optimization. Default is None. + iteratively_update_sigma : bool, optional + Whether to iteratively update the sigma parameter. Default is False. Notes ----- @@ -595,32 +786,15 @@ def fit( # RBF fitting for all variables rbf_coeffs, opt_sigmas = {}, {} - # Optimize sigma for each target variable for target_var in target_data.columns: self.logger.info(f"Fitting RBF for variable {target_var}") target_var_values = target_data[target_var].values - if ( - self.kernel == "linear" - or self.kernel == "cubic" - or self.kernel == "quintic" - or self.kernel == "thin_plate_spline" - ): - rbf = RBFInterpolator( - y=subset_data.values, - d=target_var_values, - neighbors=self.neighbors, - smoothing=self.smoothing, - kernel=self.kernel, - degree=self.degree, - ) - opt_sigma = None - else: - rbf, opt_sigma = self._calc_opt_sigma( - target_variable=target_var_values, - subset_variables=subset_data.values, - ) - self.rbfs[target_var] = copy.deepcopy(rbf) - rbf_coeffs[target_var] = rbf._coeffs.flatten() + rbf_coeff, opt_sigma = self._calc_opt_sigma( + target_variable=target_var_values, + subset_variables=subset_data.values.T, + iteratively_update_sigma=iteratively_update_sigma, + ) + rbf_coeffs[target_var] = rbf_coeff.flatten() opt_sigmas[target_var] = opt_sigma # Store the RBF coefficients and optimal sigmas @@ -661,40 +835,19 @@ def predict(self, dataset: pd.DataFrame) -> pd.DataFrame: raise RBFError("RBF model must be fitted before predicting.") self.logger.info("Reconstructing data using fitted coefficients.") - normalized_dataset = self._preprocess_subset_data( - subset_data=dataset, is_fit=False - ) - - # Create an empty array to store the interpolated target data - interpolated_target_array = np.zeros( - (normalized_dataset.shape[0], len(self.target_processed_variables)) - ) - for target_var in self.target_processed_variables: - self.logger.info(f"Predicting target variable {target_var}") - rbf = self.rbfs[target_var] - interpolated_target_array[ - :, self.target_processed_variables.index(target_var) - ] = rbf(normalized_dataset.values) - interpolated_target = pd.DataFrame( - data=interpolated_target_array, columns=self.target_processed_variables - ) - - # Denormalize the target data if normalize_target_data is True + interpolated_target = self._rbf_interpolate(dataset=dataset) if self.is_target_normalized: self.logger.info("Denormalizing target data") interpolated_target = self.denormalize( normalized_data=interpolated_target, scale_factor=self.target_scale_factor, ) - - # Calculate the degrees for the target directional variables for directional_variable in self.target_directional_variables: self.logger.info(f"Calculating target degrees for {directional_variable}") interpolated_target[directional_variable] = self.get_degrees_from_uv( xu=interpolated_target[f"{directional_variable}_u"].values, xv=interpolated_target[f"{directional_variable}_v"].values, ) - return interpolated_target def fit_predict( @@ -708,6 +861,7 @@ def fit_predict( normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, num_threads: int = None, + iteratively_update_sigma: bool = False, ) -> pd.DataFrame: """ Fits the model to the subset and predicts the interpolated dataset. @@ -732,6 +886,8 @@ def fit_predict( The custom scale factor for the target data. Default is {}. num_threads : int, optional The number of threads to use for the optimization. Default is None. + iteratively_update_sigma : bool, optional + Whether to iteratively update the sigma parameter. Default is False. Returns ------- @@ -752,6 +908,7 @@ def fit_predict( normalize_target_data=normalize_target_data, target_custom_scale_factor=target_custom_scale_factor, num_threads=num_threads, + iteratively_update_sigma=iteratively_update_sigma, ) return self.predict(dataset=dataset) diff --git a/bluemath_tk/interpolation/rbf_old.py b/bluemath_tk/interpolation/rbf_scipy.py similarity index 64% rename from bluemath_tk/interpolation/rbf_old.py rename to bluemath_tk/interpolation/rbf_scipy.py index 5c24031..7b977ff 100644 --- a/bluemath_tk/interpolation/rbf_old.py +++ b/bluemath_tk/interpolation/rbf_scipy.py @@ -1,54 +1,16 @@ +from typing import List +import copy import time -from typing import List, Tuple, Callable import numpy as np import pandas as pd -from scipy.optimize import fminbound +from scipy.optimize import fmin, fminbound +from scipy.interpolate import RBFInterpolator +from sklearn.model_selection import KFold +from sklearn.metrics import mean_squared_error from ._base_interpolation import BaseInterpolation from ..core.decorators import validate_data_rbf -def gaussian_kernel(r: float, const: float) -> float: - """ - This function calculates the Gaussian kernel for the given distance and constant. - - Parameters - ---------- - r : float - The distance. - const : float - The constant (default name is usually sigma for gaussian kernel). - - Returns - ------- - float - The Gaussian kernel value. - - Notes - ----- - - The Gaussian kernel is defined as: - K(r) = exp(r^2 / 2 * const^2) (https://en.wikipedia.org/wiki/Gaussian_function) - - Here, we are assuming the mean is 0. - """ - - return np.exp(-0.5 * r * r / (const * const)) - - -def multiquadratic_kernel(r, const): - return np.sqrt(1 + (r / const) ** 2) - - -def inverse_kernel(r, const): - return 1 / np.sqrt(1 + (r / const) ** 2) - - -def cubic_kernel(r, const): - return r**3 - - -def thin_plate_kernel(r, const): - return r**2 * np.log(r / const) - - class RBFError(Exception): """ Custom exception for RBF class. @@ -63,6 +25,18 @@ class RBF(BaseInterpolation): """ Radial Basis Function (RBF) interpolation model. + Here, scipy's RBFInterpolator is used to interpolate the data. + https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.RBFInterpolator.html + + Warnings + -------- + - This class is a Beta, results may not be accurate. + + See Also + -------- + bluemath_tk.interpolation.RBF : + The stable version for this model. + Attributes ---------- sigma_min : float @@ -71,14 +45,43 @@ class RBF(BaseInterpolation): sigma_max : float The maximum value for the sigma parameter. This value might change in the optimization process. - sigma_diff : float - The minimum difference between the optimal sigma and the minimum and maximum sigma values. + sigma_opt : float + The optimal value for the sigma parameter. kernel : str - The kernel to use for the RBF model. - kernel_func : function - The kernel function to use for the RBF model. - smooth : float - The smoothness parameter. + Type of RBF. This should be one of + + - 'linear' : ``-r`` + - 'thin_plate_spline' : ``r**2 * log(r)`` + - 'cubic' : ``r**3`` + - 'quintic' : ``-r**5`` + - 'multiquadric' : ``-sqrt(1 + r**2)`` + - 'inverse_multiquadric' : ``1/sqrt(1 + r**2)`` + - 'inverse_quadratic' : ``1/(1 + r**2)`` + - 'gaussian' : ``exp(-r**2)`` + + smoothing : float or (npoints, ) array_like + Smoothing parameter. The interpolant perfectly fits the data when this + is set to 0. For large values, the interpolant approaches a least + squares fit of a polynomial with the specified degree. + degree : int + Degree of the added polynomial. For some RBFs the interpolant may not + be well-posed if the polynomial degree is too small. Those RBFs and + their corresponding minimum degrees are + + - 'multiquadric' : 0 + - 'linear' : 0 + - 'thin_plate_spline' : 1 + - 'cubic' : 1 + - 'quintic' : 2 + + The default value is the minimum degree for `kernel` or 0 if there is + no minimum degree. Set this to -1 for no added polynomial. + neighbors : int + If specified, the value of the interpolant at each evaluation point + will be computed using only this many nearest data points. All the data + points are used by default. + rbfs : dict + Dict with RBFInterpolator instances. subset_data : pd.DataFrame The subset data used to fit the model. normalized_subset_data : pd.DataFrame @@ -111,49 +114,41 @@ class RBF(BaseInterpolation): Methods ------- - fit( - subset_data: pd.DataFrame, - target_data: pd.DataFrame, - subset_directional_variables: List[str] = [], - target_directional_variables: List[str] = [], - subset_custom_scale_factor: dict = {}, - normalize_target_data: bool = True, - target_custom_scale_factor: dict = {}, - ) -> None - predict(dataset: pd.DataFrame) -> pd.DataFrame - fit_predict( - subset_data: pd.DataFrame, - target_data: pd.DataFrame, - dataset: pd.DataFrame, - subset_directional_variables: List[str] = [], - target_directional_variables: List[str] = [], - subset_custom_scale_factor: dict = {}, - normalize_target_data: bool = True, - target_custom_scale_factor: dict = {}, - ) -> pd.DataFrame + fit(...) : + Fits the model to the data. + predict(...) : + Predicts the data for the provided dataset. + fit_predict(...) : + Fits the model to the subset and predicts the interpolated dataset. + + References + ---------- + .. [1] Fasshauer, G., 2007. Meshfree Approximation Methods with Matlab. + World Scientific Publishing Co. + + .. [2] http://amadeus.math.iit.edu/~fass/603_ch3.pdf + + .. [3] Wahba, G., 1990. Spline Models for Observational Data. SIAM. + + .. [4] http://pages.stat.wisc.edu/~wahba/stat860public/lect/lect8/lect8.pdf Notes ----- + .. versionadded:: 1.0.3 TODO: For the moment, this class only supports optimization for one parameter kernels. For this reason, we only have sigma as the parameter to optimize. This sigma refers to the sigma parameter in the Gaussian kernel (but is used for all kernels). """ - rbf_kernels = { - "gaussian": gaussian_kernel, - "multiquadratic": multiquadratic_kernel, - "inverse": inverse_kernel, - "cubic": cubic_kernel, - "thin_plate": thin_plate_kernel, - } - def __init__( self, sigma_min: float = 0.001, - sigma_max: float = 0.1, - sigma_diff: float = 0.0001, - kernel: str = "gaussian", - smooth: float = 1e-5, + sigma_max: float = 1.0, + sigma_opt: float = None, + kernel: str = "thin_plate_spline", + smoothing: float = 0.0, + degree: int = None, + neighbors: int = None, ): """ Initializes the RBF model. @@ -163,29 +158,18 @@ def __init__( sigma_min : float, optional The minimum value for the sigma parameter. Default is 0.001. sigma_max : float, optional - The maximum value for the sigma parameter. Default is 0.1. - sigma_diff : float, optional - The minimum difference between the optimal sigma and the minimum and maximum sigma values. - Default is 0.0001. + The maximum value for the sigma parameter. Default is 1.0. + sigma_opt : float, optional + The optimal value for the sigma parameter. Default is None. kernel : str, optional - The kernel to use for the RBF model. Default is "gaussian". - The available kernels are: - - "gaussian": Gaussian kernel. - - "multiquadratic": Multiquadratic kernel. - - "inverse": Inverse kernel. - - "cubic": Cubic kernel. - - "thin_plate": Thin plate kernel. - smooth : float, optional - The smoothness parameter. Default is 1e-5. - - Raises - ------ - ValueError - If the sigma_min is not a positive float. - If the sigma_max is not a positive float greater than sigma_min. - If the sigma_diff is not a positive float. - If the kernel is not a string and one of the available kernels. - If the smooth is not a positive float. + Type of RBF. Default is 'thin_plate_spline'. + smoothing : float, optional + Smoothing parameter. Default is 0.0. + degree : int, optional + Degree of the added polynomial. Default is None. + neighbors : int, optional + If specified, the value of the interpolant at each evaluation point will be + computed using only this many nearest data points. Default is None. """ super().__init__() @@ -198,18 +182,23 @@ def __init__( "sigma_max must be a positive float greater than sigma_min." ) self._sigma_max = sigma_max - if not isinstance(sigma_diff, float) or sigma_diff < 0: - raise ValueError("sigma_diff must be a positive float.") - self._sigma_diff = sigma_diff - if not isinstance(kernel, str) or kernel not in self.rbf_kernels.keys(): - raise ValueError( - f"kernel must be a string and one of {list(self.rbf_kernels.keys())}." - ) + if sigma_opt is not None: + if not isinstance(sigma_opt, float) or sigma_opt < 0: + raise ValueError("sigma_opt must be a positive float.") + self._sigma_opt = sigma_opt + if not isinstance(kernel, str): + raise ValueError("kernel must be a string.") self._kernel = kernel - self._kernel_func = self.rbf_kernels[self.kernel] - if not isinstance(smooth, float) or smooth < 0: - raise ValueError("smooth must be a positive float.") - self._smooth = smooth + if not isinstance(smoothing, float): + raise ValueError("smoothing must be a float.") + self._smoothing = smoothing + if not isinstance(degree, int) and degree is not None: + raise ValueError("degree must be an integer.") + self._degree = degree + if not isinstance(neighbors, int) and neighbors is not None: + raise ValueError("neighbors must be an integer.") + self._neighbors = neighbors + self._rbfs: dict = {} # Dict with RBFInterpolator instances # Below, we initialize the attributes that will be set in the fit method self.is_fitted: bool = False self.is_target_normalized: bool = False @@ -237,20 +226,28 @@ def sigma_max(self) -> float: return self._sigma_max @property - def sigma_diff(self) -> float: - return self._sigma_diff + def sigma_opt(self) -> float: + return self._sigma_opt @property def kernel(self) -> str: return self._kernel @property - def kernel_func(self) -> Callable: - return self._kernel_func + def smoothing(self) -> float: + return self._smoothing + + @property + def degree(self) -> int: + return self._degree + + @property + def neighbors(self) -> int: + return self._neighbors @property - def smooth(self) -> float: - return self._smooth + def rbfs(self) -> dict: + return self._rbfs @property def subset_data(self) -> pd.DataFrame: @@ -308,6 +305,8 @@ def rbf_coeffs(self) -> pd.DataFrame: @property def opt_sigmas(self) -> dict: + if not self._opt_sigmas: + raise ValueError("Specified kernel does not require optimization.") return self._opt_sigmas def _preprocess_subset_data( @@ -321,7 +320,7 @@ def _preprocess_subset_data( subset_data : pd.DataFrame The subset data to preprocess (could be a dataset to predict). is_fit : bool, optional - Whether the data is being fit or not. Default is True. + Whether the data is to fit or not. Default is True. Returns ------- @@ -446,127 +445,62 @@ def _preprocess_target_data( self.logger.info("Target data preprocessed successfully") return target_data.copy() - def _rbf_assemble(self, x: np.ndarray, sigma: float) -> np.ndarray: - """ - This function assembles the RBF matrix for the given data. - - Parameters - ---------- - x : np.ndarray - The data. - sigma : float - The sigma parameter for the kernel. - - Returns - ------- - np.ndarray - The data with all the calculated kernel values. - """ - - # Get the number of rows and columns in x - dim, n = x.shape - - # Fill the matrix with the kernel values - A = np.zeros((n, n)) - for i in range(n): - for j in range(i + 1): - r = np.linalg.norm(x[:, i] - x[:, j]) - temp = self.kernel_func(r, sigma) - A[i, j] = temp - A[j, i] = temp - A[i, i] = A[i, i] - self.smooth - - # Add the identity matrix to the matrix (polynomial term) - P = np.hstack((np.ones((n, 1)), x.T)) - A = np.vstack( - (np.hstack((A, P)), np.hstack((P.T, np.zeros((dim + 1, dim + 1))))) - ) - - return A - - def _calc_rbf_coeff( - self, sigma: float, x: np.ndarray, y: np.ndarray - ) -> Tuple[np.ndarray, np.ndarray]: - """ - This function calculates the RBF coefficients for the given data. - - Parameters - ---------- - sigma : float - The sigma parameter for the kernel. - x : np.ndarray - The subset data used to interpolate. - y : np.ndarray - The target data to interpolate. - - Returns - ------- - Tuple[np.ndarray, np.ndarray] - The RBF coefficients and the A matrix. - """ - - # Get the number of rows and columns in x - m, n = x.shape - - # Assemble the A matrix - A = self._rbf_assemble(x=x, sigma=sigma) - - # Concatenate y with zeros and reshape - b = np.concatenate((y, np.zeros((m + 1,)))).reshape(-1, 1) - - # Calculate the RBF coefficients - rbfcoeff, _, _, _ = np.linalg.lstsq(A, b, rcond=None) # inverse - - return rbfcoeff, A - - def _cost_sigma(self, sigma: float, x: np.ndarray, y: np.ndarray) -> float: + def _cost_sigma( + self, sigma: float, x: np.ndarray, y: np.ndarray, k: int = 5 + ) -> float: """ - This function is called by fminbound to minimize the cost function. + Calculate the cost for a given sigma using K-Fold cross-validation. Parameters ---------- sigma : float The sigma parameter for the kernel. x : np.ndarray - The subset data used to interpolate. + The input data. y : np.ndarray - The target data to interpolate. + The target data. + k : int, optional + The number of folds for cross-validation. Default is 5. Returns ------- float - The cost value. + The total cost for the RBF interpolation. """ - # Calculate RBF coefficients and A matrix - rbf_coeff, A = self._calc_rbf_coeff(sigma=sigma, x=x, y=y) - - # Extract the top-left n x n submatrix from A - m, n = x.shape - A = A[:n, :n] - - # Compute the pseudo-inverse of the submatrix A - invA = np.linalg.pinv(A) - - # Initialize residuals by subtracting the last m elements of rbf_coeff from y - m1, n1 = rbf_coeff.shape - kk = y - rbf_coeff[m1 - m - 1] - - # Adjust residuals by subtracting the product of rbf_coeff and x - for i in range(m): - kk = kk - rbf_coeff[m1 - m + i] * x[i, :] + kf = KFold(n_splits=k) + total_cost = 0.0 + + for train_index, val_index in kf.split(x): + x_train, x_val = x[train_index], x[val_index] + y_train, y_val = y[train_index], y[val_index] + + # Instantiate the RBFInterpolator + rbf = RBFInterpolator( + y=x_train, + d=y_train, + neighbors=self.neighbors, + smoothing=self.smoothing, + kernel=self.kernel, + epsilon=sigma, + degree=self.degree, + ) - # Calculate the cost by multiplying invA with kk and normalizing by the diagonal elements of invA - ceps = np.dot(invA, kk) / np.diagonal(invA) + # Predict on the validation set + predicted_y = rbf(x_val) - # Return the norm of ceps, representing the cost - yy = np.linalg.norm(ceps) + # Calculate the cost (mean squared error) + cost = mean_squared_error(y_val, predicted_y) + total_cost += cost - return yy + return total_cost / k def _calc_opt_sigma( - self, target_variable: np.ndarray, subset_variables: np.ndarray - ) -> float: + self, + target_variable: np.ndarray, + subset_variables: np.ndarray, + iteratively_update_sigma: bool = False, + ) -> RBFInterpolator: """ This function calculates the optimal sigma for the given target variable. @@ -576,6 +510,8 @@ def _calc_opt_sigma( The target variable to interpolate. subset_variables : np.ndarray The subset variables used to interpolate. + iteratively_update_sigma : bool, optional + Whether to iteratively update the sigma parameter. Default is False. Returns ------- @@ -584,93 +520,42 @@ def _calc_opt_sigma( """ t0 = time.time() - # Initialize sigma_min, sigma_max, and d_sigma - sigma_min, sigma_max, d_sigma = self.sigma_min, self.sigma_max, 0 - # Loop until sigma_diff is less than the specified sigma_diff - while d_sigma < self.sigma_diff: + # Optimize sigma using fminbound or fmin + if self.sigma_opt is not None: + opt_sigma = fmin( + func=self._cost_sigma, + x0=self.sigma_opt, + args=(subset_variables, target_variable), + disp=0, + )[-1] + if iteratively_update_sigma: + self._sigma_opt = opt_sigma + else: opt_sigma = fminbound( func=self._cost_sigma, - x1=sigma_min, - x2=sigma_max, + x1=self.sigma_min, + x2=self.sigma_max, args=(subset_variables, target_variable), disp=0, ) - lm_min = np.abs(opt_sigma - sigma_min) - lm_max = np.abs(opt_sigma - sigma_max) - if lm_min < self.sigma_diff: - sigma_min = sigma_min - sigma_min / 2 - elif lm_max < self.sigma_min: - sigma_max = sigma_max + sigma_max / 2 - d_sigma = np.nanmin([lm_min, lm_max]) + + # Save the fitted RBF for the optimal sigma + rbf = RBFInterpolator( + y=subset_variables, + d=target_variable, + neighbors=self.neighbors, + smoothing=self.smoothing, + kernel=self.kernel, + epsilon=opt_sigma, + degree=self.degree, + ) # Calculate the time taken to optimize sigma t1 = time.time() self.logger.info(f"Optimal sigma: {opt_sigma} - Time: {t1 - t0:.2f} seconds") - # Calculate the RBF coefficients for the optimal sigma - rbf_coeff, _ = self._calc_rbf_coeff( - sigma=opt_sigma, x=subset_variables, y=target_variable - ) - - return rbf_coeff, opt_sigma - - def _rbf_interpolate(self, dataset: pd.DataFrame) -> pd.DataFrame: - """ - This function interpolates the dataset. - - Parameters - ---------- - dataset : pd.DataFrame - The dataset to interpolate (must have same variables as subset). - - Returns - ------- - pd.DataFrame - The interpolated dataset (with all target variables). - """ - - normalized_dataset = self._preprocess_subset_data( - subset_data=dataset, is_fit=False - ) - - # Get the number of rows and columns in subset and dataset - num_vars_subset, num_points_subset = self.normalized_subset_data.T.shape - _, num_points_dataset = normalized_dataset.T.shape - - # Initialize the interpolated dataset - interpolated_array = np.zeros( - (num_points_dataset, len(self.target_processed_variables)) - ) - - # Loop through the target variables - for i_var, target_var in enumerate(self.target_processed_variables): - self.logger.info(f"Interpolating target variable {target_var}") - rbf_coeff = self._rbf_coeffs[target_var].values - opt_sigma = self._opt_sigmas[target_var] - for i in range(num_points_dataset): - r = np.linalg.norm( - np.repeat( - [normalized_dataset.iloc[i].values], num_points_subset, axis=0 - ) - - self.normalized_subset_data.values, - axis=1, - ) - s = rbf_coeff[num_points_subset] + np.sum( - rbf_coeff[:num_points_subset] * self.kernel_func(r, opt_sigma) - ) - - # linear part - for k in range(num_vars_subset): - s = ( - s - + rbf_coeff[k + num_points_subset + 1] - * normalized_dataset.values.T[k, i] - ) - - interpolated_array[i, i_var] = s - - return pd.DataFrame(interpolated_array, columns=self.target_processed_variables) + return rbf, opt_sigma @validate_data_rbf def fit( @@ -683,6 +568,7 @@ def fit( normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, num_threads: int = None, + iteratively_update_sigma: bool = False, ) -> None: """ Fits the model to the data. @@ -705,6 +591,8 @@ def fit( The custom scale factor for the target data. Default is {}. num_threads : int, optional The number of threads to use for the optimization. Default is None. + iteratively_update_sigma : bool, optional + Whether to iteratively update the sigma parameter. Default is False. Notes ----- @@ -733,14 +621,33 @@ def fit( # RBF fitting for all variables rbf_coeffs, opt_sigmas = {}, {} + # Optimize sigma for each target variable for target_var in target_data.columns: self.logger.info(f"Fitting RBF for variable {target_var}") target_var_values = target_data[target_var].values - rbf_coeff, opt_sigma = self._calc_opt_sigma( - target_variable=target_var_values, - subset_variables=subset_data.values.T, - ) - rbf_coeffs[target_var] = rbf_coeff.flatten() + if ( + self.kernel == "linear" + or self.kernel == "cubic" + or self.kernel == "quintic" + or self.kernel == "thin_plate_spline" + ): + rbf = RBFInterpolator( + y=subset_data.values, + d=target_var_values, + neighbors=self.neighbors, + smoothing=self.smoothing, + kernel=self.kernel, + degree=self.degree, + ) + opt_sigma = None + else: + rbf, opt_sigma = self._calc_opt_sigma( + target_variable=target_var_values, + subset_variables=subset_data.values, + iteratively_update_sigma=iteratively_update_sigma, + ) + self.rbfs[target_var] = copy.deepcopy(rbf) + rbf_coeffs[target_var] = rbf._coeffs.flatten() opt_sigmas[target_var] = opt_sigma # Store the RBF coefficients and optimal sigmas @@ -779,20 +686,42 @@ def predict(self, dataset: pd.DataFrame) -> pd.DataFrame: if self.is_fitted is False: raise RBFError("RBF model must be fitted before predicting.") + self.logger.info("Reconstructing data using fitted coefficients.") - interpolated_target = self._rbf_interpolate(dataset=dataset) + normalized_dataset = self._preprocess_subset_data( + subset_data=dataset, is_fit=False + ) + + # Create an empty array to store the interpolated target data + interpolated_target_array = np.zeros( + (normalized_dataset.shape[0], len(self.target_processed_variables)) + ) + for target_var in self.target_processed_variables: + self.logger.info(f"Predicting target variable {target_var}") + rbf = self.rbfs[target_var] + interpolated_target_array[ + :, self.target_processed_variables.index(target_var) + ] = rbf(normalized_dataset.values) + interpolated_target = pd.DataFrame( + data=interpolated_target_array, columns=self.target_processed_variables + ) + + # Denormalize the target data if normalize_target_data is True if self.is_target_normalized: self.logger.info("Denormalizing target data") interpolated_target = self.denormalize( normalized_data=interpolated_target, scale_factor=self.target_scale_factor, ) + + # Calculate the degrees for the target directional variables for directional_variable in self.target_directional_variables: self.logger.info(f"Calculating target degrees for {directional_variable}") interpolated_target[directional_variable] = self.get_degrees_from_uv( xu=interpolated_target[f"{directional_variable}_u"].values, xv=interpolated_target[f"{directional_variable}_v"].values, ) + return interpolated_target def fit_predict( @@ -806,6 +735,7 @@ def fit_predict( normalize_target_data: bool = True, target_custom_scale_factor: dict = {}, num_threads: int = None, + iteratively_update_sigma: bool = False, ) -> pd.DataFrame: """ Fits the model to the subset and predicts the interpolated dataset. @@ -830,6 +760,8 @@ def fit_predict( The custom scale factor for the target data. Default is {}. num_threads : int, optional The number of threads to use for the optimization. Default is None. + iteratively_update_sigma : bool, optional + Whether to iteratively update the sigma parameter. Default is False. Returns ------- @@ -850,6 +782,7 @@ def fit_predict( normalize_target_data=normalize_target_data, target_custom_scale_factor=target_custom_scale_factor, num_threads=num_threads, + iteratively_update_sigma=iteratively_update_sigma, ) return self.predict(dataset=dataset)