/ pyod / test / test_knn.py
test_knn.py
  1  # -*- coding: utf-8 -*-
  2  
  3  
  4  import os
  5  import sys
  6  import unittest
  7  from unittest.mock import patch
  8  
  9  import numpy as np
 10  # noinspection PyProtectedMember
 11  from numpy.testing import assert_allclose
 12  from numpy.testing import assert_array_less
 13  from numpy.testing import assert_equal
 14  from numpy.testing import assert_raises
 15  from scipy.stats import rankdata
 16  from sklearn.base import clone
 17  from sklearn.metrics import roc_auc_score
 18  
 19  # temporary solution for relative imports in case pyod is not installed
 20  # if pyod is installed, no need to use the following line
 21  sys.path.append(
 22      os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
 23  
 24  from pyod.models.knn import KNN
 25  from pyod.utils.data import generate_data
 26  
 27  
 28  class TestKnn(unittest.TestCase):
 29      def setUp(self):
 30          self.n_train = 200
 31          self.n_test = 100
 32          self.contamination = 0.1
 33          self.roc_floor = 0.8
 34          self.X_train, self.X_test, self.y_train, self.y_test = generate_data(
 35              n_train=self.n_train, n_test=self.n_test,
 36              contamination=self.contamination, random_state=42)
 37  
 38          self.clf = KNN(contamination=self.contamination)
 39          self.clf.fit(self.X_train)
 40  
 41      def test_parameters(self):
 42          assert (hasattr(self.clf, 'decision_scores_') and
 43                  self.clf.decision_scores_ is not None)
 44          assert (hasattr(self.clf, 'labels_') and
 45                  self.clf.labels_ is not None)
 46          assert (hasattr(self.clf, 'threshold_') and
 47                  self.clf.threshold_ is not None)
 48          assert (hasattr(self.clf, '_mu') and
 49                  self.clf._mu is not None)
 50          assert (hasattr(self.clf, '_sigma') and
 51                  self.clf._sigma is not None)
 52  
 53      def test_train_scores(self):
 54          assert_equal(len(self.clf.decision_scores_), self.X_train.shape[0])
 55  
 56      def test_prediction_scores(self):
 57          pred_scores = self.clf.decision_function(self.X_test)
 58  
 59          # check score shapes
 60          assert_equal(pred_scores.shape[0], self.X_test.shape[0])
 61  
 62          # check performance
 63          assert (roc_auc_score(self.y_test, pred_scores) >= self.roc_floor)
 64  
 65      def test_prediction_labels(self):
 66          pred_labels = self.clf.predict(self.X_test)
 67          assert_equal(pred_labels.shape, self.y_test.shape)
 68  
 69      def test_prediction_proba(self):
 70          pred_proba = self.clf.predict_proba(self.X_test)
 71          assert (pred_proba.min() >= 0)
 72          assert (pred_proba.max() <= 1)
 73  
 74      def test_prediction_proba_linear(self):
 75          pred_proba = self.clf.predict_proba(self.X_test, method='linear')
 76          assert (pred_proba.min() >= 0)
 77          assert (pred_proba.max() <= 1)
 78  
 79      def test_prediction_proba_unify(self):
 80          pred_proba = self.clf.predict_proba(self.X_test, method='unify')
 81          assert (pred_proba.min() >= 0)
 82          assert (pred_proba.max() <= 1)
 83  
 84      def test_prediction_proba_parameter(self):
 85          with assert_raises(ValueError):
 86              self.clf.predict_proba(self.X_test, method='something')
 87  
 88      def test_prediction_labels_confidence(self):
 89          pred_labels, confidence = self.clf.predict(self.X_test,
 90                                                     return_confidence=True)
 91          assert_equal(pred_labels.shape, self.y_test.shape)
 92          assert_equal(confidence.shape, self.y_test.shape)
 93          assert (confidence.min() >= 0)
 94          assert (confidence.max() <= 1)
 95  
 96      def test_prediction_proba_linear_confidence(self):
 97          pred_proba, confidence = self.clf.predict_proba(self.X_test,
 98                                                          method='linear',
 99                                                          return_confidence=True)
100          assert (pred_proba.min() >= 0)
101          assert (pred_proba.max() <= 1)
102  
103          assert_equal(confidence.shape, self.y_test.shape)
104          assert (confidence.min() >= 0)
105          assert (confidence.max() <= 1)
106  
107      def test_fit_predict(self):
108          pred_labels = self.clf.fit_predict(self.X_train)
109          assert_equal(pred_labels.shape, self.y_train.shape)
110  
111      def test_fit_predict_score(self):
112          self.clf.fit_predict_score(self.X_test, self.y_test)
113          self.clf.fit_predict_score(self.X_test, self.y_test,
114                                     scoring='roc_auc_score')
115          self.clf.fit_predict_score(self.X_test, self.y_test,
116                                     scoring='prc_n_score')
117          with assert_raises(NotImplementedError):
118              self.clf.fit_predict_score(self.X_test, self.y_test,
119                                         scoring='something')
120  
121      def test_predict_rank(self):
122          pred_socres = self.clf.decision_function(self.X_test)
123          pred_ranks = self.clf._predict_rank(self.X_test)
124  
125          # assert the order is reserved
126          assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=2)
127          assert_array_less(pred_ranks, self.X_train.shape[0] + 1)
128          assert_array_less(-0.1, pred_ranks)
129  
130      def test_predict_rank_normalized(self):
131          pred_socres = self.clf.decision_function(self.X_test)
132          pred_ranks = self.clf._predict_rank(self.X_test, normalized=True)
133  
134          # assert the order is reserved
135          assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=2)
136          assert_array_less(pred_ranks, 1.01)
137          assert_array_less(-0.1, pred_ranks)
138  
139      def test_model_clone(self):
140          clone_clf = clone(self.clf)
141  
142      def tearDown(self):
143          pass
144  
145  
146  class TestKnnMean(unittest.TestCase):
147  
148      def setUp(self):
149          self.n_train = 200
150          self.n_test = 100
151          self.contamination = 0.1
152          self.roc_floor = 0.8
153          self.X_train, self.X_test, self.y_train, self.y_test = generate_data(
154              n_train=self.n_train, n_test=self.n_test,
155              contamination=self.contamination, random_state=42)
156  
157          self.clf = KNN(contamination=self.contamination, method='mean')
158  
159      def test_fit(self):
160          self.clf.fit(self.X_train)
161  
162      def test_decision_function(self):
163          self.clf.fit(self.X_train)
164          self.clf.decision_function(self.X_train)
165          self.clf.decision_function(self.X_test)
166  
167      def test_model_clone(self):
168          clone_clf = clone(self.clf)
169  
170      def tearDown(self):
171          pass
172  
173  
174  class TestKnnMedian(unittest.TestCase):
175  
176      def setUp(self):
177          self.n_train = 200
178          self.n_test = 100
179          self.contamination = 0.1
180          self.roc_floor = 0.8
181          self.X_train, self.X_test, self.y_train, self.y_test = generate_data(
182              n_train=self.n_train, n_test=self.n_test,
183              contamination=self.contamination, random_state=42)
184  
185          self.clf = KNN(contamination=self.contamination, method='median')
186  
187      def test_fit(self):
188          self.clf.fit(self.X_train)
189  
190      def test_decision_function(self):
191          self.clf.fit(self.X_train)
192          self.clf.decision_function(self.X_train)
193          self.clf.decision_function(self.X_test)
194  
195      def test_model_clone(self):
196          clone_clf = clone(self.clf)
197  
198      def tearDown(self):
199          pass
200  
201  
202  class TestKnnMahalanobis(unittest.TestCase):
203      def setUp(self):
204          self.n_train = 200
205          self.n_test = 100
206          self.contamination = 0.1
207          self.roc_floor = 0.8
208          self.X_train, self.X_test, self.y_train, self.y_test = generate_data(
209              n_train=self.n_train, n_test=self.n_test,
210              contamination=self.contamination, random_state=42)
211  
212          # calculate covariance for mahalanobis distance
213          X_train_cov = np.cov(self.X_train, rowvar=False)
214  
215          self.clf = KNN(algorithm='auto', metric='mahalanobis',
216                         metric_params={'V': X_train_cov})
217          self.clf.fit(self.X_train)
218  
219      def test_parameters(self):
220          assert (hasattr(self.clf, 'decision_scores_') and
221                  self.clf.decision_scores_ is not None)
222          assert (hasattr(self.clf, 'labels_') and
223                  self.clf.labels_ is not None)
224          assert (hasattr(self.clf, 'threshold_') and
225                  self.clf.threshold_ is not None)
226          assert (hasattr(self.clf, '_mu') and
227                  self.clf._mu is not None)
228          assert (hasattr(self.clf, '_sigma') and
229                  self.clf._sigma is not None)
230  
231      def test_train_scores(self):
232          assert_equal(len(self.clf.decision_scores_), self.X_train.shape[0])
233  
234      def test_prediction_scores(self):
235          pred_scores = self.clf.decision_function(self.X_test)
236  
237          # check score shapes
238          assert_equal(pred_scores.shape[0], self.X_test.shape[0])
239  
240          # check performance
241          assert (roc_auc_score(self.y_test, pred_scores) >= self.roc_floor)
242  
243      def test_prediction_labels(self):
244          pred_labels = self.clf.predict(self.X_test)
245          assert_equal(pred_labels.shape, self.y_test.shape)
246  
247      def test_prediction_proba(self):
248          pred_proba = self.clf.predict_proba(self.X_test)
249          assert (pred_proba.min() >= 0)
250          assert (pred_proba.max() <= 1)
251  
252      def test_prediction_proba_linear(self):
253          pred_proba = self.clf.predict_proba(self.X_test, method='linear')
254          assert (pred_proba.min() >= 0)
255          assert (pred_proba.max() <= 1)
256  
257      def test_prediction_proba_unify(self):
258          pred_proba = self.clf.predict_proba(self.X_test, method='unify')
259          assert (pred_proba.min() >= 0)
260          assert (pred_proba.max() <= 1)
261  
262      def test_prediction_proba_parameter(self):
263          with assert_raises(ValueError):
264              self.clf.predict_proba(self.X_test, method='something')
265  
266      def test_prediction_labels_confidence(self):
267          pred_labels, confidence = self.clf.predict(self.X_test,
268                                                     return_confidence=True)
269          assert_equal(pred_labels.shape, self.y_test.shape)
270          assert_equal(confidence.shape, self.y_test.shape)
271          assert (confidence.min() >= 0)
272          assert (confidence.max() <= 1)
273  
274      def test_prediction_proba_linear_confidence(self):
275          pred_proba, confidence = self.clf.predict_proba(self.X_test,
276                                                          method='linear',
277                                                          return_confidence=True)
278          assert (pred_proba.min() >= 0)
279          assert (pred_proba.max() <= 1)
280  
281          assert_equal(confidence.shape, self.y_test.shape)
282          assert (confidence.min() >= 0)
283          assert (confidence.max() <= 1)
284  
285      def test_prediction_with_rejection(self):
286          pred_labels = self.clf.predict_with_rejection(self.X_test,
287                                                        return_stats=False)
288          assert_equal(pred_labels.shape, self.y_test.shape)
289  
290      def test_prediction_with_rejection_stats(self):
291          _, [expected_rejrate, ub_rejrate,
292              ub_cost] = self.clf.predict_with_rejection(self.X_test,
293                                                         return_stats=True)
294          assert (expected_rejrate >= 0)
295          assert (expected_rejrate <= 1)
296          assert (ub_rejrate >= 0)
297          assert (ub_rejrate <= 1)
298          assert (ub_cost >= 0)
299  
300      def test_fit_predict(self):
301          pred_labels = self.clf.fit_predict(self.X_train)
302          assert_equal(pred_labels.shape, self.y_train.shape)
303  
304      def test_fit_predict_score(self):
305          self.clf.fit_predict_score(self.X_test, self.y_test)
306          self.clf.fit_predict_score(self.X_test, self.y_test,
307                                     scoring='roc_auc_score')
308          self.clf.fit_predict_score(self.X_test, self.y_test,
309                                     scoring='prc_n_score')
310          with assert_raises(NotImplementedError):
311              self.clf.fit_predict_score(self.X_test, self.y_test,
312                                         scoring='something')
313  
314      def test_predict_rank(self):
315          pred_socres = self.clf.decision_function(self.X_test)
316          pred_ranks = self.clf._predict_rank(self.X_test)
317  
318          # assert the order is reserved
319          assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=3)
320          assert_array_less(pred_ranks, self.X_train.shape[0] + 1)
321          assert_array_less(-0.1, pred_ranks)
322  
323      def test_predict_rank_normalized(self):
324          pred_socres = self.clf.decision_function(self.X_test)
325          pred_ranks = self.clf._predict_rank(self.X_test, normalized=True)
326  
327          # assert the order is reserved
328          assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=3)
329          assert_array_less(pred_ranks, 1.01)
330          assert_array_less(-0.1, pred_ranks)
331  
332      def test_model_clone(self):
333          clone_clf = clone(self.clf)
334  
335      def tearDown(self):
336          pass
337  
338  
339  # TODO: add a testcase for #158
340  class TestKnnTree(unittest.TestCase):
341      def setUp(self):
342          pass
343  
344      def tearDown(self):
345          pass
346  
347  
348  class TestKnnNearestNeighborsConfig(unittest.TestCase):
349      def setUp(self):
350          self.n_train = 300
351          self.n_test = 80
352          self.X_train, self.X_test, self.y_train, self.y_test = generate_data(
353              n_train=self.n_train, n_test=self.n_test,
354              contamination=0.1, random_state=42)
355  
356      def test_neighbor_params_propagation(self):
357          clf = KNN(n_neighbors=7, algorithm='kd_tree', n_jobs=-1)
358          clf.fit(self.X_train)
359          assert_equal(clf.neigh_.algorithm, 'kd_tree')
360          assert_equal(clf.neigh_.n_jobs, -1)
361          scores = clf.decision_function(self.X_test)
362          assert_equal(scores.shape[0], self.X_test.shape[0])
363  
364      def test_decision_function_uses_batch_kneighbors(self):
365          clf = KNN(n_neighbors=5, algorithm='brute', n_jobs=1)
366          clf.fit(self.X_train)
367  
368          with patch.object(clf.neigh_, 'kneighbors',
369                            wraps=clf.neigh_.kneighbors) as mocked_kneighbors:
370              scores = clf.decision_function(self.X_test)
371              mocked_kneighbors.assert_called_once()
372  
373          assert_equal(scores.shape[0], self.X_test.shape[0])
374  
375  
376  if __name__ == '__main__':
377      unittest.main()