/ pyod / test / test_embedding.py
test_embedding.py
  1  # -*- coding: utf-8 -*-
  2  
  3  import os
  4  import sys
  5  import unittest
  6  
  7  import numpy as np
  8  from numpy.testing import assert_equal
  9  from sklearn.base import clone
 10  
 11  sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
 12  
 13  from pyod.models.embedding import EmbeddingOD
 14  
 15  
 16  def _mock_encoder(X):
 17      """Deterministic mock encoder for testing."""
 18      rng = np.random.RandomState(42)
 19      return rng.randn(len(X), 20)
 20  
 21  
 22  class TestEmbeddingOD(unittest.TestCase):
 23      def setUp(self):
 24          self.n_train = 200
 25          self.n_test = 100
 26          self.contamination = 0.1
 27          self.X_train = [f"train_{i}" for i in range(self.n_train)]
 28          self.X_test = [f"test_{i}" for i in range(self.n_test)]
 29  
 30          self.clf = EmbeddingOD(encoder=_mock_encoder, detector='KNN',
 31                                 contamination=self.contamination)
 32          self.clf.fit(self.X_train)
 33  
 34      def test_parameters(self):
 35          assert (hasattr(self.clf, 'decision_scores_') and
 36                  self.clf.decision_scores_ is not None)
 37          assert (hasattr(self.clf, 'labels_') and
 38                  self.clf.labels_ is not None)
 39          assert (hasattr(self.clf, 'threshold_') and
 40                  self.clf.threshold_ is not None)
 41          assert (hasattr(self.clf, '_mu') and
 42                  self.clf._mu is not None)
 43          assert (hasattr(self.clf, '_sigma') and
 44                  self.clf._sigma is not None)
 45  
 46      def test_train_scores(self):
 47          assert_equal(len(self.clf.decision_scores_), self.n_train)
 48  
 49      def test_prediction_scores(self):
 50          pred_scores = self.clf.decision_function(self.X_test)
 51          assert_equal(pred_scores.shape[0], self.n_test)
 52  
 53      def test_prediction_labels(self):
 54          pred_labels = self.clf.predict(self.X_test)
 55          assert_equal(pred_labels.shape[0], self.n_test)
 56          assert set(pred_labels).issubset({0, 1})
 57  
 58      def test_prediction_proba(self):
 59          pred_proba = self.clf.predict_proba(self.X_test)
 60          assert (pred_proba.min() >= 0)
 61          assert (pred_proba.max() <= 1)
 62  
 63      def test_prediction_proba_linear(self):
 64          pred_proba = self.clf.predict_proba(self.X_test, method='linear')
 65          assert (pred_proba.min() >= 0)
 66          assert (pred_proba.max() <= 1)
 67  
 68      def test_prediction_proba_unify(self):
 69          pred_proba = self.clf.predict_proba(self.X_test, method='unify')
 70          assert (pred_proba.min() >= 0)
 71          assert (pred_proba.max() <= 1)
 72  
 73      def test_prediction_proba_parameter(self):
 74          with self.assertRaises(ValueError):
 75              self.clf.predict_proba(self.X_test, method='something')
 76  
 77      def test_prediction_labels_confidence(self):
 78          pred_labels, confidence = self.clf.predict(self.X_test,
 79                                                      return_confidence=True)
 80          assert_equal(pred_labels.shape[0], self.n_test)
 81          assert_equal(confidence.shape[0], self.n_test)
 82          assert (confidence.min() >= 0)
 83          assert (confidence.max() <= 1)
 84  
 85      def test_prediction_with_rejection(self):
 86          pred_labels = self.clf.predict_with_rejection(self.X_test,
 87                                                         return_stats=False)
 88          assert_equal(pred_labels.shape[0], self.n_test)
 89  
 90      def test_detector_string_resolution(self):
 91          for name in ['KNN', 'LOF', 'ECOD', 'IForest', 'HBOS',
 92                        'COPOD', 'PCA', 'OCSVM', 'INNE']:
 93              clf = EmbeddingOD(encoder=_mock_encoder, detector=name)
 94              clf.fit(self.X_train)
 95              assert hasattr(clf, 'decision_scores_')
 96  
 97      def test_detector_instance(self):
 98          from pyod.models.knn import KNN
 99          clf = EmbeddingOD(encoder=_mock_encoder,
100                            detector=KNN(n_neighbors=3))
101          clf.fit(self.X_train)
102          assert hasattr(clf, 'decision_scores_')
103  
104      def test_detector_instance_is_cloned(self):
105          from pyod.models.knn import KNN
106          original = KNN(n_neighbors=3)
107          clf = EmbeddingOD(encoder=_mock_encoder, detector=original)
108          clf.fit(self.X_train)
109          # original should not be fitted (it was cloned)
110          assert not hasattr(original, 'decision_scores_')
111  
112      def test_invalid_detector_raises(self):
113          with self.assertRaises(ValueError):
114              EmbeddingOD(encoder=_mock_encoder,
115                          detector='NoSuchDetector').fit(self.X_train)
116  
117      def test_standardize(self):
118          clf = EmbeddingOD(encoder=_mock_encoder, detector='KNN',
119                            standardize=True)
120          clf.fit(self.X_train)
121          assert hasattr(clf, 'scaler_')
122  
123      def test_no_standardize(self):
124          clf = EmbeddingOD(encoder=_mock_encoder, detector='KNN',
125                            standardize=False)
126          clf.fit(self.X_train)
127          assert not hasattr(clf, 'scaler_')
128  
129      def test_reduce_dim(self):
130          clf = EmbeddingOD(encoder=_mock_encoder, detector='KNN',
131                            reduce_dim=5)
132          clf.fit(self.X_train)
133          assert hasattr(clf, 'pca_')
134          scores = clf.decision_function(self.X_test)
135          assert_equal(scores.shape[0], self.n_test)
136  
137      def test_cache_embeddings(self):
138          clf = EmbeddingOD(encoder=_mock_encoder, detector='KNN',
139                            cache_embeddings=True)
140          clf.fit(self.X_train)
141          assert hasattr(clf, 'train_embeddings_')
142          assert_equal(clf.train_embeddings_.shape[0], self.n_train)
143  
144      def test_model_clone(self):
145          clone_clf = clone(self.clf)
146  
147      def test_default_detector_is_lunar(self):
148          clf = EmbeddingOD(encoder=_mock_encoder)
149          assert clf.detector == 'LUNAR'
150  
151  
152  class TestEmbeddingODPresets(unittest.TestCase):
153      def test_for_text_returns_instance(self):
154          clf = EmbeddingOD.for_text(quality='fast')
155          assert isinstance(clf, EmbeddingOD)
156          assert clf.encoder == 'all-MiniLM-L6-v2'
157          assert clf.detector == 'KNN'
158  
159      def test_for_text_balanced(self):
160          clf = EmbeddingOD.for_text(quality='balanced')
161          assert clf.encoder == 'all-mpnet-base-v2'
162          assert clf.detector == 'LUNAR'
163  
164      def test_for_text_best(self):
165          clf = EmbeddingOD.for_text(quality='best')
166          assert clf.encoder == 'text-embedding-3-large'
167          assert clf.detector == 'LUNAR'
168          assert clf.cache_embeddings is True
169  
170      def test_for_text_override(self):
171          clf = EmbeddingOD.for_text(quality='fast', detector='LOF')
172          assert clf.detector == 'LOF'
173  
174      def test_for_text_invalid_quality(self):
175          with self.assertRaises(ValueError):
176              EmbeddingOD.for_text(quality='invalid')
177  
178      def test_for_image_returns_instance(self):
179          clf = EmbeddingOD.for_image(quality='fast')
180          assert isinstance(clf, EmbeddingOD)
181          assert clf.encoder == 'dinov2-small'
182          assert clf.detector == 'KNN'
183  
184      def test_for_image_balanced(self):
185          clf = EmbeddingOD.for_image(quality='balanced')
186          assert clf.encoder == 'dinov2-base'
187          assert clf.detector == 'LOF'
188  
189      def test_for_image_best(self):
190          clf = EmbeddingOD.for_image(quality='best')
191          assert clf.encoder == 'dinov2-large'
192          assert clf.detector == 'KNN'
193  
194      def test_for_image_override(self):
195          clf = EmbeddingOD.for_image(quality='fast', detector='ECOD')
196          assert clf.detector == 'ECOD'
197  
198  
199  import importlib
200  
201  
202  @unittest.skipUnless(
203      importlib.util.find_spec('sentence_transformers') is not None,
204      "sentence-transformers not installed")
205  class TestEmbeddingODIntegration(unittest.TestCase):
206      """End-to-end test with real sentence-transformers encoder."""
207  
208      def setUp(self):
209          self.normal_train = [
210              "Sunny weather expected throughout the week",
211              "Light rain showers predicted for tomorrow morning",
212              "Temperature will reach 75 degrees today",
213              "Clear skies and mild winds this afternoon",
214              "A cold front will bring cooler temperatures",
215              "Morning fog expected to clear by noon",
216              "High pressure system bringing warm weather",
217              "Partly cloudy with a chance of evening showers",
218          ] * 10  # 80 normal training samples
219  
220          self.test_normal = [
221              "Thunderstorms likely later this evening",
222              "Weekend forecast shows pleasant conditions",
223          ] * 5  # 10 normal
224          self.test_anomaly = [
225              "The stock market crashed by 500 points today",
226              "Scientists discovered alien life on Mars",
227              "The football team won the championship game",
228          ]  # 3 anomalous (different topic)
229  
230          self.X_test = self.test_normal + self.test_anomaly
231          self.y_test = np.array([0] * 10 + [1] * 3)
232  
233      def test_text_detection_knn(self):
234          clf = EmbeddingOD(encoder='all-MiniLM-L6-v2', detector='KNN',
235                            contamination=0.1)
236          clf.fit(self.normal_train)
237  
238          scores = clf.decision_function(self.X_test)
239          assert_equal(scores.shape[0], len(self.X_test))
240  
241          labels = clf.predict(self.X_test)
242          assert set(labels).issubset({0, 1})
243  
244          proba = clf.predict_proba(self.X_test)
245          assert proba.min() >= 0
246          assert proba.max() <= 1
247  
248      def test_for_text_preset(self):
249          clf = EmbeddingOD.for_text(quality='fast')
250          clf.fit(self.normal_train)
251          scores = clf.decision_function(self.X_test)
252          assert_equal(scores.shape[0], len(self.X_test))
253  
254  
255  from pyod.models.embedding import MultiModalOD
256  from pyod.models.knn import KNN
257  
258  
259  def _mock_encoder_a(X):
260      rng = np.random.RandomState(10)
261      return rng.randn(len(X), 15)
262  
263  
264  def _mock_encoder_b(X):
265      rng = np.random.RandomState(20)
266      return rng.randn(len(X), 12)
267  
268  
269  class TestMultiModalOD(unittest.TestCase):
270      def setUp(self):
271          self.n_train = 200
272          self.n_test = 100
273          self.train_data = {
274              'text': [f"train_{i}" for i in range(self.n_train)],
275              'tabular': np.random.RandomState(42).randn(self.n_train, 5),
276          }
277          self.test_data = {
278              'text': [f"test_{i}" for i in range(self.n_test)],
279              'tabular': np.random.RandomState(43).randn(self.n_test, 5),
280          }
281  
282      def test_fit_and_predict(self):
283          clf = MultiModalOD(modalities={
284              'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
285              'tabular': KNN(),
286          })
287          clf.fit(self.train_data)
288          assert hasattr(clf, 'decision_scores_')
289          assert_equal(len(clf.decision_scores_), self.n_train)
290  
291          scores = clf.decision_function(self.test_data)
292          assert_equal(scores.shape[0], self.n_test)
293  
294      def test_predict_labels(self):
295          clf = MultiModalOD(modalities={
296              'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
297              'tabular': KNN(),
298          })
299          clf.fit(self.train_data)
300          labels = clf.predict(self.test_data)
301          assert_equal(labels.shape[0], self.n_test)
302          assert set(labels).issubset({0, 1})
303  
304      def test_combination_average(self):
305          clf = MultiModalOD(
306              modalities={
307                  'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
308                  'tabular': KNN(),
309              },
310              combination='average')
311          clf.fit(self.train_data)
312          assert hasattr(clf, 'decision_scores_')
313  
314      def test_combination_maximization(self):
315          clf = MultiModalOD(
316              modalities={
317                  'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
318                  'tabular': KNN(),
319              },
320              combination='maximization')
321          clf.fit(self.train_data)
322          assert hasattr(clf, 'decision_scores_')
323  
324      def test_combination_median(self):
325          clf = MultiModalOD(
326              modalities={
327                  'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
328                  'tabular': KNN(),
329              },
330              combination='median')
331          clf.fit(self.train_data)
332          assert hasattr(clf, 'decision_scores_')
333  
334      def test_invalid_combination_raises(self):
335          clf = MultiModalOD(
336              modalities={
337                  'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
338                  'tabular': KNN(),
339              },
340              combination='invalid')
341          with self.assertRaises(ValueError):
342              clf.fit(self.train_data)
343  
344      def test_missing_modality_raises(self):
345          clf = MultiModalOD(modalities={
346              'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
347              'tabular': KNN(),
348          })
349          with self.assertRaises(KeyError):
350              clf.fit({'text': self.train_data['text']})
351  
352      def test_non_dict_input_raises(self):
353          clf = MultiModalOD(modalities={
354              'tabular': KNN(),
355          })
356          with self.assertRaises(TypeError):
357              clf.fit(np.random.randn(50, 5))
358  
359      def test_three_modalities(self):
360          clf = MultiModalOD(modalities={
361              'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
362              'image': EmbeddingOD(encoder=_mock_encoder_b, detector='LOF'),
363              'tabular': KNN(),
364          })
365          train = {
366              'text': self.train_data['text'],
367              'image': [f"img_{i}" for i in range(self.n_train)],
368              'tabular': self.train_data['tabular'],
369          }
370          clf.fit(train)
371          assert len(clf.detectors_) == 3
372  
373      def test_no_standardize(self):
374          clf = MultiModalOD(
375              modalities={
376                  'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
377                  'tabular': KNN(),
378              },
379              standardize_scores=False)
380          clf.fit(self.train_data)
381          assert hasattr(clf, 'decision_scores_')
382  
383      def test_missing_modality_at_test_time(self):
384          clf = MultiModalOD(modalities={
385              'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
386              'tabular': KNN(),
387          })
388          clf.fit(self.train_data)
389          # At test time, text modality is missing
390          scores = clf.decision_function({
391              'text': None,
392              'tabular': self.test_data['tabular'],
393          })
394          assert_equal(scores.shape[0], self.n_test)
395  
396      def test_missing_modality_score_stability(self):
397          """Same sample should get same score regardless of batch size."""
398          clf = MultiModalOD(modalities={
399              'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
400              'tabular': KNN(),
401          })
402          clf.fit(self.train_data)
403  
404          # Score one sample with missing text
405          single = {'text': None,
406                    'tabular': self.test_data['tabular'][:1]}
407          score_single = clf.decision_function(single)[0]
408  
409          # Score same sample in a batch of 10
410          batch = {'text': None,
411                   'tabular': self.test_data['tabular'][:10]}
412          score_batch = clf.decision_function(batch)[0]
413  
414          # Scores should be identical (using training scalers)
415          np.testing.assert_allclose(score_single, score_batch)
416  
417      def test_missing_modality_predict(self):
418          """predict() should work with missing modalities."""
419          clf = MultiModalOD(modalities={
420              'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
421              'tabular': KNN(),
422          })
423          clf.fit(self.train_data)
424          labels = clf.predict({
425              'text': None,
426              'tabular': self.test_data['tabular'],
427          })
428          assert_equal(labels.shape[0], self.n_test)
429          assert set(labels).issubset({0, 1})
430  
431      def test_all_modalities_missing_raises(self):
432          clf = MultiModalOD(modalities={
433              'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'),
434              'tabular': KNN(),
435          })
436          clf.fit(self.train_data)
437          with self.assertRaises(ValueError):
438              clf.decision_function({'text': None, 'tabular': None})
439  
440      def test_detectors_are_cloned(self):
441          original_det = KNN()
442          clf = MultiModalOD(modalities={'tabular': original_det})
443          clf.fit({'tabular': self.train_data['tabular']})
444          assert not hasattr(original_det, 'decision_scores_')
445  
446  
447  if __name__ == '__main__':
448      unittest.main()