/ src / api / routes / queue.py
queue.py
 1  """
 2  Queue management endpoints for Ag3ntum API.
 3  
 4  Provides endpoints for:
 5  - GET /queue/status - Get current queue status
 6  """
 7  import logging
 8  from datetime import datetime, timezone
 9  from typing import Optional
10  
11  from fastapi import APIRouter, Depends, HTTPException, Request, status
12  from sqlalchemy import select
13  from sqlalchemy.ext.asyncio import AsyncSession
14  
15  from ...db.database import get_db
16  from ...db.models import Session
17  from ..deps import get_current_user_id
18  from ..models import QueueStatusResponse, QueuedSessionInfo
19  
20  logger = logging.getLogger(__name__)
21  
22  router = APIRouter(prefix="/queue", tags=["queue"])
23  
24  
25  @router.get("/status", response_model=QueueStatusResponse)
26  async def get_queue_status(
27      request: Request,
28      user_id: str = Depends(get_current_user_id),
29      db: AsyncSession = Depends(get_db),
30  ) -> QueueStatusResponse:
31      """
32      Get current queue status.
33  
34      Returns global queue statistics and user's queued tasks.
35      """
36      # Get queue components from app state
37      task_queue = getattr(request.app.state, "task_queue", None)
38      quota_manager = getattr(request.app.state, "quota_manager", None)
39  
40      if task_queue is None or quota_manager is None:
41          # Queue system not enabled - return zeros
42          return QueueStatusResponse(
43              global_queue_length=0,
44              global_active_tasks=0,
45              user_active_tasks=0,
46              user_queued_tasks=[],
47              max_concurrent_global=4,  # Default
48              max_concurrent_user=2,  # Default
49          )
50  
51      # Get global stats
52      queue_length = await task_queue.get_queue_length()
53      global_active = quota_manager.get_global_active()
54      user_active = await task_queue.get_user_active_count(user_id)
55  
56      # Get user's queued sessions from database
57      result = await db.execute(
58          select(Session).where(
59              Session.user_id == user_id,
60              Session.status == "queued",
61          ).order_by(Session.queue_position)
62      )
63      queued_sessions = result.scalars().all()
64  
65      user_queued_tasks = [
66          QueuedSessionInfo(
67              session_id=s.id,
68              queue_position=s.queue_position,
69              queued_at=s.queued_at,
70              is_auto_resume=s.is_auto_resume,
71          )
72          for s in queued_sessions
73      ]
74  
75      return QueueStatusResponse(
76          global_queue_length=queue_length,
77          global_active_tasks=global_active,
78          user_active_tasks=user_active,
79          user_queued_tasks=user_queued_tasks,
80          max_concurrent_global=quota_manager.config.global_max_concurrent,
81          max_concurrent_user=quota_manager.config.per_user_max_concurrent,
82      )