test_embedding.py
1 # -*- coding: utf-8 -*- 2 3 import os 4 import sys 5 import unittest 6 7 import numpy as np 8 from numpy.testing import assert_equal 9 from sklearn.base import clone 10 11 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) 12 13 from pyod.models.embedding import EmbeddingOD 14 15 16 def _mock_encoder(X): 17 """Deterministic mock encoder for testing.""" 18 rng = np.random.RandomState(42) 19 return rng.randn(len(X), 20) 20 21 22 class TestEmbeddingOD(unittest.TestCase): 23 def setUp(self): 24 self.n_train = 200 25 self.n_test = 100 26 self.contamination = 0.1 27 self.X_train = [f"train_{i}" for i in range(self.n_train)] 28 self.X_test = [f"test_{i}" for i in range(self.n_test)] 29 30 self.clf = EmbeddingOD(encoder=_mock_encoder, detector='KNN', 31 contamination=self.contamination) 32 self.clf.fit(self.X_train) 33 34 def test_parameters(self): 35 assert (hasattr(self.clf, 'decision_scores_') and 36 self.clf.decision_scores_ is not None) 37 assert (hasattr(self.clf, 'labels_') and 38 self.clf.labels_ is not None) 39 assert (hasattr(self.clf, 'threshold_') and 40 self.clf.threshold_ is not None) 41 assert (hasattr(self.clf, '_mu') and 42 self.clf._mu is not None) 43 assert (hasattr(self.clf, '_sigma') and 44 self.clf._sigma is not None) 45 46 def test_train_scores(self): 47 assert_equal(len(self.clf.decision_scores_), self.n_train) 48 49 def test_prediction_scores(self): 50 pred_scores = self.clf.decision_function(self.X_test) 51 assert_equal(pred_scores.shape[0], self.n_test) 52 53 def test_prediction_labels(self): 54 pred_labels = self.clf.predict(self.X_test) 55 assert_equal(pred_labels.shape[0], self.n_test) 56 assert set(pred_labels).issubset({0, 1}) 57 58 def test_prediction_proba(self): 59 pred_proba = self.clf.predict_proba(self.X_test) 60 assert (pred_proba.min() >= 0) 61 assert (pred_proba.max() <= 1) 62 63 def test_prediction_proba_linear(self): 64 pred_proba = self.clf.predict_proba(self.X_test, method='linear') 65 assert (pred_proba.min() >= 0) 66 assert (pred_proba.max() <= 1) 67 68 def test_prediction_proba_unify(self): 69 pred_proba = self.clf.predict_proba(self.X_test, method='unify') 70 assert (pred_proba.min() >= 0) 71 assert (pred_proba.max() <= 1) 72 73 def test_prediction_proba_parameter(self): 74 with self.assertRaises(ValueError): 75 self.clf.predict_proba(self.X_test, method='something') 76 77 def test_prediction_labels_confidence(self): 78 pred_labels, confidence = self.clf.predict(self.X_test, 79 return_confidence=True) 80 assert_equal(pred_labels.shape[0], self.n_test) 81 assert_equal(confidence.shape[0], self.n_test) 82 assert (confidence.min() >= 0) 83 assert (confidence.max() <= 1) 84 85 def test_prediction_with_rejection(self): 86 pred_labels = self.clf.predict_with_rejection(self.X_test, 87 return_stats=False) 88 assert_equal(pred_labels.shape[0], self.n_test) 89 90 def test_detector_string_resolution(self): 91 for name in ['KNN', 'LOF', 'ECOD', 'IForest', 'HBOS', 92 'COPOD', 'PCA', 'OCSVM', 'INNE']: 93 clf = EmbeddingOD(encoder=_mock_encoder, detector=name) 94 clf.fit(self.X_train) 95 assert hasattr(clf, 'decision_scores_') 96 97 def test_detector_instance(self): 98 from pyod.models.knn import KNN 99 clf = EmbeddingOD(encoder=_mock_encoder, 100 detector=KNN(n_neighbors=3)) 101 clf.fit(self.X_train) 102 assert hasattr(clf, 'decision_scores_') 103 104 def test_detector_instance_is_cloned(self): 105 from pyod.models.knn import KNN 106 original = KNN(n_neighbors=3) 107 clf = EmbeddingOD(encoder=_mock_encoder, detector=original) 108 clf.fit(self.X_train) 109 # original should not be fitted (it was cloned) 110 assert not hasattr(original, 'decision_scores_') 111 112 def test_invalid_detector_raises(self): 113 with self.assertRaises(ValueError): 114 EmbeddingOD(encoder=_mock_encoder, 115 detector='NoSuchDetector').fit(self.X_train) 116 117 def test_standardize(self): 118 clf = EmbeddingOD(encoder=_mock_encoder, detector='KNN', 119 standardize=True) 120 clf.fit(self.X_train) 121 assert hasattr(clf, 'scaler_') 122 123 def test_no_standardize(self): 124 clf = EmbeddingOD(encoder=_mock_encoder, detector='KNN', 125 standardize=False) 126 clf.fit(self.X_train) 127 assert not hasattr(clf, 'scaler_') 128 129 def test_reduce_dim(self): 130 clf = EmbeddingOD(encoder=_mock_encoder, detector='KNN', 131 reduce_dim=5) 132 clf.fit(self.X_train) 133 assert hasattr(clf, 'pca_') 134 scores = clf.decision_function(self.X_test) 135 assert_equal(scores.shape[0], self.n_test) 136 137 def test_cache_embeddings(self): 138 clf = EmbeddingOD(encoder=_mock_encoder, detector='KNN', 139 cache_embeddings=True) 140 clf.fit(self.X_train) 141 assert hasattr(clf, 'train_embeddings_') 142 assert_equal(clf.train_embeddings_.shape[0], self.n_train) 143 144 def test_model_clone(self): 145 clone_clf = clone(self.clf) 146 147 def test_default_detector_is_lunar(self): 148 clf = EmbeddingOD(encoder=_mock_encoder) 149 assert clf.detector == 'LUNAR' 150 151 152 class TestEmbeddingODPresets(unittest.TestCase): 153 def test_for_text_returns_instance(self): 154 clf = EmbeddingOD.for_text(quality='fast') 155 assert isinstance(clf, EmbeddingOD) 156 assert clf.encoder == 'all-MiniLM-L6-v2' 157 assert clf.detector == 'KNN' 158 159 def test_for_text_balanced(self): 160 clf = EmbeddingOD.for_text(quality='balanced') 161 assert clf.encoder == 'all-mpnet-base-v2' 162 assert clf.detector == 'LUNAR' 163 164 def test_for_text_best(self): 165 clf = EmbeddingOD.for_text(quality='best') 166 assert clf.encoder == 'text-embedding-3-large' 167 assert clf.detector == 'LUNAR' 168 assert clf.cache_embeddings is True 169 170 def test_for_text_override(self): 171 clf = EmbeddingOD.for_text(quality='fast', detector='LOF') 172 assert clf.detector == 'LOF' 173 174 def test_for_text_invalid_quality(self): 175 with self.assertRaises(ValueError): 176 EmbeddingOD.for_text(quality='invalid') 177 178 def test_for_image_returns_instance(self): 179 clf = EmbeddingOD.for_image(quality='fast') 180 assert isinstance(clf, EmbeddingOD) 181 assert clf.encoder == 'dinov2-small' 182 assert clf.detector == 'KNN' 183 184 def test_for_image_balanced(self): 185 clf = EmbeddingOD.for_image(quality='balanced') 186 assert clf.encoder == 'dinov2-base' 187 assert clf.detector == 'LOF' 188 189 def test_for_image_best(self): 190 clf = EmbeddingOD.for_image(quality='best') 191 assert clf.encoder == 'dinov2-large' 192 assert clf.detector == 'KNN' 193 194 def test_for_image_override(self): 195 clf = EmbeddingOD.for_image(quality='fast', detector='ECOD') 196 assert clf.detector == 'ECOD' 197 198 199 import importlib 200 201 202 @unittest.skipUnless( 203 importlib.util.find_spec('sentence_transformers') is not None, 204 "sentence-transformers not installed") 205 class TestEmbeddingODIntegration(unittest.TestCase): 206 """End-to-end test with real sentence-transformers encoder.""" 207 208 def setUp(self): 209 self.normal_train = [ 210 "Sunny weather expected throughout the week", 211 "Light rain showers predicted for tomorrow morning", 212 "Temperature will reach 75 degrees today", 213 "Clear skies and mild winds this afternoon", 214 "A cold front will bring cooler temperatures", 215 "Morning fog expected to clear by noon", 216 "High pressure system bringing warm weather", 217 "Partly cloudy with a chance of evening showers", 218 ] * 10 # 80 normal training samples 219 220 self.test_normal = [ 221 "Thunderstorms likely later this evening", 222 "Weekend forecast shows pleasant conditions", 223 ] * 5 # 10 normal 224 self.test_anomaly = [ 225 "The stock market crashed by 500 points today", 226 "Scientists discovered alien life on Mars", 227 "The football team won the championship game", 228 ] # 3 anomalous (different topic) 229 230 self.X_test = self.test_normal + self.test_anomaly 231 self.y_test = np.array([0] * 10 + [1] * 3) 232 233 def test_text_detection_knn(self): 234 clf = EmbeddingOD(encoder='all-MiniLM-L6-v2', detector='KNN', 235 contamination=0.1) 236 clf.fit(self.normal_train) 237 238 scores = clf.decision_function(self.X_test) 239 assert_equal(scores.shape[0], len(self.X_test)) 240 241 labels = clf.predict(self.X_test) 242 assert set(labels).issubset({0, 1}) 243 244 proba = clf.predict_proba(self.X_test) 245 assert proba.min() >= 0 246 assert proba.max() <= 1 247 248 def test_for_text_preset(self): 249 clf = EmbeddingOD.for_text(quality='fast') 250 clf.fit(self.normal_train) 251 scores = clf.decision_function(self.X_test) 252 assert_equal(scores.shape[0], len(self.X_test)) 253 254 255 from pyod.models.embedding import MultiModalOD 256 from pyod.models.knn import KNN 257 258 259 def _mock_encoder_a(X): 260 rng = np.random.RandomState(10) 261 return rng.randn(len(X), 15) 262 263 264 def _mock_encoder_b(X): 265 rng = np.random.RandomState(20) 266 return rng.randn(len(X), 12) 267 268 269 class TestMultiModalOD(unittest.TestCase): 270 def setUp(self): 271 self.n_train = 200 272 self.n_test = 100 273 self.train_data = { 274 'text': [f"train_{i}" for i in range(self.n_train)], 275 'tabular': np.random.RandomState(42).randn(self.n_train, 5), 276 } 277 self.test_data = { 278 'text': [f"test_{i}" for i in range(self.n_test)], 279 'tabular': np.random.RandomState(43).randn(self.n_test, 5), 280 } 281 282 def test_fit_and_predict(self): 283 clf = MultiModalOD(modalities={ 284 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 285 'tabular': KNN(), 286 }) 287 clf.fit(self.train_data) 288 assert hasattr(clf, 'decision_scores_') 289 assert_equal(len(clf.decision_scores_), self.n_train) 290 291 scores = clf.decision_function(self.test_data) 292 assert_equal(scores.shape[0], self.n_test) 293 294 def test_predict_labels(self): 295 clf = MultiModalOD(modalities={ 296 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 297 'tabular': KNN(), 298 }) 299 clf.fit(self.train_data) 300 labels = clf.predict(self.test_data) 301 assert_equal(labels.shape[0], self.n_test) 302 assert set(labels).issubset({0, 1}) 303 304 def test_combination_average(self): 305 clf = MultiModalOD( 306 modalities={ 307 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 308 'tabular': KNN(), 309 }, 310 combination='average') 311 clf.fit(self.train_data) 312 assert hasattr(clf, 'decision_scores_') 313 314 def test_combination_maximization(self): 315 clf = MultiModalOD( 316 modalities={ 317 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 318 'tabular': KNN(), 319 }, 320 combination='maximization') 321 clf.fit(self.train_data) 322 assert hasattr(clf, 'decision_scores_') 323 324 def test_combination_median(self): 325 clf = MultiModalOD( 326 modalities={ 327 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 328 'tabular': KNN(), 329 }, 330 combination='median') 331 clf.fit(self.train_data) 332 assert hasattr(clf, 'decision_scores_') 333 334 def test_invalid_combination_raises(self): 335 clf = MultiModalOD( 336 modalities={ 337 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 338 'tabular': KNN(), 339 }, 340 combination='invalid') 341 with self.assertRaises(ValueError): 342 clf.fit(self.train_data) 343 344 def test_missing_modality_raises(self): 345 clf = MultiModalOD(modalities={ 346 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 347 'tabular': KNN(), 348 }) 349 with self.assertRaises(KeyError): 350 clf.fit({'text': self.train_data['text']}) 351 352 def test_non_dict_input_raises(self): 353 clf = MultiModalOD(modalities={ 354 'tabular': KNN(), 355 }) 356 with self.assertRaises(TypeError): 357 clf.fit(np.random.randn(50, 5)) 358 359 def test_three_modalities(self): 360 clf = MultiModalOD(modalities={ 361 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 362 'image': EmbeddingOD(encoder=_mock_encoder_b, detector='LOF'), 363 'tabular': KNN(), 364 }) 365 train = { 366 'text': self.train_data['text'], 367 'image': [f"img_{i}" for i in range(self.n_train)], 368 'tabular': self.train_data['tabular'], 369 } 370 clf.fit(train) 371 assert len(clf.detectors_) == 3 372 373 def test_no_standardize(self): 374 clf = MultiModalOD( 375 modalities={ 376 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 377 'tabular': KNN(), 378 }, 379 standardize_scores=False) 380 clf.fit(self.train_data) 381 assert hasattr(clf, 'decision_scores_') 382 383 def test_missing_modality_at_test_time(self): 384 clf = MultiModalOD(modalities={ 385 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 386 'tabular': KNN(), 387 }) 388 clf.fit(self.train_data) 389 # At test time, text modality is missing 390 scores = clf.decision_function({ 391 'text': None, 392 'tabular': self.test_data['tabular'], 393 }) 394 assert_equal(scores.shape[0], self.n_test) 395 396 def test_missing_modality_score_stability(self): 397 """Same sample should get same score regardless of batch size.""" 398 clf = MultiModalOD(modalities={ 399 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 400 'tabular': KNN(), 401 }) 402 clf.fit(self.train_data) 403 404 # Score one sample with missing text 405 single = {'text': None, 406 'tabular': self.test_data['tabular'][:1]} 407 score_single = clf.decision_function(single)[0] 408 409 # Score same sample in a batch of 10 410 batch = {'text': None, 411 'tabular': self.test_data['tabular'][:10]} 412 score_batch = clf.decision_function(batch)[0] 413 414 # Scores should be identical (using training scalers) 415 np.testing.assert_allclose(score_single, score_batch) 416 417 def test_missing_modality_predict(self): 418 """predict() should work with missing modalities.""" 419 clf = MultiModalOD(modalities={ 420 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 421 'tabular': KNN(), 422 }) 423 clf.fit(self.train_data) 424 labels = clf.predict({ 425 'text': None, 426 'tabular': self.test_data['tabular'], 427 }) 428 assert_equal(labels.shape[0], self.n_test) 429 assert set(labels).issubset({0, 1}) 430 431 def test_all_modalities_missing_raises(self): 432 clf = MultiModalOD(modalities={ 433 'text': EmbeddingOD(encoder=_mock_encoder_a, detector='KNN'), 434 'tabular': KNN(), 435 }) 436 clf.fit(self.train_data) 437 with self.assertRaises(ValueError): 438 clf.decision_function({'text': None, 'tabular': None}) 439 440 def test_detectors_are_cloned(self): 441 original_det = KNN() 442 clf = MultiModalOD(modalities={'tabular': original_det}) 443 clf.fit({'tabular': self.train_data['tabular']}) 444 assert not hasattr(original_det, 'decision_scores_') 445 446 447 if __name__ == '__main__': 448 unittest.main()