#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
The :mod:`samplesizelib.linear.heuristic` contains classes:
- :class:`samplesizelib.linear.heuristic.CrossValidationEstimator`
- :class:`samplesizelib.linear.heuristic.BootstrapEstimator`
"""
from __future__ import print_function
__docformat__ = 'restructuredtext'
from multiprocessing import Pool
import numpy as np
import scipy.stats as sps
from sklearn.metrics import roc_curve
from ..shared.estimator import SampleSizeEstimator
from ..shared.utils import Dataset
[docs]class CrossValidationEstimator(SampleSizeEstimator):
r"""
Description of Cross Validation Method
:param statmodel: the machine learning algorithm
:type statmodel: RegressionModel or LogisticModel
:param averaging: to do
:type averaging: float
:param epsilon: to do
:type epsilon: float
:param begin: to do
:type begin: int
:param end: to do
:type end: int
:param num: to do
:type num: int
:param test_size: to do
:type test_size: float
:param multiprocess: to do
:type multiprocess: bool
:param progressbar: to do
:type progressbar: bool
"""
def __init__(self, statmodel, **kwards):
r"""Constructor method
"""
super().__init__()
self.statmodel = statmodel
self.averaging = int(kwards.pop('averaging', 100))
if self.averaging <= 0:
raise ValueError(
"The averaging should be positive but get {}".format(
self.averaging))
self.test_size = kwards.pop('test_size', 0.5)
if self.test_size < 0 or self.test_size > 1:
raise ValueError(
"The test_size must be between 0 and 1 but get {}".format(
self.test_size))
self.epsilon = kwards.pop('epsilon', 0.05)
if self.epsilon <= 0:
raise ValueError(
"The epsilon must be positive value but get {}".format(
self.epsilon))
self.begin = kwards.pop('begin', None)
if self.begin is not None and self.begin <0:
raise ValueError(
"The begin must be positive value but get {}".format(
self.begin))
self.end = kwards.pop('end', None)
if self.end is not None and self.end < 0:
raise ValueError(
"The end must be positive value but get {}".format(
self.end))
if self.end is not None and self.begin is not None and self.end <= self.begin:
raise ValueError(
"The end value must be greater than the begin value but {}<={}".format(
self.end, self.begin))
self.num = kwards.pop('num', 5)
if self.num <=0:
raise ValueError(
"The num must be positive value but get {}".format(
self.num))
if self.end is not None and self.begin is not None and self.num >= self.end - self.begin:
raise ValueError(
"The num value must be smaler than (end - begin) but {}>={}".format(
self.num, self.end - self.begin))
self.multiprocess = kwards.pop('multiprocess', False)
if not isinstance(self.multiprocess, bool):
raise ValueError(
"The multiprocess must be bool value but get {}".format(
self.multiprocess))
self.progressbar = kwards.pop('progressbar', False)
if not isinstance(self.progressbar, bool):
raise ValueError(
"The progressbar must be bool value but get {}".format(
self.progressbar))
if kwards:
raise ValueError("Invalid parameters: %s" % str(kwards))
self.dataset = None
def _RS(self, dataset):
r"""
Return ...
"""
X_train, X_test, y_train, y_test = dataset.train_test_split(self.test_size)
w_hat = self.statmodel(y_train, X_train).fit()
S_train = self.statmodel(y_train, X_train).loglike(w_hat)/y_train.shape[0]
S_test = self.statmodel(y_test, X_test).loglike(w_hat)/y_test.shape[0]
return S_train - S_test
def _score_subsample(self, m):
r"""
Return ...
"""
X_m, y_m = self.dataset.sample(m)
dataset_m = Dataset(X_m, y_m)
return self._RS(dataset_m)
[docs] def forward(self, features, target):
r"""
Returns sample size prediction for the given dataset.
:param features: The tensor of shape
`num_elements` :math:`\times` `num_feature`.
:type features: array.
:param target: The tensor of shape `num_elements`.
:type target: array.
:return: sample size estimation for the given dataset.
:rtype: dict
"""
self.dataset = Dataset(features, target)
if self.end is None:
end = len(self.dataset) - 1
else:
end = self.end
if self.begin is None:
begin = 2*self.dataset.n
else:
begin = self.begin
if end <= begin:
raise ValueError(
"The end value must be greater than the begin value but {}<={}".format(
end, begin))
if self.num >= end - begin:
raise ValueError(
"The num value must be smaler than (end - begin) but {}>={}".format(
self.num, end - begin))
subset_sizes = np.arange(begin, end, self.num, dtype=np.int64)
list_of_answers = []
points_one = np.ones(self.averaging, dtype=np.int64)
if self.multiprocess:
pool = Pool()
mapping = pool.map
else:
mapping = map
if self.progressbar:
iterator = self._progressbar(subset_sizes)
else:
iterator = subset_sizes
for i, m in enumerate(iterator):
list_of_answers.append(
np.asarray(
list(mapping(self._score_subsample, m*points_one))))
self._set_status(100.*(i+1)/len(subset_sizes))
if self.multiprocess:
pool.close()
pool.join()
list_of_answers = np.asarray(list_of_answers)
list_of_E = np.mean(list_of_answers, axis = 1)
list_of_S = np.std(list_of_answers, axis = 1)
m_size = end
for m, mean in zip(reversed(subset_sizes), reversed(list_of_E)):
if mean < self.epsilon:
m_size = m
return {'m*': m_size,
'E': np.array(list_of_E),
'S': np.array(list_of_S),
'm': np.array(subset_sizes),
}
[docs]class BootstrapEstimator(SampleSizeEstimator):
r"""
Description of Bootstrap Method
:param statmodel: the machine learning algorithm
:type statmodel: RegressionModel or LogisticModel
:param averaging: to do
:type averaging: float
:param epsilon: to do
:type epsilon: float
:param begin: to do
:type begin: int
:param end: to do
:type end: int
:param num: to do
:type num: int
:param multiprocess: to do
:type multiprocess: bool
:param progressbar: to do
:type progressbar: bool
"""
def __init__(self, statmodel, **kwards):
r"""Constructor method
"""
super().__init__()
self.statmodel = statmodel
self.averaging = int(kwards.pop('averaging', 100))
if self.averaging <= 0:
raise ValueError(
"The averaging should be positive but get {}".format(
self.averaging))
self.epsilon = kwards.pop('epsilon', 0.5)
if self.epsilon <= 0:
raise ValueError(
"The epsilon must be positive value but get {}".format(
self.epsilon))
self.begin = kwards.pop('begin', None)
if self.begin is not None and self.begin < 0:
raise ValueError(
"The begin must be positive value but get {}".format(
self.begin))
self.end = kwards.pop('end', None)
if self.end is not None and self.end < 0:
raise ValueError(
"The end must be positive value but get {}".format(
self.end))
if self.end is not None and self.begin is not None and self.end <= self.begin:
raise ValueError(
"The end value must be greater than the begin value but {}<={}".format(
self.end, self.begin))
self.num = kwards.pop('num', 5)
if self.num <=0:
raise ValueError(
"The num must be positive value but get {}".format(
self.num))
if self.end is not None and self.begin is not None and self.num >= self.end - self.begin:
raise ValueError(
"The num value must be smaler than (end - begin) but {}>={}".format(
self.num, self.end - self.begin))
self.multiprocess = kwards.pop('multiprocess', False)
if not isinstance(self.multiprocess, bool):
raise ValueError(
"The multiprocess must be bool value but get {}".format(
self.multiprocess))
self.progressbar = kwards.pop('progressbar', False)
if not isinstance(self.progressbar, bool):
raise ValueError(
"The progressbar must be bool value but get {}".format(
self.progressbar))
if kwards:
raise ValueError("Invalid parameters: %s" % str(kwards))
self.dataset = None
def _bFunction(self, dataset):
r"""
Return ...
"""
X, y = dataset.sample()
w_hat = self.statmodel(y, X).fit()
if len(list(set(list(y)))) != 2:
y_hat = self.statmodel(y, X).predict(w_hat)
Es = y - y_hat
y_new = y_hat + (Es - Es.mean())
w_res = self.statmodel(y_new, X).fit()
else:
w_res = w_hat
return w_res
def _score_subsample(self, m):
r"""
Return ...
"""
X_m, y_m = self.dataset.sample(m)
dataset_m = Dataset(X_m, y_m)
return self._bFunction(dataset_m)
[docs] def forward(self, features, target):
r"""
Returns sample size prediction for the given dataset.
:param features: The tensor of shape
`num_elements` :math:`\times` `num_feature`.
:type features: array.
:param target: The tensor of shape `num_elements`.
:type target: array.
:return: sample size estimation for the given dataset.
:rtype: dict
"""
self.dataset = Dataset(features, target)
if self.end is None:
end = len(self.dataset) - 1
else:
end = self.end
if self.begin is None:
begin = 2*self.dataset.n
else:
begin = self.begin
if end <= begin:
raise ValueError(
"The end value must be greater than the begin value but {}<={}".format(
end, begin))
if self.num >= end - begin:
raise ValueError(
"The num value must be smaler than (end - begin) but {}>={}".format(
self.num, end - begin))
subset_sizes = np.arange(begin, end, self.num, dtype=np.int64)
list_of_answers = []
points_one = np.ones(self.averaging, dtype=np.int64)
if self.multiprocess:
pool = Pool()
mapping = pool.map
else:
mapping = map
if self.progressbar:
iterator = self._progressbar(subset_sizes)
else:
iterator = subset_sizes
for i, m in enumerate(iterator):
list_of_answers.append(
np.asarray(
list(mapping(self._score_subsample, m*points_one))))
self._set_status(100.*(i+1)/len(subset_sizes))
if self.multiprocess:
pool.close()
pool.join()
list_of_answers = np.asarray(list_of_answers)
percentile_diff = np.abs(
np.percentile(list_of_answers, 2.5, axis=1)
-np.percentile(list_of_answers, 97.5, axis=1))
list_of_E = np.max(percentile_diff, axis=-1)
list_of_S = np.zeros_like(list_of_E)
m_size = end
for m, mean in zip(reversed(subset_sizes), reversed(list_of_E)):
if mean < self.epsilon:
m_size = m
return {'m*': m_size,
'E': np.array(list_of_E),
'S': np.array(list_of_S),
'm': np.array(subset_sizes),
}
[docs]class LogisticRegressionEstimator(SampleSizeEstimator):
r"""
Description of Logistic Regression Method
:param statmodel: the machine learning algorithm
:type statmodel: RegressionModel or LogisticModel
:param ind: to do
:type ind: int
:param alpha: to do
:type alpha: float
:param beta: to do
:type beta: float
"""
def __init__(self, statmodel, **kwards):
r"""Constructor method
"""
super().__init__()
self.statmodel = statmodel
self.ind = int(kwards.pop('ind', 0))
if self.ind < 0:
raise ValueError(
"The ind must be positive value but get {}".format(
self.ind))
self.alpha = kwards.pop('alpha', 0.05)
if self.alpha < 0 or self.alpha > 1:
raise ValueError(
"The alpha must be between 0 and 1 but get {}".format(
self.alpha))
self.beta = kwards.pop('beta', 0.2)
if self.beta < 0 or self.beta > 1:
raise ValueError(
"The beta must be between 0 and 1 but get {}".format(
self.beta))
if kwards:
raise ValueError("Invalid parameters: %s" % str(kwards))
self.dataset = None
[docs] def forward(self, features, target):
r"""
Returns sample size prediction for the given dataset.
:param features: The tensor of shape
`num_elements` :math:`\times` `num_feature`.
:type features: array.
:param target: The tensor of shape `num_elements`.
:type target: array.
:return: sample size estimation for the given dataset.
:rtype: dict
"""
y, X = target, features
self.dataset = Dataset(features, target)
w_hat0 = self.statmodel(y, np.delete(X, self.ind, axis = 1)).fit()
w_hat1 = self.statmodel(y, X).fit()
predict0 = self.statmodel(y, np.delete(X, self.ind, axis = 1)).predict(w_hat0)
predict1 = self.statmodel(y, X).predict(w_hat1)
fpr0, tpr0, threshold0 = roc_curve(y, predict0)
fpr1, tpr1, threshold1 = roc_curve(y, predict1)
c0 = threshold0[np.argmax((tpr0 - threshold0)**2 - (fpr0 - threshold0)**2)]
c1 = threshold1[np.argmax((tpr1 - threshold1)**2 - (fpr1 - threshold1)**2)]
p0 = np.mean(predict0 > c0)
p1 = np.mean(predict1 > c0)
t_alpha = sps.norm.ppf(1 - 0.5*self.alpha)
t_beta = sps.norm.ppf(1 - self.beta)
m_size = ((np.sqrt(p0*(1-p0))*t_alpha+t_beta*np.sqrt(p1*(1-p1)))**2)/((p0-p1)**2)
return {'m*': int(m_size),
}