/ pyod / test / test_stat_models.py
test_stat_models.py
  1  # -*- coding: utf-8 -*-
  2  
  3  
  4  import os
  5  import sys
  6  import unittest
  7  
  8  import numpy as np
  9  # noinspection PyProtectedMember
 10  from numpy.testing import assert_allclose
 11  from numpy.testing import assert_equal
 12  from numpy.testing import assert_raises
 13  
 14  # temporary solution for relative imports in case pyod is not installed
 15  # if pyod is installed, no need to use the following line
 16  sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
 17  
 18  from pyod.utils.stat_models import pairwise_distances_no_broadcast
 19  from pyod.utils.stat_models import wpearsonr
 20  from pyod.utils.stat_models import pearsonr_mat
 21  from pyod.utils.stat_models import column_ecdf
 22  import statsmodels.distributions
 23  import time
 24  
 25  
 26  class TestStatModels(unittest.TestCase):
 27      def setUp(self):
 28          self.a = [1, 2, 3, 2, 3, 1, 0, 5]
 29          self.b = [1, 2, 1, 2, 2, 1, 0, 2]
 30          self.w = [2, 2, 1, 2, 4, 1, 0, 2]
 31  
 32          self.mat = np.random.rand(10, 20)
 33          self.w_mat = np.random.rand(10, 1)
 34  
 35          self.X = np.array([[1, 2, 3],
 36                             [3, 4, 5],
 37                             [3, 6, 7],
 38                             [4, 1, 1]])
 39          self.Y = np.array([[2, 2, 2],
 40                             [3, 3, 3],
 41                             [4, 4, 3],
 42                             [0, 1, 2]])
 43  
 44      def test_pairwise_distances_no_broadcast(self):
 45          assert_allclose(pairwise_distances_no_broadcast(self.X, self.Y),
 46                          [1.41421356, 2.23606798, 4.58257569, 4.12310563])
 47  
 48          with assert_raises(ValueError):
 49              pairwise_distances_no_broadcast([1, 2, 3], [6])
 50  
 51      def test_wpearsonr(self):
 52          # TODO: if unweight version changes, wp[0] format should be changed
 53          wp = wpearsonr(self.a, self.b)
 54          assert_allclose(wp[0], 0.6956083, atol=0.01)
 55  
 56          wp = wpearsonr(self.a, self.b, w=self.w)
 57          assert_allclose(wp, 0.5477226, atol=0.01)
 58  
 59      def test_pearsonr_mat(self):
 60          pear_mat = pearsonr_mat(self.mat)
 61          assert_equal(pear_mat.shape, (10, 10))
 62  
 63          pear_mat = pearsonr_mat(self.mat, self.w_mat)
 64          assert_equal(pear_mat.shape, (10, 10))
 65  
 66          assert (np.min(pear_mat) >= -1)
 67          assert (np.max(pear_mat) <= 1)
 68  
 69      def test_njit_probability_reordering(self):
 70          # trigger the njit compiler for one of the functions
 71          # in the stat models packages
 72          column_ecdf(self.mat)
 73  
 74      def test_column_ecdf(self):
 75          def ecdf(X):
 76              """Calculated the empirical CDF of a given dataset using the statsmodels function.
 77              Parameters
 78              ----------
 79              X : numpy array of shape (n_samples, n_features)
 80                  The training dataset.
 81              Returns
 82              -------
 83              ecdf(X) : float
 84                  Empirical CDF of X
 85              """
 86              ecdf = statsmodels.distributions.ECDF(X)
 87              return ecdf(X)
 88  
 89          # run a test case that has equal elements per feature column to show also
 90          # this highly unlikely case works
 91          mat = np.array([[1, 1, 1], [1, 1, 1], [1, 1, 1], [2, 2, 2]])
 92          assert_equal(column_ecdf(mat), np.apply_along_axis(ecdf, 0, mat))
 93  
 94          # run the models multiple times for random matrices
 95          new = []
 96          old = []
 97          for _ in range(50):
 98              # create random matrix for testing
 99              mat = np.random.rand(1000, 100)
100  
101              # execute and measure the time of our own function
102              t = time.time()
103              result = column_ecdf(mat)
104              new.append(time.time() - t)
105  
106              # execute the statsmodels function and measure execution time
107              t = time.time()
108              expected = np.apply_along_axis(ecdf, 0, mat)
109              old.append(time.time() - t)
110  
111              # check that the results are equal
112              assert_equal(result, expected)
113          #
114          # print(f'Statsmodels ECDF took {sum(old) / len(old) * 1000:0.1f} ms '
115          #       f'and own implementation {sum(new) / len(new) * 1000:0.1f} ms per run.')
116  
117      def tearDown(self):
118          pass
119  
120  
121  if __name__ == '__main__':
122      unittest.main()