concurrent-locking.test.js
1 /** 2 * Concurrent Locking Tests 3 * 4 * Tests for row-level task locking to enable horizontal scaling. 5 * Verifies that multiple agent instances can run concurrently without conflicts. 6 */ 7 8 import { test, beforeEach, afterEach } from 'node:test'; 9 import assert from 'node:assert'; 10 import { fileURLToPath } from 'url'; 11 import { dirname, join } from 'path'; 12 import { unlinkSync, existsSync } from 'fs'; 13 import Database from 'better-sqlite3'; 14 import { BaseAgent, resetDb as resetBaseAgentDb } from '../../src/agents/base-agent.js'; 15 import { 16 createAgentTask, 17 getTaskById, 18 resetDbConnection, 19 } from '../../src/agents/utils/task-manager.js'; 20 21 const __filename = fileURLToPath(import.meta.url); 22 const __dirname = dirname(__filename); 23 24 // Test database path - use timestamp to ensure uniqueness 25 const TEST_DB_PATH = join(__dirname, `../test-concurrent-locking-${Date.now()}.db`); 26 27 // Test agent implementation 28 class TestAgent extends BaseAgent { 29 constructor(agentName, contextFiles = ['base.md']) { 30 super(agentName, contextFiles); 31 this.processedTasks = []; 32 } 33 34 async processTask(task) { 35 // Simulate some work 36 await new Promise(resolve => setTimeout(resolve, 10)); 37 this.processedTasks.push(task.id); 38 await this.completeTask(task.id, { test: true }); 39 } 40 } 41 42 /** 43 * Setup test database 44 */ 45 function setupTestDb() { 46 const db = new Database(TEST_DB_PATH); 47 db.pragma('foreign_keys = ON'); 48 49 // Create necessary tables 50 db.exec(` 51 DROP TABLE IF EXISTS agent_outcomes; 52 DROP TABLE IF EXISTS agent_logs; 53 DROP TABLE IF EXISTS agent_tasks; 54 DROP TABLE IF EXISTS agent_state; 55 56 CREATE TABLE agent_tasks ( 57 id INTEGER PRIMARY KEY AUTOINCREMENT, 58 task_type TEXT NOT NULL, 59 assigned_to TEXT NOT NULL, 60 created_by TEXT DEFAULT 'system', 61 parent_task_id INTEGER REFERENCES agent_tasks(id) ON DELETE CASCADE, 62 priority INTEGER DEFAULT 5 CHECK(priority BETWEEN 1 AND 10), 63 status TEXT DEFAULT 'pending' CHECK(status IN ( 64 'pending', 65 'running', 66 'completed', 67 'failed', 68 'blocked', 69 'awaiting_po_approval', 70 'awaiting_architect_approval' 71 )), 72 context_json TEXT, 73 result_json TEXT, 74 error_message TEXT, 75 retry_count INTEGER DEFAULT 0, 76 reviewed_by TEXT, 77 approval_json TEXT, 78 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 79 started_at TIMESTAMP, 80 completed_at TIMESTAMP 81 ); 82 83 CREATE TABLE agent_state ( 84 agent_name TEXT PRIMARY KEY, 85 last_active DATETIME DEFAULT CURRENT_TIMESTAMP, 86 current_task_id INTEGER REFERENCES agent_tasks(id), 87 status TEXT DEFAULT 'idle' CHECK(status IN ('idle', 'working', 'blocked')), 88 metrics_json TEXT 89 ); 90 91 CREATE TABLE agent_logs ( 92 id INTEGER PRIMARY KEY AUTOINCREMENT, 93 task_id INTEGER REFERENCES agent_tasks(id) ON DELETE CASCADE, 94 agent_name TEXT NOT NULL, 95 log_level TEXT NOT NULL CHECK(log_level IN ('info', 'warn', 'error', 'debug')), 96 message TEXT NOT NULL, 97 data_json TEXT, 98 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 99 ); 100 101 CREATE TABLE agent_outcomes ( 102 id INTEGER PRIMARY KEY AUTOINCREMENT, 103 task_id INTEGER NOT NULL REFERENCES agent_tasks(id) ON DELETE CASCADE, 104 agent_name TEXT NOT NULL, 105 task_type TEXT NOT NULL, 106 outcome TEXT NOT NULL CHECK(outcome IN ('success', 'failure')), 107 context_json TEXT, 108 result_json TEXT, 109 duration_ms INTEGER, 110 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 111 ); 112 113 -- Initialize agent state 114 INSERT INTO agent_state (agent_name, status) VALUES ('developer', 'idle'); 115 INSERT INTO agent_state (agent_name, status) VALUES ('qa', 'idle'); 116 `); 117 118 db.close(); 119 } 120 121 /** 122 * Cleanup test database 123 */ 124 function cleanupTestDb() { 125 try { 126 // Reset all database connections 127 resetDbConnection(); 128 resetBaseAgentDb(); 129 130 // Small delay to ensure connections are released 131 const start = Date.now(); 132 while (Date.now() - start < 100) { 133 // Busy wait for 100ms 134 } 135 136 // Delete test database file 137 if (existsSync(TEST_DB_PATH)) { 138 unlinkSync(TEST_DB_PATH); 139 } 140 } catch (error) { 141 // Ignore cleanup errors (file might not exist) 142 } 143 } 144 145 // Setup hooks 146 beforeEach(() => { 147 cleanupTestDb(); 148 setupTestDb(); 149 process.env.DATABASE_PATH = TEST_DB_PATH; 150 process.env.AGENT_REALTIME_NOTIFICATIONS = 'false'; // Disable spawning 151 resetDbConnection(); 152 resetBaseAgentDb(); 153 }); 154 155 afterEach(() => { 156 resetDbConnection(); 157 resetBaseAgentDb(); 158 cleanupTestDb(); 159 delete process.env.DATABASE_PATH; 160 delete process.env.AGENT_REALTIME_NOTIFICATIONS; 161 delete process.env.AGENT_ENABLE_ROW_LOCKING; 162 delete process.env.AGENT_ALLOW_CONCURRENT_INSTANCES; 163 }); 164 165 test('Row-level locking prevents duplicate task processing', async () => { 166 process.env.AGENT_ENABLE_ROW_LOCKING = 'true'; 167 process.env.AGENT_ALLOW_CONCURRENT_INSTANCES = 'true'; 168 169 // Create 10 tasks 170 const taskIds = []; 171 for (let i = 0; i < 10; i++) { 172 const taskId = await createAgentTask({ 173 task_type: 'test_task', 174 assigned_to: 'developer', 175 created_by: 'test', 176 priority: 5, 177 context: { test_id: i }, 178 }); 179 taskIds.push(taskId); 180 } 181 182 // Create 3 agent instances 183 const agent1 = new TestAgent('developer'); 184 const agent2 = new TestAgent('developer'); 185 const agent3 = new TestAgent('developer'); 186 187 await agent1.initialize(); 188 await agent2.initialize(); 189 await agent3.initialize(); 190 191 // Run all agents concurrently 192 const results = await Promise.all([ 193 agent1.pollTasks(10), 194 agent2.pollTasks(10), 195 agent3.pollTasks(10), 196 ]); 197 198 const totalProcessed = results.reduce((sum, count) => sum + count, 0); 199 200 // Verify results 201 assert.strictEqual(totalProcessed, 10, 'All 10 tasks should be processed'); 202 203 // Verify no duplicate processing 204 const allProcessed = [ 205 ...agent1.processedTasks, 206 ...agent2.processedTasks, 207 ...agent3.processedTasks, 208 ]; 209 210 assert.strictEqual(allProcessed.length, 10, 'Exactly 10 tasks should be processed'); 211 212 const uniqueProcessed = [...new Set(allProcessed)]; 213 assert.strictEqual(uniqueProcessed.length, 10, 'No task should be processed more than once'); 214 215 // Verify all tasks completed 216 for (const taskId of taskIds) { 217 const task = getTaskById(taskId); 218 assert.strictEqual(task.status, 'completed', `Task ${taskId} should be completed`); 219 } 220 }); 221 222 test('Agent-level locking blocks concurrent instances when disabled', async () => { 223 process.env.AGENT_ENABLE_ROW_LOCKING = 'true'; 224 process.env.AGENT_ALLOW_CONCURRENT_INSTANCES = 'false'; // Agent-level locking enabled 225 226 // Create 10 tasks 227 for (let i = 0; i < 10; i++) { 228 await createAgentTask({ 229 task_type: 'test_task', 230 assigned_to: 'developer', 231 created_by: 'test', 232 priority: 5, 233 context: { test_id: i }, 234 }); 235 } 236 237 // Create 3 agent instances 238 const agent1 = new TestAgent('developer'); 239 const agent2 = new TestAgent('developer'); 240 const agent3 = new TestAgent('developer'); 241 242 await agent1.initialize(); 243 await agent2.initialize(); 244 await agent3.initialize(); 245 246 // Run all agents concurrently 247 const results = await Promise.all([ 248 agent1.pollTasks(10), 249 agent2.pollTasks(10), 250 agent3.pollTasks(10), 251 ]); 252 253 const totalProcessed = results.reduce((sum, count) => sum + count, 0); 254 255 // Only one agent should acquire the lock and process tasks 256 assert.strictEqual(totalProcessed, 10, 'Only one agent should process all tasks'); 257 258 // Count how many agents processed tasks 259 const agentsWithWork = [agent1, agent2, agent3].filter( 260 agent => agent.processedTasks.length > 0 261 ).length; 262 263 assert.strictEqual( 264 agentsWithWork, 265 1, 266 'Only one agent should have processed tasks (agent-level lock)' 267 ); 268 }); 269 270 test('Row-level locking handles race conditions correctly', async () => { 271 process.env.AGENT_ENABLE_ROW_LOCKING = 'true'; 272 process.env.AGENT_ALLOW_CONCURRENT_INSTANCES = 'true'; 273 274 // Create 100 tasks to increase race condition likelihood 275 const taskIds = []; 276 for (let i = 0; i < 100; i++) { 277 const taskId = await createAgentTask({ 278 task_type: 'test_task', 279 assigned_to: 'developer', 280 created_by: 'test', 281 priority: 5, 282 context: { test_id: i }, 283 }); 284 taskIds.push(taskId); 285 } 286 287 // Create 5 concurrent agent instances 288 const agents = []; 289 for (let i = 0; i < 5; i++) { 290 const agent = new TestAgent('developer'); 291 await agent.initialize(); 292 agents.push(agent); 293 } 294 295 // Run all agents concurrently 296 const results = await Promise.all(agents.map(agent => agent.pollTasks(100))); 297 298 const totalProcessed = results.reduce((sum, count) => sum + count, 0); 299 300 // Verify results 301 assert.strictEqual(totalProcessed, 100, 'All 100 tasks should be processed'); 302 303 // Verify no duplicate processing 304 const allProcessed = agents.flatMap(agent => agent.processedTasks); 305 306 assert.strictEqual(allProcessed.length, 100, 'Exactly 100 tasks should be processed'); 307 308 const uniqueProcessed = [...new Set(allProcessed)]; 309 assert.strictEqual(uniqueProcessed.length, 100, 'No task should be processed more than once'); 310 311 // Verify all tasks completed 312 for (const taskId of taskIds) { 313 const task = getTaskById(taskId); 314 assert.strictEqual(task.status, 'completed', `Task ${taskId} should be completed`); 315 } 316 317 // Verify work was distributed among agents 318 const agentsWithWork = agents.filter(agent => agent.processedTasks.length > 0).length; 319 320 assert.ok( 321 agentsWithWork >= 2, 322 `Work should be distributed (${agentsWithWork} agents processed tasks)` 323 ); 324 }); 325 326 test('acquireNextTask returns null when no tasks available', async () => { 327 process.env.AGENT_ENABLE_ROW_LOCKING = 'true'; 328 329 const agent = new TestAgent('developer'); 330 await agent.initialize(); 331 332 // No tasks created 333 const task = agent.acquireNextTask(); 334 335 assert.strictEqual(task, null, 'Should return null when no tasks available'); 336 }); 337 338 test('acquireNextTask respects priority ordering', async () => { 339 process.env.AGENT_ENABLE_ROW_LOCKING = 'true'; 340 341 // Create tasks with different priorities 342 const lowPriorityId = await createAgentTask({ 343 task_type: 'test_task', 344 assigned_to: 'developer', 345 priority: 3, 346 context: { priority: 'low' }, 347 }); 348 349 const highPriorityId = await createAgentTask({ 350 task_type: 'test_task', 351 assigned_to: 'developer', 352 priority: 9, 353 context: { priority: 'high' }, 354 }); 355 356 const mediumPriorityId = await createAgentTask({ 357 task_type: 'test_task', 358 assigned_to: 'developer', 359 priority: 5, 360 context: { priority: 'medium' }, 361 }); 362 363 const agent = new TestAgent('developer'); 364 await agent.initialize(); 365 366 // Acquire tasks one by one 367 const task1 = agent.acquireNextTask(); 368 const task2 = agent.acquireNextTask(); 369 const task3 = agent.acquireNextTask(); 370 371 // Verify priority ordering (high -> medium -> low) 372 assert.strictEqual(task1.id, highPriorityId, 'First task should be high priority'); 373 assert.strictEqual(task2.id, mediumPriorityId, 'Second task should be medium priority'); 374 assert.strictEqual(task3.id, lowPriorityId, 'Third task should be low priority'); 375 }); 376 377 test('Backwards compatibility - row locking disabled', async () => { 378 process.env.AGENT_ENABLE_ROW_LOCKING = 'false'; // Disabled 379 process.env.AGENT_ALLOW_CONCURRENT_INSTANCES = 'false'; 380 381 // Create 5 tasks 382 const taskIds = []; 383 for (let i = 0; i < 5; i++) { 384 const taskId = await createAgentTask({ 385 task_type: 'test_task', 386 assigned_to: 'developer', 387 context: { test_id: i }, 388 }); 389 taskIds.push(taskId); 390 } 391 392 const agent = new TestAgent('developer'); 393 await agent.initialize(); 394 395 // Process tasks 396 const processed = await agent.pollTasks(5); 397 398 // Verify results 399 assert.strictEqual(processed, 5, 'All tasks should be processed'); 400 401 // Verify all tasks completed 402 for (const taskId of taskIds) { 403 const task = getTaskById(taskId); 404 assert.strictEqual(task.status, 'completed', `Task ${taskId} should be completed`); 405 } 406 }); 407 408 test('Different agents do not interfere with each other', async () => { 409 process.env.AGENT_ENABLE_ROW_LOCKING = 'true'; 410 process.env.AGENT_ALLOW_CONCURRENT_INSTANCES = 'true'; 411 412 // Create tasks for different agents 413 const devTaskIds = []; 414 const qaTaskIds = []; 415 416 for (let i = 0; i < 5; i++) { 417 devTaskIds.push( 418 await createAgentTask({ 419 task_type: 'test_task', 420 assigned_to: 'developer', 421 context: { agent: 'dev', id: i }, 422 }) 423 ); 424 425 qaTaskIds.push( 426 await createAgentTask({ 427 task_type: 'test_task', 428 assigned_to: 'qa', 429 context: { agent: 'qa', id: i }, 430 }) 431 ); 432 } 433 434 const devAgent = new TestAgent('developer'); 435 const qaAgent = new TestAgent('qa'); 436 437 await devAgent.initialize(); 438 await qaAgent.initialize(); 439 440 // Run both agents concurrently 441 const [devProcessed, qaProcessed] = await Promise.all([ 442 devAgent.pollTasks(10), 443 qaAgent.pollTasks(10), 444 ]); 445 446 // Verify results 447 assert.strictEqual(devProcessed, 5, 'Developer should process 5 tasks'); 448 assert.strictEqual(qaProcessed, 5, 'QA should process 5 tasks'); 449 450 // Verify correct tasks were processed 451 for (const taskId of devTaskIds) { 452 const task = getTaskById(taskId); 453 assert.strictEqual(task.status, 'completed', `Dev task ${taskId} should be completed`); 454 } 455 456 for (const taskId of qaTaskIds) { 457 const task = getTaskById(taskId); 458 assert.strictEqual(task.status, 'completed', `QA task ${taskId} should be completed`); 459 } 460 });