/ pyod / models / ts_sand.py
ts_sand.py
  1  # -*- coding: utf-8 -*-
  2  """SAND: Streaming Anomaly detection with Normalization and Drift adaptation.
  3  
  4  Simplified PyOD adaptation of:
  5  Boniol, P., Paparrizos, J., Palpanas, T. and Franklin, M.J., 2021.
  6  SAND: Streaming subsequence anomaly detection.
  7  *Proceedings of the VLDB Endowment*, 14(10), pp. 1717-1729.
  8  
  9  This implementation extracts z-normalized sliding-window subsequences,
 10  initializes k centroids using k-Shape on the first batch, then scores
 11  remaining subsequences by SBD (shape-based distance) to their nearest
 12  centroid.  Centroids are updated every ``batch_size`` subsequences via
 13  exponential moving average with parameter ``alpha``.
 14  """
 15  # Author: Yue Zhao <yzhao062@gmail.com>
 16  # License: BSD 2 clause
 17  
 18  import numpy as np
 19  from sklearn.utils.validation import check_is_fitted
 20  
 21  from .base import BaseDetector
 22  from ._ts_utils import (validate_ts_input, sliding_windows,
 23                           map_scores_to_timestamps, aggregate_channel_scores)
 24  from .ts_kshape import _znormalize, _sbd, _kshape
 25  
 26  
 27  class SAND(BaseDetector):
 28      """SAND streaming anomaly detector for time series.
 29  
 30      Extracts z-normalized sliding-window subsequences, initializes
 31      k centroids via k-Shape on the first batch, and scores each
 32      subsequence by its SBD to the nearest centroid.  Centroids are
 33      updated every ``batch_size`` subsequences using an exponential
 34      moving average, enabling drift adaptation.
 35  
 36      Parameters
 37      ----------
 38      window_size : int, optional (default=50)
 39          Subsequence length for the sliding window.
 40  
 41      n_clusters : int, optional (default=5)
 42          Number of centroids for k-Shape clustering.
 43  
 44      alpha : float, optional (default=0.5)
 45          Smoothing factor for centroid updates.
 46          ``new = alpha * batch_centroid + (1 - alpha) * old_centroid``.
 47  
 48      batch_size : int, optional (default=100)
 49          Number of subsequences between centroid updates.
 50  
 51      max_iter : int, optional (default=50)
 52          Maximum iterations for initial k-Shape clustering.
 53  
 54      contamination : float, optional (default=0.1)
 55          Expected proportion of outliers.  Must be in (0, 0.5].
 56  
 57      channel_aggregation : str, optional (default='max')
 58          How to aggregate per-channel scores for multivariate input.
 59          One of ``'max'`` or ``'mean'``.
 60  
 61      random_state : int or None, optional (default=42)
 62          Random seed for reproducible centroid initialization.
 63  
 64      Attributes
 65      ----------
 66      decision_scores_ : numpy array of shape (n_timestamps,)
 67          Outlier scores of the training data.  Higher is more abnormal.
 68  
 69      threshold_ : float
 70          Score threshold derived from ``contamination``.
 71  
 72      labels_ : numpy array of shape (n_timestamps,)
 73          Binary labels (0: inlier, 1: outlier).
 74  
 75      centroids_ : list of numpy arrays
 76          Per-channel centroids after processing all training data.
 77  
 78      Examples
 79      --------
 80      >>> from pyod.models.ts_sand import SAND
 81      >>> import numpy as np
 82      >>> X_train = np.random.randn(500)
 83      >>> clf = SAND(n_clusters=3, window_size=20, contamination=0.1)
 84      >>> clf.fit(X_train)
 85      >>> scores = clf.decision_function(np.random.randn(200))
 86  
 87      References
 88      ----------
 89      .. [1] Boniol, P., Paparrizos, J., Palpanas, T. and Franklin, M.J.,
 90         2021. SAND: Streaming subsequence anomaly detection.
 91         *Proceedings of the VLDB Endowment*, 14(10), pp. 1717-1729.
 92      """
 93  
 94      def __init__(self, window_size=50, n_clusters=5, alpha=0.5,
 95                   batch_size=100, max_iter=50, contamination=0.1,
 96                   channel_aggregation='max', random_state=42):
 97          super(SAND, self).__init__(contamination=contamination)
 98          self.window_size = window_size
 99          self.n_clusters = n_clusters
100          self.alpha = alpha
101          self.batch_size = batch_size
102          self.max_iter = max_iter
103          self.channel_aggregation = channel_aggregation
104          self.random_state = random_state
105  
106      def _get_min_length(self):
107          """Minimum time series length required."""
108          return self.window_size + 1
109  
110      @staticmethod
111      def _znorm_windows(windows):
112          """Z-normalize each row of a window matrix."""
113          return np.array([_znormalize(w) for w in windows])
114  
115      @staticmethod
116      def _score_subsequences(subs, centroids):
117          """Score each subsequence by SBD to nearest centroid.
118  
119          Parameters
120          ----------
121          subs : np.ndarray of shape (n_subs, length)
122          centroids : np.ndarray of shape (n_clusters, length)
123  
124          Returns
125          -------
126          scores : np.ndarray of shape (n_subs,)
127          """
128          scores = np.empty(len(subs))
129          for i, s in enumerate(subs):
130              best_dist = np.inf
131              for c in centroids:
132                  d, _ = _sbd(s, c)
133                  if d < best_dist:
134                      best_dist = d
135              scores[i] = best_dist
136          return scores
137  
138      def _streaming_fit_channel(self, windows):
139          """Fit on one channel: initialize centroids, stream with updates.
140  
141          Parameters
142          ----------
143          windows : np.ndarray of shape (n_windows, window_size)
144              Z-normalized sliding windows for one channel.
145  
146          Returns
147          -------
148          scores : np.ndarray of shape (n_windows,)
149          centroids : np.ndarray of shape (n_clusters, window_size)
150          """
151          n = len(windows)
152          batch_size = min(self.batch_size, n)
153          rng = np.random.RandomState(self.random_state)
154  
155          # Initialize centroids on the first batch using k-Shape
156          init_batch = windows[:batch_size]
157          centroids, _, _ = _kshape(init_batch, self.n_clusters,
158                                    max_iter=self.max_iter,
159                                    random_state=rng)
160  
161          # Score all subsequences in a streaming fashion
162          scores = np.empty(n)
163  
164          # Process in batches for centroid updates
165          pos = 0
166          while pos < n:
167              end = min(pos + batch_size, n)
168              batch = windows[pos:end]
169  
170              # Score this batch against current centroids
171              scores[pos:end] = self._score_subsequences(batch, centroids)
172  
173              # Update centroids if this is not the last batch
174              if end < n and len(batch) >= self.n_clusters:
175                  batch_rng = np.random.RandomState(self.random_state)
176                  batch_centroids, _, _ = _kshape(
177                      batch, self.n_clusters,
178                      max_iter=self.max_iter,
179                      random_state=batch_rng)
180                  for k in range(self.n_clusters):
181                      updated = (self.alpha * batch_centroids[k]
182                                 + (1.0 - self.alpha) * centroids[k])
183                      centroids[k] = _znormalize(updated)
184  
185              pos = end
186  
187          return scores, centroids
188  
189      def fit(self, X, y=None):
190          """Fit the SAND detector on time series data.
191  
192          Parameters
193          ----------
194          X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels)
195              Training time series data.
196  
197          y : Ignored
198              Not used, present for API consistency.
199  
200          Returns
201          -------
202          self : object
203              Fitted estimator.
204          """
205          X = validate_ts_input(X)
206          n_timestamps, n_channels = X.shape
207          min_len = self._get_min_length()
208          if n_timestamps < min_len:
209              raise ValueError(
210                  "Time series length %d is shorter than minimum "
211                  "required length %d (window_size=%d)"
212                  % (n_timestamps, min_len, self.window_size))
213  
214          n_windows = n_timestamps - self.window_size + 1
215          init_batch = min(self.batch_size, n_windows)
216          if init_batch < self.n_clusters:
217              raise ValueError(
218                  "Not enough subsequences in initial batch (%d) for "
219                  "n_clusters=%d. Need a longer series, larger batch_size, "
220                  "or fewer clusters." % (init_batch, self.n_clusters))
221  
222          self._set_n_classes(y)
223  
224          # Store channel count for validation in decision_function
225          self.n_channels_ = n_channels
226  
227          # Process each channel independently
228          self.centroids_ = []
229          per_channel_results = []
230  
231          for ch in range(n_channels):
232              channel = X[:, ch].reshape(-1, 1)
233              windows = sliding_windows(channel, self.window_size, step=1)
234              windows = self._znorm_windows(windows)
235  
236              ch_window_scores, ch_centroids = self._streaming_fit_channel(
237                  windows)
238              self.centroids_.append(ch_centroids)
239  
240              # Map window scores back to timestamps
241              ts_scores, valid_mask = map_scores_to_timestamps(
242                  ch_window_scores, self.window_size, step=1,
243                  n_timestamps=n_timestamps, aggregation='max')
244              per_channel_results.append((ts_scores, valid_mask))
245  
246          # Aggregate channels
247          if n_channels == 1:
248              scores, valid_mask = per_channel_results[0]
249          else:
250              filled_scores = []
251              combined_valid = per_channel_results[0][1].copy()
252              for ch_scores, ch_valid in per_channel_results:
253                  filled = ch_scores.copy()
254                  filled[~ch_valid] = 0.0
255                  filled_scores.append(filled)
256                  combined_valid &= ch_valid
257              scores = aggregate_channel_scores(
258                  filled_scores, method=self.channel_aggregation)
259              valid_mask = combined_valid
260  
261          # Masked-score workflow: threshold from valid subset
262          valid_scores = scores[valid_mask]
263          self.decision_scores_ = valid_scores
264          self._process_decision_scores()
265  
266          # Reconstruct full-length arrays
267          full_scores = scores.copy()
268          full_scores[~valid_mask] = self.threshold_
269          full_labels = (full_scores > self.threshold_).astype(int)
270          self.decision_scores_ = full_scores
271          self.labels_ = full_labels
272  
273          return self
274  
275      def decision_function(self, X):
276          """Predict raw anomaly scores for a test time series.
277  
278          Parameters
279          ----------
280          X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels)
281              Test time series data.
282  
283          Returns
284          -------
285          anomaly_scores : numpy array of shape (n_timestamps,)
286              Anomaly scores.  Higher is more abnormal.
287          """
288          check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_',
289                                  'centroids_'])
290          X = validate_ts_input(X)
291          n_timestamps, n_channels = X.shape
292  
293          if n_channels != self.n_channels_:
294              raise ValueError(
295                  "Channel count mismatch: model fitted on %d channels, "
296                  "got %d" % (self.n_channels_, n_channels))
297  
298          per_channel_results = []
299          for ch in range(n_channels):
300              channel = X[:, ch].reshape(-1, 1)
301              windows = sliding_windows(channel, self.window_size, step=1)
302              windows = self._znorm_windows(windows)
303  
304              ch_window_scores = self._score_subsequences(
305                  windows,
306                  self.centroids_[ch])
307  
308              ts_scores, valid_mask = map_scores_to_timestamps(
309                  ch_window_scores, self.window_size, step=1,
310                  n_timestamps=n_timestamps, aggregation='max')
311              per_channel_results.append((ts_scores, valid_mask))
312  
313          if n_channels == 1:
314              scores, valid_mask = per_channel_results[0]
315          else:
316              filled_scores = []
317              combined_valid = per_channel_results[0][1].copy()
318              for ch_scores, ch_valid in per_channel_results:
319                  filled = ch_scores.copy()
320                  filled[~ch_valid] = 0.0
321                  filled_scores.append(filled)
322                  combined_valid &= ch_valid
323              scores = aggregate_channel_scores(
324                  filled_scores, method=self.channel_aggregation)
325              valid_mask = combined_valid
326  
327          scores[~valid_mask] = self.threshold_
328          return scores