test_stat_models.py
1 # -*- coding: utf-8 -*- 2 3 4 import os 5 import sys 6 import unittest 7 8 import numpy as np 9 # noinspection PyProtectedMember 10 from numpy.testing import assert_allclose 11 from numpy.testing import assert_equal 12 from numpy.testing import assert_raises 13 14 # temporary solution for relative imports in case pyod is not installed 15 # if pyod is installed, no need to use the following line 16 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) 17 18 from pyod.utils.stat_models import pairwise_distances_no_broadcast 19 from pyod.utils.stat_models import wpearsonr 20 from pyod.utils.stat_models import pearsonr_mat 21 from pyod.utils.stat_models import column_ecdf 22 import statsmodels.distributions 23 import time 24 25 26 class TestStatModels(unittest.TestCase): 27 def setUp(self): 28 self.a = [1, 2, 3, 2, 3, 1, 0, 5] 29 self.b = [1, 2, 1, 2, 2, 1, 0, 2] 30 self.w = [2, 2, 1, 2, 4, 1, 0, 2] 31 32 self.mat = np.random.rand(10, 20) 33 self.w_mat = np.random.rand(10, 1) 34 35 self.X = np.array([[1, 2, 3], 36 [3, 4, 5], 37 [3, 6, 7], 38 [4, 1, 1]]) 39 self.Y = np.array([[2, 2, 2], 40 [3, 3, 3], 41 [4, 4, 3], 42 [0, 1, 2]]) 43 44 def test_pairwise_distances_no_broadcast(self): 45 assert_allclose(pairwise_distances_no_broadcast(self.X, self.Y), 46 [1.41421356, 2.23606798, 4.58257569, 4.12310563]) 47 48 with assert_raises(ValueError): 49 pairwise_distances_no_broadcast([1, 2, 3], [6]) 50 51 def test_wpearsonr(self): 52 # TODO: if unweight version changes, wp[0] format should be changed 53 wp = wpearsonr(self.a, self.b) 54 assert_allclose(wp[0], 0.6956083, atol=0.01) 55 56 wp = wpearsonr(self.a, self.b, w=self.w) 57 assert_allclose(wp, 0.5477226, atol=0.01) 58 59 def test_pearsonr_mat(self): 60 pear_mat = pearsonr_mat(self.mat) 61 assert_equal(pear_mat.shape, (10, 10)) 62 63 pear_mat = pearsonr_mat(self.mat, self.w_mat) 64 assert_equal(pear_mat.shape, (10, 10)) 65 66 assert (np.min(pear_mat) >= -1) 67 assert (np.max(pear_mat) <= 1) 68 69 def test_njit_probability_reordering(self): 70 # trigger the njit compiler for one of the functions 71 # in the stat models packages 72 column_ecdf(self.mat) 73 74 def test_column_ecdf(self): 75 def ecdf(X): 76 """Calculated the empirical CDF of a given dataset using the statsmodels function. 77 Parameters 78 ---------- 79 X : numpy array of shape (n_samples, n_features) 80 The training dataset. 81 Returns 82 ------- 83 ecdf(X) : float 84 Empirical CDF of X 85 """ 86 ecdf = statsmodels.distributions.ECDF(X) 87 return ecdf(X) 88 89 # run a test case that has equal elements per feature column to show also 90 # this highly unlikely case works 91 mat = np.array([[1, 1, 1], [1, 1, 1], [1, 1, 1], [2, 2, 2]]) 92 assert_equal(column_ecdf(mat), np.apply_along_axis(ecdf, 0, mat)) 93 94 # run the models multiple times for random matrices 95 new = [] 96 old = [] 97 for _ in range(50): 98 # create random matrix for testing 99 mat = np.random.rand(1000, 100) 100 101 # execute and measure the time of our own function 102 t = time.time() 103 result = column_ecdf(mat) 104 new.append(time.time() - t) 105 106 # execute the statsmodels function and measure execution time 107 t = time.time() 108 expected = np.apply_along_axis(ecdf, 0, mat) 109 old.append(time.time() - t) 110 111 # check that the results are equal 112 assert_equal(result, expected) 113 # 114 # print(f'Statsmodels ECDF took {sum(old) / len(old) * 1000:0.1f} ms ' 115 # f'and own implementation {sum(new) / len(new) * 1000:0.1f} ms per run.') 116 117 def tearDown(self): 118 pass 119 120 121 if __name__ == '__main__': 122 unittest.main()