/ pyod / test / test_cof.py
test_cof.py
  1  # -*- coding: utf-8 -*-
  2  
  3  
  4  import os
  5  import sys
  6  import unittest
  7  
  8  # noinspection PyProtectedMember
  9  from numpy.testing import assert_allclose
 10  from numpy.testing import assert_array_less
 11  from numpy.testing import assert_equal
 12  from numpy.testing import assert_raises
 13  from scipy.stats import rankdata
 14  from sklearn.base import clone
 15  from sklearn.metrics import roc_auc_score
 16  
 17  # temporary solution for relative imports in case pyod is not installed
 18  # if pyod is installed, no need to use the following line
 19  sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
 20  
 21  from pyod.models.cof import COF
 22  from pyod.utils.data import generate_data
 23  
 24  
 25  class TestFastCOF(unittest.TestCase):
 26      def setUp(self):
 27          self.n_train = 100
 28          self.n_test = 50
 29          self.contamination = 0.1
 30          self.roc_floor = 0.8
 31          self.X_train, self.X_test, self.y_train, self.y_test = generate_data(
 32              n_train=self.n_train, n_test=self.n_test,
 33              contamination=self.contamination, random_state=42)
 34  
 35          self.clf = COF(contamination=self.contamination)
 36          self.clf.fit(self.X_train)
 37  
 38      def test_parameters(self):
 39          assert (hasattr(self.clf, 'decision_scores_') and
 40                  self.clf.decision_scores_ is not None)
 41          assert (hasattr(self.clf, 'labels_') and
 42                  self.clf.labels_ is not None)
 43          assert (hasattr(self.clf, 'threshold_') and
 44                  self.clf.threshold_ is not None)
 45          assert (hasattr(self.clf, 'n_neighbors_') and
 46                  self.clf.n_neighbors_ is not None)
 47  
 48      def test_train_scores(self):
 49          assert_equal(len(self.clf.decision_scores_), self.X_train.shape[0])
 50  
 51      def test_prediction_scores(self):
 52          pred_scores = self.clf.decision_function(self.X_test)
 53  
 54          # check score shapes
 55          assert_equal(pred_scores.shape[0], self.X_test.shape[0])
 56  
 57          # check performance
 58          assert (roc_auc_score(self.y_test, pred_scores) >= self.roc_floor)
 59  
 60      def test_prediction_labels(self):
 61          pred_labels = self.clf.predict(self.X_test)
 62          assert_equal(pred_labels.shape, self.y_test.shape)
 63  
 64      def test_prediction_proba(self):
 65          pred_proba = self.clf.predict_proba(self.X_test)
 66          assert (pred_proba.min() >= 0)
 67          assert (pred_proba.max() <= 1)
 68  
 69      def test_prediction_proba_linear(self):
 70          pred_proba = self.clf.predict_proba(self.X_test, method='linear')
 71          assert (pred_proba.min() >= 0)
 72          assert (pred_proba.max() <= 1)
 73  
 74      def test_prediction_proba_unify(self):
 75          pred_proba = self.clf.predict_proba(self.X_test, method='unify')
 76          assert (pred_proba.min() >= 0)
 77          assert (pred_proba.max() <= 1)
 78  
 79      def test_prediction_proba_parameter(self):
 80          with assert_raises(ValueError):
 81              self.clf.predict_proba(self.X_test, method='something')
 82  
 83      def test_prediction_labels_confidence(self):
 84          pred_labels, confidence = self.clf.predict(self.X_test,
 85                                                     return_confidence=True)
 86          assert_equal(pred_labels.shape, self.y_test.shape)
 87          assert_equal(confidence.shape, self.y_test.shape)
 88          assert (confidence.min() >= 0)
 89          assert (confidence.max() <= 1)
 90  
 91      def test_prediction_proba_linear_confidence(self):
 92          pred_proba, confidence = self.clf.predict_proba(self.X_test,
 93                                                          method='linear',
 94                                                          return_confidence=True)
 95          assert (pred_proba.min() >= 0)
 96          assert (pred_proba.max() <= 1)
 97  
 98          assert_equal(confidence.shape, self.y_test.shape)
 99          assert (confidence.min() >= 0)
100          assert (confidence.max() <= 1)
101  
102      def test_prediction_with_rejection(self):
103          pred_labels = self.clf.predict_with_rejection(self.X_test,
104                                                        return_stats=False)
105          assert_equal(pred_labels.shape, self.y_test.shape)
106  
107      def test_prediction_with_rejection_stats(self):
108          _, [expected_rejrate, ub_rejrate,
109              ub_cost] = self.clf.predict_with_rejection(self.X_test,
110                                                         return_stats=True)
111          assert (expected_rejrate >= 0)
112          assert (expected_rejrate <= 1)
113          assert (ub_rejrate >= 0)
114          assert (ub_rejrate <= 1)
115          assert (ub_cost >= 0)
116  
117      def test_fit_predict(self):
118          pred_labels = self.clf.fit_predict(self.X_train)
119          assert_equal(pred_labels.shape, self.y_train.shape)
120  
121      def test_fit_predict_score(self):
122          self.clf.fit_predict_score(self.X_test, self.y_test)
123          self.clf.fit_predict_score(self.X_test, self.y_test,
124                                     scoring='roc_auc_score')
125          self.clf.fit_predict_score(self.X_test, self.y_test,
126                                     scoring='prc_n_score')
127          with assert_raises(NotImplementedError):
128              self.clf.fit_predict_score(self.X_test, self.y_test,
129                                         scoring='something')
130  
131      def test_predict_rank(self):
132          pred_scores = self.clf.decision_function(self.X_test)
133          pred_ranks = self.clf._predict_rank(self.X_test)
134          print(pred_ranks)
135  
136          # assert the order is reserved
137          assert_allclose(rankdata(pred_ranks), rankdata(pred_scores), atol=2)
138          assert_array_less(pred_ranks, self.X_train.shape[0] + 1)
139          assert_array_less(-0.1, pred_ranks)
140  
141      def test_predict_rank_normalized(self):
142          pred_socres = self.clf.decision_function(self.X_test)
143          pred_ranks = self.clf._predict_rank(self.X_test, normalized=True)
144  
145          # assert the order is reserved
146          assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=2)
147          assert_array_less(pred_ranks, 1.01)
148          assert_array_less(-0.1, pred_ranks)
149  
150      def test_check_parameters(self):
151          with assert_raises(ValueError):
152              COF(contamination=0.1, n_neighbors=-1)
153          with assert_raises(ValueError):
154              COF(contamination=10., n_neighbors=5)
155          with assert_raises(TypeError):
156              COF(contamination=0.1, n_neighbors='not int')
157          cof_ = COF(contamination=0.1, n_neighbors=10000)
158          cof_.fit(self.X_train)
159          assert self.X_train.shape[0] > cof_.n_neighbors_
160  
161      def test_model_clone(self):
162          clone_clf = clone(self.clf)
163  
164      def tearDown(self):
165          pass
166  
167  
168  class TestMemoryCOF(unittest.TestCase):
169      def setUp(self):
170          self.n_train = 100
171          self.n_test = 50
172          self.contamination = 0.1
173          self.roc_floor = 0.8
174          self.X_train, self.X_test, self.y_train, self.y_test = generate_data(
175              n_train=self.n_train, n_test=self.n_test,
176              contamination=self.contamination, random_state=42)
177  
178          self.clf = COF(contamination=self.contamination, method="memory")
179          self.clf.fit(self.X_train)
180  
181      def test_parameters(self):
182          assert (hasattr(self.clf, 'decision_scores_') and
183                  self.clf.decision_scores_ is not None)
184          assert (hasattr(self.clf, 'labels_') and
185                  self.clf.labels_ is not None)
186          assert (hasattr(self.clf, 'threshold_') and
187                  self.clf.threshold_ is not None)
188          assert (hasattr(self.clf, 'n_neighbors_') and
189                  self.clf.n_neighbors_ is not None)
190  
191      def test_train_scores(self):
192          assert_equal(len(self.clf.decision_scores_), self.X_train.shape[0])
193  
194      def test_prediction_scores(self):
195          pred_scores = self.clf.decision_function(self.X_test)
196  
197          # check score shapes
198          assert_equal(pred_scores.shape[0], self.X_test.shape[0])
199  
200          # check performance
201          assert (roc_auc_score(self.y_test, pred_scores) >= self.roc_floor)
202  
203      def test_prediction_labels(self):
204          pred_labels = self.clf.predict(self.X_test)
205          assert_equal(pred_labels.shape, self.y_test.shape)
206  
207      def test_prediction_proba(self):
208          pred_proba = self.clf.predict_proba(self.X_test)
209          assert (pred_proba.min() >= 0)
210          assert (pred_proba.max() <= 1)
211  
212      def test_prediction_proba_linear(self):
213          pred_proba = self.clf.predict_proba(self.X_test, method='linear')
214          assert (pred_proba.min() >= 0)
215          assert (pred_proba.max() <= 1)
216  
217      def test_prediction_proba_unify(self):
218          pred_proba = self.clf.predict_proba(self.X_test, method='unify')
219          assert (pred_proba.min() >= 0)
220          assert (pred_proba.max() <= 1)
221  
222      def test_prediction_proba_parameter(self):
223          with assert_raises(ValueError):
224              self.clf.predict_proba(self.X_test, method='something')
225  
226      def test_fit_predict(self):
227          pred_labels = self.clf.fit_predict(self.X_train)
228          assert_equal(pred_labels.shape, self.y_train.shape)
229  
230      def test_fit_predict_score(self):
231          self.clf.fit_predict_score(self.X_test, self.y_test)
232          self.clf.fit_predict_score(self.X_test, self.y_test,
233                                     scoring='roc_auc_score')
234          self.clf.fit_predict_score(self.X_test, self.y_test,
235                                     scoring='prc_n_score')
236          with assert_raises(NotImplementedError):
237              self.clf.fit_predict_score(self.X_test, self.y_test,
238                                         scoring='something')
239  
240      def test_predict_rank(self):
241          pred_scores = self.clf.decision_function(self.X_test)
242          pred_ranks = self.clf._predict_rank(self.X_test)
243          print(pred_ranks)
244  
245          # assert the order is reserved
246          assert_allclose(rankdata(pred_ranks), rankdata(pred_scores), atol=2)
247          assert_array_less(pred_ranks, self.X_train.shape[0] + 1)
248          assert_array_less(-0.1, pred_ranks)
249  
250      def test_predict_rank_normalized(self):
251          pred_socres = self.clf.decision_function(self.X_test)
252          pred_ranks = self.clf._predict_rank(self.X_test, normalized=True)
253  
254          # assert the order is reserved
255          assert_allclose(rankdata(pred_ranks), rankdata(pred_socres), atol=2)
256          assert_array_less(pred_ranks, 1.01)
257          assert_array_less(-0.1, pred_ranks)
258  
259      def test_check_parameters(self):
260          with assert_raises(ValueError):
261              COF(contamination=0.1, n_neighbors=-1)
262          with assert_raises(ValueError):
263              COF(contamination=10., n_neighbors=5)
264          with assert_raises(TypeError):
265              COF(contamination=0.1, n_neighbors='not int')
266          cof_ = COF(contamination=0.1, n_neighbors=10000)
267          cof_.fit(self.X_train)
268          assert self.X_train.shape[0] > cof_.n_neighbors_
269  
270      def tearDown(self):
271          pass
272  
273  
274  if __name__ == '__main__':
275      unittest.main()