video_router.py
1 """ 2 Router for video analysis features 3 ---------------------------------------------- 4 This module implements routes for extracting and analyzing video content, 5 including nonverbal analysis and manipulation strategies. 6 """ 7 8 import os 9 import logging 10 import time 11 import traceback 12 from typing import Optional, Dict, Any, List 13 from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Depends, Body, BackgroundTasks, Request 14 from fastapi.responses import JSONResponse 15 from pydantic import BaseModel 16 17 # Import response models 18 from .response_models import ( 19 VideoExtractionResponse, 20 NonverbalAnalysisResponse, 21 ErrorResponse, 22 TaskResponse 23 ) 24 25 # Import video processing functions 26 from video_models import ( 27 extract_video_content, 28 extract_nonverbal, 29 analyze_nonverbal, 30 analyze_manipulation_strategies 31 ) 32 33 # Import for authentication 34 from auth import get_current_active_user, User 35 36 # Import configuration 37 from config import video_config, system_prompts 38 39 # Import task manager 40 from inference_engine import ( 41 TaskType, 42 create_task, 43 update_task, 44 get_task_status, 45 ProgressTracker 46 ) 47 48 # Import prompt manager 49 from utils.prompt_manager import get_prompt_manager 50 51 # Logging configuration 52 logger = logging.getLogger("api.video") 53 54 # Create router 55 video_router = APIRouter( 56 prefix="/video", 57 tags=["Video"], 58 responses={ 59 400: {"model": ErrorResponse, "description": "Invalid request"}, 60 401: {"model": ErrorResponse, "description": "Unauthorized"}, 61 404: {"model": ErrorResponse, "description": "File not found"}, 62 500: {"model": ErrorResponse, "description": "Server error"} 63 } 64 ) 65 66 # Configuration 67 ALLOWED_EXTENSIONS = {'mp4', 'mov', 'avi', 'mkv', 'webm'} 68 UPLOAD_FOLDER = 'uploads' 69 RESULTS_FOLDER = 'results/videos' 70 os.makedirs(UPLOAD_FOLDER, exist_ok=True) 71 os.makedirs(RESULTS_FOLDER, exist_ok=True) 72 73 # Pydantic models for requests 74 class AnalysisRequest(BaseModel): 75 extraction_text: str 76 extraction_path: Optional[str] = None 77 78 def allowed_file(filename: str) -> bool: 79 """Checks if the file has an allowed extension""" 80 return '.' in filename and \ 81 filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS 82 83 async def save_uploaded_file(file: UploadFile) -> str: 84 """Saves an uploaded file and returns its path""" 85 if not file: 86 raise HTTPException(status_code=400, detail="No file was provided") 87 88 if not allowed_file(file.filename): 89 raise HTTPException(status_code=400, detail="Unsupported file type") 90 91 # Secure filename 92 filename = "".join(c for c in file.filename if c.isalnum() or c in "._- ") 93 timestamp = int(time.time()) 94 safe_path = os.path.join(UPLOAD_FOLDER, f"{timestamp}_{filename}") 95 96 # Save file 97 contents = await file.read() 98 with open(safe_path, "wb") as f: 99 f.write(contents) 100 101 return safe_path 102 103 def create_output_filename(original_filename: str, prefix: str = "video") -> str: 104 """Creates a filename for the video analysis output""" 105 base_name = os.path.basename(original_filename) 106 name_without_ext = os.path.splitext(base_name)[0] 107 timestamp = int(time.time()) 108 return os.path.join(RESULTS_FOLDER, f"{prefix}_{name_without_ext}_{timestamp}.txt") 109 110 def progress_callback(progress: float, desc: str) -> None: 111 """Progress function (for compatibility)""" 112 logger.debug(f"Progress: {progress*100:.1f}% - {desc}") 113 114 # Analysis function with prompt manager 115 def formatted_analyze_nonverbal(extraction_text: str, extraction_path: Optional[str], progress=None): 116 """Modified version of analyze_nonverbal using the prompt manager""" 117 prompt_manager = get_prompt_manager() 118 119 # Use formatted prompt with placeholder {text} 120 if "nonverbal_analysis" in system_prompts: 121 prompt = prompt_manager.format_prompt_direct( 122 system_prompts["nonverbal_analysis"], 123 text=extraction_text 124 ) 125 else: 126 # Fallback to default prompt if not found 127 prompt = prompt_manager.format_prompt_direct( 128 "Analyze the nonverbal cues in the following video content: {text}", 129 text=extraction_text 130 ) 131 132 # Call the original function with the formatted prompt 133 return analyze_nonverbal(prompt, extraction_path, progress=progress) 134 135 # Analysis function with prompt manager 136 def formatted_analyze_manipulation(extraction_text: str, extraction_path: Optional[str], progress=None): 137 """Modified version of analyze_manipulation_strategies using the prompt manager""" 138 prompt_manager = get_prompt_manager() 139 140 # Use formatted prompt with placeholder {text} 141 if "manipulation_analysis" in system_prompts: 142 prompt = prompt_manager.format_prompt_direct( 143 system_prompts["manipulation_analysis"], 144 text=extraction_text 145 ) 146 else: 147 # Fallback to default prompt if not found 148 prompt = prompt_manager.format_prompt_direct( 149 "Analyze the manipulation strategies in the following video content: {text}", 150 text=extraction_text 151 ) 152 153 # Call the original function with the formatted prompt 154 return analyze_manipulation_strategies(prompt, extraction_path, progress=progress) 155 156 async def process_video_task(task_id: str, task_type: str, video_path: str, **kwargs): 157 """Asynchronous function to process a video task in the background""" 158 try: 159 # Update status 160 update_task(task_id, { 161 "status": "running", 162 "message": "Video processing in progress..." 163 }) 164 165 # Initialize progress tracker 166 progress_tracker = ProgressTracker(task_id) 167 168 # Determine task type and function to call 169 if task_type == TaskType.VIDEO_NONVERBAL: 170 update_task(task_id, {"message": "Extracting nonverbal cues..."}) 171 content, temp_path = extract_nonverbal(video_path, progress=progress_tracker) 172 result = { 173 "content": content, 174 "file_path": temp_path 175 } 176 success_message = "Nonverbal cues extraction completed successfully" 177 178 elif task_type == TaskType.VIDEO_MANIPULATION: 179 update_task(task_id, {"message": "Extracting video content..."}) 180 content, temp_path = extract_video_content(video_path, progress=progress_tracker) 181 result = { 182 "content": content, 183 "file_path": temp_path 184 } 185 success_message = "Video content extraction completed successfully" 186 187 else: 188 raise ValueError(f"Unsupported video task type: {task_type}") 189 190 # Update with results 191 update_task(task_id, { 192 "status": "completed", 193 "results": result, 194 "message": success_message 195 }) 196 197 logger.info(f"Video task {task_id} completed successfully") 198 199 except Exception as e: 200 logger.error(f"Error during video task {task_id}: {str(e)}") 201 logger.error(traceback.format_exc()) 202 update_task(task_id, { 203 "status": "failed", 204 "error": str(e), 205 "message": f"Error: {str(e)}" 206 }) 207 208 @video_router.post('/extract', response_model=VideoExtractionResponse) 209 async def video_extraction( 210 video: UploadFile = File(...), 211 current_user: User = Depends(get_current_active_user) 212 ): 213 """Extracts content from a video""" 214 try: 215 # Save uploaded video 216 video_path = await save_uploaded_file(video) 217 logger.info(f"Video saved to {video_path}") 218 219 # Extract video content 220 content, temp_path = extract_video_content( 221 video_path, 222 progress=lambda progress, desc: logger.debug(f"Progress: {progress*100:.1f}% - {desc}") 223 ) 224 225 # Prepare response 226 response = VideoExtractionResponse( 227 content=content, 228 file_path=temp_path, 229 message="Video extraction successful" 230 ) 231 232 return response 233 234 except HTTPException: 235 raise 236 except Exception as e: 237 logger.error(f"Error during video extraction: {str(e)}") 238 raise HTTPException(status_code=500, detail=f"Error during video extraction: {str(e)}") 239 240 @video_router.post('/extract_nonverbal', response_model=VideoExtractionResponse) 241 async def nonverbal_extraction( 242 video: UploadFile = File(...), 243 current_user: User = Depends(get_current_active_user) 244 ): 245 """Extracts nonverbal cues from a video""" 246 try: 247 # Save uploaded video 248 video_path = await save_uploaded_file(video) 249 logger.info(f"Video saved to {video_path}") 250 251 # Extract nonverbal cues 252 content, temp_path = extract_nonverbal( 253 video_path, 254 progress=lambda progress, desc: logger.debug(f"Progress: {progress*100:.1f}% - {desc}") 255 ) 256 257 # Prepare response 258 response = VideoExtractionResponse( 259 content=content, 260 file_path=temp_path, 261 message="Nonverbal cues extraction successful" 262 ) 263 264 return response 265 266 except HTTPException: 267 raise 268 except Exception as e: 269 logger.error(f"Error during nonverbal cues extraction: {str(e)}") 270 raise HTTPException(status_code=500, detail=f"Error during nonverbal cues extraction: {str(e)}") 271 272 @video_router.post('/analyze_nonverbal', response_model=NonverbalAnalysisResponse) 273 async def analyze_nonverbal_api( 274 request: Request, 275 analysis_req: AnalysisRequest = Body(...), 276 current_user: User = Depends(get_current_active_user) 277 ): 278 """Analyzes nonverbal cues in a video""" 279 try: 280 # Analyze nonverbal cues with prompt manager 281 analysis = formatted_analyze_nonverbal( 282 analysis_req.extraction_text, 283 analysis_req.extraction_path, 284 progress=progress_callback 285 ) 286 287 # Apply JSONSimplifier post-processor if available 288 result = {"analysis": analysis, "message": "Nonverbal cues analysis successful"} 289 json_simplifier = getattr(request.app.state, "json_simplifier", None) 290 if json_simplifier and json_simplifier.should_process("video"): 291 result = json_simplifier.process(result, "video") 292 293 # Prepare response 294 response = NonverbalAnalysisResponse(**result) 295 296 return response 297 298 except HTTPException: 299 raise 300 except Exception as e: 301 logger.error(f"Error during nonverbal cues analysis: {str(e)}") 302 raise HTTPException(status_code=500, detail=f"Error during nonverbal cues analysis: {str(e)}") 303 304 @video_router.post('/analyze_manipulation', response_model=NonverbalAnalysisResponse) 305 async def analyze_manipulation_api( 306 request: Request, 307 analysis_req: AnalysisRequest = Body(...), 308 current_user: User = Depends(get_current_active_user) 309 ): 310 """Analyzes manipulation strategies in a video""" 311 try: 312 # Analyze manipulation strategies with prompt manager 313 analysis = formatted_analyze_manipulation( 314 analysis_req.extraction_text, 315 analysis_req.extraction_path, 316 progress=progress_callback 317 ) 318 319 # Apply JSONSimplifier post-processor if available 320 result = {"analysis": analysis, "message": "Manipulation strategies analysis successful"} 321 json_simplifier = getattr(request.app.state, "json_simplifier", None) 322 if json_simplifier and json_simplifier.should_process("video"): 323 result = json_simplifier.process(result, "video") 324 325 # Prepare response 326 response = NonverbalAnalysisResponse(**result) 327 328 return response 329 330 except HTTPException: 331 raise 332 except Exception as e: 333 logger.error(f"Error during manipulation strategies analysis: {str(e)}") 334 raise HTTPException(status_code=500, detail=f"Error during manipulation strategies analysis: {str(e)}") 335 336 @video_router.post('/async_extract', response_model=TaskResponse) 337 async def async_video_extraction( 338 background_tasks: BackgroundTasks, 339 video: UploadFile = File(...), 340 extract_type: str = Form("standard"), # 'standard' or 'nonverbal' 341 current_user: User = Depends(get_current_active_user) 342 ): 343 """Starts an asynchronous video extraction (in background)""" 344 try: 345 # Save uploaded video 346 video_path = await save_uploaded_file(video) 347 logger.info(f"Video saved to {video_path} for asynchronous extraction") 348 349 # Determine task type 350 if extract_type.lower() == "nonverbal": 351 task_type = TaskType.VIDEO_NONVERBAL 352 task_desc = "nonverbal cues extraction" 353 else: 354 task_type = TaskType.VIDEO_MANIPULATION 355 task_desc = "video content extraction" 356 357 # Task parameters 358 task_params = { 359 "video_path": video_path, 360 "extract_type": extract_type, 361 "output_txt": create_output_filename(video_path, prefix=extract_type) 362 } 363 364 # Create task 365 task_id = create_task( 366 task_type=task_type, 367 user_id=current_user.username, 368 params=task_params 369 ) 370 371 # Launch task in background 372 background_tasks.add_task( 373 process_video_task, 374 task_id=task_id, 375 task_type=task_type, 376 video_path=video_path, 377 **task_params 378 ) 379 380 return TaskResponse( 381 task_id=task_id, 382 status="pending", 383 message=f"{task_desc} task launched successfully" 384 ) 385 386 except HTTPException: 387 raise 388 except Exception as e: 389 logger.error(f"Error launching asynchronous video extraction: {str(e)}") 390 raise HTTPException( 391 status_code=500, 392 detail=f"Error launching asynchronous video extraction: {str(e)}" 393 ) 394 395 @video_router.post('/async_analyze', response_model=TaskResponse) 396 async def async_video_analysis( 397 background_tasks: BackgroundTasks, 398 analysis_type: str = Form(...), # 'nonverbal' or 'manipulation' 399 extraction_text: str = Form(...), 400 extraction_path: Optional[str] = Form(None), 401 current_user: User = Depends(get_current_active_user) 402 ): 403 """Starts an asynchronous video analysis (in background)""" 404 try: 405 # Check analysis type 406 if analysis_type not in ["nonverbal", "manipulation"]: 407 raise HTTPException( 408 status_code=400, 409 detail="Invalid analysis type. Use 'nonverbal' or 'manipulation'" 410 ) 411 412 # Task parameters 413 task_params = { 414 "extraction_text": extraction_text, 415 "extraction_path": extraction_path, 416 "analysis_type": analysis_type 417 } 418 419 # Determine task type 420 if analysis_type == "nonverbal": 421 task_type = TaskType.VIDEO_NONVERBAL 422 task_desc = "nonverbal cues analysis" 423 else: 424 task_type = TaskType.VIDEO_MANIPULATION 425 task_desc = "manipulation strategies analysis" 426 427 # Create task 428 task_id = create_task( 429 task_type=task_type, 430 user_id=current_user.username, 431 params=task_params 432 ) 433 434 # Function to execute analysis in background 435 async def execute_analysis(task_id: str, analysis_type: str, extraction_text: str, extraction_path: Optional[str]): 436 try: 437 # Update status 438 update_task(task_id, { 439 "status": "running", 440 "message": f"{analysis_type} analysis in progress..." 441 }) 442 443 # Initialize progress tracker 444 progress_tracker = ProgressTracker(task_id) 445 446 # Get prompt manager 447 prompt_manager = get_prompt_manager() 448 449 # Execute appropriate analysis with formatted prompts 450 if analysis_type == "nonverbal": 451 # Format prompt for nonverbal analysis 452 if "nonverbal_analysis" in system_prompts: 453 formatted_text = prompt_manager.format_prompt_direct( 454 system_prompts["nonverbal_analysis"], 455 text=extraction_text 456 ) 457 else: 458 formatted_text = prompt_manager.format_prompt_direct( 459 "Analyze the nonverbal cues in the following video content: {text}", 460 text=extraction_text 461 ) 462 463 analysis = analyze_nonverbal( 464 formatted_text, 465 extraction_path, 466 progress=progress_tracker 467 ) 468 success_message = "Nonverbal cues analysis completed successfully" 469 else: 470 # Format prompt for manipulation analysis 471 if "manipulation_analysis" in system_prompts: 472 formatted_text = prompt_manager.format_prompt_direct( 473 system_prompts["manipulation_analysis"], 474 text=extraction_text 475 ) 476 else: 477 formatted_text = prompt_manager.format_prompt_direct( 478 "Analyze the manipulation strategies in the following video content: {text}", 479 text=extraction_text 480 ) 481 482 analysis = analyze_manipulation_strategies( 483 formatted_text, 484 extraction_path, 485 progress=progress_tracker 486 ) 487 success_message = "Manipulation strategies analysis completed successfully" 488 489 # Get JSONSimplifier if available 490 from main import app 491 json_simplifier = getattr(app.state, "json_simplifier", None) 492 493 # Prepare result 494 result = analysis 495 496 # Apply post-processor if available 497 if json_simplifier and json_simplifier.should_process("video"): 498 result_dict = {"result": result} 499 processed = json_simplifier.process(result_dict, "video") 500 result = processed.get("result", result) 501 502 # If plain text explanation is available, add it to results 503 if "plain_explanation" in processed: 504 result["plain_explanation"] = processed["plain_explanation"] 505 506 # Update with results 507 update_task(task_id, { 508 "status": "completed", 509 "results": result, 510 "message": success_message 511 }) 512 513 logger.info(f"Analysis task {task_id} completed successfully") 514 515 except Exception as e: 516 logger.error(f"Error during analysis task {task_id}: {str(e)}") 517 logger.error(traceback.format_exc()) 518 update_task(task_id, { 519 "status": "failed", 520 "error": str(e), 521 "message": f"Error: {str(e)}" 522 }) 523 524 # Launch task in background 525 background_tasks.add_task( 526 execute_analysis, 527 task_id=task_id, 528 analysis_type=analysis_type, 529 extraction_text=extraction_text, 530 extraction_path=extraction_path 531 ) 532 533 return TaskResponse( 534 task_id=task_id, 535 status="pending", 536 message=f"{task_desc} task launched successfully" 537 ) 538 539 except HTTPException: 540 raise 541 except Exception as e: 542 logger.error(f"Error launching asynchronous video analysis: {str(e)}") 543 raise HTTPException( 544 status_code=500, 545 detail=f"Error launching asynchronous video analysis: {str(e)}" 546 ) 547 548 @video_router.get('/allowed_extensions') 549 async def get_allowed_extensions(): 550 """Retrieves the list of allowed video file extensions""" 551 return { 552 "allowed_extensions": list(ALLOWED_EXTENSIONS), 553 "max_upload_size_mb": video_config["max_upload_size_mb"] 554 } 555 556 @video_router.get('/task/{task_id}/result', response_model=None) 557 async def get_task_result( 558 task_id: str, 559 request: Request, 560 current_user: User = Depends(get_current_active_user) 561 ): 562 """Retrieves the result of a video analysis task""" 563 try: 564 # Get task status 565 task = get_task_status(task_id) 566 567 if not task: 568 raise HTTPException(status_code=404, detail="Task not found") 569 570 if task["status"] != "completed": 571 return { 572 "status": task["status"], 573 "message": task.get("message", "Task is being processed") 574 } 575 576 # Get results 577 result = task.get("results", {}) 578 579 # Apply JSONSimplifier post-processor if available and not already applied 580 if "plain_explanation" not in result: 581 json_simplifier = getattr(request.app.state, "json_simplifier", None) 582 if json_simplifier and json_simplifier.should_process("video"): 583 result_dict = {"result": result} 584 processed = json_simplifier.process(result_dict, "video") 585 586 # If plain text explanation is available, add it to results 587 if "plain_explanation" in processed: 588 result["plain_explanation"] = processed["plain_explanation"] 589 590 return { 591 "status": "completed", 592 "result": result, 593 "message": task.get("message", "Task completed successfully") 594 } 595 596 except HTTPException: 597 raise 598 except Exception as e: 599 logger.error(f"Error retrieving task result: {str(e)}") 600 raise HTTPException( 601 status_code=500, 602 detail=f"Error retrieving task result: {str(e)}" 603 )