:mod:`operalib.ridge` implements Operator-Valued Kernel ridge
# Author: Romain Brault <romain.brault@telecom-paristech.fr> with help from
# the scikit-learn community.
# Maxime Sangnier <maxime.sangnier@gmail.com>
# License: MIT
from scipy.optimize import minimize
from scipy.sparse.linalg import LinearOperator
from numpy import (reshape, eye, zeros, empty, dot, all, isnan, diag, number,
from scipy.linalg import solve
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from sklearn.metrics.pairwise import rbf_kernel
from control import dlyap
from .metrics import first_periodic_kernel
from .kernels import DecomposableKernel, RBFCurlFreeKernel
from .risk import OVKRidgeRisk
from .signal import get_period
# When adding a new kernel, update this table and the _get_kernel_map method
'DGauss': DecomposableKernel,
'DPeriodic': DecomposableKernel,
'CurlF': RBFCurlFreeKernel}
def _graph_Laplacian(similarities):
return diag(similarities.sum(axis=1)) - similarities
class _SemisupLinop:
def __init__(self, lbda2, is_sup, L, p):
self.lbda2 = lbda2
self.is_sup = is_sup.ravel()
self.L = L
self.p = p
self.ns = is_sup.shape[0] - L.shape[0]
self.ls = L.shape[0]
def _dot_U(self, vec):
mat = vec.reshape((self.ns + self.ls, self.p))
res = empty((self.ns + self.ls, self.p))
res[self.is_sup, :] = mat[self.is_sup, :]
res[~self.is_sup, :] = self.lbda2 * dot(self.L, mat[~self.is_sup, :])
return res.ravel()
def _dot_JT(self, vec):
mat = vec.reshape((self.ns + self.ls, self.p))
res = empty((self.ns + self.ls, self.p))
res[self.is_sup, :] = mat[self.is_sup, :]
res[~self.is_sup, :] = 0
return res.ravel()
def gen(self):
shape_U = ((self.ns + self.ls) * self.p, (self.ns + self.ls) * self.p)
shape_JT = ((self.ns + self.ls) * self.p, (self.ns + self.ls) * self.p)
# return U, JT
return (LinearOperator(shape_U,
matvec=lambda b: self._dot_U(b),
rmatvec=lambda b: self._dot_U(b)),
matvec=lambda b: self._dot_JT(b),
rmatvec=lambda b: self._dot_JT(b)))
[docs]class OVKDecomposableRidge(BaseEstimator, RegressorMixin):
"""Operator-Valued kernel ridge regression.
Operator-Valued kernel ridge regression (OVKRR) combines ridge regression
(linear least squares with l2-norm regularization) with the (OV)kernel
trick. It learns a linear function in the space induced by the
respective kernel and the data. For non-linear kernels, this corresponds to
a non-linear function in the original space.
This is a simplyfied version of OVKRidge Handelling only the decomposable
kernel. This allows to rewrite the optimality condition as a sylvester
system of equation and reduce the complexity to
.. math::
\mathcal{O}(n^3) + \mathcal{O}(p^3)
where n is the number of training points and p the number of outputs.
Optimization problem solved for learning:
.. math::
\min_{h \in \mathcal H}~ \frac{\lambda}{2} \|h\|_{\mathcal H}^2 +
\frac{1}{np} \sum_{i=1}^n \|y_j - h(x_i)\|_{\mathcal Y}^2 +
\frac{\lambda_m}{2} \sum_{i=1}^n W_{ij}
\|h(x_i) - h(x_j)\|_{\mathcal Y}^2
dual_coef_ : array, shape = [n_samples x n_targest]
Weight vector(s) in kernel space
linop_ : callable
Callable which associate to the training points X the Gram matrix (the
Gram matrix being a LinearOperator)
A_ : array, shape = [n_targets, n_targets]
Set when Linear operator used by the decomposable kernel is default or
period_ : float
Set when period used by the First periodic kernel is 'autocorr'.
See also
Linear ridge regression.
Kernel ridge regression.
Support Vector Regression implemented using libsvm.
>>> import operalib as ovk
>>> import numpy as np
>>> n_samples, n_features, n_targets = 10, 5, 5
>>> rng = np.random.RandomState(0)
>>> y = rng.randn(n_samples, n_targets)
>>> X = rng.randn(n_samples, n_features)
>>> clf = ovk.OVKRidge('DGauss', lbda=1.0)
>>> clf.fit(X, y)
def __init__(self, input_kernel='Gauss', A=None, lbda=1e-5, gamma=None,
theta=0.7, period='autocorr', autocorr_params=None):
"""Initialize OVK ridge regression model.
ovkernel : {string, callable}, default='DGauss'
Kernel mapping used internally. A callable should accept two
arguments, and should return a LinearOperator.
lbda : {float}, default=1e-5
Small positive values of lbda improve the conditioning of the
problem and reduce the variance of the estimates. Lbda corresponds
to ``(2*C)^-1`` in other linear models such as LogisticRegression
or LinearSVC.
A : {LinearOperator, array-like, sparse matrix}, default=None
Linear operator used by the decomposable kernel. If default is
None, wich is set to identity matrix of size y.shape[1] when
gamma : {float}, default=None.
Gamma parameter for the Decomposable Gaussian kernel.
Ignored by other kernels.
theta : {float}, default=.7
Theta parameter for the Decomposable First Periodic kernel.
Ignored by other kernels.
period : {float}, default=default_period
Period parameter for the First periodic kernel. If optional modules
have been imported then default_period is 2 * pi. Otherwise it uses
autocorrelation methods to determine the period.
autocorr_params : {mapping of string to any}
Additional parameters (keyword arguments) for the period detection
for periodic kernels. If None, parameter choice is left to the
period detection method.
self.input_kernel = input_kernel
self.A = A
self.lbda = lbda
self.theta = theta
self.period = period
self.autocorr_params = autocorr_params
self.gamma = gamma
def _validate_params(self):
# check on self.ovkernel is performed in method __get_kernel
if self.lbda < 0:
raise ValueError('lbda must be positive')
if self.gamma is not None:
if self.gamma < 0:
raise ValueError('sigma must be positive or default (None)')
if self.theta < 0:
raise ValueError('theta must be positive')
if isinstance(self.period, (int, float)):
if self.period < 0:
raise ValueError('period must be positive')
def _default_decomposable_op(self, y):
if self.A is not None:
return self.A
elif y.ndim == 2:
return eye(y.shape[1])
return eye(1)
def _default_period(self, X, y):
if self.period is 'autocorr':
autocorr_params = self.autocorr_params or {}
return get_period(X, y, **autocorr_params)
elif isinstance(self.period, (int, float)):
return self.period
raise ValueError('period must be a positive number or a valid '
def _get_kernel_map(self, X, y):
# When adding a new kernel, update this table and the _get_kernel_map
# method
if callable(self.input_kernel):
ovkernel = self.input_kernel
elif type(self.input_kernel) is str:
# 1) check string and assign the right parameters
if self.input_kernel == 'Gauss':
self.A_ = self._default_decomposable_op(y)
kernel_params = {'A': self.A_, 'scalar_kernel': rbf_kernel,
'scalar_kernel_params': {'gamma': self.gamma}}
elif self.input_kernel == 'Periodic':
self.A_ = self._default_decomposable_op(y)
self.period_ = self._default_period(X, y)
kernel_params = {'A': self.A_,
'scalar_kernel': first_periodic_kernel,
'scalar_kernel_params': {'gamma': self.theta,
self.period_}, }
raise NotImplementedError('unsupported kernel')
# 2) Uses lookup table to select the right kernel from string
ovkernel = PAIRWISE_KERNEL_FUNCTIONS['D' + self.input_kernel](
raise NotImplementedError('unsupported kernel')
return ovkernel(X)
[docs] def fit(self, X, y):
"""Fit OVK ridge regression model.
X : {array-like, sparse matrix}, shape = [n_samples, n_features]
Training data.
y : {array-like}, shape = [n_samples] or [n_samples, n_targets]
Target values. numpy.NaN for missing targets (semi-supervised
self : returns an instance of self.
X = check_array(X, force_all_finite=True, accept_sparse=False,
y = check_array(y, force_all_finite=False, accept_sparse=False,
if y.ndim == 1:
y = check_array(y, force_all_finite=True, accept_sparse=False,
self.linop_ = self._get_kernel_map(X, y)
Gram = self.linop_._Gram(X)
if self.lbda > 0:
self.dual_coefs_ = dlyap(-Gram / self.lbda, self.linop_.A,
y / self.lbda)
# TODO: Check A is invertible!!
self.dual_coefs_ = solve(Gram, y)
return self
def _decision_function(self, X):
pred = dot(dot(self.linop_._Gram(X), self.dual_coefs_), self.linop_.A)
return reshape(pred, (X.shape[0], self.linop_.p)) \
if self.linop_.p > 1 else pred
[docs] def predict(self, X):
"""Predict using the OVK ridge model.
X : {array-like, sparse matrix}, shape = [n_samples, n_features]
C : {array}, shape = [n_samples] or [n_samples, n_targets]
Returns predicted values.
check_is_fitted(self, ['dual_coefs_', 'linop_'], all_or_any=all)
X = check_array(X, force_all_finite=True, accept_sparse=False,
return self._decision_function(X)
[docs]class OVKRidge(BaseEstimator, RegressorMixin):
"""Operator-Valued kernel ridge regression.
Operator-Valued kernel ridge regression (OVKRR) combines ridge regression
(linear least squares with l2-norm regularization) with the (OV)kernel
trick. It learns a linear function in the space induced by the
respective kernel and the data. For non-linear kernels, this corresponds to
a non-linear function in the original space.
Let n be the number of training points and p the number of outputs. This
algorithm has a complexity per iterations of
... math::
\mathcal{O}(pn^2) + \mathcal{O}(p^2n)
for the decomposable kernel. Hence, when the number of outputs is large,
the solver OVKDecomposableRidge should be faster.
The form of the model learned by OVKRR is identical to support vector
regression (SVR). However, different loss functions are used: OVKRR uses
squared error loss while support vector regression uses epsilon-insensitive
loss, both combined with l2 regularization. In contrast to SVR, fitting a
OVKRR model can be done in closed-form and is typically faster for
medium-sized datasets. On the other hand, the learned model is non-sparse
and thus slower than SVR, which learns a sparse model for epsilon > 0, at
Optimization problem solved for learning:
.. math::
\min_{h \in \mathcal H}~ \frac{\lambda}{2} \|h\|_{\mathcal H}^2 +
\frac{1}{np} \sum_{i=1}^n \|y_j - h(x_i)\|_{\mathcal Y}^2 +
\frac{\lambda_m}{2} \sum_{i=1}^n W_{ij}
\|h(x_i) - h(x_j)\|_{\mathcal Y}^2
dual_coef_ : array, shape = [n_samples x n_targest]
Weight vector(s) in kernel space
linop_ : callable
Callable which associate to the training points X the Gram matrix (the
Gram matrix being a LinearOperator)
A_ : array, shape = [n_targets, n_targets]
Set when Linear operator used by the decomposable kernel is default or
L_ : array, shape = [n_samples_miss, n_samples_miss]
Graph Laplacian of data with missing targets (semi-supervised
period_ : float
Set when period used by the First periodic kernel is 'autocorr'.
solver_res_ : any
Raw results returned by the solver.
* Micchelli, Charles A., and Massimiliano Pontil.
"On learning vector-valued functions." Neural computation
17.1 (2005): 177-204.
* Alvarez, Mauricio A., Lorenzo Rosasco, and Neil D. Lawrence.
"Kernels for vector-valued functions: A review." arXiv preprint
arXiv:1106.6251 (2011). APA
* Brouard Celine, d'Alche-Buc Florence and Szafranski Marie.
"Input Output Kernel Regression," Hal preprint
hal-01216708 (2015).
See also
Linear ridge regression.
Kernel ridge regression.
Support Vector Regression implemented using libsvm.
>>> import operalib as ovk
>>> import numpy as np
>>> n_samples, n_features, n_targets = 10, 5, 5
>>> rng = np.random.RandomState(0)
>>> y = rng.randn(n_samples, n_targets)
>>> X = rng.randn(n_samples, n_features)
>>> clf = ovk.OVKRidge('DGauss', lbda=1.0)
>>> clf.fit(X, y)
def __init__(self,
ovkernel='DGauss', lbda=1e-5, lbda_m=0.,
A=None, gamma=None, gamma_m=None, theta=0.7,
period='autocorr', autocorr_params=None,
solver='L-BFGS-B', solver_params=None):
"""Initialize OVK ridge regression model.
ovkernel : {string, callable}, default='DGauss'
Kernel mapping used internally. A callable should accept two
arguments, and should return a LinearOperator.
lbda : {float}, default=1e-5
Small positive values of lbda improve the conditioning of the
problem and reduce the variance of the estimates. Lbda corresponds
to ``(2*C)^-1`` in other linear models such as LogisticRegression
or LinearSVC.
lbda_m : {float}, default=0.
Regularization parameter for quadratic penalty on data with missing
A : {LinearOperator, array-like, sparse matrix}, default=None
Linear operator used by the decomposable kernel. If default is
None, wich is set to identity matrix of size y.shape[1] when
gamma : {float}, default=None.
Gamma parameter for the Decomposable Gaussian kernel.
Ignored by other kernels.
gamma_m : {float}, default=None.
Gamma parameter for the graph Laplacian inducing penalty on data
with missing targets.
theta : {float}, default=.7
Theta parameter for the Decomposable First Periodic kernel.
Ignored by other kernels.
period : {float}, default=default_period
Period parameter for the First periodic kernel. If optional modules
have been imported then default_period is 2 * pi. Otherwise it uses
autocorrelation methods to determine the period.
solver : {callable}, default=scipy.optimize.fmin_l_bfgs_b
Solver able to find the minimum of the ridge problem.
scipy.optimize.fmin_l_bfgs_b(*solver_params)[0] must return the
optimal solution.
autocorr_params : {mapping of string to any}
Additional parameters (keyword arguments) for the period detection
for periodic kernels. If None, parameter choice is left to the
period detection method.
solver_params : {mapping of string to any}, optional
Additional parameters (keyword arguments) for solver function
passed as callable object.
self.ovkernel = ovkernel
self.lbda = lbda
self.lbda_m = lbda_m
self.A = A
self.gamma = gamma
self.gamma_m = gamma_m
self.theta = theta
self.period = period
self.autocorr_params = autocorr_params
self.solver = solver
self.solver_params = solver_params
def _validate_params(self):
# check on self.ovkernel is performed in method __get_kernel
if self.lbda < 0:
raise ValueError('lbda must be positive')
if self.lbda_m < 0:
raise ValueError('lbda_m must be positive')
# if self.A < 0: # Check whether A is S PD would be really expensive
# raise ValueError('A must be a symmetric positive operator')
if self.gamma is not None:
if self.gamma < 0:
raise ValueError('sigma must be positive or default (None)')
if self.theta < 0:
raise ValueError('theta must be positive')
if isinstance(self.period, (int, float)):
if self.period < 0:
raise ValueError('period must be positive')
# TODO, add supported solver check
def _default_decomposable_op(self, y):
# TODO: check NaN values (semi-sup learning)
if self.A is not None:
return self.A
elif y.ndim == 2:
return eye(y.shape[1])
return eye(1)
def _default_period(self, X, y):
if self.period is 'autocorr':
autocorr_params = self.autocorr_params or {}
return get_period(X, y, **autocorr_params)
elif isinstance(self.period, (int, float)):
return self.period
raise ValueError('period must be a positive number or a valid '
def _get_kernel_map(self, X, y):
# When adding a new kernel, update this table and the _get_kernel_map
# method
if callable(self.ovkernel):
ovkernel = self.ovkernel
elif type(self.ovkernel) is str:
# 1) check string and assign the right parameters
if self.ovkernel == 'DGauss':
self.A_ = self._default_decomposable_op(y)
kernel_params = {'A': self.A_, 'scalar_kernel': rbf_kernel,
'scalar_kernel_params': {'gamma': self.gamma}}
elif self.ovkernel == 'DPeriodic':
self.A_ = self._default_decomposable_op(y)
self.period_ = self._default_period(X, y)
kernel_params = {'A': self.A_,
'scalar_kernel': first_periodic_kernel,
'scalar_kernel_params': {'gamma': self.theta,
self.period_}, }
elif self.ovkernel == 'CurlF':
kernel_params = {'gamma': self.gamma}
raise NotImplementedError('unsupported kernel')
# 2) Uses lookup table to select the right kernel from string
ovkernel = PAIRWISE_KERNEL_FUNCTIONS[self.ovkernel](
raise NotImplementedError('unsupported kernel')
return ovkernel(X)
def _decision_function(self, X):
pred = self.linop_(X) * self.dual_coefs_
return reshape(pred, (X.shape[0], self.linop_.p)) \
if self.linop_.p > 1 else pred
[docs] def fit(self, X, y):
"""Fit OVK ridge regression model.
X : {array-like, sparse matrix}, shape = [n_samples, n_features]
Training data.
y : {array-like}, shape = [n_samples] or [n_samples, n_targets]
Target values. numpy.NaN for missing targets (semi-supervised
self : returns an instance of self.
X = check_array(X, force_all_finite=True, accept_sparse=False,
y = check_array(y, force_all_finite=False, accept_sparse=False,
if y.ndim == 1:
y = check_array(y, force_all_finite=True, accept_sparse=False,
solver_params = self.solver_params or {}
self.linop_ = self._get_kernel_map(X, y)
Gram = self.linop_(X)
risk = OVKRidgeRisk(self.lbda)
if not issubdtype(y.dtype, number):
raise ValueError("Unknown label type: %r" % y.dtype)
if y.ndim > 1:
is_sup = ~all(isnan(y), axis=1)
is_sup = ~isnan(y)
if sum(~is_sup) > 0:
self.L_ = _graph_Laplacian(rbf_kernel(X[~is_sup, :],
self.L_ = empty((0, 0))
p = y.shape[1] if y.ndim > 1 else 1
weight, zeronan = _SemisupLinop(self.lbda_m, is_sup, self.L_, p).gen()
self.solver_res_ = minimize(risk.functional_grad_val,
args=(y.ravel(), Gram, weight, zeronan),
self.dual_coefs_ = self.solver_res_.x
return self
[docs] def predict(self, X):
"""Predict using the OVK ridge model.
X : {array-like, sparse matrix}, shape = [n_samples, n_features]
C : {array}, shape = [n_samples] or [n_samples, n_targets]
Returns predicted values.
check_is_fitted(self, ['dual_coefs_', 'linop_'], all_or_any=all)
X = check_array(X, force_all_finite=True, accept_sparse=False,
return self._decision_function(X)