ts_matrix_profile.py
1 # -*- coding: utf-8 -*- 2 """MatrixProfile: Time series anomaly detection using the STOMP algorithm. 3 4 Computes the Matrix Profile via the STOMP (Scalable Time-series Ordered 5 Matrix Profile) algorithm, which identifies anomalous subsequences by 6 measuring z-normalized Euclidean distance to the nearest non-trivial 7 match. 8 9 See :cite:`yeh2016matrix` for details. 10 11 Reference: 12 Yeh, C.C.M., Zhu, Y., Ulanova, L., Begum, N., Ding, Y., Dau, H.A., 13 Silva, D.F., Mueen, A. and Keogh, E., 2016. Matrix profile I: 14 All pairs similarity joins for time series: a unifying view that 15 includes motifs, discords and shapelets. In ICDM, pp. 1317-1322. 16 """ 17 # Author: Yue Zhao <yzhao062@gmail.com> 18 # License: BSD 2 clause 19 20 import numpy as np 21 from sklearn.utils.validation import check_is_fitted 22 23 from .base import BaseDetector 24 from ._ts_utils import (validate_ts_input, map_scores_to_timestamps, 25 aggregate_channel_scores) 26 27 28 def _compute_matrix_profile(T, m): 29 """Compute the Matrix Profile of a 1-D time series using STOMP. 30 31 Parameters 32 ---------- 33 T : np.ndarray of shape (n,) 34 Input time series (single channel). 35 m : int 36 Subsequence (window) length. 37 38 Returns 39 ------- 40 mp : np.ndarray of shape (n - m + 1,) 41 Matrix Profile values (nearest-neighbor z-normalized distances). 42 """ 43 n = len(T) 44 n_subseq = n - m + 1 45 exclusion_zone = m // 4 46 47 # --- Precompute rolling mean and std using cumulative sums --- 48 cumsum = np.cumsum(T) 49 cumsum2 = np.cumsum(T ** 2) 50 51 # sum of T[i:i+m] for each subsequence i 52 subseq_sum = np.empty(n_subseq) 53 subseq_sum[0] = cumsum[m - 1] 54 subseq_sum[1:] = cumsum[m:] - cumsum[:n - m] 55 56 subseq_sum2 = np.empty(n_subseq) 57 subseq_sum2[0] = cumsum2[m - 1] 58 subseq_sum2[1:] = cumsum2[m:] - cumsum2[:n - m] 59 60 mu = subseq_sum / m 61 sigma_sq = subseq_sum2 / m - mu ** 2 62 sigma_sq = np.maximum(sigma_sq, 0.0) # numerical stability 63 sigma = np.sqrt(sigma_sq) 64 65 # Mask for constant subsequences (std < 1e-10) 66 const_mask = sigma < 1e-10 67 68 # Initialize Matrix Profile with infinity 69 mp = np.full(n_subseq, np.inf) 70 71 # --- First column (j=0): compute distance profile using MASS (FFT) --- 72 # Pad to next power of 2 for FFT efficiency 73 fft_size = 1 74 while fft_size < 2 * n: 75 fft_size *= 2 76 77 T_fft = np.fft.rfft(T, n=fft_size) 78 79 # First query subsequence (reversed, then padded) 80 query = T[:m][::-1] 81 Q_fft = np.fft.rfft(query, n=fft_size) 82 83 # QT[i] = dot product of T[i:i+m] and T[0:m] 84 QT_full = np.fft.irfft(T_fft * Q_fft, n=fft_size) 85 QT = QT_full[m - 1:m - 1 + n_subseq].copy() 86 87 # Compute distance for j=0 88 _update_mp(mp, QT, mu, sigma, const_mask, m, 0, exclusion_zone, n_subseq) 89 90 # Keep a copy of QT for incremental updates 91 QT_prev = QT.copy() 92 93 # --- STOMP: incremental updates for j=1..n_subseq-1 --- 94 for j in range(1, n_subseq): 95 # Incremental QT update: 96 # QT_new[i] = QT_old[i-1] - T[i-1]*T[j-1] + T[i+m-1]*T[j+m-1] 97 QT_new = np.empty(n_subseq) 98 99 # QT_new[0] must be computed as a direct dot product 100 QT_new[0] = np.dot(T[:m], T[j:j + m]) 101 102 # Vectorized incremental update for i=1..n_subseq-1 103 i_indices = np.arange(1, n_subseq) 104 QT_new[1:] = (QT_prev[:-1] 105 - T[i_indices - 1] * T[j - 1] 106 + T[i_indices + m - 1] * T[j + m - 1]) 107 108 _update_mp(mp, QT_new, mu, sigma, const_mask, m, j, 109 exclusion_zone, n_subseq) 110 111 QT_prev = QT_new 112 113 return mp 114 115 116 def _update_mp(mp, QT, mu, sigma, const_mask, m, j, exclusion_zone, n_subseq): 117 """Update the Matrix Profile for column j. 118 119 Converts QT values to z-normalized distances and updates the 120 profile wherever the new distance is smaller. 121 122 Parameters 123 ---------- 124 mp : np.ndarray, modified in-place 125 QT : np.ndarray of shape (n_subseq,) 126 mu : np.ndarray of shape (n_subseq,) 127 sigma : np.ndarray of shape (n_subseq,) 128 const_mask : np.ndarray of bool 129 m : int 130 j : int, current column index 131 exclusion_zone : int 132 n_subseq : int 133 """ 134 # Compute z-normalized distance: 135 # d = sqrt(2*m*(1 - (QT - m*mu_i*mu_j) / (m*sigma_i*sigma_j))) 136 # where i ranges over all subsequences 137 138 # Denominator: m * sigma_i * sigma_j 139 denom = m * sigma * sigma[j] 140 141 # Numerator inside the (1 - ...) term 142 numerator = QT - m * mu * mu[j] 143 144 # Compute the argument of sqrt 145 # Avoid division by zero: where denom is 0 (constant subsequences), 146 # distance is inf 147 with np.errstate(divide='ignore', invalid='ignore'): 148 corr = np.where(denom > 0, numerator / denom, 0.0) 149 150 dist_sq = 2 * m * (1 - corr) 151 152 # Clip for numerical stability 153 dist_sq = np.maximum(dist_sq, 0.0) 154 dist = np.sqrt(dist_sq) 155 156 # Set distance to inf for constant subsequences 157 dist[const_mask] = np.inf 158 if const_mask[j]: 159 dist[:] = np.inf 160 161 # Apply exclusion zone: ignore indices where |i - j| <= exclusion_zone 162 ez_start = max(0, j - exclusion_zone) 163 ez_end = min(n_subseq, j + exclusion_zone + 1) 164 dist[ez_start:ez_end] = np.inf 165 166 # Update Matrix Profile where distance is smaller 167 mask = dist < mp 168 mp[mask] = dist[mask] 169 170 171 class MatrixProfile(BaseDetector): 172 """Matrix Profile time series anomaly detector using STOMP. 173 174 Computes the Matrix Profile via the STOMP algorithm, identifying 175 anomalous subsequences by measuring z-normalized Euclidean distance 176 to the nearest non-trivial neighbor. Subsequences with high 177 Matrix Profile values (discords) are anomalous. 178 179 This is a **transductive** detector: it only scores the training 180 data. ``decision_function``, ``predict``, ``predict_proba``, and 181 ``predict_confidence`` raise ``NotImplementedError`` when called 182 with new data. 183 184 Parameters 185 ---------- 186 window_size : int, optional (default=50) 187 Subsequence length for the Matrix Profile computation. 188 189 contamination : float in (0., 0.5), optional (default=0.1) 190 Expected proportion of outliers in the dataset. 191 192 channel_aggregation : str, optional (default='max') 193 How to aggregate per-channel scores for multivariate data. 194 One of 'max' or 'mean'. 195 196 Attributes 197 ---------- 198 decision_scores_ : numpy array of shape (n_timestamps,) 199 Outlier scores of the training data. Higher is more abnormal. 200 201 threshold_ : float 202 Score threshold based on ``contamination``. 203 204 labels_ : numpy array of shape (n_timestamps,) 205 Binary labels of training data (0: inlier, 1: outlier). 206 207 References 208 ---------- 209 Yeh, C.C.M., Zhu, Y., Ulanova, L., Begum, N., Ding, Y., Dau, H.A., 210 Silva, D.F., Mueen, A. and Keogh, E., 2016. Matrix profile I: 211 All pairs similarity joins for time series. In ICDM, pp. 1317-1322. 212 213 Examples 214 -------- 215 >>> from pyod.models.ts_matrix_profile import MatrixProfile 216 >>> import numpy as np 217 >>> X_train = np.random.randn(300) 218 >>> clf = MatrixProfile(window_size=20, contamination=0.1) 219 >>> clf.fit(X_train) 220 >>> scores = clf.decision_scores_ 221 """ 222 223 def __init__(self, window_size=50, contamination=0.1, 224 channel_aggregation='max'): 225 super(MatrixProfile, self).__init__(contamination=contamination) 226 self.window_size = window_size 227 self.channel_aggregation = channel_aggregation 228 229 def fit(self, X, y=None): 230 """Fit the Matrix Profile detector on time series data. 231 232 Parameters 233 ---------- 234 X : array-like of shape (n_timestamps,) or (n_timestamps, n_channels) 235 Training time series data. 236 237 y : Ignored 238 Not used, present for API consistency. 239 240 Returns 241 ------- 242 self : object 243 Fitted estimator. 244 """ 245 X = validate_ts_input(X) 246 n_timestamps, n_channels = X.shape 247 m = self.window_size 248 249 if n_timestamps < m + 1: 250 raise ValueError( 251 "Time series length %d is too short for window_size=%d. " 252 "Need at least %d timestamps." % (n_timestamps, m, m + 1)) 253 254 self._set_n_classes(y) 255 256 # Compute Matrix Profile per channel 257 per_channel_ts_scores = [] 258 for ch in range(n_channels): 259 ts = X[:, ch] 260 mp = _compute_matrix_profile(ts, m) 261 262 # Map subsequence-level MP scores back to timestamps 263 # step=1 since MP computes all subsequences 264 scores, valid_mask = map_scores_to_timestamps( 265 mp, m, step=1, n_timestamps=n_timestamps, 266 aggregation=self.channel_aggregation) 267 per_channel_ts_scores.append((scores, valid_mask)) 268 269 if n_channels == 1: 270 scores, valid_mask = per_channel_ts_scores[0] 271 else: 272 # Aggregate across channels 273 # First, fill NaN positions in each channel with 0 for aggregation 274 filled_scores = [] 275 combined_valid = per_channel_ts_scores[0][1].copy() 276 for ch_scores, ch_valid in per_channel_ts_scores: 277 filled = ch_scores.copy() 278 filled[~ch_valid] = 0.0 279 filled_scores.append(filled) 280 combined_valid &= ch_valid 281 scores = aggregate_channel_scores( 282 filled_scores, method=self.channel_aggregation) 283 valid_mask = combined_valid 284 285 # Masked-score workflow: compute threshold on valid scores only 286 valid_scores = scores[valid_mask] 287 self.decision_scores_ = valid_scores 288 self._process_decision_scores() 289 290 # Reconstruct full-length arrays 291 full_scores = scores.copy() 292 full_scores[~valid_mask] = self.threshold_ 293 full_labels = (full_scores > self.threshold_).astype(int) 294 self.decision_scores_ = full_scores 295 self.labels_ = full_labels 296 297 return self 298 299 def decision_function(self, X): 300 """Not supported (transductive detector). 301 302 Raises 303 ------ 304 NotImplementedError 305 """ 306 raise NotImplementedError( 307 "MatrixProfile is a transductive detector and does not support " 308 "decision_function on new data. Access decision_scores_ after " 309 "fit() for training scores.") 310 311 def predict(self, X, return_confidence=False): 312 """Not supported (transductive detector). 313 314 Raises 315 ------ 316 NotImplementedError 317 """ 318 raise NotImplementedError( 319 "MatrixProfile is a transductive detector and does not support " 320 "predict on new data. Access labels_ after fit() for training " 321 "labels.") 322 323 def predict_proba(self, X, method='linear', return_confidence=False): 324 """Not supported (transductive detector). 325 326 Raises 327 ------ 328 NotImplementedError 329 """ 330 raise NotImplementedError( 331 "MatrixProfile is a transductive detector and does not support " 332 "predict_proba on new data.") 333 334 def predict_confidence(self, X): 335 """Not supported (transductive detector). 336 337 Raises 338 ------ 339 NotImplementedError 340 """ 341 raise NotImplementedError( 342 "MatrixProfile is a transductive detector and does not support " 343 "predict_confidence on new data.")