/ __quarantined_tests__ / agents / concurrent-locking.test.js
concurrent-locking.test.js
  1  /**
  2   * Concurrent Locking Tests
  3   *
  4   * Tests for row-level task locking to enable horizontal scaling.
  5   * Verifies that multiple agent instances can run concurrently without conflicts.
  6   */
  7  
  8  import { test, beforeEach, afterEach } from 'node:test';
  9  import assert from 'node:assert';
 10  import { fileURLToPath } from 'url';
 11  import { dirname, join } from 'path';
 12  import { unlinkSync, existsSync } from 'fs';
 13  import Database from 'better-sqlite3';
 14  import { BaseAgent, resetDb as resetBaseAgentDb } from '../../src/agents/base-agent.js';
 15  import {
 16    createAgentTask,
 17    getTaskById,
 18    resetDbConnection,
 19  } from '../../src/agents/utils/task-manager.js';
 20  
 21  const __filename = fileURLToPath(import.meta.url);
 22  const __dirname = dirname(__filename);
 23  
 24  // Test database path - use timestamp to ensure uniqueness
 25  const TEST_DB_PATH = join(__dirname, `../test-concurrent-locking-${Date.now()}.db`);
 26  
 27  // Test agent implementation
 28  class TestAgent extends BaseAgent {
 29    constructor(agentName, contextFiles = ['base.md']) {
 30      super(agentName, contextFiles);
 31      this.processedTasks = [];
 32    }
 33  
 34    async processTask(task) {
 35      // Simulate some work
 36      await new Promise(resolve => setTimeout(resolve, 10));
 37      this.processedTasks.push(task.id);
 38      await this.completeTask(task.id, { test: true });
 39    }
 40  }
 41  
 42  /**
 43   * Setup test database
 44   */
 45  function setupTestDb() {
 46    const db = new Database(TEST_DB_PATH);
 47    db.pragma('foreign_keys = ON');
 48  
 49    // Create necessary tables
 50    db.exec(`
 51      DROP TABLE IF EXISTS agent_outcomes;
 52      DROP TABLE IF EXISTS agent_logs;
 53      DROP TABLE IF EXISTS agent_tasks;
 54      DROP TABLE IF EXISTS agent_state;
 55  
 56      CREATE TABLE agent_tasks (
 57        id INTEGER PRIMARY KEY AUTOINCREMENT,
 58        task_type TEXT NOT NULL,
 59        assigned_to TEXT NOT NULL,
 60        created_by TEXT DEFAULT 'system',
 61        parent_task_id INTEGER REFERENCES agent_tasks(id) ON DELETE CASCADE,
 62        priority INTEGER DEFAULT 5 CHECK(priority BETWEEN 1 AND 10),
 63        status TEXT DEFAULT 'pending' CHECK(status IN (
 64          'pending',
 65          'running',
 66          'completed',
 67          'failed',
 68          'blocked',
 69          'awaiting_po_approval',
 70          'awaiting_architect_approval'
 71        )),
 72        context_json TEXT,
 73        result_json TEXT,
 74        error_message TEXT,
 75        retry_count INTEGER DEFAULT 0,
 76        reviewed_by TEXT,
 77        approval_json TEXT,
 78        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
 79        started_at TIMESTAMP,
 80        completed_at TIMESTAMP
 81      );
 82  
 83      CREATE TABLE agent_state (
 84        agent_name TEXT PRIMARY KEY,
 85        last_active DATETIME DEFAULT CURRENT_TIMESTAMP,
 86        current_task_id INTEGER REFERENCES agent_tasks(id),
 87        status TEXT DEFAULT 'idle' CHECK(status IN ('idle', 'working', 'blocked')),
 88        metrics_json TEXT
 89      );
 90  
 91      CREATE TABLE agent_logs (
 92        id INTEGER PRIMARY KEY AUTOINCREMENT,
 93        task_id INTEGER REFERENCES agent_tasks(id) ON DELETE CASCADE,
 94        agent_name TEXT NOT NULL,
 95        log_level TEXT NOT NULL CHECK(log_level IN ('info', 'warn', 'error', 'debug')),
 96        message TEXT NOT NULL,
 97        data_json TEXT,
 98        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
 99      );
100  
101      CREATE TABLE agent_outcomes (
102        id INTEGER PRIMARY KEY AUTOINCREMENT,
103        task_id INTEGER NOT NULL REFERENCES agent_tasks(id) ON DELETE CASCADE,
104        agent_name TEXT NOT NULL,
105        task_type TEXT NOT NULL,
106        outcome TEXT NOT NULL CHECK(outcome IN ('success', 'failure')),
107        context_json TEXT,
108        result_json TEXT,
109        duration_ms INTEGER,
110        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
111      );
112  
113      -- Initialize agent state
114      INSERT INTO agent_state (agent_name, status) VALUES ('developer', 'idle');
115      INSERT INTO agent_state (agent_name, status) VALUES ('qa', 'idle');
116    `);
117  
118    db.close();
119  }
120  
121  /**
122   * Cleanup test database
123   */
124  function cleanupTestDb() {
125    try {
126      // Reset all database connections
127      resetDbConnection();
128      resetBaseAgentDb();
129  
130      // Small delay to ensure connections are released
131      const start = Date.now();
132      while (Date.now() - start < 100) {
133        // Busy wait for 100ms
134      }
135  
136      // Delete test database file
137      if (existsSync(TEST_DB_PATH)) {
138        unlinkSync(TEST_DB_PATH);
139      }
140    } catch (error) {
141      // Ignore cleanup errors (file might not exist)
142    }
143  }
144  
145  // Setup hooks
146  beforeEach(() => {
147    cleanupTestDb();
148    setupTestDb();
149    process.env.DATABASE_PATH = TEST_DB_PATH;
150    process.env.AGENT_REALTIME_NOTIFICATIONS = 'false'; // Disable spawning
151    resetDbConnection();
152    resetBaseAgentDb();
153  });
154  
155  afterEach(() => {
156    resetDbConnection();
157    resetBaseAgentDb();
158    cleanupTestDb();
159    delete process.env.DATABASE_PATH;
160    delete process.env.AGENT_REALTIME_NOTIFICATIONS;
161    delete process.env.AGENT_ENABLE_ROW_LOCKING;
162    delete process.env.AGENT_ALLOW_CONCURRENT_INSTANCES;
163  });
164  
165  test('Row-level locking prevents duplicate task processing', async () => {
166    process.env.AGENT_ENABLE_ROW_LOCKING = 'true';
167    process.env.AGENT_ALLOW_CONCURRENT_INSTANCES = 'true';
168  
169    // Create 10 tasks
170    const taskIds = [];
171    for (let i = 0; i < 10; i++) {
172      const taskId = await createAgentTask({
173        task_type: 'test_task',
174        assigned_to: 'developer',
175        created_by: 'test',
176        priority: 5,
177        context: { test_id: i },
178      });
179      taskIds.push(taskId);
180    }
181  
182    // Create 3 agent instances
183    const agent1 = new TestAgent('developer');
184    const agent2 = new TestAgent('developer');
185    const agent3 = new TestAgent('developer');
186  
187    await agent1.initialize();
188    await agent2.initialize();
189    await agent3.initialize();
190  
191    // Run all agents concurrently
192    const results = await Promise.all([
193      agent1.pollTasks(10),
194      agent2.pollTasks(10),
195      agent3.pollTasks(10),
196    ]);
197  
198    const totalProcessed = results.reduce((sum, count) => sum + count, 0);
199  
200    // Verify results
201    assert.strictEqual(totalProcessed, 10, 'All 10 tasks should be processed');
202  
203    // Verify no duplicate processing
204    const allProcessed = [
205      ...agent1.processedTasks,
206      ...agent2.processedTasks,
207      ...agent3.processedTasks,
208    ];
209  
210    assert.strictEqual(allProcessed.length, 10, 'Exactly 10 tasks should be processed');
211  
212    const uniqueProcessed = [...new Set(allProcessed)];
213    assert.strictEqual(uniqueProcessed.length, 10, 'No task should be processed more than once');
214  
215    // Verify all tasks completed
216    for (const taskId of taskIds) {
217      const task = getTaskById(taskId);
218      assert.strictEqual(task.status, 'completed', `Task ${taskId} should be completed`);
219    }
220  });
221  
222  test('Agent-level locking blocks concurrent instances when disabled', async () => {
223    process.env.AGENT_ENABLE_ROW_LOCKING = 'true';
224    process.env.AGENT_ALLOW_CONCURRENT_INSTANCES = 'false'; // Agent-level locking enabled
225  
226    // Create 10 tasks
227    for (let i = 0; i < 10; i++) {
228      await createAgentTask({
229        task_type: 'test_task',
230        assigned_to: 'developer',
231        created_by: 'test',
232        priority: 5,
233        context: { test_id: i },
234      });
235    }
236  
237    // Create 3 agent instances
238    const agent1 = new TestAgent('developer');
239    const agent2 = new TestAgent('developer');
240    const agent3 = new TestAgent('developer');
241  
242    await agent1.initialize();
243    await agent2.initialize();
244    await agent3.initialize();
245  
246    // Run all agents concurrently
247    const results = await Promise.all([
248      agent1.pollTasks(10),
249      agent2.pollTasks(10),
250      agent3.pollTasks(10),
251    ]);
252  
253    const totalProcessed = results.reduce((sum, count) => sum + count, 0);
254  
255    // Only one agent should acquire the lock and process tasks
256    assert.strictEqual(totalProcessed, 10, 'Only one agent should process all tasks');
257  
258    // Count how many agents processed tasks
259    const agentsWithWork = [agent1, agent2, agent3].filter(
260      agent => agent.processedTasks.length > 0
261    ).length;
262  
263    assert.strictEqual(
264      agentsWithWork,
265      1,
266      'Only one agent should have processed tasks (agent-level lock)'
267    );
268  });
269  
270  test('Row-level locking handles race conditions correctly', async () => {
271    process.env.AGENT_ENABLE_ROW_LOCKING = 'true';
272    process.env.AGENT_ALLOW_CONCURRENT_INSTANCES = 'true';
273  
274    // Create 100 tasks to increase race condition likelihood
275    const taskIds = [];
276    for (let i = 0; i < 100; i++) {
277      const taskId = await createAgentTask({
278        task_type: 'test_task',
279        assigned_to: 'developer',
280        created_by: 'test',
281        priority: 5,
282        context: { test_id: i },
283      });
284      taskIds.push(taskId);
285    }
286  
287    // Create 5 concurrent agent instances
288    const agents = [];
289    for (let i = 0; i < 5; i++) {
290      const agent = new TestAgent('developer');
291      await agent.initialize();
292      agents.push(agent);
293    }
294  
295    // Run all agents concurrently
296    const results = await Promise.all(agents.map(agent => agent.pollTasks(100)));
297  
298    const totalProcessed = results.reduce((sum, count) => sum + count, 0);
299  
300    // Verify results
301    assert.strictEqual(totalProcessed, 100, 'All 100 tasks should be processed');
302  
303    // Verify no duplicate processing
304    const allProcessed = agents.flatMap(agent => agent.processedTasks);
305  
306    assert.strictEqual(allProcessed.length, 100, 'Exactly 100 tasks should be processed');
307  
308    const uniqueProcessed = [...new Set(allProcessed)];
309    assert.strictEqual(uniqueProcessed.length, 100, 'No task should be processed more than once');
310  
311    // Verify all tasks completed
312    for (const taskId of taskIds) {
313      const task = getTaskById(taskId);
314      assert.strictEqual(task.status, 'completed', `Task ${taskId} should be completed`);
315    }
316  
317    // Verify work was distributed among agents
318    const agentsWithWork = agents.filter(agent => agent.processedTasks.length > 0).length;
319  
320    assert.ok(
321      agentsWithWork >= 2,
322      `Work should be distributed (${agentsWithWork} agents processed tasks)`
323    );
324  });
325  
326  test('acquireNextTask returns null when no tasks available', async () => {
327    process.env.AGENT_ENABLE_ROW_LOCKING = 'true';
328  
329    const agent = new TestAgent('developer');
330    await agent.initialize();
331  
332    // No tasks created
333    const task = agent.acquireNextTask();
334  
335    assert.strictEqual(task, null, 'Should return null when no tasks available');
336  });
337  
338  test('acquireNextTask respects priority ordering', async () => {
339    process.env.AGENT_ENABLE_ROW_LOCKING = 'true';
340  
341    // Create tasks with different priorities
342    const lowPriorityId = await createAgentTask({
343      task_type: 'test_task',
344      assigned_to: 'developer',
345      priority: 3,
346      context: { priority: 'low' },
347    });
348  
349    const highPriorityId = await createAgentTask({
350      task_type: 'test_task',
351      assigned_to: 'developer',
352      priority: 9,
353      context: { priority: 'high' },
354    });
355  
356    const mediumPriorityId = await createAgentTask({
357      task_type: 'test_task',
358      assigned_to: 'developer',
359      priority: 5,
360      context: { priority: 'medium' },
361    });
362  
363    const agent = new TestAgent('developer');
364    await agent.initialize();
365  
366    // Acquire tasks one by one
367    const task1 = agent.acquireNextTask();
368    const task2 = agent.acquireNextTask();
369    const task3 = agent.acquireNextTask();
370  
371    // Verify priority ordering (high -> medium -> low)
372    assert.strictEqual(task1.id, highPriorityId, 'First task should be high priority');
373    assert.strictEqual(task2.id, mediumPriorityId, 'Second task should be medium priority');
374    assert.strictEqual(task3.id, lowPriorityId, 'Third task should be low priority');
375  });
376  
377  test('Backwards compatibility - row locking disabled', async () => {
378    process.env.AGENT_ENABLE_ROW_LOCKING = 'false'; // Disabled
379    process.env.AGENT_ALLOW_CONCURRENT_INSTANCES = 'false';
380  
381    // Create 5 tasks
382    const taskIds = [];
383    for (let i = 0; i < 5; i++) {
384      const taskId = await createAgentTask({
385        task_type: 'test_task',
386        assigned_to: 'developer',
387        context: { test_id: i },
388      });
389      taskIds.push(taskId);
390    }
391  
392    const agent = new TestAgent('developer');
393    await agent.initialize();
394  
395    // Process tasks
396    const processed = await agent.pollTasks(5);
397  
398    // Verify results
399    assert.strictEqual(processed, 5, 'All tasks should be processed');
400  
401    // Verify all tasks completed
402    for (const taskId of taskIds) {
403      const task = getTaskById(taskId);
404      assert.strictEqual(task.status, 'completed', `Task ${taskId} should be completed`);
405    }
406  });
407  
408  test('Different agents do not interfere with each other', async () => {
409    process.env.AGENT_ENABLE_ROW_LOCKING = 'true';
410    process.env.AGENT_ALLOW_CONCURRENT_INSTANCES = 'true';
411  
412    // Create tasks for different agents
413    const devTaskIds = [];
414    const qaTaskIds = [];
415  
416    for (let i = 0; i < 5; i++) {
417      devTaskIds.push(
418        await createAgentTask({
419          task_type: 'test_task',
420          assigned_to: 'developer',
421          context: { agent: 'dev', id: i },
422        })
423      );
424  
425      qaTaskIds.push(
426        await createAgentTask({
427          task_type: 'test_task',
428          assigned_to: 'qa',
429          context: { agent: 'qa', id: i },
430        })
431      );
432    }
433  
434    const devAgent = new TestAgent('developer');
435    const qaAgent = new TestAgent('qa');
436  
437    await devAgent.initialize();
438    await qaAgent.initialize();
439  
440    // Run both agents concurrently
441    const [devProcessed, qaProcessed] = await Promise.all([
442      devAgent.pollTasks(10),
443      qaAgent.pollTasks(10),
444    ]);
445  
446    // Verify results
447    assert.strictEqual(devProcessed, 5, 'Developer should process 5 tasks');
448    assert.strictEqual(qaProcessed, 5, 'QA should process 5 tasks');
449  
450    // Verify correct tasks were processed
451    for (const taskId of devTaskIds) {
452      const task = getTaskById(taskId);
453      assert.strictEqual(task.status, 'completed', `Dev task ${taskId} should be completed`);
454    }
455  
456    for (const taskId of qaTaskIds) {
457      const task = getTaskById(taskId);
458      assert.strictEqual(task.status, 'completed', `QA task ${taskId} should be completed`);
459    }
460  });