queue.py
1 """ 2 Queue management endpoints for Ag3ntum API. 3 4 Provides endpoints for: 5 - GET /queue/status - Get current queue status 6 """ 7 import logging 8 from datetime import datetime, timezone 9 from typing import Optional 10 11 from fastapi import APIRouter, Depends, HTTPException, Request, status 12 from sqlalchemy import select 13 from sqlalchemy.ext.asyncio import AsyncSession 14 15 from ...db.database import get_db 16 from ...db.models import Session 17 from ..deps import get_current_user_id 18 from ..models import QueueStatusResponse, QueuedSessionInfo 19 20 logger = logging.getLogger(__name__) 21 22 router = APIRouter(prefix="/queue", tags=["queue"]) 23 24 25 @router.get("/status", response_model=QueueStatusResponse) 26 async def get_queue_status( 27 request: Request, 28 user_id: str = Depends(get_current_user_id), 29 db: AsyncSession = Depends(get_db), 30 ) -> QueueStatusResponse: 31 """ 32 Get current queue status. 33 34 Returns global queue statistics and user's queued tasks. 35 """ 36 # Get queue components from app state 37 task_queue = getattr(request.app.state, "task_queue", None) 38 quota_manager = getattr(request.app.state, "quota_manager", None) 39 40 if task_queue is None or quota_manager is None: 41 # Queue system not enabled - return zeros 42 return QueueStatusResponse( 43 global_queue_length=0, 44 global_active_tasks=0, 45 user_active_tasks=0, 46 user_queued_tasks=[], 47 max_concurrent_global=4, # Default 48 max_concurrent_user=2, # Default 49 ) 50 51 # Get global stats 52 queue_length = await task_queue.get_queue_length() 53 global_active = quota_manager.get_global_active() 54 user_active = await task_queue.get_user_active_count(user_id) 55 56 # Get user's queued sessions from database 57 result = await db.execute( 58 select(Session).where( 59 Session.user_id == user_id, 60 Session.status == "queued", 61 ).order_by(Session.queue_position) 62 ) 63 queued_sessions = result.scalars().all() 64 65 user_queued_tasks = [ 66 QueuedSessionInfo( 67 session_id=s.id, 68 queue_position=s.queue_position, 69 queued_at=s.queued_at, 70 is_auto_resume=s.is_auto_resume, 71 ) 72 for s in queued_sessions 73 ] 74 75 return QueueStatusResponse( 76 global_queue_length=queue_length, 77 global_active_tasks=global_active, 78 user_active_tasks=user_active, 79 user_queued_tasks=user_queued_tasks, 80 max_concurrent_global=quota_manager.config.global_max_concurrent, 81 max_concurrent_user=quota_manager.config.per_user_max_concurrent, 82 )