task_router.py
1 """ 2 Router for task management 3 --------------------------------- 4 This module implements routes for tracking and managing asynchronous tasks. 5 """ 6 7 import logging 8 from typing import Optional 9 from fastapi import APIRouter, Depends, HTTPException 10 11 # Import response models 12 from .response_models import ( 13 TaskStatusResponse, 14 TaskListResponse, 15 SuccessResponse, 16 ErrorResponse 17 ) 18 19 # Import task manager 20 from inference_engine import ( 21 get_task_status, 22 list_tasks, 23 cancel_task, 24 delete_task, 25 TaskNotFoundException 26 ) 27 28 # Import for authentication 29 from auth import get_current_active_user, User 30 31 # Logging configuration 32 logger = logging.getLogger("api.tasks") 33 34 # Create router 35 task_router = APIRouter( 36 prefix="/tasks", 37 tags=["Tasks"], 38 responses={ 39 400: {"model": ErrorResponse, "description": "Invalid request"}, 40 401: {"model": ErrorResponse, "description": "Unauthorized"}, 41 404: {"model": ErrorResponse, "description": "Task not found"}, 42 500: {"model": ErrorResponse, "description": "Server error"} 43 } 44 ) 45 46 @task_router.get("/{task_id}", response_model=TaskStatusResponse) 47 async def get_task( 48 task_id: str, 49 current_user: User = Depends(get_current_active_user) 50 ): 51 """Retrieves the status of a task""" 52 task = get_task_status(task_id) 53 54 if not task: 55 raise HTTPException(status_code=404, detail=f"Task {task_id} not found") 56 57 # Check that the user has access to this task 58 if not current_user.is_admin and task.get("user_id") != current_user.username: 59 raise HTTPException(status_code=403, detail="You are not authorized to access this task") 60 61 return task 62 63 @task_router.get("", response_model=TaskListResponse) 64 async def list_user_tasks( 65 limit: int = 10, 66 offset: int = 0, 67 status: Optional[str] = None, 68 task_type: Optional[str] = None, 69 current_user: User = Depends(get_current_active_user) 70 ): 71 """Lists the user's tasks""" 72 # If admin, don't filter by user 73 user_filter = None if current_user.is_admin else current_user.username 74 75 tasks_result = list_tasks( 76 user_id=user_filter, 77 task_type=task_type, 78 status=status, 79 limit=limit, 80 offset=offset 81 ) 82 83 return TaskListResponse( 84 total=tasks_result.get("total", 0), 85 limit=tasks_result.get("limit", limit), 86 offset=tasks_result.get("offset", offset), 87 tasks=tasks_result.get("tasks", {}) 88 ) 89 90 @task_router.delete("/{task_id}", response_model=SuccessResponse) 91 async def cancel_user_task( 92 task_id: str, 93 current_user: User = Depends(get_current_active_user) 94 ): 95 """Cancels an ongoing task""" 96 task = get_task_status(task_id) 97 98 if not task: 99 raise HTTPException(status_code=404, detail=f"Task {task_id} not found") 100 101 # Check that the user has access to this task 102 if not current_user.is_admin and task.get("user_id") != current_user.username: 103 raise HTTPException(status_code=403, detail="You are not authorized to cancel this task") 104 105 result = cancel_task(task_id) 106 107 if result: 108 return SuccessResponse( 109 success=True, 110 message=f"Task {task_id} canceled successfully" 111 ) 112 else: 113 return SuccessResponse( 114 success=False, 115 message=f"Unable to cancel task {task_id}, it may already be completed" 116 )