base-agent.js
1 /** 2 * Base Agent Class 3 * 4 * Abstract base class for all agents in the system. 5 * Provides common functionality: task polling, messaging, logging, context loading. 6 * 7 * Subclasses must implement processTask(task) method. 8 */ 9 10 import { run, getOne, getAll } from '../utils/db.js'; 11 import { loadContextWithMetadata } from './utils/context-loader.js'; 12 import { buildAgentContext } from './utils/context-builder.js'; 13 import { 14 createAgentTask, 15 getAgentTasks, 16 startTask, 17 completeTask as completeTaskUtil, 18 failTask as failTaskUtil, 19 blockTask as blockTaskUtil, 20 updateTaskStatus, 21 getTaskById, 22 incrementRetryCount, 23 } from './utils/task-manager.js'; 24 import { 25 sendAgentMessage, 26 getUnreadMessages, 27 markMessageRead, 28 sendQuestion, 29 sendAnswer, 30 sendHandoff, 31 sendNotification, 32 hasPendingQuestions, 33 } from './utils/message-manager.js'; 34 import Logger from '../utils/logger.js'; 35 import StructuredLogger from './utils/structured-logger.js'; 36 import * as agentTools from './utils/agent-tools.js'; 37 38 const logger = new Logger('Agents'); 39 40 /** 41 * Base Agent Class 42 * 43 * @example 44 * class DeveloperAgent extends BaseAgent { 45 * constructor() { 46 * super('developer', ['base.md', 'developer.md']); 47 * } 48 * 49 * async processTask(task) { 50 * if (task.task_type === 'fix_bug') { 51 * await this.fixBug(task); 52 * } 53 * } 54 * 55 * async fixBug(task) { 56 * // Implementation... 57 * } 58 * } 59 */ 60 export class BaseAgent { 61 /** 62 * Create an agent 63 * 64 * @param {string} agentName - Agent name ('developer', 'qa', 'security', etc.) 65 * @param {string[]} contextFiles - Context files to load (e.g., ['base.md', 'developer.md']) 66 */ 67 constructor(agentName, contextFiles) { 68 if (!agentName || !contextFiles) { 69 throw new Error('agentName and contextFiles are required'); 70 } 71 72 this.agentName = agentName; 73 this.contextFiles = contextFiles; 74 this.context = null; 75 this.contextMetadata = null; 76 this.isInitialized = false; 77 this.structuredLogger = null; // Initialized in executeTask with task_id 78 } 79 80 /** 81 * Initialize the agent (load context, update state) 82 * 83 * @returns {Promise<void>} 84 */ 85 async initialize() { 86 if (this.isInitialized) return; 87 88 // Load context files 89 this.contextMetadata = await loadContextWithMetadata(this.contextFiles); 90 this.context = this.contextMetadata.context; 91 92 logger.info(`${this.agentName} agent initialized`, { 93 context_size_kb: this.contextMetadata.sizeKB, 94 context_files: this.contextMetadata.files, 95 }); 96 97 // Update agent_state 98 await this.updateAgentState('idle'); 99 100 this.isInitialized = true; 101 } 102 103 /** 104 * Acquire exclusive lock for this agent 105 * 106 * Prevents multiple instances of the same agent from running concurrently. 107 * Uses database-backed locking via agent_state table. 108 * Automatically clears stale locks (older than AGENT_LOCK_STALE_MINUTES). 109 * 110 * @returns {Promise<boolean>} - True if lock acquired, false if agent already running 111 */ 112 async acquireLock() { 113 const staleMinutes = parseInt(process.env.AGENT_LOCK_STALE_MINUTES || '2', 10); 114 115 const result = await getOne( 116 `UPDATE tel.agent_state 117 SET status = 'working', last_active = NOW() 118 WHERE agent_name = $1 119 AND (status = 'idle' OR 120 (status = 'working' AND last_active < NOW() - ($2 || ' minutes')::interval)) 121 RETURNING agent_name`, 122 [this.agentName, staleMinutes] 123 ); 124 125 return !!result; 126 } 127 128 /** 129 * Release exclusive lock for this agent 130 * 131 * Marks agent as idle in agent_state table. 132 * Called automatically in finally block of pollTasks(). 133 * 134 * @returns {Promise<void>} 135 */ 136 async releaseLock() { 137 await run( 138 `UPDATE tel.agent_state 139 SET status = 'idle', last_active = NOW() 140 WHERE agent_name = $1`, 141 [this.agentName] 142 ); 143 } 144 145 /** 146 * Poll for pending tasks and process them 147 * 148 * Uses row-level locking to enable horizontal scaling. 149 * Multiple instances of the same agent can run concurrently without conflicts. 150 * 151 * Agent-level locking is optional (controlled by AGENT_ALLOW_HORIZONTAL_SCALING). 152 * When enabled, multiple instances can process different tasks simultaneously. 153 * 154 * @param {number} [limit=5] - Maximum number of tasks to process 155 * @returns {Promise<number>} - Number of tasks processed 156 */ 157 async pollTasks(limit = 5) { 158 if (!this.isInitialized) { 159 await this.initialize(); 160 } 161 162 // Agent-level locking (optional for backwards compatibility) 163 // Support both old and new env var names (backwards compatibility) 164 const allowConcurrentInstances = 165 process.env.AGENT_ALLOW_HORIZONTAL_SCALING === 'true' || 166 process.env.AGENT_ALLOW_CONCURRENT_INSTANCES === 'true'; 167 168 if (!allowConcurrentInstances) { 169 // Try to acquire agent-level lock 170 if (!await this.acquireLock()) { 171 logger.info(`${this.agentName} agent already running, skipping`); 172 return 0; 173 } 174 } 175 176 try { 177 let processed = 0; 178 179 // Process up to 'limit' tasks 180 for (let i = 0; i < limit; i++) { 181 // Acquire next task with row-level locking 182 const task = await this.acquireNextTask(); 183 184 if (!task) { 185 // No more tasks available 186 break; 187 } 188 189 try { 190 await this.executeTask(task); 191 processed++; 192 } catch (error) { 193 logger.error(`${this.agentName} task execution failed`, { 194 task_id: task.id, 195 task_type: task.task_type, 196 error: error.message, 197 }); 198 199 // Check retry count 200 const retryCount = await incrementRetryCount(task.id); 201 if (retryCount >= 3) { 202 // Max retries exceeded 203 await failTaskUtil(task.id, `Max retries exceeded: ${error.message}`, retryCount); 204 } else { 205 // Retry 206 await updateTaskStatus(task.id, 'pending', { 207 error_message: `Retry ${retryCount}/3: ${error.message}`, 208 }); 209 } 210 } 211 } 212 213 return processed; 214 } finally { 215 // Release agent-level lock if acquired 216 if (!allowConcurrentInstances) { 217 await this.releaseLock(); 218 } 219 } 220 } 221 222 /** 223 * Acquire next pending task with row-level locking 224 * 225 * Uses PostgreSQL row-level locking to atomically claim tasks. 226 * Multiple agent instances can call this concurrently without conflicts. 227 * 228 * Strategy: 229 * 1. SELECT next pending task for this agent (by priority, created_at) 230 * 2. Atomically UPDATE status to 'running' (race-safe via CTE) 231 * 3. If another instance claimed it (0 rows updated), return null 232 * 4. Otherwise, return the claimed task 233 * 234 * @returns {Promise<Object|null>} - Claimed task object, or null if no tasks available 235 * @private 236 */ 237 async acquireNextTask() { 238 const enableRowLocking = process.env.AGENT_ENABLE_ROW_LOCKING !== 'false'; 239 240 if (!enableRowLocking) { 241 // Fallback to old behavior (backwards compatibility) 242 const tasks = await getAgentTasks(this.agentName, 'pending', 1); 243 return tasks.length > 0 ? tasks[0] : null; 244 } 245 246 // Row-level locking via CTE — atomic claim without a separate transaction 247 const task = await getOne( 248 `WITH next_task AS ( 249 SELECT id FROM tel.agent_tasks 250 WHERE assigned_to = $1 AND status = 'pending' 251 ORDER BY priority DESC, created_at ASC 252 LIMIT 1 253 FOR UPDATE SKIP LOCKED 254 ) 255 UPDATE tel.agent_tasks t 256 SET status = 'running', started_at = NOW() 257 FROM next_task 258 WHERE t.id = next_task.id 259 RETURNING t.*`, 260 [this.agentName] 261 ); 262 263 if (!task) { 264 return null; 265 } 266 267 // Parse JSON fields, handle malformed JSON gracefully 268 let contextJson = null; 269 let resultJson = null; 270 try { 271 contextJson = task.context_json ? JSON.parse(task.context_json) : null; 272 } catch (_e) { 273 // Malformed JSON - leave as null; executeTask will handle gracefully 274 } 275 try { 276 resultJson = task.result_json ? JSON.parse(task.result_json) : null; 277 } catch (_e) { 278 // Malformed JSON - leave as null 279 } 280 return { 281 ...task, 282 context_json: contextJson, 283 result_json: resultJson, 284 }; 285 } 286 287 /** 288 * Execute a single task (internal wrapper around processTask) 289 * 290 * @param {Object} task - Task object 291 * @returns {Promise<void>} 292 * @private 293 */ 294 async executeTask(task) { 295 // Initialize structured logger for this task 296 this.structuredLogger = new StructuredLogger(this.agentName, task.id); 297 298 // Mark as running 299 await startTask(task.id); 300 await this.updateAgentState('working', task.id); 301 302 const startTime = Date.now(); 303 304 await this.log('info', 'Task started', { 305 task_id: task.id, 306 task_type: task.task_type, 307 }); 308 309 try { 310 // Validate task has required fields 311 if (!task.task_type) { 312 throw new Error('Task is missing required field: task_type'); 313 } 314 315 // Parse context_json if it's a string 316 if (task.context_json && typeof task.context_json === 'string') { 317 try { 318 task.context_json = JSON.parse(task.context_json); 319 } catch (parseError) { 320 await this.log('warn', 'Failed to parse context_json, treating as empty', { 321 task_id: task.id, 322 error: parseError.message, 323 }); 324 task.context_json = {}; 325 } 326 } 327 328 // Call subclass implementation 329 await this.processTask(task); 330 331 const duration = Date.now() - startTime; 332 333 await this.log('info', 'Task completed', { 334 task_id: task.id, 335 task_type: task.task_type, 336 duration_ms: duration, 337 }); 338 339 // Record successful outcome 340 await this.recordOutcome(task.id, 'success', { 341 task_type: task.task_type, 342 duration_ms: duration, 343 }); 344 } catch (error) { 345 const duration = Date.now() - startTime; 346 347 await this.log('error', 'Task failed', { 348 task_id: task.id, 349 task_type: task.task_type, 350 error: error.message, 351 stack: error.stack, 352 duration_ms: duration, 353 }); 354 355 // Record failure outcome 356 await this.recordOutcome(task.id, 'failure', { 357 task_type: task.task_type, 358 error: error.message, 359 stack: error.stack, 360 duration_ms: duration, 361 }); 362 363 throw error; 364 } finally { 365 await this.updateAgentState('idle'); 366 this.structuredLogger = null; 367 } 368 } 369 370 /** 371 * Process a task (must be implemented by subclasses) 372 * 373 * @param {Object} task - Task object 374 * @returns {Promise<void>} 375 * @abstract 376 */ 377 async processTask(task) { 378 throw new Error('processTask() must be implemented by subclass'); 379 } 380 381 /** 382 * Create a new task 383 * 384 * @param {Object} taskData - Task data (see task-manager.js for details) 385 * @returns {Promise<number>} - Task ID 386 */ 387 async createTask(taskData) { 388 // DEDUPLICATION: Check for existing similar task 389 const existingTask = await this.findDuplicateTask(taskData); 390 391 if (existingTask) { 392 await this.log('info', 'Task already exists (skipping duplicate)', { 393 existing_task_id: existingTask.id, 394 task_type: taskData.task_type, 395 assigned_to: taskData.assigned_to, 396 status: existingTask.status, 397 }); 398 return existingTask.id; 399 } 400 401 const taskId = await createAgentTask({ 402 ...taskData, 403 created_by: this.agentName, 404 }); 405 406 await this.log('info', 'Task created', { 407 task_id: taskId, 408 task_type: taskData.task_type, 409 assigned_to: taskData.assigned_to, 410 priority: taskData.priority, 411 }); 412 413 // NEW: Immediately invoke the assigned agent 414 if (process.env.AGENT_IMMEDIATE_INVOCATION !== 'false' && taskData.assigned_to) { 415 await this.invokeAgentImmediately(taskData.assigned_to).catch(error => { 416 logger.error(`Failed to invoke ${taskData.assigned_to} agent immediately:`, error); 417 // Don't throw - fallback to cron polling 418 }); 419 } 420 421 return taskId; 422 } 423 424 /** 425 * Find duplicate task based on task type and key fields 426 * 427 * @param {Object} taskData - Task data to check for duplicates 428 * @returns {Promise<Object|null>} - Existing task or null 429 */ 430 async findDuplicateTask(taskData) { 431 const { task_type, assigned_to, context } = taskData; 432 433 // Extract deduplication key based on task type 434 let dedupeKey = null; 435 let dedupeField = null; 436 437 switch (task_type) { 438 case 'fix_bug': 439 case 'classify_error': 440 // Dedupe by error_message (first 100 chars for similarity) 441 dedupeKey = context?.error_message?.substring(0, 100); 442 dedupeField = 'error_message'; 443 break; 444 445 case 'design_optimization': 446 // Dedupe by optimization description or pattern 447 dedupeKey = context?.description?.substring(0, 100) || context?.pattern; 448 dedupeField = 'description'; 449 break; 450 451 case 'scan_logs': 452 case 'check_agent_health': 453 case 'check_pipeline_health': 454 case 'check_process_compliance': 455 case 'detect_anomaly': 456 case 'check_slo_compliance': 457 case 'check_loops': 458 case 'check_blocked_tasks': 459 // Monitoring tasks: only one active at a time per type 460 dedupeKey = task_type; 461 dedupeField = null; 462 break; 463 464 default: 465 // For other task types, allow duplicates (features, refactors, etc.) 466 return null; 467 } 468 469 if (!dedupeKey) { 470 return null; 471 } 472 473 try { 474 // Check for existing task with same key and active status 475 // For fix_bug/classify_error: also treat recently-failed tasks as duplicates 476 // to prevent the monitor→developer→fail→recreate loop 477 const recentFailedWindow = 478 task_type === 'fix_bug' || task_type === 'classify_error' ? '6 hours' : null; 479 const activeStatuses = recentFailedWindow 480 ? `status IN ('pending', 'running', 'blocked') OR (status = 'failed' AND created_at > NOW() - INTERVAL '${recentFailedWindow}')` 481 : `status IN ('pending', 'running', 'blocked')`; 482 483 if (dedupeField) { 484 // Search by JSONB field 485 return await getOne( 486 `SELECT id, status FROM tel.agent_tasks 487 WHERE task_type = $1 488 AND assigned_to = $2 489 AND (${activeStatuses}) 490 AND LEFT(context_json->>'${dedupeField}', 100) = $3 491 ORDER BY created_at DESC 492 LIMIT 1`, 493 [task_type, assigned_to, dedupeKey] 494 ) || null; 495 } else { 496 // Search by task_type only (monitoring tasks) 497 return await getOne( 498 `SELECT id, status FROM tel.agent_tasks 499 WHERE task_type = $1 500 AND assigned_to = $2 501 AND (${activeStatuses}) 502 ORDER BY created_at DESC 503 LIMIT 1`, 504 [task_type, assigned_to] 505 ) || null; 506 } 507 } catch (error) { 508 // If deduplication check fails, log and allow task creation 509 logger.error('Deduplication check failed:', error.message); 510 return null; 511 } 512 } 513 514 /** 515 * Complete the current task 516 * 517 * @param {number} taskId - Task ID 518 * @param {Object} [result] - Result object 519 * @returns {Promise<void>} 520 */ 521 async completeTask(taskId, result = null) { 522 await completeTaskUtil(taskId, result); 523 524 await this.log('info', 'Task marked complete', { 525 task_id: taskId, 526 result: result ? Object.keys(result) : null, 527 }); 528 } 529 530 /** 531 * Fail the current task 532 * 533 * @param {number} taskId - Task ID 534 * @param {string} errorMessage - Error message 535 * @returns {Promise<void>} 536 */ 537 async failTask(taskId, errorMessage) { 538 const task = await getTaskById(taskId); 539 const retryCount = task ? task.retry_count : 0; 540 541 await failTaskUtil(taskId, errorMessage, retryCount); 542 543 await this.log('error', 'Task marked failed', { 544 task_id: taskId, 545 error: errorMessage, 546 retry_count: retryCount, 547 }); 548 } 549 550 /** 551 * Block the current task 552 * 553 * @param {number} taskId - Task ID 554 * @param {string} reason - Reason for blocking 555 * @returns {Promise<void>} 556 */ 557 async blockTask(taskId, reason) { 558 await blockTaskUtil(taskId, reason); 559 560 await this.log('warn', 'Task blocked', { 561 task_id: taskId, 562 reason, 563 }); 564 } 565 566 /** 567 * Update task with additional data 568 * 569 * @param {number} taskId - Task ID 570 * @param {Object} updates - Fields to update 571 * @returns {Promise<void>} 572 */ 573 async updateTask(taskId, updates) { 574 const task = await getTaskById(taskId); 575 if (!task) { 576 throw new Error(`Task ${taskId} not found`); 577 } 578 579 await updateTaskStatus(taskId, task.status, updates); 580 } 581 582 /** 583 * Send a message to another agent 584 * 585 * @param {Object} messageData - Message data (see message-manager.js for details) 586 * @returns {Promise<number>} - Message ID 587 */ 588 async sendMessage(messageData) { 589 const msgId = await sendAgentMessage({ 590 ...messageData, 591 from_agent: this.agentName, 592 }); 593 594 await this.log('info', 'Message sent', { 595 message_id: msgId, 596 to_agent: messageData.to_agent, 597 message_type: messageData.message_type, 598 }); 599 600 return msgId; 601 } 602 603 /** 604 * Send a question to another agent 605 * 606 * @param {number} taskId - Related task ID 607 * @param {string} toAgent - Receiving agent 608 * @param {string} question - Question content 609 * @param {Object} [metadata] - Additional metadata 610 * @returns {Promise<number>} - Message ID 611 */ 612 async askQuestion(taskId, toAgent, question, metadata = null) { 613 return await this.sendMessage({ 614 task_id: taskId, 615 to_agent: toAgent, 616 message_type: 'question', 617 content: question, 618 metadata, 619 }); 620 } 621 622 /** 623 * Send an answer to a question 624 * 625 * @param {number} taskId - Related task ID 626 * @param {string} toAgent - Receiving agent 627 * @param {string} answer - Answer content 628 * @param {number} [questionMessageId] - ID of question being answered 629 * @returns {Promise<number>} - Message ID 630 */ 631 async sendAnswer(taskId, toAgent, answer, questionMessageId = null) { 632 const metadata = questionMessageId ? { in_reply_to: questionMessageId } : null; 633 634 return await this.sendMessage({ 635 task_id: taskId, 636 to_agent: toAgent, 637 message_type: 'answer', 638 content: answer, 639 metadata, 640 }); 641 } 642 643 /** 644 * Send a handoff notification 645 * 646 * @param {number} taskId - Related task ID 647 * @param {string} toAgent - Receiving agent 648 * @param {string} message - Handoff message 649 * @param {Object} [metadata] - Additional metadata 650 * @returns {Promise<number>} - Message ID 651 */ 652 async handoff(taskId, toAgent, message, metadata = null) { 653 const msgId = await this.sendMessage({ 654 task_id: taskId, 655 to_agent: toAgent, 656 message_type: 'handoff', 657 content: message, 658 metadata, 659 }); 660 661 // NEW: Immediately invoke the receiving agent 662 if (process.env.AGENT_IMMEDIATE_INVOCATION !== 'false') { 663 await this.invokeAgentImmediately(toAgent).catch(error => { 664 logger.error(`Failed to invoke ${toAgent} agent immediately:`, error); 665 // Don't throw - fallback to cron polling 666 }); 667 } 668 669 return msgId; 670 } 671 672 /** 673 * Send a notification to another agent 674 * 675 * @param {string} toAgent - Receiving agent 676 * @param {string} message - Notification content 677 * @param {number} [taskId] - Related task ID 678 * @param {Object} [metadata] - Additional metadata 679 * @returns {Promise<number>} - Message ID 680 */ 681 async notify(toAgent, message, taskId = null, metadata = null) { 682 return await this.sendMessage({ 683 task_id: taskId, 684 to_agent: toAgent, 685 message_type: 'notification', 686 content: message, 687 metadata, 688 }); 689 } 690 691 /** 692 * Check for unread messages 693 * 694 * @param {number} [limit=10] - Maximum messages to retrieve 695 * @returns {Promise<Array>} - Unread messages 696 */ 697 async getUnreadMessages(limit = 10) { 698 return await getUnreadMessages(this.agentName, limit); 699 } 700 701 /** 702 * Mark a message as read 703 * 704 * @param {number} messageId - Message ID 705 * @returns {Promise<void>} 706 */ 707 async markMessageRead(messageId) { 708 await markMessageRead(messageId); 709 } 710 711 /** 712 * Check if agent has pending questions 713 * 714 * @returns {Promise<boolean>} - True if questions pending 715 */ 716 async hasPendingQuestions() { 717 return await hasPendingQuestions(this.agentName); 718 } 719 720 /** 721 * Log a message to agent_logs table 722 * 723 * Uses StructuredLogger if available (during task execution), 724 * otherwise falls back to direct database write. 725 * 726 * @param {string} level - Log level ('info', 'warn', 'error', 'debug') 727 * @param {string} message - Log message 728 * @param {Object} [data] - Additional data (will be JSON-stringified) 729 * @returns {Promise<void>} 730 */ 731 async log(level, message, data = null) { 732 // Use structured logger if available (during task execution) 733 if (this.structuredLogger) { 734 this.structuredLogger.log(level, message, data || {}); 735 return; 736 } 737 738 // Fallback to direct database write (for logs outside task execution) 739 await run( 740 `INSERT INTO tel.agent_logs (task_id, agent_name, log_level, message, data_json) 741 VALUES ($1, $2, $3, $4, $5)`, 742 [ 743 data?.task_id || null, 744 this.agentName, 745 level.toLowerCase(), // Database CHECK constraint requires lowercase 746 message, 747 data ? JSON.stringify(data) : null, 748 ] 749 ); 750 751 // Also log to standard logger for visibility 752 logger[level](`[${this.agentName}] ${message}`, data); 753 } 754 755 /** 756 * Update agent_state table 757 * 758 * @param {string} status - Status ('idle', 'working', 'blocked') 759 * @param {number} [currentTaskId] - Current task ID (if working) 760 * @returns {Promise<void>} 761 * @private 762 */ 763 async updateAgentState(status, currentTaskId = null) { 764 await run( 765 `INSERT INTO tel.agent_state (agent_name, status, current_task_id, last_active) 766 VALUES ($1, $2, $3, NOW()) 767 ON CONFLICT (agent_name) DO UPDATE SET 768 status = EXCLUDED.status, 769 current_task_id = EXCLUDED.current_task_id, 770 last_active = EXCLUDED.last_active`, 771 [this.agentName, status, currentTaskId] 772 ); 773 } 774 775 /** 776 * Get the agent's context string (for debugging) 777 * 778 * @returns {string|null} - Context string 779 */ 780 getContext() { 781 return this.context; 782 } 783 784 /** 785 * Get the agent's context metadata (for debugging) 786 * 787 * @returns {Object|null} - Context metadata 788 */ 789 getContextMetadata() { 790 return this.contextMetadata; 791 } 792 793 /** 794 * Get enriched context with task history for a specific task 795 * 796 * Builds context that includes: 797 * - Base context (base.md + role-specific context) 798 * - Recent successful tasks (patterns that work) 799 * - Recent failures (mistakes to avoid) 800 * - Related tasks (same file/error type) 801 * 802 * @param {Object} task - Current task to get context for 803 * @returns {Promise<Object>} - Enriched context object 804 * 805 * @example 806 * const context = await agent.getContextForTask(task); 807 * // Use context.fullContext for LLM prompts 808 * // context.historyTokens shows how many tokens the history uses 809 */ 810 async getContextForTask(task) { 811 return await buildAgentContext(this.agentName, this.contextFiles, task); 812 } 813 814 /** 815 * Validate task workflow dependencies before creation 816 * Enforces proper workflow: Architect → PO → Developer → Architect → Developer → QA 817 * 818 * @param {Object} taskData - Task data to validate 819 * @returns {Promise<Object>} - {valid: boolean, reason?: string, requiredPrerequisite?: Object} 820 */ 821 async validateWorkflowDependencies(taskData) { 822 const { task_type, assigned_to, parent_task_id } = taskData; 823 824 // Workflow rules: 825 // 1. implement_feature must have approved design_proposal as parent 826 // 2. Developer implementation must have approved implementation_plan 827 // 3. QA can only test completed developer tasks 828 829 if (task_type === 'implement_feature' && !parent_task_id) { 830 return { 831 valid: false, 832 reason: 'Features require design_proposal parent task', 833 requiredPrerequisite: { 834 task_type: 'design_proposal', 835 assigned_to: 'architect', 836 }, 837 }; 838 } 839 840 if (parent_task_id) { 841 const parent = await getTaskById(parent_task_id); 842 843 if (!parent) { 844 return { 845 valid: false, 846 reason: `Parent task ${parent_task_id} not found`, 847 }; 848 } 849 850 // Check parent is in valid state for implement_feature 851 if (task_type === 'implement_feature') { 852 if (parent.task_type !== 'design_proposal' || parent.status !== 'completed') { 853 return { 854 valid: false, 855 reason: 'Parent design_proposal must be completed and PO-approved', 856 }; 857 } 858 859 // Check PO approval exists 860 const approval = parent.approval_json ? JSON.parse(parent.approval_json) : null; 861 862 if (!approval || approval.decision !== 'approved') { 863 return { 864 valid: false, 865 reason: 'Design requires Product Owner approval before implementation', 866 }; 867 } 868 } 869 } 870 871 return { valid: true }; 872 } 873 874 /** 875 * Request Product Owner approval for design proposal 876 * Transitions task to awaiting_po_approval status 877 * 878 * @param {number} taskId - Task ID 879 * @param {Object} proposal - Design proposal document 880 * @returns {Promise<void>} 881 */ 882 async requestPoApproval(taskId, proposal) { 883 await this.log('info', 'Requesting Product Owner approval', { 884 task_id: taskId, 885 proposal_summary: proposal.summary, 886 }); 887 888 // Use 'blocked' status with metadata indicating waiting for PO 889 await updateTaskStatus(taskId, 'blocked', { 890 error_message: 'Awaiting Product Owner approval for design proposal', 891 result: { design_proposal: proposal }, 892 }); 893 894 // Add to human review queue for PO attention 895 const { addReviewItem } = await import('../utils/human-review-queue.js'); 896 addReviewItem({ 897 file: `Task #${taskId}: ${proposal.title}`, 898 reason: `Design proposal requires Product Owner approval: ${proposal.summary}`, 899 type: 'architecture', 900 priority: proposal.priority === 'high' ? 'high' : 'medium', 901 metadata: JSON.stringify({ task_id: taskId, type: 'po_approval' }), 902 }); 903 } 904 905 /** 906 * Request Architect approval for implementation plan 907 * Transitions task to awaiting_architect_approval status 908 * 909 * @param {number} taskId - Task ID 910 * @param {Object} plan - Implementation plan 911 * @returns {Promise<void>} 912 */ 913 async requestArchitectApproval(taskId, plan) { 914 await this.log('info', 'Requesting Architect approval', { 915 task_id: taskId, 916 plan_summary: plan.summary, 917 }); 918 919 // Use 'blocked' status with metadata indicating waiting for architect 920 await updateTaskStatus(taskId, 'blocked', { 921 error_message: 'Awaiting Architect approval for implementation plan', 922 result: { implementation_plan: plan }, 923 }); 924 925 // Create technical_review task for Architect 926 const task = await getTaskById(taskId); 927 await this.createTask({ 928 task_type: 'technical_review', 929 assigned_to: 'architect', 930 parent_task_id: taskId, 931 priority: task?.priority || 6, 932 context: { 933 implementation_plan: plan, 934 original_task_id: taskId, 935 }, 936 }); 937 } 938 939 /** 940 * Approve a task (records approval metadata) 941 * 942 * @param {number} taskId - Task to approve 943 * @param {string} reviewer - Who approved (human name or agent name) 944 * @param {Object} approval - Approval details {decision, notes, conditions} 945 * @returns {Promise<void>} 946 */ 947 async approveTask(taskId, reviewer, approval) { 948 const approvalData = { 949 decision: approval.decision, // 'approved', 'approved_with_conditions', 'rejected' 950 reviewer, 951 timestamp: new Date().toISOString(), 952 notes: approval.notes, 953 conditions: approval.conditions || [], 954 }; 955 956 await this.log('info', 'Task approved', { 957 task_id: taskId, 958 reviewer, 959 decision: approval.decision, 960 }); 961 962 await updateTaskStatus(taskId, 'completed', { 963 reviewed_by: reviewer, 964 approval_json: JSON.stringify(approvalData), 965 }); 966 } 967 968 /** 969 * Record task outcome for learning 970 * 971 * Writes to agent_outcomes table for pattern analysis. 972 * Used by learnFromPastOutcomes() to improve future task performance. 973 * 974 * @param {number} taskId - Completed task ID 975 * @param {string} outcome - 'success' or 'failure' 976 * @param {Object} [context={}] - Task-specific context (error_type, file_path, etc.) 977 * @param {Object} [result={}] - Result details (what worked, what didn't) 978 * @returns {Promise<void>} 979 */ 980 async recordOutcome(taskId, outcome, context = {}, result = {}) { 981 if (!['success', 'failure'].includes(outcome)) { 982 throw new Error(`Invalid outcome: ${outcome}. Must be 'success' or 'failure'`); 983 } 984 985 const task = await getTaskById(taskId); 986 if (!task) { 987 throw new Error(`Task ${taskId} not found`); 988 } 989 990 await run( 991 `INSERT INTO tel.agent_outcomes (task_id, agent_name, task_type, outcome, context_json, result_json, duration_ms) 992 VALUES ($1, $2, $3, $4, $5, $6, $7)`, 993 [ 994 taskId, 995 this.agentName, 996 task.task_type, 997 outcome, 998 Object.keys(context).length > 0 ? JSON.stringify(context) : null, 999 Object.keys(result).length > 0 ? JSON.stringify(result) : null, 1000 context.duration_ms || null, 1001 ] 1002 ); 1003 1004 await this.log('debug', 'Outcome recorded', { 1005 task_id: taskId, 1006 outcome, 1007 task_type: task.task_type, 1008 }); 1009 } 1010 1011 /** 1012 * Learn from past outcomes for specific task type 1013 * 1014 * Analyzes historical agent_outcomes to identify patterns: 1015 * - Which approaches succeeded vs failed 1016 * - Common error types and their fixes 1017 * - Average execution times 1018 * - Success rates by context patterns 1019 * 1020 * @param {string} taskType - Task type to analyze (e.g., 'fix_bug', 'verify_fix') 1021 * @param {number} [limit=50] - Maximum outcomes to analyze 1022 * @returns {Promise<Object>} - Learning insights 1023 */ 1024 async learnFromPastOutcomes(taskType, limit = 50) { 1025 await this.log('debug', 'Analyzing past outcomes', { 1026 task_type: taskType, 1027 limit, 1028 }); 1029 1030 // Fetch recent outcomes for this agent and task type 1031 const outcomes = await getAll( 1032 `SELECT task_id, outcome, context_json, result_json, duration_ms, created_at 1033 FROM tel.agent_outcomes 1034 WHERE agent_name = $1 AND task_type = $2 1035 ORDER BY created_at DESC 1036 LIMIT $3`, 1037 [this.agentName, taskType, limit] 1038 ); 1039 1040 if (outcomes.length === 0) { 1041 return { 1042 task_type: taskType, 1043 total_outcomes: 0, 1044 success_rate: 0, 1045 insights: 'No historical data available for this task type', 1046 }; 1047 } 1048 1049 // Calculate success rate 1050 const successCount = outcomes.filter(o => o.outcome === 'success').length; 1051 const failureCount = outcomes.filter(o => o.outcome === 'failure').length; 1052 const successRate = (successCount / outcomes.length) * 100; 1053 1054 // Calculate average duration 1055 const durationsMs = outcomes.filter(o => o.duration_ms).map(o => o.duration_ms); 1056 const avgDurationMs = 1057 durationsMs.length > 0 ? durationsMs.reduce((a, b) => a + b, 0) / durationsMs.length : 0; 1058 1059 // Analyze context patterns (group by error_type, file patterns, etc.) 1060 const contextPatterns = this.analyzeContextPatterns(outcomes); 1061 1062 // Analyze what worked vs what didn't 1063 const successPatterns = this.extractSuccessPatterns( 1064 outcomes.filter(o => o.outcome === 'success') 1065 ); 1066 const failurePatterns = this.extractFailurePatterns( 1067 outcomes.filter(o => o.outcome === 'failure') 1068 ); 1069 1070 const insights = { 1071 task_type: taskType, 1072 total_outcomes: outcomes.length, 1073 success_count: successCount, 1074 failure_count: failureCount, 1075 success_rate: Math.round(successRate * 100) / 100, 1076 avg_duration_ms: Math.round(avgDurationMs), 1077 context_patterns: contextPatterns, 1078 success_patterns: successPatterns, 1079 failure_patterns: failurePatterns, 1080 recommendations: this.generateRecommendations(successPatterns, failurePatterns), 1081 }; 1082 1083 await this.log('info', 'Learning analysis complete', { 1084 task_type: taskType, 1085 success_rate: insights.success_rate, 1086 total_outcomes: insights.total_outcomes, 1087 }); 1088 1089 return insights; 1090 } 1091 1092 /** 1093 * Analyze context patterns from outcomes 1094 * 1095 * @param {Array} outcomes - Outcome records 1096 * @returns {Object} - Pattern analysis 1097 * @private 1098 */ 1099 analyzeContextPatterns(outcomes) { 1100 const patterns = {}; 1101 1102 for (const outcome of outcomes) { 1103 if (!outcome.context_json) continue; 1104 1105 try { 1106 const context = JSON.parse(outcome.context_json); 1107 1108 // Group by error_type 1109 if (context.error_type) { 1110 patterns[context.error_type] = patterns[context.error_type] || { 1111 total: 0, 1112 successes: 0, 1113 }; 1114 patterns[context.error_type].total++; 1115 if (outcome.outcome === 'success') { 1116 patterns[context.error_type].successes++; 1117 } 1118 } 1119 } catch (e) { 1120 // Skip malformed JSON 1121 } 1122 } 1123 1124 // Calculate success rates for each pattern 1125 for (const [pattern, data] of Object.entries(patterns)) { 1126 data.success_rate = Math.round((data.successes / data.total) * 100); 1127 } 1128 1129 return patterns; 1130 } 1131 1132 /** 1133 * Extract patterns from successful outcomes 1134 * 1135 * @param {Array} successes - Successful outcome records 1136 * @returns {Array<string>} - Success patterns 1137 * @private 1138 */ 1139 extractSuccessPatterns(successes) { 1140 const patterns = []; 1141 1142 for (const outcome of successes) { 1143 if (!outcome.result_json) continue; 1144 1145 try { 1146 const result = JSON.parse(outcome.result_json); 1147 1148 // Extract file patterns 1149 if (result.files_changed || result.file_path) { 1150 const files = result.files_changed || [result.file_path]; 1151 for (const file of files) { 1152 if (typeof file === 'string') { 1153 patterns.push(`Successfully handled: ${file.split('/').slice(-1)[0]}`); 1154 } 1155 } 1156 } 1157 1158 // Extract approach patterns 1159 if (result.approach || result.action_taken) { 1160 patterns.push(result.approach || result.action_taken); 1161 } 1162 } catch (e) { 1163 // Skip malformed JSON 1164 } 1165 } 1166 1167 // Return unique patterns, limited to top 10 1168 return [...new Set(patterns)].slice(0, 10); 1169 } 1170 1171 /** 1172 * Extract patterns from failed outcomes 1173 * 1174 * @param {Array} failures - Failed outcome records 1175 * @returns {Array<string>} - Failure patterns 1176 * @private 1177 */ 1178 extractFailurePatterns(failures) { 1179 const patterns = []; 1180 1181 for (const outcome of failures) { 1182 if (!outcome.context_json) continue; 1183 1184 try { 1185 const context = JSON.parse(outcome.context_json); 1186 1187 // Extract error patterns 1188 if (context.error) { 1189 // Normalize error messages (remove file paths, line numbers, etc.) 1190 const normalized = context.error 1191 .replace(/\/[^\s]+\.js:\d+:\d+/g, '[file]') 1192 .replace(/\d+/g, 'N') 1193 .substring(0, 100); 1194 patterns.push(normalized); 1195 } 1196 } catch (e) { 1197 // Skip malformed JSON 1198 } 1199 } 1200 1201 // Return unique patterns, limited to top 10 1202 return [...new Set(patterns)].slice(0, 10); 1203 } 1204 1205 /** 1206 * Generate recommendations based on patterns 1207 * 1208 * @param {Array<string>} successPatterns - Success patterns 1209 * @param {Array<string>} failurePatterns - Failure patterns 1210 * @returns {Array<string>} - Recommendations 1211 * @private 1212 */ 1213 generateRecommendations(successPatterns, failurePatterns) { 1214 const recommendations = []; 1215 1216 if (successPatterns.length > 0) { 1217 recommendations.push(`Continue using successful approaches: ${successPatterns[0]}`); 1218 } 1219 1220 if (failurePatterns.length > 0) { 1221 recommendations.push(`Avoid common failure pattern: ${failurePatterns[0]}`); 1222 } 1223 1224 if (successPatterns.length === 0 && failurePatterns.length === 0) { 1225 recommendations.push('Insufficient data for recommendations. Continue monitoring outcomes.'); 1226 } 1227 1228 return recommendations; 1229 } 1230 1231 /** 1232 * Delegate task to correct agent when assigned to wrong agent 1233 * 1234 * Uses centralized task routing configuration to ensure consistency. 1235 * All agents should call this method when they receive a task type they don't handle. 1236 * 1237 * @param {Object} task - Task assigned to wrong agent 1238 * @returns {Promise<void>} 1239 */ 1240 async delegateToCorrectAgent(task) { 1241 const { getAgentForTaskType } = await import('./utils/task-routing.js'); 1242 const correctAgent = getAgentForTaskType(task.task_type); 1243 1244 await this.log('info', 'Task assigned to wrong agent, delegating', { 1245 task_id: task.id, 1246 task_type: task.task_type, 1247 wrong_agent: this.agentName, 1248 correct_agent: correctAgent, 1249 }); 1250 1251 // Create new task assigned to correct agent 1252 const newTaskId = await this.createTask({ 1253 task_type: task.task_type, 1254 assigned_to: correctAgent, 1255 parent_task_id: task.parent_task_id, 1256 priority: task.priority, 1257 context: task.context_json, 1258 }); 1259 1260 // Complete this task with delegation note 1261 await this.completeTask(task.id, { 1262 delegated: true, 1263 new_task_id: newTaskId, 1264 correct_agent: correctAgent, 1265 reason: `Task type '${task.task_type}' belongs to '${correctAgent}' agent`, 1266 }); 1267 } 1268 1269 /** 1270 * Invoke an agent immediately (event-driven invocation) 1271 * 1272 * Spawns a one-shot agent process to handle pending tasks. 1273 * Prevents 5-minute cron delays by invoking agents immediately after handoffs. 1274 * 1275 * @param {string} agentName - Agent to invoke 1276 * @param {number} [maxDepth=10] - Max recursion depth (prevents infinite loops) 1277 * @returns {Promise<void>} 1278 * @private 1279 */ 1280 async invokeAgentImmediately(agentName, maxDepth = 10) { 1281 // Check recursion depth to prevent infinite loops 1282 const currentDepth = parseInt(process.env.AGENT_INVOCATION_DEPTH || '0', 10); 1283 const maxChainDepth = parseInt(process.env.AGENT_MAX_CHAIN_DEPTH || '10', 10); 1284 1285 if (currentDepth >= Math.min(maxDepth, maxChainDepth)) { 1286 await this.log('warn', 'Max invocation chain depth reached, falling back to cron', { 1287 current_depth: currentDepth, 1288 max_depth: maxChainDepth, 1289 target_agent: agentName, 1290 }); 1291 return; 1292 } 1293 1294 await this.log('info', 'Invoking agent immediately', { 1295 target_agent: agentName, 1296 invocation_depth: currentDepth + 1, 1297 }); 1298 1299 // Get agent class (async import) 1300 const AgentClass = await this.getAgentClass(agentName); 1301 if (!AgentClass) { 1302 throw new Error(`Unknown agent: ${agentName}`); 1303 } 1304 1305 // Create agent instance 1306 const agent = new AgentClass(); 1307 1308 // Increment depth for nested invocations 1309 const oldDepth = process.env.AGENT_INVOCATION_DEPTH; 1310 process.env.AGENT_INVOCATION_DEPTH = String(currentDepth + 1); 1311 1312 try { 1313 // Process one task (not all tasks - avoid runaway loops) 1314 await agent.pollTasks(1); 1315 } finally { 1316 // Restore original depth 1317 if (oldDepth === undefined) { 1318 delete process.env.AGENT_INVOCATION_DEPTH; 1319 } else { 1320 process.env.AGENT_INVOCATION_DEPTH = oldDepth; 1321 } 1322 } 1323 } 1324 1325 /** 1326 * Get agent class by name (for immediate invocation) 1327 * 1328 * @param {string} agentName - Agent name 1329 * @returns {typeof BaseAgent|null} - Agent class or null if not found 1330 * @private 1331 */ 1332 getAgentClass(agentName) { 1333 // Lazy import to avoid circular dependencies 1334 const agentMap = { 1335 developer: () => import('./developer.js').then(m => m.DeveloperAgent), 1336 qa: () => import('./qa.js').then(m => m.QAAgent), 1337 security: () => import('./security.js').then(m => m.SecurityAgent), 1338 architect: () => import('./architect.js').then(m => m.ArchitectAgent), 1339 triage: () => import('./triage.js').then(m => m.TriageAgent), 1340 monitor: () => import('./monitor.js').then(m => m.MonitorAgent), 1341 }; 1342 1343 const loader = agentMap[agentName]; 1344 if (!loader) { 1345 logger.error(`Unknown agent: ${agentName}`); 1346 return null; 1347 } 1348 1349 // Return promise that resolves to agent class 1350 return loader(); 1351 } 1352 1353 // ========== DIRECT TOOL ACCESS ========== 1354 // Agents can use these tools directly without LLM API calls 1355 1356 /** 1357 * Read a file (direct tool access) 1358 * 1359 * @param {string} filePath - Path to file 1360 * @returns {Promise<string>} File content 1361 * 1362 * @example 1363 * const content = await this.readFileTool('src/utils/logger.js'); 1364 */ 1365 async readFileTool(filePath) { 1366 return await agentTools.readFile(filePath); 1367 } 1368 1369 /** 1370 * Write a file (direct tool access) 1371 * 1372 * @param {string} filePath - Path to file 1373 * @param {string} content - Content to write 1374 * @returns {Promise<void>} 1375 * 1376 * @example 1377 * await this.writeFileTool('src/test.js', 'const x = 1;'); 1378 */ 1379 async writeFileTool(filePath, content) { 1380 return await agentTools.writeFile(filePath, content); 1381 } 1382 1383 /** 1384 * Search for files matching a pattern (direct tool access) 1385 * 1386 * @param {string} pattern - Search pattern (regex) 1387 * @param {string} directory - Directory to search 1388 * @param {Object} options - Search options 1389 * @returns {Promise<string>} Search results 1390 * 1391 * @example 1392 * const files = await this.searchFilesTool('TODO', 'src/', { filesOnly: true }); 1393 */ 1394 async searchFilesTool(pattern, directory = '.', options = {}) { 1395 return await agentTools.searchFiles(pattern, directory, options); 1396 } 1397 1398 /** 1399 * Search for content in files (direct tool access) 1400 * 1401 * @param {string} pattern - Search pattern (regex) 1402 * @param {string} directory - Directory to search 1403 * @param {Object} options - Search options (contextBefore, contextAfter, glob) 1404 * @returns {Promise<string>} Search results with context 1405 * 1406 * @example 1407 * const results = await this.searchContentTool('function.*export', 'src/', { contextBefore: 2 }); 1408 */ 1409 async searchContentTool(pattern, directory = '.', options = {}) { 1410 return await agentTools.searchContent(pattern, directory, options); 1411 } 1412 1413 /** 1414 * Find files matching a glob pattern (direct tool access) 1415 * 1416 * @param {string} pattern - Glob pattern 1417 * @param {string} directory - Directory to search 1418 * @returns {Promise<string[]>} Array of matching file paths 1419 * 1420 * @example 1421 * const testFiles = await this.globFilesTool('**\/*.test.js', 'tests/'); 1422 */ 1423 async globFilesTool(pattern, directory = '.') { 1424 return await agentTools.globFiles(pattern, directory); 1425 } 1426 1427 /** 1428 * Run a shell command (direct tool access) 1429 * 1430 * @param {string} cmd - Command to run 1431 * @param {Object} options - Execution options (cwd, timeout, maxBuffer) 1432 * @returns {Promise<{stdout: string, stderr: string, exitCode: number}>} Command output 1433 * 1434 * @example 1435 * const result = await this.runCommandTool('npm test'); 1436 * const gitStatus = await this.runCommandTool('git status --short'); 1437 */ 1438 async runCommandTool(cmd, options = {}) { 1439 return await agentTools.runCommand(cmd, options); 1440 } 1441 1442 /** 1443 * Execute multiple operations in parallel (direct tool access) 1444 * 1445 * @param {Array<Function>} operations - Array of async functions 1446 * @returns {Promise<Array>} Results from all operations 1447 * 1448 * @example 1449 * const [file1, file2] = await this.executeInParallelTool([ 1450 * () => this.readFileTool('src/example1.js'), 1451 * () => this.readFileTool('src/example2.js'), 1452 * ]); 1453 */ 1454 async executeInParallelTool(operations) { 1455 return await agentTools.executeInParallel(operations); 1456 } 1457 1458 /** 1459 * Check if a file exists (direct tool access) 1460 * 1461 * @param {string} filePath - Path to file 1462 * @returns {Promise<boolean>} True if file exists 1463 * 1464 * @example 1465 * if (await this.fileExistsTool('src/test.js')) { 1466 * const content = await this.readFileTool('src/test.js'); 1467 * } 1468 */ 1469 async fileExistsTool(filePath) { 1470 return await agentTools.fileExists(filePath); 1471 } 1472 1473 /** 1474 * List files in a directory (direct tool access) 1475 * 1476 * @param {string} directory - Directory path 1477 * @param {Object} options - List options (recursive, filter) 1478 * @returns {Promise<string[]>} Array of file paths 1479 * 1480 * @example 1481 * const files = await this.listFilesTool('src/', { filter: '*.js' }); 1482 */ 1483 async listFilesTool(directory, options = {}) { 1484 return await agentTools.listFiles(directory, options); 1485 } 1486 }