test_knn.py
1 # -*- coding: utf-8 -*- 2 3 4 import os 5 import sys 6 import unittest 7 from unittest.mock import patch 8 9 import numpy as np 10 # noinspection PyProtectedMember 11 from numpy.testing import assert_allclose 12 from numpy.testing import assert_array_less 13 from numpy.testing import assert_equal 14 from numpy.testing import assert_raises 15 from scipy.stats import rankdata 16 from sklearn.base import clone 17 from sklearn.metrics import roc_auc_score 18 19 # temporary solution for relative imports in case pyod is not installed 20 # if pyod is installed, no need to use the following line 21 sys.path.append( 22 os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) 23 24 from pyod.models.knn import KNN 25 from pyod.utils.data import generate_data 26 27 28 class TestKnn(unittest.TestCase): 29 def setUp(self): 30 self.n_train = 200 31 self.n_test = 100 32 self.contamination = 0.1 33 self.roc_floor = 0.8 34 self.X_train, self.X_test, self.y_train, self.y_test = generate_data( 35 n_train=self.n_train, n_test=self.n_test, 36 contamination=self.contamination, random_state=42) 37 38 self.clf = KNN(contamination=self.contamination) 39 self.clf.fit(self.X_train) 40 41 def test_parameters(self): 42 assert (hasattr(self.clf, 'decision_scores_') and 43 self.clf.decision_scores_ is not None) 44 assert (hasattr(self.clf, 'labels_') and 45 self.clf.labels_ is not None) 46 assert (hasattr(self.clf, 'threshold_') and 47 self.clf.threshold_ is not None) 48 assert (hasattr(self.clf, '_mu') and 49 self.clf._mu is not None) 50 assert (hasattr(self.clf, '_sigma') and 51 self.clf._sigma is not None) 52 53 def test_train_scores(self): 54 assert_equal(len(self.clf.decision_scores_), self.X_train.shape[0]) 55 56 def test_prediction_scores(self): 57 pred_scores = self.clf.decision_function(self.X_test) 58 59 # check score shapes 60 assert_equal(pred_scores.shape[0], self.X_test.shape[0]) 61 62 # check performance 63 assert (roc_auc_score(self.y_test, pred_scores) >= self.roc_floor) 64 65 def test_prediction_labels(self): 66 pred_labels = self.clf.predict(self.X_test) 67 assert_equal(pred_labels.shape, self.y_test.shape) 68 69 def test_prediction_proba(self): 70 pred_proba = self.clf.predict_proba(self.X_test) 71 assert (pred_proba.min() >= 0) 72 assert (pred_proba.max() <= 1) 73 74 def test_prediction_proba_linear(self): 75 pred_proba = self.clf.predict_proba(self.X_test, method='linear') 76 assert (pred_proba.min() >= 0) 77 assert (pred_proba.max() <= 1) 78 79 def test_prediction_proba_unify(self): 80 pred_proba = self.clf.predict_proba(self.X_test, method='unify') 81 assert (pred_proba.min() >= 0) 82 assert (pred_proba.max() <= 1) 83 84 def test_prediction_proba_parameter(self): 85 with assert_raises(ValueError): 86 self.clf.predict_proba(self.X_test, method='something') 87 88 def test_prediction_labels_confidence(self): 89 pred_labels, confidence = self.clf.predict(self.X_test, 90 return_confidence=True) 91 assert_equal(pred_labels.shape, self.y_test.shape) 92 assert_equal(confidence.shape, self.y_test.shape) 93 assert (confidence.min() >= 0) 94 assert (confidence.max() <= 1) 95 96 def test_prediction_proba_linear_confidence(self): 97 pred_proba, confidence = self.clf.predict_proba(self.X_test, 98 method='linear', 99 return_confidence=True) 100 assert (pred_proba.min() >= 0) 101 assert (pred_proba.max() <= 1) 102 103 assert_equal(confidence.shape, self.y_test.shape) 104 assert (confidence.min() >= 0) 105 assert (confidence.max() <= 1) 106 107 def test_fit_predict(self): 108 pred_labels = self.clf.fit_predict(self.X_train) 109 assert_equal(pred_labels.shape, self.y_train.shape) 110 111 def test_fit_predict_score(self): 112 self.clf.fit_predict_score(self.X_test, self.y_test) 113 self.clf.fit_predict_score(self.X_test, self.y_test, 114 scoring='roc_auc_score') 115 self.clf.fit_predict_score(self.X_test, self.y_test, 116 scoring='prc_n_score') 117 with assert_raises(NotImplementedError): 118 self.clf.fit_predict_score(self.X_test, self.y_test, 119 scoring='something') 120 121 def test_predict_rank(self): 122 pred_socres = self.clf.decision_function(self.X_test) 123 pred_ranks = self.clf._predict_rank(self.X_test) 124 125 # assert the order is reserved 126 assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=2) 127 assert_array_less(pred_ranks, self.X_train.shape[0] + 1) 128 assert_array_less(-0.1, pred_ranks) 129 130 def test_predict_rank_normalized(self): 131 pred_socres = self.clf.decision_function(self.X_test) 132 pred_ranks = self.clf._predict_rank(self.X_test, normalized=True) 133 134 # assert the order is reserved 135 assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=2) 136 assert_array_less(pred_ranks, 1.01) 137 assert_array_less(-0.1, pred_ranks) 138 139 def test_model_clone(self): 140 clone_clf = clone(self.clf) 141 142 def tearDown(self): 143 pass 144 145 146 class TestKnnMean(unittest.TestCase): 147 148 def setUp(self): 149 self.n_train = 200 150 self.n_test = 100 151 self.contamination = 0.1 152 self.roc_floor = 0.8 153 self.X_train, self.X_test, self.y_train, self.y_test = generate_data( 154 n_train=self.n_train, n_test=self.n_test, 155 contamination=self.contamination, random_state=42) 156 157 self.clf = KNN(contamination=self.contamination, method='mean') 158 159 def test_fit(self): 160 self.clf.fit(self.X_train) 161 162 def test_decision_function(self): 163 self.clf.fit(self.X_train) 164 self.clf.decision_function(self.X_train) 165 self.clf.decision_function(self.X_test) 166 167 def test_model_clone(self): 168 clone_clf = clone(self.clf) 169 170 def tearDown(self): 171 pass 172 173 174 class TestKnnMedian(unittest.TestCase): 175 176 def setUp(self): 177 self.n_train = 200 178 self.n_test = 100 179 self.contamination = 0.1 180 self.roc_floor = 0.8 181 self.X_train, self.X_test, self.y_train, self.y_test = generate_data( 182 n_train=self.n_train, n_test=self.n_test, 183 contamination=self.contamination, random_state=42) 184 185 self.clf = KNN(contamination=self.contamination, method='median') 186 187 def test_fit(self): 188 self.clf.fit(self.X_train) 189 190 def test_decision_function(self): 191 self.clf.fit(self.X_train) 192 self.clf.decision_function(self.X_train) 193 self.clf.decision_function(self.X_test) 194 195 def test_model_clone(self): 196 clone_clf = clone(self.clf) 197 198 def tearDown(self): 199 pass 200 201 202 class TestKnnMahalanobis(unittest.TestCase): 203 def setUp(self): 204 self.n_train = 200 205 self.n_test = 100 206 self.contamination = 0.1 207 self.roc_floor = 0.8 208 self.X_train, self.X_test, self.y_train, self.y_test = generate_data( 209 n_train=self.n_train, n_test=self.n_test, 210 contamination=self.contamination, random_state=42) 211 212 # calculate covariance for mahalanobis distance 213 X_train_cov = np.cov(self.X_train, rowvar=False) 214 215 self.clf = KNN(algorithm='auto', metric='mahalanobis', 216 metric_params={'V': X_train_cov}) 217 self.clf.fit(self.X_train) 218 219 def test_parameters(self): 220 assert (hasattr(self.clf, 'decision_scores_') and 221 self.clf.decision_scores_ is not None) 222 assert (hasattr(self.clf, 'labels_') and 223 self.clf.labels_ is not None) 224 assert (hasattr(self.clf, 'threshold_') and 225 self.clf.threshold_ is not None) 226 assert (hasattr(self.clf, '_mu') and 227 self.clf._mu is not None) 228 assert (hasattr(self.clf, '_sigma') and 229 self.clf._sigma is not None) 230 231 def test_train_scores(self): 232 assert_equal(len(self.clf.decision_scores_), self.X_train.shape[0]) 233 234 def test_prediction_scores(self): 235 pred_scores = self.clf.decision_function(self.X_test) 236 237 # check score shapes 238 assert_equal(pred_scores.shape[0], self.X_test.shape[0]) 239 240 # check performance 241 assert (roc_auc_score(self.y_test, pred_scores) >= self.roc_floor) 242 243 def test_prediction_labels(self): 244 pred_labels = self.clf.predict(self.X_test) 245 assert_equal(pred_labels.shape, self.y_test.shape) 246 247 def test_prediction_proba(self): 248 pred_proba = self.clf.predict_proba(self.X_test) 249 assert (pred_proba.min() >= 0) 250 assert (pred_proba.max() <= 1) 251 252 def test_prediction_proba_linear(self): 253 pred_proba = self.clf.predict_proba(self.X_test, method='linear') 254 assert (pred_proba.min() >= 0) 255 assert (pred_proba.max() <= 1) 256 257 def test_prediction_proba_unify(self): 258 pred_proba = self.clf.predict_proba(self.X_test, method='unify') 259 assert (pred_proba.min() >= 0) 260 assert (pred_proba.max() <= 1) 261 262 def test_prediction_proba_parameter(self): 263 with assert_raises(ValueError): 264 self.clf.predict_proba(self.X_test, method='something') 265 266 def test_prediction_labels_confidence(self): 267 pred_labels, confidence = self.clf.predict(self.X_test, 268 return_confidence=True) 269 assert_equal(pred_labels.shape, self.y_test.shape) 270 assert_equal(confidence.shape, self.y_test.shape) 271 assert (confidence.min() >= 0) 272 assert (confidence.max() <= 1) 273 274 def test_prediction_proba_linear_confidence(self): 275 pred_proba, confidence = self.clf.predict_proba(self.X_test, 276 method='linear', 277 return_confidence=True) 278 assert (pred_proba.min() >= 0) 279 assert (pred_proba.max() <= 1) 280 281 assert_equal(confidence.shape, self.y_test.shape) 282 assert (confidence.min() >= 0) 283 assert (confidence.max() <= 1) 284 285 def test_prediction_with_rejection(self): 286 pred_labels = self.clf.predict_with_rejection(self.X_test, 287 return_stats=False) 288 assert_equal(pred_labels.shape, self.y_test.shape) 289 290 def test_prediction_with_rejection_stats(self): 291 _, [expected_rejrate, ub_rejrate, 292 ub_cost] = self.clf.predict_with_rejection(self.X_test, 293 return_stats=True) 294 assert (expected_rejrate >= 0) 295 assert (expected_rejrate <= 1) 296 assert (ub_rejrate >= 0) 297 assert (ub_rejrate <= 1) 298 assert (ub_cost >= 0) 299 300 def test_fit_predict(self): 301 pred_labels = self.clf.fit_predict(self.X_train) 302 assert_equal(pred_labels.shape, self.y_train.shape) 303 304 def test_fit_predict_score(self): 305 self.clf.fit_predict_score(self.X_test, self.y_test) 306 self.clf.fit_predict_score(self.X_test, self.y_test, 307 scoring='roc_auc_score') 308 self.clf.fit_predict_score(self.X_test, self.y_test, 309 scoring='prc_n_score') 310 with assert_raises(NotImplementedError): 311 self.clf.fit_predict_score(self.X_test, self.y_test, 312 scoring='something') 313 314 def test_predict_rank(self): 315 pred_socres = self.clf.decision_function(self.X_test) 316 pred_ranks = self.clf._predict_rank(self.X_test) 317 318 # assert the order is reserved 319 assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=3) 320 assert_array_less(pred_ranks, self.X_train.shape[0] + 1) 321 assert_array_less(-0.1, pred_ranks) 322 323 def test_predict_rank_normalized(self): 324 pred_socres = self.clf.decision_function(self.X_test) 325 pred_ranks = self.clf._predict_rank(self.X_test, normalized=True) 326 327 # assert the order is reserved 328 assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=3) 329 assert_array_less(pred_ranks, 1.01) 330 assert_array_less(-0.1, pred_ranks) 331 332 def test_model_clone(self): 333 clone_clf = clone(self.clf) 334 335 def tearDown(self): 336 pass 337 338 339 # TODO: add a testcase for #158 340 class TestKnnTree(unittest.TestCase): 341 def setUp(self): 342 pass 343 344 def tearDown(self): 345 pass 346 347 348 class TestKnnNearestNeighborsConfig(unittest.TestCase): 349 def setUp(self): 350 self.n_train = 300 351 self.n_test = 80 352 self.X_train, self.X_test, self.y_train, self.y_test = generate_data( 353 n_train=self.n_train, n_test=self.n_test, 354 contamination=0.1, random_state=42) 355 356 def test_neighbor_params_propagation(self): 357 clf = KNN(n_neighbors=7, algorithm='kd_tree', n_jobs=-1) 358 clf.fit(self.X_train) 359 assert_equal(clf.neigh_.algorithm, 'kd_tree') 360 assert_equal(clf.neigh_.n_jobs, -1) 361 scores = clf.decision_function(self.X_test) 362 assert_equal(scores.shape[0], self.X_test.shape[0]) 363 364 def test_decision_function_uses_batch_kneighbors(self): 365 clf = KNN(n_neighbors=5, algorithm='brute', n_jobs=1) 366 clf.fit(self.X_train) 367 368 with patch.object(clf.neigh_, 'kneighbors', 369 wraps=clf.neigh_.kneighbors) as mocked_kneighbors: 370 scores = clf.decision_function(self.X_test) 371 mocked_kneighbors.assert_called_once() 372 373 assert_equal(scores.shape[0], self.X_test.shape[0]) 374 375 376 if __name__ == '__main__': 377 unittest.main()