2026-04-12-v3-agentic-implementation.md
1 # V3 Agentic Session API Implementation Plan 2 3 > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. 4 5 **Goal:** Add session-based workflow engine to ADEngine: `start → plan → run → analyze → iterate → report`, with multi-detector consensus, quality assessment, and actionable iteration. 6 7 **Architecture:** New `InvestigationState` dataclass in `pyod/utils/investigation.py`. Session methods added to existing `ADEngine` class in `pyod/utils/ad_engine.py`. All existing methods unchanged. Session methods wrap existing helpers (`plan_detection`, `run_detection`, `analyze_results`, `generate_report`). 8 9 **Tech Stack:** Python dataclasses, numpy, scipy.stats (spearmanr, rankdata — both already dependencies). 10 11 **Spec:** `docs/superpowers/specs/2026-04-12-v3-agentic-design.md` 12 13 --- 14 15 ## File Structure 16 17 ### New files 18 19 | File | Responsibility | 20 |------|---------------| 21 | `pyod/utils/investigation.py` | `InvestigationState` dataclass, `PHASES`, `ACTION_TYPES` enums, helper constructors | 22 | `pyod/test/test_ad_engine_v3.py` | Tests for session workflow methods | 23 24 ### Modified files 25 26 | File | Change | 27 |------|--------| 28 | `pyod/utils/ad_engine.py` | Add 7 session methods: `start`, `plan`, `run`, `analyze`, `iterate`, `report`, `investigate` | 29 | `skills/od-expert/SKILL.md` | Update with V3 session workflow instructions | 30 | `CHANGES.txt` | Add V3 entry | 31 32 --- 33 34 ## Dependency graph 35 36 ``` 37 Task 1 (InvestigationState) → Task 2 (start + plan) → Task 3 (run + consensus) 38 → Task 4 (analyze + quality) → Task 5 (iterate) → Task 6 (report + investigate) 39 → Task 7 (CHANGES.txt) → Task 8 (od-expert skill) 40 ``` 41 42 All tasks are sequential — each builds on the previous. 43 44 --- 45 46 ### Task 1: InvestigationState dataclass 47 48 **Files:** 49 - Create: `pyod/utils/investigation.py` 50 51 - [ ] **Step 1: Create `investigation.py`** 52 53 ```python 54 # pyod/utils/investigation.py 55 # -*- coding: utf-8 -*- 56 """Investigation state for ADEngine session workflow.""" 57 58 import time 59 from dataclasses import dataclass, field 60 61 PHASES = ('profiled', 'planned', 'detected', 'analyzed') 62 63 ACTION_TYPES = ( 64 'plan', 65 'run', 66 'analyze', 67 'report_to_user', 68 'confirm_with_user', 69 'iterate', 70 'done', 71 ) 72 73 74 @dataclass 75 class InvestigationState: 76 """Typed state object for an ADEngine investigation session. 77 78 Tracks the full workflow: profiling, planning, detection, 79 analysis, and iteration. Each session method updates the state 80 and sets ``next_action`` to guide the agent. 81 82 Attributes 83 ---------- 84 phase : str 85 One of ``PHASES``: 'profiled', 'planned', 'detected', 'analyzed'. 86 iteration : int 87 Current iteration (0 = first run). 88 history : list 89 List of HistoryEntry dicts. 90 data : object 91 Reference to input data (not copied). 92 profile : dict 93 Output of ``profile_data()``. 94 plans : list 95 List of DetectionPlan dicts (top-N). 96 results : list 97 List of DetectorResult dicts. 98 consensus : dict or None 99 ConsensusResult dict. 100 analysis : dict or None 101 InvestigationAnalysis dict. 102 quality : dict or None 103 QualityAssessment dict. 104 next_action : dict 105 NextAction dict guiding the agent. 106 """ 107 phase: str 108 iteration: int = 0 109 history: list = field(default_factory=list) 110 data: object = None 111 profile: dict = field(default_factory=dict) 112 plans: list = field(default_factory=list) 113 results: list = field(default_factory=list) 114 consensus: dict = None 115 analysis: dict = None 116 quality: dict = None 117 next_action: dict = field(default_factory=dict) 118 119 120 def _make_history_entry(phase, action, iteration, detail=''): 121 """Create a HistoryEntry dict.""" 122 return { 123 'phase': phase, 124 'action': action, 125 'iteration': iteration, 126 'timestamp': time.time(), 127 'detail': detail, 128 } 129 ``` 130 131 - [ ] **Step 2: Verify import works** 132 133 Run: `python -c "from pyod.utils.investigation import InvestigationState, PHASES, ACTION_TYPES; print('OK', PHASES)"` 134 Expected: `OK ('profiled', 'planned', 'detected', 'analyzed')` 135 136 - [ ] **Step 3: Commit** 137 138 ```bash 139 git add pyod/utils/investigation.py 140 git commit -m "feat: add InvestigationState dataclass for V3 session workflow" 141 ``` 142 143 --- 144 145 ### Task 2: start() and plan() session methods 146 147 **Files:** 148 - Modify: `pyod/utils/ad_engine.py` 149 - Create: `pyod/test/test_ad_engine_v3.py` (start with first tests) 150 151 - [ ] **Step 1: Write failing tests for start() and plan()** 152 153 Create `pyod/test/test_ad_engine_v3.py`: 154 155 ```python 156 # -*- coding: utf-8 -*- 157 """Tests for ADEngine V3 session workflow.""" 158 159 import os 160 import sys 161 import unittest 162 163 import numpy as np 164 165 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) 166 167 from pyod.utils.ad_engine import ADEngine 168 from pyod.utils.investigation import InvestigationState, PHASES 169 170 171 class TestSessionStartPlan(unittest.TestCase): 172 def setUp(self): 173 self.engine = ADEngine() 174 self.X = np.random.RandomState(42).randn(200, 10) 175 176 def test_start_returns_state(self): 177 state = self.engine.start(self.X) 178 assert isinstance(state, InvestigationState) 179 assert state.phase == 'profiled' 180 assert state.profile['data_type'] == 'tabular' 181 assert state.profile['n_samples'] == 200 182 assert state.next_action['action'] == 'plan' 183 184 def test_start_with_data_type(self): 185 state = self.engine.start(self.X, data_type='time_series') 186 assert state.profile['data_type'] == 'time_series' 187 188 def test_plan_returns_state(self): 189 state = self.engine.start(self.X) 190 state = self.engine.plan(state) 191 assert state.phase == 'planned' 192 assert len(state.plans) >= 1 193 assert len(state.plans) <= 3 194 assert state.next_action['action'] == 'run' 195 196 def test_plan_has_detector_names(self): 197 state = self.engine.start(self.X) 198 state = self.engine.plan(state) 199 for p in state.plans: 200 assert 'detector_name' in p 201 assert len(p['detector_name']) > 0 202 203 def test_plan_with_exclude(self): 204 state = self.engine.start(self.X) 205 state = self.engine.plan( 206 state, constraints={'exclude_detectors': ['IForest']}) 207 names = [p['detector_name'] for p in state.plans] 208 assert 'IForest' not in names 209 210 def test_plan_max_detectors_1(self): 211 state = self.engine.start(self.X) 212 state = self.engine.plan( 213 state, constraints={'max_detectors': 1}) 214 assert len(state.plans) == 1 215 216 def test_plan_max_detectors_2(self): 217 state = self.engine.start(self.X) 218 state = self.engine.plan( 219 state, constraints={'max_detectors': 2}) 220 assert len(state.plans) <= 2 221 222 def test_history_tracking(self): 223 state = self.engine.start(self.X) 224 assert len(state.history) == 1 225 assert state.history[0]['action'] == 'start' 226 state = self.engine.plan(state) 227 assert len(state.history) == 2 228 assert state.history[1]['action'] == 'plan' 229 230 231 if __name__ == '__main__': 232 unittest.main() 233 ``` 234 235 - [ ] **Step 2: Run tests to verify they fail** 236 237 Run: `python -m pytest pyod/test/test_ad_engine_v3.py -v` 238 Expected: FAIL — `ADEngine has no attribute 'start'` 239 240 - [ ] **Step 3: Implement start() and plan()** 241 242 Add to `pyod/utils/ad_engine.py`, at the end of the class (before the knowledge query section, around line 830): 243 244 ```python 245 # ------------------------------------------------------------------ 246 # V3 Session workflow 247 # ------------------------------------------------------------------ 248 249 def start(self, X, data_type=None): 250 """Start an investigation session. 251 252 Profiles the data and returns an InvestigationState. 253 254 Parameters 255 ---------- 256 X : array-like, Data, list, or dict 257 Input data (any modality). 258 data_type : str or None 259 Explicit type override. 260 261 Returns 262 ------- 263 state : InvestigationState 264 """ 265 from .investigation import InvestigationState, _make_history_entry 266 267 profile = self.profile_data(X, data_type=data_type) 268 state = InvestigationState( 269 phase='profiled', 270 data=X, 271 profile=profile, 272 next_action={ 273 'action': 'plan', 274 'reason': 'Data profiled as %s with %d samples. ' 275 'Ready to select detectors.' 276 % (profile['data_type'], 277 profile.get('n_samples', 0)), 278 }, 279 ) 280 state.history.append(_make_history_entry( 281 'profiled', 'start', 0, 282 'Profiled %s data' % profile['data_type'])) 283 return state 284 285 def plan(self, state, priority='balanced', constraints=None): 286 """Plan detection: select top-N detectors. 287 288 Wraps ``plan_detection()`` and extracts primary + alternatives 289 into ``state.plans`` (up to 3 detectors, v1 limit). 290 291 Parameters 292 ---------- 293 state : InvestigationState 294 priority : str 295 constraints : dict or None 296 297 Returns 298 ------- 299 state : InvestigationState 300 """ 301 from .investigation import _make_history_entry 302 303 constraints = constraints or {} 304 result = self.plan_detection( 305 state.profile, priority=priority, constraints=constraints) 306 307 # Extract primary + alternatives into flat list 308 plans = [] 309 if result.get('detector_name'): 310 plans.append(result) 311 for alt in result.get('alternatives', []): 312 if alt.get('detector_name'): 313 plans.append(alt) 314 315 # Honor max_detectors (v1 cap at 3) 316 max_det = max(1, min( 317 int(constraints.get('max_detectors', 3)), 3)) 318 state.plans = plans[:max_det] 319 state.phase = 'planned' 320 names = [p['detector_name'] for p in state.plans] 321 state.next_action = { 322 'action': 'run', 323 'reason': 'Top %d detectors selected: %s. Ready to run.' 324 % (len(state.plans), ', '.join(names)), 325 } 326 state.history.append(_make_history_entry( 327 'planned', 'plan', state.iteration, 328 'Selected %d detectors: %s' % (len(plans), ', '.join(names)))) 329 return state 330 ``` 331 332 - [ ] **Step 4: Run tests to verify they pass** 333 334 Run: `python -m pytest pyod/test/test_ad_engine_v3.py -v` 335 Expected: 8 tests PASS 336 337 - [ ] **Step 5: Commit** 338 339 ```bash 340 git add pyod/utils/ad_engine.py pyod/test/test_ad_engine_v3.py 341 git commit -m "feat: add start() and plan() session methods to ADEngine" 342 ``` 343 344 --- 345 346 ### Task 3: run() method — multi-detector execution + consensus 347 348 **Files:** 349 - Modify: `pyod/utils/ad_engine.py` 350 - Modify: `pyod/test/test_ad_engine_v3.py` 351 352 - [ ] **Step 1: Write failing tests for run()** 353 354 Add to `pyod/test/test_ad_engine_v3.py`: 355 356 ```python 357 class TestSessionRun(unittest.TestCase): 358 def setUp(self): 359 self.engine = ADEngine() 360 self.X = np.random.RandomState(42).randn(200, 10) 361 362 def test_run_returns_state(self): 363 state = self.engine.start(self.X) 364 state = self.engine.plan(state) 365 state = self.engine.run(state) 366 assert state.phase == 'detected' 367 assert len(state.results) > 0 368 assert state.consensus is not None 369 assert state.next_action['action'] == 'analyze' 370 371 def test_results_have_scores(self): 372 state = self.engine.start(self.X) 373 state = self.engine.plan(state) 374 state = self.engine.run(state) 375 for r in state.results: 376 if r['status'] == 'success': 377 assert r['scores_train'] is not None 378 assert len(r['scores_train']) == 200 379 380 def test_consensus_scores(self): 381 state = self.engine.start(self.X) 382 state = self.engine.plan(state) 383 state = self.engine.run(state) 384 assert len(state.consensus['scores']) == 200 385 assert len(state.consensus['labels']) == 200 386 assert 0 <= state.consensus['agreement'] <= 1 387 388 def test_consensus_single_detector(self): 389 state = self.engine.start(self.X) 390 state = self.engine.plan( 391 state, constraints={'exclude_detectors': [ 392 'ECOD', 'KNN', 'HBOS', 'LOF', 'COPOD', 'CBLOF', 393 'PCA', 'INNE']}) 394 state = self.engine.run(state) 395 # Single detector: agreement = 0.5 396 assert state.consensus['agreement'] == 0.5 397 ``` 398 399 - [ ] **Step 2: Run tests to verify they fail** 400 401 Run: `python -m pytest pyod/test/test_ad_engine_v3.py::TestSessionRun -v` 402 Expected: FAIL — `ADEngine has no attribute 'run'` 403 404 - [ ] **Step 3: Implement run()** 405 406 Add to `pyod/utils/ad_engine.py` after `plan()`: 407 408 ```python 409 def run(self, state): 410 """Run detection with all planned detectors. 411 412 Wraps ``run_detection()`` per plan. Computes consensus via 413 rank normalization and majority vote. Records errors per 414 detector without stopping. 415 416 Parameters 417 ---------- 418 state : InvestigationState 419 420 Returns 421 ------- 422 state : InvestigationState 423 """ 424 from .investigation import _make_history_entry 425 from scipy.stats import rankdata, spearmanr 426 427 results = [] 428 for plan in state.plans: 429 try: 430 raw = self.run_detection(state.data, plan) 431 entry = dict(raw) 432 entry['detector_name'] = plan['detector_name'] 433 entry['status'] = 'success' 434 entry['error'] = None 435 results.append(entry) 436 except Exception as e: 437 results.append({ 438 'detector_name': plan['detector_name'], 439 'status': 'error', 440 'error': str(e), 441 'plan': plan, 442 }) 443 444 state.results = results 445 state.phase = 'detected' 446 447 # Compute consensus from successful detectors 448 successful = [r for r in results if r['status'] == 'success'] 449 450 if len(successful) == 0: 451 state.consensus = None 452 state.next_action = { 453 'action': 'confirm_with_user', 454 'reason': 'All %d detectors failed. Check data format ' 455 'or try a different detector family.' 456 % len(results), 457 } 458 elif len(successful) == 1: 459 r = successful[0] 460 state.consensus = { 461 'scores': r['scores_train'], 462 'labels': r['labels_train'], 463 'n_detectors': 1, 464 'agreement': 0.5, 465 'disagreements': [], 466 } 467 state.next_action = { 468 'action': 'analyze', 469 'reason': 'Detection complete (1 detector).', 470 } 471 else: 472 n_samples = len(successful[0]['scores_train']) 473 # Rank-normalize scores per detector 474 rank_scores = np.array([ 475 rankdata(r['scores_train']) / n_samples 476 for r in successful 477 ]) 478 consensus_scores = np.mean(rank_scores, axis=0) 479 480 # Majority-vote labels 481 all_labels = np.array([ 482 r['labels_train'] for r in successful]) 483 vote_count = np.sum(all_labels, axis=0) 484 consensus_labels = ( 485 vote_count > len(successful) / 2).astype(int) 486 487 # Pairwise Spearman agreement 488 correlations = [] 489 for i in range(len(successful)): 490 for j in range(i + 1, len(successful)): 491 rho, _ = spearmanr( 492 successful[i]['scores_train'], 493 successful[j]['scores_train']) 494 correlations.append( 495 max(0.0, rho) if np.isfinite(rho) else 0.0) 496 agreement = float(np.mean(correlations)) if correlations else 0.5 497 498 # Disagreements: indices where detectors disagree 499 disagreements = [] 500 for idx in range(n_samples): 501 votes = all_labels[:, idx] 502 if not (votes.all() or not votes.any()): 503 disagreements.append(int(idx)) 504 505 state.consensus = { 506 'scores': consensus_scores, 507 'labels': consensus_labels, 508 'n_detectors': len(successful), 509 'agreement': agreement, 510 'disagreements': disagreements, 511 } 512 state.next_action = { 513 'action': 'analyze', 514 'reason': 'Detection complete (%d detectors, ' 515 'agreement=%.2f).' 516 % (len(successful), agreement), 517 } 518 519 state.history.append(_make_history_entry( 520 'detected', 'run', state.iteration, 521 '%d/%d detectors succeeded' 522 % (len(successful), len(results)))) 523 return state 524 ``` 525 526 - [ ] **Step 4: Run tests to verify they pass** 527 528 Run: `python -m pytest pyod/test/test_ad_engine_v3.py -v` 529 Expected: 12 tests PASS 530 531 - [ ] **Step 5: Commit** 532 533 ```bash 534 git add pyod/utils/ad_engine.py pyod/test/test_ad_engine_v3.py 535 git commit -m "feat: add run() session method with multi-detector consensus" 536 ``` 537 538 --- 539 540 ### Task 4: analyze() method — quality assessment 541 542 **Files:** 543 - Modify: `pyod/utils/ad_engine.py` 544 - Modify: `pyod/test/test_ad_engine_v3.py` 545 546 - [ ] **Step 1: Write failing tests for analyze()** 547 548 Add to `pyod/test/test_ad_engine_v3.py`: 549 550 ```python 551 class TestSessionAnalyze(unittest.TestCase): 552 def setUp(self): 553 self.engine = ADEngine() 554 self.X = np.random.RandomState(42).randn(200, 10) 555 556 def _run_to_detected(self): 557 state = self.engine.start(self.X) 558 state = self.engine.plan(state) 559 state = self.engine.run(state) 560 return state 561 562 def test_analyze_returns_state(self): 563 state = self._run_to_detected() 564 state = self.engine.analyze(state) 565 assert state.phase == 'analyzed' 566 assert state.analysis is not None 567 assert state.quality is not None 568 569 def test_quality_metrics(self): 570 state = self._run_to_detected() 571 state = self.engine.analyze(state) 572 q = state.quality 573 assert 0 <= q['separation'] <= 1 574 assert 0 <= q['agreement'] <= 1 575 assert 0 <= q['stability'] <= 1 576 assert 0 <= q['overall'] <= 1 577 assert q['verdict'] in ('high', 'medium', 'low') 578 assert len(q['explanation']) > 0 579 580 def test_analysis_has_best_detector(self): 581 state = self._run_to_detected() 582 state = self.engine.analyze(state) 583 a = state.analysis 584 assert 'best_detector' in a 585 assert 'best_detector_index' in a 586 assert 'consensus_analysis' in a 587 assert 'per_detector_analysis' in a 588 assert 'summary' in a 589 590 def test_per_detector_aligned_with_results(self): 591 state = self._run_to_detected() 592 state = self.engine.analyze(state) 593 assert len(state.analysis['per_detector_analysis']) == len(state.results) 594 595 def test_next_action_after_analyze(self): 596 state = self._run_to_detected() 597 state = self.engine.analyze(state) 598 assert state.next_action['action'] in ( 599 'report_to_user', 'iterate') 600 601 def test_quality_separation_edge_case(self): 602 """All same label → separation = 0.""" 603 # Use very low contamination so likely all labeled 0 604 state = self.engine.start(self.X) 605 state = self.engine.plan(state) 606 state = self.engine.run(state) 607 # Force all labels to 0 for test 608 state.consensus['labels'] = np.zeros(200, dtype=int) 609 state = self.engine.analyze(state) 610 assert state.quality['separation'] == 0.0 611 ``` 612 613 - [ ] **Step 2: Run tests to verify they fail** 614 615 Run: `python -m pytest pyod/test/test_ad_engine_v3.py::TestSessionAnalyze -v` 616 Expected: FAIL 617 618 - [ ] **Step 3: Implement analyze()** 619 620 Add to `pyod/utils/ad_engine.py` after `run()`: 621 622 ```python 623 def analyze(self, state): 624 """Analyze detection results with quality assessment. 625 626 Computes per-detector analysis, consensus analysis, quality 627 metrics (separation, agreement, stability), and selects 628 the best detector. 629 630 Parameters 631 ---------- 632 state : InvestigationState 633 634 Returns 635 ------- 636 state : InvestigationState 637 """ 638 from .investigation import _make_history_entry 639 from scipy.stats import spearmanr 640 641 state.phase = 'analyzed' 642 643 # All-error path 644 successful = [r for r in state.results 645 if r['status'] == 'success'] 646 if not successful: 647 state.analysis = None 648 state.quality = { 649 'separation': 0.0, 'agreement': 0.0, 650 'stability': 0.0, 'overall': 0.0, 651 'verdict': 'low', 652 'explanation': 'All detectors failed.', 653 } 654 state.next_action = { 655 'action': 'confirm_with_user', 656 'reason': 'All detectors failed. Check data format ' 657 'or try a different detector family.', 658 } 659 state.history.append(_make_history_entry( 660 'analyzed', 'analyze', state.iteration, 661 'All detectors failed')) 662 return state 663 664 # Per-detector analysis (aligned with state.results) 665 per_det = [] 666 for r in state.results: 667 if r['status'] == 'success': 668 try: 669 a = self.analyze_results(r, X=state.data) 670 except Exception: 671 a = None 672 per_det.append(a) 673 else: 674 per_det.append(None) 675 676 # Consensus analysis (lightweight, not via analyze_results) 677 c = state.consensus 678 c_scores = c['scores'] 679 c_labels = c['labels'] 680 n_anomalies = int(c_labels.sum()) 681 n_samples = len(c_labels) 682 top_k = min(10, n_samples) 683 top_indices = np.argsort(c_scores)[::-1][:top_k] 684 consensus_analysis = { 685 'n_anomalies': n_anomalies, 686 'anomaly_ratio': n_anomalies / max(n_samples, 1), 687 'score_distribution': { 688 'mean': float(np.mean(c_scores)), 689 'std': float(np.std(c_scores)), 690 'min': float(np.min(c_scores)), 691 'max': float(np.max(c_scores)), 692 'median': float(np.median(c_scores)), 693 'q25': float(np.percentile(c_scores, 25)), 694 'q75': float(np.percentile(c_scores, 75)), 695 }, 696 'top_anomalies': [ 697 {'index': int(i), 'score': float(c_scores[i])} 698 for i in top_indices], 699 'summary': '%d anomalies detected out of %d samples ' 700 '(%.1f%%) by consensus of %d detectors.' 701 % (n_anomalies, n_samples, 702 100 * n_anomalies / max(n_samples, 1), 703 c['n_detectors']), 704 } 705 706 # Best detector selection 707 best_idx = self._select_best_detector( 708 state.results, c_scores) 709 710 state.analysis = { 711 'consensus_analysis': consensus_analysis, 712 'per_detector_analysis': per_det, 713 'best_detector': state.results[best_idx]['detector_name'], 714 'best_detector_index': best_idx, 715 'summary': consensus_analysis['summary'], 716 } 717 718 # Quality metrics 719 state.quality = self._compute_quality( 720 c_scores, c_labels, state.results, c) 721 state.analysis['summary'] += ( 722 ' Quality: %s (%.2f).' 723 % (state.quality['verdict'], state.quality['overall'])) 724 725 # Next action based on quality 726 if state.quality['overall'] >= 0.4: 727 state.next_action = { 728 'action': 'report_to_user', 729 'reason': 'Results ready (quality=%s, %.2f).' 730 % (state.quality['verdict'], 731 state.quality['overall']), 732 'summary': state.analysis['summary'], 733 'confidence': state.quality['overall'], 734 } 735 else: 736 state.next_action = { 737 'action': 'iterate', 738 'reason': 'Low result quality (%.2f). Consider ' 739 'trying different detectors.' 740 % state.quality['overall'], 741 'suggestion': 'Exclude lowest-agreement detector ' 742 'and re-run.', 743 } 744 745 state.history.append(_make_history_entry( 746 'analyzed', 'analyze', state.iteration, 747 'Quality: %s (%.2f)' % ( 748 state.quality['verdict'], 749 state.quality['overall']))) 750 return state 751 752 def _select_best_detector(self, results, consensus_scores): 753 """Select best detector via Spearman with consensus. 754 755 Fallback chain (per spec): 756 1. Highest finite Spearman correlation 757 2. If tied: highest plan confidence 758 3. If still tied: fastest runtime 759 4. If ALL correlations are NaN: first successful detector 760 """ 761 from scipy.stats import spearmanr 762 763 successful = [ 764 (i, r) for i, r in enumerate(results) 765 if r['status'] == 'success'] 766 if len(successful) == 1: 767 return successful[0][0] 768 769 # Compute Spearman for each successful detector 770 rhos = [] 771 for i, r in successful: 772 rho, _ = spearmanr(r['scores_train'], consensus_scores) 773 rhos.append(float(rho) if np.isfinite(rho) else None) 774 775 # If ALL NaN: return first successful (spec rule 4) 776 if all(rho is None for rho in rhos): 777 return successful[0][0] 778 779 # Find best by finite Spearman, then tie-break 780 best_j = 0 # index into successful list 781 best_rho = -1.0 782 for j, (i, r) in enumerate(successful): 783 rho = rhos[j] 784 if rho is None: 785 continue 786 if rho > best_rho: 787 best_rho = rho 788 best_j = j 789 elif rho == best_rho: 790 # Tie-break: plan confidence 791 curr_conf = r.get('plan', {}).get('confidence', 0) 792 prev_conf = successful[best_j][1].get( 793 'plan', {}).get('confidence', 0) 794 if curr_conf > prev_conf: 795 best_j = j 796 elif curr_conf == prev_conf: 797 # Tie-break: fastest 798 if r.get('runtime_seconds', 999) < successful[ 799 best_j][1].get('runtime_seconds', 999): 800 best_j = j 801 return successful[best_j][0] 802 803 def _compute_quality(self, scores, labels, results, consensus): 804 """Compute quality metrics: separation, agreement, stability.""" 805 # Separation 806 if labels.sum() == 0 or labels.sum() == len(labels): 807 separation = 0.0 808 else: 809 anomaly_mean = float(np.mean(scores[labels == 1])) 810 inlier_mean = float(np.mean(scores[labels == 0])) 811 separation = float(np.clip( 812 anomaly_mean / (inlier_mean + 1e-10) - 1, 0, 1)) 813 814 # Agreement (from consensus) 815 agreement = float(consensus.get('agreement', 0.5)) 816 817 # Stability: Jaccard of top-k under +/-20% perturbation 818 n_anomalies = int(labels.sum()) 819 n_samples = len(labels) 820 if n_anomalies == 0: 821 stability = 0.0 822 else: 823 k = n_anomalies 824 k_low = max(1, int(k * 0.8)) 825 k_high = min(n_samples, int(k * 1.2)) 826 sorted_idx = np.argsort(scores)[::-1] 827 top_k = set(sorted_idx[:k].tolist()) 828 top_low = set(sorted_idx[:k_low].tolist()) 829 top_high = set(sorted_idx[:k_high].tolist()) 830 831 def _jaccard(a, b): 832 if not a and not b: 833 return 1.0 834 return len(a & b) / len(a | b) 835 836 stability = 0.5 * ( 837 _jaccard(top_k, top_low) 838 + _jaccard(top_k, top_high)) 839 840 overall = float(np.mean([separation, agreement, stability])) 841 if overall >= 0.7: 842 verdict = 'high' 843 elif overall >= 0.4: 844 verdict = 'medium' 845 else: 846 verdict = 'low' 847 848 return { 849 'separation': separation, 850 'agreement': agreement, 851 'stability': stability, 852 'overall': overall, 853 'verdict': verdict, 854 'explanation': 'Separation=%.2f, agreement=%.2f, ' 855 'stability=%.2f.' % ( 856 separation, agreement, stability), 857 } 858 ``` 859 860 - [ ] **Step 4: Run tests to verify they pass** 861 862 Run: `python -m pytest pyod/test/test_ad_engine_v3.py -v` 863 Expected: 18 tests PASS 864 865 - [ ] **Step 5: Commit** 866 867 ```bash 868 git add pyod/utils/ad_engine.py pyod/test/test_ad_engine_v3.py 869 git commit -m "feat: add analyze() with quality metrics and best-detector selection" 870 ``` 871 872 --- 873 874 ### Task 5: iterate() method — feedback handling 875 876 **Files:** 877 - Modify: `pyod/utils/ad_engine.py` 878 - Modify: `pyod/test/test_ad_engine_v3.py` 879 880 - [ ] **Step 1: Write failing tests for iterate()** 881 882 Add to `pyod/test/test_ad_engine_v3.py`: 883 884 ```python 885 class TestSessionIterate(unittest.TestCase): 886 def setUp(self): 887 self.engine = ADEngine() 888 self.X = np.random.RandomState(42).randn(200, 10) 889 890 def _run_to_analyzed(self): 891 state = self.engine.start(self.X) 892 state = self.engine.plan(state) 893 state = self.engine.run(state) 894 state = self.engine.analyze(state) 895 return state 896 897 def test_structured_adjust_contamination(self): 898 state = self._run_to_analyzed() 899 state = self.engine.iterate( 900 state, {'action': 'adjust_contamination', 'value': 0.05}) 901 assert state.phase == 'planned' 902 assert state.iteration == 1 903 assert state.next_action['action'] == 'run' 904 905 def test_structured_exclude(self): 906 state = self._run_to_analyzed() 907 excluded = state.plans[0]['detector_name'] 908 state = self.engine.iterate( 909 state, {'action': 'exclude', 'detectors': [excluded]}) 910 names = [p['detector_name'] for p in state.plans] 911 assert excluded not in names 912 913 def test_structured_rerun(self): 914 state = self._run_to_analyzed() 915 old_plans = [p['detector_name'] for p in state.plans] 916 state = self.engine.iterate(state, {'action': 'rerun'}) 917 new_plans = [p['detector_name'] for p in state.plans] 918 assert old_plans == new_plans 919 assert state.phase == 'planned' 920 921 def test_nl_high_confidence(self): 922 state = self._run_to_analyzed() 923 state = self.engine.iterate( 924 state, 'try without IForest') 925 # Should either execute or ask confirmation 926 assert state.next_action['action'] in ('run', 'confirm_with_user') 927 928 def test_nl_low_confidence(self): 929 state = self._run_to_analyzed() 930 state = self.engine.iterate( 931 state, 'hmm something seems off') 932 # Ambiguous → confirm 933 assert state.next_action['action'] == 'confirm_with_user' 934 935 def test_iteration_counter(self): 936 state = self._run_to_analyzed() 937 assert state.iteration == 0 938 state = self.engine.iterate(state, {'action': 'rerun'}) 939 assert state.iteration == 1 940 ``` 941 942 - [ ] **Step 2: Run tests to verify they fail** 943 944 Run: `python -m pytest pyod/test/test_ad_engine_v3.py::TestSessionIterate -v` 945 Expected: FAIL 946 947 - [ ] **Step 3: Implement iterate()** 948 949 Add to `pyod/utils/ad_engine.py` after `_compute_quality()`: 950 951 ```python 952 def iterate(self, state, feedback): 953 """Iterate based on feedback. 954 955 Structured dicts execute immediately. NL strings are 956 parsed with confidence; ambiguous feedback triggers 957 ``'confirm_with_user'``. 958 959 Parameters 960 ---------- 961 state : InvestigationState 962 feedback : str or dict 963 964 Returns 965 ------- 966 state : InvestigationState 967 """ 968 from .investigation import _make_history_entry 969 970 if isinstance(feedback, dict): 971 return self._iterate_structured(state, feedback) 972 return self._iterate_nl(state, str(feedback)) 973 974 def _iterate_structured(self, state, feedback): 975 """Handle structured feedback dict.""" 976 from .investigation import _make_history_entry 977 978 action = feedback.get('action', '') 979 state.iteration += 1 980 981 if action == 'adjust_contamination': 982 value = feedback['value'] 983 for p in state.plans: 984 params = dict(p.get('params', {})) 985 params['contamination'] = value 986 p['params'] = params 987 detail = 'Adjusted contamination to %.3f' % value 988 989 elif action == 'exclude': 990 to_exclude = set(feedback.get('detectors', [])) 991 state.plans = [ 992 p for p in state.plans 993 if p['detector_name'] not in to_exclude] 994 if not state.plans: 995 # Re-plan without excluded detectors 996 result = self.plan_detection( 997 state.profile, 998 constraints={'exclude_detectors': list(to_exclude)}) 999 state.plans = [result] 1000 for alt in result.get('alternatives', []): 1001 if alt.get('detector_name'): 1002 state.plans.append(alt) 1003 detail = 'Excluded: %s' % ', '.join(to_exclude) 1004 1005 elif action == 'include': 1006 to_include = feedback.get('detectors', []) 1007 existing = {p['detector_name'] for p in state.plans} 1008 for name in to_include: 1009 if name not in existing: 1010 algo = self.kb.get_algorithm(name) 1011 if algo and algo.get('status') in ( 1012 'shipped', 'experimental'): 1013 state.plans.append(self._make_plan( 1014 detector_name=name, params={}, 1015 reason='Added by user', confidence=0.5)) 1016 detail = 'Included: %s' % ', '.join(to_include) 1017 1018 elif action == 'rerun': 1019 detail = 'Re-running same plan' 1020 1021 else: 1022 state.next_action = { 1023 'action': 'confirm_with_user', 1024 'reason': 'Unknown action: %s' % action, 1025 } 1026 return state 1027 1028 state.phase = 'planned' 1029 state.results = [] 1030 state.consensus = None 1031 state.analysis = None 1032 state.quality = None 1033 state.next_action = { 1034 'action': 'run', 1035 'reason': 'Plan adjusted. ' + detail, 1036 'adjustment': detail, 1037 } 1038 state.history.append(_make_history_entry( 1039 'planned', 'iterate', state.iteration, detail)) 1040 return state 1041 1042 def _iterate_nl(self, state, feedback): 1043 """Parse NL feedback into structured action.""" 1044 from .investigation import _make_history_entry 1045 1046 lower = feedback.lower() 1047 proposed = None 1048 confidence = 0.0 1049 1050 # High-confidence patterns 1051 if 'without' in lower or 'exclude' in lower: 1052 # Try to extract detector name 1053 for r in state.results: 1054 name = r.get('detector_name', '') 1055 if name.lower() in lower: 1056 proposed = {'action': 'exclude', 1057 'detectors': [name]} 1058 confidence = 0.9 1059 break 1060 if proposed is None and ('without' in lower 1061 or 'exclude' in lower): 1062 proposed = {'action': 'exclude', 'detectors': []} 1063 confidence = 0.3 1064 1065 elif ('false positive' in lower or 'too many' in lower): 1066 current = state.plans[0].get('params', {}).get( 1067 'contamination', 0.1) if state.plans else 0.1 1068 proposed = {'action': 'adjust_contamination', 1069 'value': max(current * 0.5, 0.01)} 1070 confidence = 0.7 1071 1072 elif ('missed' in lower or 'false negative' in lower): 1073 current = state.plans[0].get('params', {}).get( 1074 'contamination', 0.1) if state.plans else 0.1 1075 proposed = {'action': 'adjust_contamination', 1076 'value': min(current * 1.5, 0.5)} 1077 confidence = 0.7 1078 1079 elif 'rerun' in lower or 'again' in lower: 1080 proposed = {'action': 'rerun'} 1081 confidence = 0.9 1082 1083 if proposed is None: 1084 proposed = {'action': 'rerun'} 1085 confidence = 0.0 1086 1087 if confidence >= 0.8: 1088 return self._iterate_structured(state, proposed) 1089 1090 # Low confidence → ask for confirmation 1091 state.next_action = { 1092 'action': 'confirm_with_user', 1093 'reason': 'Interpreted "%s" as: %s (confidence=%.1f).' 1094 % (feedback, proposed.get('action', '?'), 1095 confidence), 1096 'suggestion': 'Proposed: %s. Proceed?' % str(proposed), 1097 'proposed_change': proposed, 1098 } 1099 state.history.append(_make_history_entry( 1100 state.phase, 'iterate_nl', state.iteration, 1101 'NL feedback: "%s" → confidence=%.1f' 1102 % (feedback, confidence))) 1103 return state 1104 ``` 1105 1106 - [ ] **Step 4: Run tests to verify they pass** 1107 1108 Run: `python -m pytest pyod/test/test_ad_engine_v3.py -v` 1109 Expected: 24 tests PASS 1110 1111 - [ ] **Step 5: Commit** 1112 1113 ```bash 1114 git add pyod/utils/ad_engine.py pyod/test/test_ad_engine_v3.py 1115 git commit -m "feat: add iterate() with structured and NL feedback handling" 1116 ``` 1117 1118 --- 1119 1120 ### Task 6: report() and investigate() — output + convenience 1121 1122 **Files:** 1123 - Modify: `pyod/utils/ad_engine.py` 1124 - Modify: `pyod/test/test_ad_engine_v3.py` 1125 1126 - [ ] **Step 1: Write failing tests** 1127 1128 Add to `pyod/test/test_ad_engine_v3.py`: 1129 1130 ```python 1131 class TestSessionReport(unittest.TestCase): 1132 def setUp(self): 1133 self.engine = ADEngine() 1134 self.X = np.random.RandomState(42).randn(200, 10) 1135 1136 def _run_to_analyzed(self): 1137 state = self.engine.start(self.X) 1138 state = self.engine.plan(state) 1139 state = self.engine.run(state) 1140 state = self.engine.analyze(state) 1141 return state 1142 1143 def test_report_text(self): 1144 state = self._run_to_analyzed() 1145 report = self.engine.report(state, format='text') 1146 assert isinstance(report, str) 1147 assert 'Anomaly' in report 1148 assert 'consensus' in report.lower() or 'quality' in report.lower() 1149 1150 def test_report_json(self): 1151 state = self._run_to_analyzed() 1152 report = self.engine.report(state, format='json') 1153 assert isinstance(report, dict) 1154 assert 'session' in report 1155 assert 'best_detector' in report 1156 1157 def test_report_no_analysis_raises(self): 1158 state = self.engine.start(self.X) 1159 state = self.engine.plan(state) 1160 state = self.engine.run(state) 1161 # No analyze() called 1162 state.analysis = None 1163 with self.assertRaises(ValueError): 1164 self.engine.report(state) 1165 1166 1167 class TestSessionInvestigate(unittest.TestCase): 1168 def setUp(self): 1169 self.engine = ADEngine() 1170 self.X = np.random.RandomState(42).randn(200, 10) 1171 1172 def test_investigate_returns_analyzed_state(self): 1173 state = self.engine.investigate(self.X) 1174 assert isinstance(state, InvestigationState) 1175 assert state.phase == 'analyzed' 1176 assert state.analysis is not None 1177 assert state.quality is not None 1178 assert len(state.results) > 0 1179 1180 def test_investigate_with_data_type(self): 1181 state = self.engine.investigate( 1182 self.X, data_type='tabular') 1183 assert state.profile['data_type'] == 'tabular' 1184 ``` 1185 1186 - [ ] **Step 2: Run tests to verify they fail** 1187 1188 Run: `python -m pytest pyod/test/test_ad_engine_v3.py::TestSessionReport -v` 1189 Expected: FAIL 1190 1191 - [ ] **Step 3: Implement report() and investigate()** 1192 1193 Add to `pyod/utils/ad_engine.py` after `_iterate_nl()`: 1194 1195 ```python 1196 def report(self, state, format='text'): 1197 """Generate investigation report. 1198 1199 Text format wraps ``generate_report()`` for best detector, 1200 prepending session-level context. JSON format returns a 1201 native dict. 1202 1203 Parameters 1204 ---------- 1205 state : InvestigationState 1206 format : str 1207 'text' or 'json'. 1208 1209 Returns 1210 ------- 1211 report : str or dict 1212 """ 1213 if state.analysis is None: 1214 raise ValueError( 1215 "No successful detectors to report on. " 1216 "Use iterate() to adjust the plan.") 1217 1218 best_idx = state.analysis['best_detector_index'] 1219 best_result = state.results[best_idx] 1220 best_analysis = state.analysis['per_detector_analysis'][ 1221 best_idx] 1222 1223 if format == 'json': 1224 return { 1225 'session': { 1226 'consensus': { 1227 'scores': state.consensus[ 1228 'scores'].tolist(), 1229 'labels': state.consensus[ 1230 'labels'].tolist(), 1231 'n_detectors': state.consensus[ 1232 'n_detectors'], 1233 'agreement': state.consensus[ 1234 'agreement'], 1235 'disagreements': state.consensus[ 1236 'disagreements'], 1237 }, 1238 'quality': state.quality, 1239 'comparison': { 1240 'agreement': state.consensus[ 1241 'agreement'], 1242 'disagreements': state.consensus[ 1243 'disagreements'], 1244 }, 1245 }, 1246 'best_detector': { 1247 'name': best_result['detector_name'], 1248 'scores': best_result[ 1249 'scores_train'].tolist(), 1250 'labels': best_result[ 1251 'labels_train'].tolist(), 1252 'threshold': best_result['threshold'], 1253 'analysis': best_analysis, 1254 }, 1255 } 1256 1257 # Text format 1258 lines = [] 1259 lines.append('# Investigation Report') 1260 lines.append('') 1261 1262 # Session section 1263 lines.append('## Session Summary') 1264 c = state.consensus 1265 q = state.quality 1266 lines.append('- **Detectors run:** %d' % c['n_detectors']) 1267 lines.append('- **Detector agreement:** %.2f' 1268 % c['agreement']) 1269 lines.append('- **Quality verdict:** %s (%.2f)' 1270 % (q['verdict'], q['overall'])) 1271 lines.append('- **Iterations:** %d' % state.iteration) 1272 if c['disagreements']: 1273 lines.append('- **Disagreements:** %d samples' 1274 % len(c['disagreements'])) 1275 lines.append('') 1276 1277 # Best detector report (via generate_report) 1278 detector_report = self.generate_report( 1279 best_result, best_analysis, format='text') 1280 lines.append(detector_report) 1281 1282 return '\n'.join(lines) 1283 1284 def investigate(self, X, data_type=None, priority='balanced'): 1285 """One-shot investigation: start → plan → run → analyze. 1286 1287 Parameters 1288 ---------- 1289 X : array-like 1290 Input data. 1291 data_type : str or None 1292 priority : str 1293 1294 Returns 1295 ------- 1296 state : InvestigationState 1297 """ 1298 state = self.start(X, data_type=data_type) 1299 state = self.plan(state, priority=priority) 1300 state = self.run(state) 1301 state = self.analyze(state) 1302 return state 1303 ``` 1304 1305 - [ ] **Step 4: Run tests to verify they pass** 1306 1307 Run: `python -m pytest pyod/test/test_ad_engine_v3.py -v` 1308 Expected: 29 tests PASS 1309 1310 - [ ] **Step 5: Commit** 1311 1312 ```bash 1313 git add pyod/utils/ad_engine.py pyod/test/test_ad_engine_v3.py 1314 git commit -m "feat: add report() and investigate() to complete V3 session API" 1315 ``` 1316 1317 --- 1318 1319 ### Task 7: Documentation 1320 1321 **Files:** 1322 - Modify: `CHANGES.txt` 1323 1324 - [ ] **Step 1: Add CHANGES.txt entry** 1325 1326 Append to `CHANGES.txt`: 1327 1328 ``` 1329 v<2.2.0>, <04/12/2026> -- V3 Agentic Session API: add InvestigationState workflow engine to ADEngine. Session methods (start, plan, run, analyze, iterate, report, investigate) enable multi-detector comparison with rank-normalized consensus, result quality assessment (separation, agreement, stability), and actionable iteration with structured and natural-language feedback. One-shot investigate() runs the full expert workflow. 1330 ``` 1331 1332 - [ ] **Step 2: Commit** 1333 1334 ```bash 1335 git add CHANGES.txt 1336 git commit -m "docs: add V3 agentic session API to CHANGES.txt" 1337 ``` 1338 1339 --- 1340 1341 --- 1342 1343 ### Task 8: od-expert skill update 1344 1345 **Files:** 1346 - Modify: `skills/od-expert/SKILL.md` 1347 1348 - [ ] **Step 1: Update od-expert skill to use session API** 1349 1350 Update `skills/od-expert/SKILL.md` to instruct the agent to use the V3 session workflow. The key change: instead of calling individual methods (`profile_data`, `plan_detection`, `run_detection`, etc.), the skill should guide the agent through the session API (`start → plan → run → analyze → iterate → report`). 1351 1352 Add the following workflow section to the skill: 1353 1354 ```markdown 1355 ## V3 Session Workflow 1356 1357 Use the ADEngine session API for the full anomaly detection lifecycle: 1358 1359 1. **Start:** `state = engine.start(data)` — profiles the data 1360 2. **Plan:** `state = engine.plan(state)` — selects top-N detectors 1361 3. **Run:** `state = engine.run(state)` — runs all detectors, computes consensus 1362 4. **Analyze:** `state = engine.analyze(state)` — quality assessment, best detector 1363 5. **Follow `state.next_action`:** 1364 - `'report_to_user'`: present `state.next_action['summary']` to the user 1365 - `'iterate'`: present the suggestion, ask if user wants to proceed 1366 - `'confirm_with_user'` with `proposed_change`: present suggestion, on approval call `engine.iterate(state, state.next_action['proposed_change'])` 1367 - `'confirm_with_user'` without `proposed_change` (error/retry): present reason, ask user what to try next 1368 6. **On user feedback:** `state = engine.iterate(state, feedback)` 1369 - Structured: `{"action": "exclude", "detectors": ["IForest"]}` 1370 - Natural language: `"too many false positives"` (may need confirmation) 1371 7. **Report:** `report = engine.report(state)` — generates final report 1372 1373 One-shot shortcut: `state = engine.investigate(data)` runs steps 1-4 automatically. 1374 ``` 1375 1376 - [ ] **Step 2: Commit** 1377 1378 ```bash 1379 git add skills/od-expert/SKILL.md 1380 git commit -m "docs: update od-expert skill to use V3 session API" 1381 ``` 1382 1383 --- 1384 1385 ## Self-Review 1386 1387 **Spec coverage:** 1388 - Section 4.1 (state machine): Task 1 (InvestigationState), all session methods implement transitions 1389 - Section 4.2 (API): Tasks 2-6 implement all 7 session methods 1390 - Section 4.3 (typed schemas): Task 1 (dataclass + enums), schemas enforced in Tasks 3-6 1391 - Section 4.4 (behaviors): consensus in Task 3, quality in Task 4, iterate in Task 5, report wrapping in Task 6 1392 - Section 5 (skill integration): Task 8 updates od-expert skill with V3 session workflow 1393 - Section 6 (backward compat): all existing methods unchanged, `run()` avoids `detect()` conflict 1394 - Section 7 (scope): all in-scope items covered, no out-of-scope items included 1395 1396 **Placeholder scan:** No TBD, TODO, or "similar to Task N" found. All code blocks complete. 1397 1398 **Type consistency:** `InvestigationState` used consistently. Method names match spec: `start`, `plan`, `run`, `analyze`, `iterate`, `report`, `investigate`. Schema field names match across tasks.