/ api / task_router.py
task_router.py
  1  """
  2  Router for task management
  3  ---------------------------------
  4  This module implements routes for tracking and managing asynchronous tasks.
  5  """
  6  
  7  import logging
  8  from typing import Optional
  9  from fastapi import APIRouter, Depends, HTTPException
 10  
 11  # Import response models
 12  from .response_models import (
 13      TaskStatusResponse,
 14      TaskListResponse,
 15      SuccessResponse,
 16      ErrorResponse
 17  )
 18  
 19  # Import task manager
 20  from inference_engine import (
 21      get_task_status,
 22      list_tasks,
 23      cancel_task,
 24      delete_task,
 25      TaskNotFoundException
 26  )
 27  
 28  # Import for authentication
 29  from auth import get_current_active_user, User
 30  
 31  # Logging configuration
 32  logger = logging.getLogger("api.tasks")
 33  
 34  # Create router
 35  task_router = APIRouter(
 36      prefix="/tasks",
 37      tags=["Tasks"],
 38      responses={
 39          400: {"model": ErrorResponse, "description": "Invalid request"},
 40          401: {"model": ErrorResponse, "description": "Unauthorized"},
 41          404: {"model": ErrorResponse, "description": "Task not found"},
 42          500: {"model": ErrorResponse, "description": "Server error"}
 43      }
 44  )
 45  
 46  @task_router.get("/{task_id}", response_model=TaskStatusResponse)
 47  async def get_task(
 48      task_id: str,
 49      current_user: User = Depends(get_current_active_user)
 50  ):
 51      """Retrieves the status of a task"""
 52      task = get_task_status(task_id)
 53      
 54      if not task:
 55          raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
 56      
 57      # Check that the user has access to this task
 58      if not current_user.is_admin and task.get("user_id") != current_user.username:
 59          raise HTTPException(status_code=403, detail="You are not authorized to access this task")
 60      
 61      return task
 62  
 63  @task_router.get("", response_model=TaskListResponse)
 64  async def list_user_tasks(
 65      limit: int = 10,
 66      offset: int = 0,
 67      status: Optional[str] = None,
 68      task_type: Optional[str] = None,
 69      current_user: User = Depends(get_current_active_user)
 70  ):
 71      """Lists the user's tasks"""
 72      # If admin, don't filter by user
 73      user_filter = None if current_user.is_admin else current_user.username
 74      
 75      tasks_result = list_tasks(
 76          user_id=user_filter,
 77          task_type=task_type,
 78          status=status,
 79          limit=limit,
 80          offset=offset
 81      )
 82      
 83      return TaskListResponse(
 84          total=tasks_result.get("total", 0),
 85          limit=tasks_result.get("limit", limit),
 86          offset=tasks_result.get("offset", offset),
 87          tasks=tasks_result.get("tasks", {})
 88      )
 89  
 90  @task_router.delete("/{task_id}", response_model=SuccessResponse)
 91  async def cancel_user_task(
 92      task_id: str,
 93      current_user: User = Depends(get_current_active_user)
 94  ):
 95      """Cancels an ongoing task"""
 96      task = get_task_status(task_id)
 97      
 98      if not task:
 99          raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
100      
101      # Check that the user has access to this task
102      if not current_user.is_admin and task.get("user_id") != current_user.username:
103          raise HTTPException(status_code=403, detail="You are not authorized to cancel this task")
104      
105      result = cancel_task(task_id)
106      
107      if result:
108          return SuccessResponse(
109              success=True,
110              message=f"Task {task_id} canceled successfully"
111          )
112      else:
113          return SuccessResponse(
114              success=False,
115              message=f"Unable to cancel task {task_id}, it may already be completed"
116          )