ts_sand.py
1 # -*- coding: utf-8 -*- 2 """SAND: Streaming Anomaly detection with Normalization and Drift adaptation. 3 4 Simplified PyOD adaptation of: 5 Boniol, P., Paparrizos, J., Palpanas, T. and Franklin, M.J., 2021. 6 SAND: Streaming subsequence anomaly detection. 7 *Proceedings of the VLDB Endowment*, 14(10), pp. 1717-1729. 8 9 This implementation extracts z-normalized sliding-window subsequences, 10 initializes k centroids using k-Shape on the first batch, then scores 11 remaining subsequences by SBD (shape-based distance) to their nearest 12 centroid. Centroids are updated every ``batch_size`` subsequences via 13 exponential moving average with parameter ``alpha``. 14 """ 15 # Author: Yue Zhao <yzhao062@gmail.com> 16 # License: BSD 2 clause 17 18 import numpy as np 19 from sklearn.utils.validation import check_is_fitted 20 21 from .base import BaseDetector 22 from ._ts_utils import (validate_ts_input, sliding_windows, 23 map_scores_to_timestamps, aggregate_channel_scores) 24 from .ts_kshape import _znormalize, _sbd, _kshape 25 26 27 class SAND(BaseDetector): 28 """SAND streaming anomaly detector for time series. 29 30 Extracts z-normalized sliding-window subsequences, initializes 31 k centroids via k-Shape on the first batch, and scores each 32 subsequence by its SBD to the nearest centroid. Centroids are 33 updated every ``batch_size`` subsequences using an exponential 34 moving average, enabling drift adaptation. 35 36 Parameters 37 ---------- 38 window_size : int, optional (default=50) 39 Subsequence length for the sliding window. 40 41 n_clusters : int, optional (default=5) 42 Number of centroids for k-Shape clustering. 43 44 alpha : float, optional (default=0.5) 45 Smoothing factor for centroid updates. 46 ``new = alpha * batch_centroid + (1 - alpha) * old_centroid``. 47 48 batch_size : int, optional (default=100) 49 Number of subsequences between centroid updates. 50 51 max_iter : int, optional (default=50) 52 Maximum iterations for initial k-Shape clustering. 53 54 contamination : float, optional (default=0.1) 55 Expected proportion of outliers. Must be in (0, 0.5]. 56 57 channel_aggregation : str, optional (default='max') 58 How to aggregate per-channel scores for multivariate input. 59 One of ``'max'`` or ``'mean'``. 60 61 random_state : int or None, optional (default=42) 62 Random seed for reproducible centroid initialization. 63 64 Attributes 65 ---------- 66 decision_scores_ : numpy array of shape (n_timestamps,) 67 Outlier scores of the training data. Higher is more abnormal. 68 69 threshold_ : float 70 Score threshold derived from ``contamination``. 71 72 labels_ : numpy array of shape (n_timestamps,) 73 Binary labels (0: inlier, 1: outlier). 74 75 centroids_ : list of numpy arrays 76 Per-channel centroids after processing all training data. 77 78 Examples 79 -------- 80 >>> from pyod.models.ts_sand import SAND 81 >>> import numpy as np 82 >>> X_train = np.random.randn(500) 83 >>> clf = SAND(n_clusters=3, window_size=20, contamination=0.1) 84 >>> clf.fit(X_train) 85 >>> scores = clf.decision_function(np.random.randn(200)) 86 87 References 88 ---------- 89 .. [1] Boniol, P., Paparrizos, J., Palpanas, T. and Franklin, M.J., 90 2021. SAND: Streaming subsequence anomaly detection. 91 *Proceedings of the VLDB Endowment*, 14(10), pp. 1717-1729. 92 """ 93 94 def __init__(self, window_size=50, n_clusters=5, alpha=0.5, 95 batch_size=100, max_iter=50, contamination=0.1, 96 channel_aggregation='max', random_state=42): 97 super(SAND, self).__init__(contamination=contamination) 98 self.window_size = window_size 99 self.n_clusters = n_clusters 100 self.alpha = alpha 101 self.batch_size = batch_size 102 self.max_iter = max_iter 103 self.channel_aggregation = channel_aggregation 104 self.random_state = random_state 105 106 def _get_min_length(self): 107 """Minimum time series length required.""" 108 return self.window_size + 1 109 110 @staticmethod 111 def _znorm_windows(windows): 112 """Z-normalize each row of a window matrix.""" 113 return np.array([_znormalize(w) for w in windows]) 114 115 @staticmethod 116 def _score_subsequences(subs, centroids): 117 """Score each subsequence by SBD to nearest centroid. 118 119 Parameters 120 ---------- 121 subs : np.ndarray of shape (n_subs, length) 122 centroids : np.ndarray of shape (n_clusters, length) 123 124 Returns 125 ------- 126 scores : np.ndarray of shape (n_subs,) 127 """ 128 scores = np.empty(len(subs)) 129 for i, s in enumerate(subs): 130 best_dist = np.inf 131 for c in centroids: 132 d, _ = _sbd(s, c) 133 if d < best_dist: 134 best_dist = d 135 scores[i] = best_dist 136 return scores 137 138 def _streaming_fit_channel(self, windows): 139 """Fit on one channel: initialize centroids, stream with updates. 140 141 Parameters 142 ---------- 143 windows : np.ndarray of shape (n_windows, window_size) 144 Z-normalized sliding windows for one channel. 145 146 Returns 147 ------- 148 scores : np.ndarray of shape (n_windows,) 149 centroids : np.ndarray of shape (n_clusters, window_size) 150 """ 151 n = len(windows) 152 batch_size = min(self.batch_size, n) 153 rng = np.random.RandomState(self.random_state) 154 155 # Initialize centroids on the first batch using k-Shape 156 init_batch = windows[:batch_size] 157 centroids, _, _ = _kshape(init_batch, self.n_clusters, 158 max_iter=self.max_iter, 159 random_state=rng) 160 161 # Score all subsequences in a streaming fashion 162 scores = np.empty(n) 163 164 # Process in batches for centroid updates 165 pos = 0 166 while pos < n: 167 end = min(pos + batch_size, n) 168 batch = windows[pos:end] 169 170 # Score this batch against current centroids 171 scores[pos:end] = self._score_subsequences(batch, centroids) 172 173 # Update centroids if this is not the last batch 174 if end < n and len(batch) >= self.n_clusters: 175 batch_rng = np.random.RandomState(self.random_state) 176 batch_centroids, _, _ = _kshape( 177 batch, self.n_clusters, 178 max_iter=self.max_iter, 179 random_state=batch_rng) 180 for k in range(self.n_clusters): 181 updated = (self.alpha * batch_centroids[k] 182 + (1.0 - self.alpha) * centroids[k]) 183 centroids[k] = _znormalize(updated) 184 185 pos = end 186 187 return scores, centroids 188 189 def fit(self, X, y=None): 190 """Fit the SAND detector on time series data. 191 192 Parameters 193 ---------- 194 X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels) 195 Training time series data. 196 197 y : Ignored 198 Not used, present for API consistency. 199 200 Returns 201 ------- 202 self : object 203 Fitted estimator. 204 """ 205 X = validate_ts_input(X) 206 n_timestamps, n_channels = X.shape 207 min_len = self._get_min_length() 208 if n_timestamps < min_len: 209 raise ValueError( 210 "Time series length %d is shorter than minimum " 211 "required length %d (window_size=%d)" 212 % (n_timestamps, min_len, self.window_size)) 213 214 n_windows = n_timestamps - self.window_size + 1 215 init_batch = min(self.batch_size, n_windows) 216 if init_batch < self.n_clusters: 217 raise ValueError( 218 "Not enough subsequences in initial batch (%d) for " 219 "n_clusters=%d. Need a longer series, larger batch_size, " 220 "or fewer clusters." % (init_batch, self.n_clusters)) 221 222 self._set_n_classes(y) 223 224 # Store channel count for validation in decision_function 225 self.n_channels_ = n_channels 226 227 # Process each channel independently 228 self.centroids_ = [] 229 per_channel_results = [] 230 231 for ch in range(n_channels): 232 channel = X[:, ch].reshape(-1, 1) 233 windows = sliding_windows(channel, self.window_size, step=1) 234 windows = self._znorm_windows(windows) 235 236 ch_window_scores, ch_centroids = self._streaming_fit_channel( 237 windows) 238 self.centroids_.append(ch_centroids) 239 240 # Map window scores back to timestamps 241 ts_scores, valid_mask = map_scores_to_timestamps( 242 ch_window_scores, self.window_size, step=1, 243 n_timestamps=n_timestamps, aggregation='max') 244 per_channel_results.append((ts_scores, valid_mask)) 245 246 # Aggregate channels 247 if n_channels == 1: 248 scores, valid_mask = per_channel_results[0] 249 else: 250 filled_scores = [] 251 combined_valid = per_channel_results[0][1].copy() 252 for ch_scores, ch_valid in per_channel_results: 253 filled = ch_scores.copy() 254 filled[~ch_valid] = 0.0 255 filled_scores.append(filled) 256 combined_valid &= ch_valid 257 scores = aggregate_channel_scores( 258 filled_scores, method=self.channel_aggregation) 259 valid_mask = combined_valid 260 261 # Masked-score workflow: threshold from valid subset 262 valid_scores = scores[valid_mask] 263 self.decision_scores_ = valid_scores 264 self._process_decision_scores() 265 266 # Reconstruct full-length arrays 267 full_scores = scores.copy() 268 full_scores[~valid_mask] = self.threshold_ 269 full_labels = (full_scores > self.threshold_).astype(int) 270 self.decision_scores_ = full_scores 271 self.labels_ = full_labels 272 273 return self 274 275 def decision_function(self, X): 276 """Predict raw anomaly scores for a test time series. 277 278 Parameters 279 ---------- 280 X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels) 281 Test time series data. 282 283 Returns 284 ------- 285 anomaly_scores : numpy array of shape (n_timestamps,) 286 Anomaly scores. Higher is more abnormal. 287 """ 288 check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_', 289 'centroids_']) 290 X = validate_ts_input(X) 291 n_timestamps, n_channels = X.shape 292 293 if n_channels != self.n_channels_: 294 raise ValueError( 295 "Channel count mismatch: model fitted on %d channels, " 296 "got %d" % (self.n_channels_, n_channels)) 297 298 per_channel_results = [] 299 for ch in range(n_channels): 300 channel = X[:, ch].reshape(-1, 1) 301 windows = sliding_windows(channel, self.window_size, step=1) 302 windows = self._znorm_windows(windows) 303 304 ch_window_scores = self._score_subsequences( 305 windows, 306 self.centroids_[ch]) 307 308 ts_scores, valid_mask = map_scores_to_timestamps( 309 ch_window_scores, self.window_size, step=1, 310 n_timestamps=n_timestamps, aggregation='max') 311 per_channel_results.append((ts_scores, valid_mask)) 312 313 if n_channels == 1: 314 scores, valid_mask = per_channel_results[0] 315 else: 316 filled_scores = [] 317 combined_valid = per_channel_results[0][1].copy() 318 for ch_scores, ch_valid in per_channel_results: 319 filled = ch_scores.copy() 320 filled[~ch_valid] = 0.0 321 filled_scores.append(filled) 322 combined_valid &= ch_valid 323 scores = aggregate_channel_scores( 324 filled_scores, method=self.channel_aggregation) 325 valid_mask = combined_valid 326 327 scores[~valid_mask] = self.threshold_ 328 return scores