test_sampling.py
1 # -*- coding: utf-8 -*- 2 3 4 import os 5 import sys 6 import unittest 7 8 import numpy as np 9 # noinspection PyProtectedMember 10 from numpy.testing import ( 11 assert_allclose, 12 assert_array_less, 13 assert_equal, 14 assert_raises, 15 ) 16 from scipy.stats import rankdata 17 from sklearn.base import clone 18 from sklearn.metrics import roc_auc_score 19 20 from pyod.models.sampling import Sampling 21 from pyod.utils.data import generate_data 22 23 # temporary solution for relative imports in case pyod is not installed 24 # if pyod is installed, no need to use the following line 25 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) 26 27 28 class TestSampling(unittest.TestCase): 29 def setUp(self): 30 self.n_train = 200 31 self.n_test = 100 32 self.contamination = 0.1 33 self.roc_floor = 0.8 34 self.X_train, self.X_test, self.y_train, self.y_test = generate_data( 35 n_train=self.n_train, 36 n_test=self.n_test, 37 contamination=self.contamination, 38 random_state=42, 39 ) 40 41 self.clf = Sampling(contamination=self.contamination, random_state=42) 42 self.clf.fit(self.X_train) 43 44 def test_parameters(self): 45 assert ( 46 hasattr(self.clf, "decision_scores_") 47 and self.clf.decision_scores_ is not None 48 ) 49 assert hasattr(self.clf, "labels_") and self.clf.labels_ is not None 50 assert hasattr(self.clf, 51 "threshold_") and self.clf.threshold_ is not None 52 53 def test_train_scores(self): 54 assert_equal(len(self.clf.decision_scores_), self.X_train.shape[0]) 55 56 def test_prediction_scores(self): 57 pred_scores = self.clf.decision_function(self.X_test) 58 59 # check score shapes 60 assert_equal(pred_scores.shape[0], self.X_test.shape[0]) 61 62 # check performance 63 assert roc_auc_score(self.y_test, pred_scores) >= self.roc_floor 64 65 def test_prediction_labels(self): 66 pred_labels = self.clf.predict(self.X_test) 67 assert_equal(pred_labels.shape, self.y_test.shape) 68 69 def test_prediction_proba(self): 70 pred_proba = self.clf.predict_proba(self.X_test) 71 assert pred_proba.min() >= 0 72 assert pred_proba.max() <= 1 73 74 def test_prediction_proba_linear(self): 75 pred_proba = self.clf.predict_proba(self.X_test, method="linear") 76 assert pred_proba.min() >= 0 77 assert pred_proba.max() <= 1 78 79 def test_prediction_proba_unify(self): 80 pred_proba = self.clf.predict_proba(self.X_test, method="unify") 81 assert pred_proba.min() >= 0 82 assert pred_proba.max() <= 1 83 84 def test_prediction_proba_parameter(self): 85 with assert_raises(ValueError): 86 self.clf.predict_proba(self.X_test, method="something") 87 88 def test_prediction_labels_confidence(self): 89 pred_labels, confidence = self.clf.predict(self.X_test, 90 return_confidence=True) 91 assert_equal(pred_labels.shape, self.y_test.shape) 92 assert_equal(confidence.shape, self.y_test.shape) 93 assert confidence.min() >= 0 94 assert confidence.max() <= 1 95 96 def test_prediction_proba_linear_confidence(self): 97 pred_proba, confidence = self.clf.predict_proba( 98 self.X_test, method="linear", return_confidence=True 99 ) 100 assert pred_proba.min() >= 0 101 assert pred_proba.max() <= 1 102 103 assert_equal(confidence.shape, self.y_test.shape) 104 assert confidence.min() >= 0 105 assert confidence.max() <= 1 106 107 def test_prediction_with_rejection(self): 108 pred_labels = self.clf.predict_with_rejection(self.X_test, 109 return_stats=False) 110 assert_equal(pred_labels.shape, self.y_test.shape) 111 112 def test_prediction_with_rejection_stats(self): 113 _, [expected_rejrate, ub_rejrate, 114 ub_cost] = self.clf.predict_with_rejection(self.X_test, 115 return_stats=True) 116 assert (expected_rejrate >= 0) 117 assert (expected_rejrate <= 1) 118 assert (ub_rejrate >= 0) 119 assert (ub_rejrate <= 1) 120 assert (ub_cost >= 0) 121 122 def test_fit_predict(self): 123 pred_labels = self.clf.fit_predict(self.X_train) 124 assert_equal(pred_labels.shape, self.y_train.shape) 125 126 def test_fit_predict_score(self): 127 self.clf.fit_predict_score(self.X_test, self.y_test) 128 self.clf.fit_predict_score(self.X_test, self.y_test, 129 scoring="roc_auc_score") 130 self.clf.fit_predict_score(self.X_test, self.y_test, 131 scoring="prc_n_score") 132 with assert_raises(NotImplementedError): 133 self.clf.fit_predict_score(self.X_test, self.y_test, 134 scoring="something") 135 136 def test_predict_rank(self): 137 pred_socres = self.clf.decision_function(self.X_test) 138 pred_ranks = self.clf._predict_rank(self.X_test) 139 140 # assert the order is reserved 141 assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=2) 142 assert_array_less(pred_ranks, self.X_train.shape[0] + 1) 143 assert_array_less(-0.1, pred_ranks) 144 145 def test_predict_rank_normalized(self): 146 pred_socres = self.clf.decision_function(self.X_test) 147 pred_ranks = self.clf._predict_rank(self.X_test, normalized=True) 148 149 # assert the order is reserved 150 assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=2) 151 assert_array_less(pred_ranks, 1.01) 152 assert_array_less(-0.1, pred_ranks) 153 154 def test_model_clone(self): 155 clone_clf = clone(self.clf) 156 157 def tearDown(self): 158 pass 159 160 161 class TestSamplingSubsetBound(unittest.TestCase): 162 def setUp(self): 163 self.n_train = 200 164 self.n_test = 100 165 self.contamination = 0.1 166 self.roc_floor = 0.8 167 self.X_train, self.X_test, self.y_train, self.y_test = generate_data( 168 n_train=self.n_train, 169 n_test=self.n_test, 170 contamination=self.contamination, 171 random_state=42, 172 ) 173 174 self.clf_float = Sampling( 175 subset_size=0.1, contamination=self.contamination, random_state=42 176 ) 177 self.clf_float_upper = Sampling(subset_size=1.5, random_state=42) 178 self.clf_float_lower = Sampling(subset_size=1.5, random_state=42) 179 self.clf_int_upper = Sampling(subset_size=1000, random_state=42) 180 self.clf_int_lower = Sampling(subset_size=-1, random_state=42) 181 182 def test_fit(self): 183 self.clf_float.fit(self.X_train) 184 with assert_raises(ValueError): 185 self.clf_float_upper.fit(self.X_train) 186 with assert_raises(ValueError): 187 self.clf_float_lower.fit(self.X_train) 188 with assert_raises(ValueError): 189 self.clf_int_upper.fit(self.X_train) 190 with assert_raises(ValueError): 191 self.clf_int_lower.fit(self.X_train) 192 193 def tearDown(self): 194 pass 195 196 197 class TestSamplingMahalanobis(unittest.TestCase): 198 def setUp(self): 199 self.n_train = 200 200 self.n_test = 100 201 self.contamination = 0.1 202 self.roc_floor = 0.8 203 self.X_train, self.X_test, self.y_train, self.y_test = generate_data( 204 n_train=self.n_train, 205 n_test=self.n_test, 206 contamination=self.contamination, 207 random_state=42, 208 ) 209 # calculate covariance for mahalanobis distance 210 X_train_cov = np.cov(self.X_train, rowvar=False) 211 212 self.clf = Sampling( 213 metric="mahalanobis", 214 metric_params={"V": X_train_cov}, 215 contamination=self.contamination, 216 random_state=42, 217 ) 218 self.clf.fit(self.X_train) 219 220 def test_fit(self): 221 self.clf.fit(self.X_train) 222 223 def tearDown(self): 224 pass 225 226 227 if __name__ == "__main__": 228 unittest.main()