/ video_api.py
video_api.py
1 from fastapi import APIRouter, File, UploadFile, BackgroundTasks, HTTPException, Depends, Form, Request, Query 2 from fastapi.responses import JSONResponse 3 from typing import Dict, List, Optional, Union, Any 4 import os 5 import uuid 6 import time 7 import json 8 import aiofiles 9 import logging 10 import asyncio 11 from tempfile import NamedTemporaryFile 12 from pathlib import Path 13 14 # Par: 15 from video_models import extract_video_content, analyze_manipulation_strategies, extract_nonverbal, analyze_nonverbal 16 17 # Import des dépendances d'authentification 18 from auth import validate_api_key, authorize_advanced_models 19 from database import record_api_usage 20 from auth_models import UsageRecord 21 22 # Configuration pour le stockage des vidéos 23 UPLOAD_DIR = Path("uploads/videos") 24 RESULTS_DIR = Path("results/videos") 25 UPLOAD_DIR.mkdir(parents=True, exist_ok=True) 26 RESULTS_DIR.mkdir(parents=True, exist_ok=True) 27 28 # Logging 29 logger = logging.getLogger("video_api") 30 31 # Création du router 32 video_router = APIRouter( 33 prefix="/api/video", 34 tags=["video analysis"], 35 responses={401: {"description": "Non autorisé"}}, 36 ) 37 38 # Stockage en mémoire des tâches en cours 39 video_tasks = {} 40 41 # Classe pour suivre la progression 42 class ProgressTracker: 43 def __init__(self, task_id: str): 44 self.task_id = task_id 45 self.progress = 0 46 self.message = "Initializing..." 47 48 def __call__(self, progress: float, desc: str = None): 49 self.progress = float(progress) * 100 50 if desc: 51 self.message = desc 52 53 # Mettre à jour l'état de la tâche 54 if self.task_id in video_tasks: 55 video_tasks[self.task_id]["progress"] = self.progress 56 video_tasks[self.task_id]["message"] = self.message 57 58 # Fonction pour vérifier si l'extension est autorisée 59 def is_allowed_video_file(filename: str) -> bool: 60 allowed_extensions = [".mp4", ".avi", ".mov", ".mkv", ".webm"] 61 return any(filename.lower().endswith(ext) for ext in allowed_extensions) 62 63 # Fonction pour sauvegarder une vidéo uploadée 64 async def save_uploaded_video(file: UploadFile) -> str: 65 if not is_allowed_video_file(file.filename): 66 raise HTTPException( 67 status_code=400, 68 detail="Format de fichier non autorisé. Formats acceptés: .mp4, .avi, .mov, .mkv, .webm" 69 ) 70 71 # Générer un nom de fichier unique 72 file_extension = os.path.splitext(file.filename)[1].lower() 73 filename = f"{uuid.uuid4().hex}{file_extension}" 74 file_path = UPLOAD_DIR / filename 75 76 # Sauvegarder le fichier 77 async with aiofiles.open(file_path, "wb") as out_file: 78 content = await file.read() 79 await out_file.write(content) 80 81 return str(file_path) 82 83 # Fonction pour traiter l'analyse de manipulation vidéo 84 async def process_manipulation_analysis( 85 task_id: str, 86 video_path: str, 87 api_key_info: Any 88 ): 89 progress_tracker = ProgressTracker(task_id) 90 91 try: 92 # Mettre à jour l'état de la tâche 93 video_tasks[task_id]["status"] = "running" 94 video_tasks[task_id]["started_at"] = time.time() 95 96 # Étape 1 : Extraction du contenu vidéo 97 start_time = time.time() 98 extraction_text, extraction_path = extract_video_content(video_path, progress_tracker) 99 100 if extraction_path is None: 101 raise Exception(f"Échec de l'extraction: {extraction_text}") 102 103 # Mettre à jour la progression 104 progress_tracker(0.5, "Starting manipulation strategy analysis...") 105 106 # Étape 2 : Analyse des stratégies de manipulation 107 analysis = analyze_manipulation_strategies(extraction_text, extraction_path, progress_tracker) 108 109 # Nettoyer le fichier temporaire 110 if extraction_path and os.path.exists(extraction_path): 111 os.unlink(extraction_path) 112 113 # Sauvegarder les résultats 114 result_file = RESULTS_DIR / f"{task_id}_manipulation_analysis.json" 115 results = { 116 "extraction": extraction_text, 117 "analysis": analysis, 118 "processing_time": time.time() - start_time 119 } 120 121 with open(result_file, "w", encoding="utf-8") as f: 122 json.dump(results, f, ensure_ascii=False, indent=2) 123 124 # Mettre à jour l'état de la tâche 125 video_tasks[task_id].update({ 126 "status": "completed", 127 "results": results, 128 "completed_at": time.time(), 129 "message": "Analysis complete", 130 "progress": 100, 131 "result_file": str(result_file) 132 }) 133 134 logger.info(f"Tâche d'analyse de manipulation vidéo {task_id} terminée avec succès") 135 136 except Exception as e: 137 logger.error(f"Erreur lors de l'analyse de manipulation vidéo {task_id}: {str(e)}") 138 video_tasks[task_id].update({ 139 "status": "failed", 140 "error": str(e), 141 "completed_at": time.time(), 142 "message": f"Error: {str(e)}", 143 "progress": 100 144 }) 145 146 # Supprimer le fichier vidéo en cas d'erreur 147 try: 148 if os.path.exists(video_path): 149 os.unlink(video_path) 150 except Exception: 151 pass 152 153 # Fonction pour traiter l'analyse non-verbale 154 async def process_nonverbal_analysis( 155 task_id: str, 156 video_path: str, 157 api_key_info: Any 158 ): 159 progress_tracker = ProgressTracker(task_id) 160 161 try: 162 # Mettre à jour l'état de la tâche 163 video_tasks[task_id]["status"] = "running" 164 video_tasks[task_id]["started_at"] = time.time() 165 166 # Étape 1 : Extraction des indices non-verbaux 167 start_time = time.time() 168 extraction_text, extraction_path = extract_nonverbal(video_path, progress_tracker) 169 170 if extraction_path is None: 171 raise Exception(f"Échec de l'extraction: {extraction_text}") 172 173 # Mettre à jour la progression 174 progress_tracker(0.5, "Starting non-verbal behavior analysis...") 175 176 # Étape 2 : Analyse des comportements non-verbaux 177 analysis = analyze_nonverbal(extraction_text, extraction_path, progress_tracker) 178 179 # Nettoyer le fichier temporaire 180 if extraction_path and os.path.exists(extraction_path): 181 os.unlink(extraction_path) 182 183 # Sauvegarder les résultats 184 result_file = RESULTS_DIR / f"{task_id}_nonverbal_analysis.json" 185 results = { 186 "extraction": extraction_text, 187 "analysis": analysis, 188 "processing_time": time.time() - start_time 189 } 190 191 with open(result_file, "w", encoding="utf-8") as f: 192 json.dump(results, f, ensure_ascii=False, indent=2) 193 194 # Mettre à jour l'état de la tâche 195 video_tasks[task_id].update({ 196 "status": "completed", 197 "results": results, 198 "completed_at": time.time(), 199 "message": "Analysis complete", 200 "progress": 100, 201 "result_file": str(result_file) 202 }) 203 204 logger.info(f"Tâche d'analyse non-verbale {task_id} terminée avec succès") 205 206 except Exception as e: 207 logger.error(f"Erreur lors de l'analyse non-verbale {task_id}: {str(e)}") 208 video_tasks[task_id].update({ 209 "status": "failed", 210 "error": str(e), 211 "completed_at": time.time(), 212 "message": f"Error: {str(e)}", 213 "progress": 100 214 }) 215 216 # Supprimer le fichier vidéo en cas d'erreur 217 try: 218 if os.path.exists(video_path): 219 os.unlink(video_path) 220 except Exception: 221 pass 222 223 @video_router.post("/manipulation-analysis", tags=["video analysis"]) 224 async def start_manipulation_analysis( 225 background_tasks: BackgroundTasks, 226 file: UploadFile = File(...), 227 keep_video: bool = Form(False), 228 api_key_info = Depends(validate_api_key) 229 ): 230 """ 231 Démarre une analyse des stratégies de manipulation dans une vidéo. 232 233 Args: 234 file: Fichier vidéo à analyser 235 keep_video: Si True, conserve la vidéo après l'analyse 236 237 Returns: 238 Un objet de réponse avec l'ID de la tâche pour vérifier l'état plus tard 239 """ 240 # Vérifier l'autorisation pour les modèles avancés (cette analyse est considérée comme avancée) 241 authorize_advanced_models(api_key_info) 242 243 # Sauvegarder la vidéo uploadée 244 video_path = await save_uploaded_video(file) 245 246 # Générer un ID de tâche 247 task_id = str(uuid.uuid4()) 248 249 # Enregistrer la nouvelle tâche 250 video_tasks[task_id] = { 251 "type": "manipulation-analysis", 252 "status": "pending", 253 "video_path": video_path, 254 "filename": file.filename, 255 "created_at": time.time(), 256 "keep_video": keep_video, 257 "message": "Task queued", 258 "progress": 0, 259 "user_id": api_key_info.user_id, 260 "api_key_id": api_key_info.id 261 } 262 263 # Lancer la tâche en arrière-plan 264 background_tasks.add_task( 265 process_manipulation_analysis, 266 task_id=task_id, 267 video_path=video_path, 268 api_key_info=api_key_info 269 ) 270 271 # Enregistrer l'utilisation 272 usage_record = UsageRecord( 273 user_id=api_key_info.user_id, 274 api_key_id=api_key_info.key, 275 request_path="/api/video/manipulation-analysis", 276 request_method="POST", 277 tokens_input=0, # À compléter plus tard 278 tokens_output=0, # À compléter plus tard 279 processing_time=0.0, # À mettre à jour une fois terminé 280 status_code=200 281 ) 282 record_api_usage(usage_record) 283 284 logger.info(f"Tâche d'analyse de manipulation vidéo créée avec ID: {task_id}") 285 286 return { 287 "task_id": task_id, 288 "status": "pending", 289 "message": "Video manipulation analysis task started" 290 } 291 292 @video_router.post("/nonverbal-analysis", tags=["video analysis"]) 293 async def start_nonverbal_analysis( 294 background_tasks: BackgroundTasks, 295 file: UploadFile = File(...), 296 keep_video: bool = Form(False), 297 api_key_info = Depends(validate_api_key) 298 ): 299 """ 300 Démarre une analyse des comportements non-verbaux dans une vidéo. 301 302 Args: 303 file: Fichier vidéo à analyser 304 keep_video: Si True, conserve la vidéo après l'analyse 305 306 Returns: 307 Un objet de réponse avec l'ID de la tâche pour vérifier l'état plus tard 308 """ 309 # Vérifier l'autorisation pour les modèles avancés (cette analyse est considérée comme avancée) 310 authorize_advanced_models(api_key_info) 311 312 # Sauvegarder la vidéo uploadée 313 video_path = await save_uploaded_video(file) 314 315 # Générer un ID de tâche 316 task_id = str(uuid.uuid4()) 317 318 # Enregistrer la nouvelle tâche 319 video_tasks[task_id] = { 320 "type": "nonverbal-analysis", 321 "status": "pending", 322 "video_path": video_path, 323 "filename": file.filename, 324 "created_at": time.time(), 325 "keep_video": keep_video, 326 "message": "Task queued", 327 "progress": 0, 328 "user_id": api_key_info.user_id, 329 "api_key_id": api_key_info.id 330 } 331 332 # Lancer la tâche en arrière-plan 333 background_tasks.add_task( 334 process_nonverbal_analysis, 335 task_id=task_id, 336 video_path=video_path, 337 api_key_info=api_key_info 338 ) 339 340 # Enregistrer l'utilisation 341 usage_record = UsageRecord( 342 user_id=api_key_info.user_id, 343 api_key_id=api_key_info.key, 344 request_path="/api/video/nonverbal-analysis", 345 request_method="POST", 346 tokens_input=0, # À compléter plus tard 347 tokens_output=0, # À compléter plus tard 348 processing_time=0.0, # À mettre à jour une fois terminé 349 status_code=200 350 ) 351 record_api_usage(usage_record) 352 353 logger.info(f"Tâche d'analyse non-verbale créée avec ID: {task_id}") 354 355 return { 356 "task_id": task_id, 357 "status": "pending", 358 "message": "Non-verbal behavior analysis task started" 359 } 360 361 @video_router.get("/tasks/{task_id}", tags=["video analysis"]) 362 async def get_video_task_status( 363 task_id: str, 364 api_key_info = Depends(validate_api_key) 365 ): 366 """ 367 Vérifie l'état d'une tâche d'analyse vidéo. 368 369 Args: 370 task_id: L'identifiant de la tâche 371 372 Returns: 373 L'état actuel de la tâche d'analyse vidéo 374 """ 375 if task_id not in video_tasks: 376 raise HTTPException(status_code=404, detail=f"Tâche {task_id} non trouvée") 377 378 task_info = video_tasks[task_id] 379 380 # Vérifier si la tâche appartient à l'utilisateur associé à la clé API 381 if task_info.get("user_id") != api_key_info.user_id: 382 raise HTTPException( 383 status_code=403, 384 detail="Vous n'avez pas accès à cette tâche" 385 ) 386 387 # Préparer la réponse 388 response = { 389 "task_id": task_id, 390 "type": task_info.get("type"), 391 "status": task_info.get("status"), 392 "message": task_info.get("message"), 393 "progress": task_info.get("progress", 0), 394 "filename": task_info.get("filename"), 395 "created_at": task_info.get("created_at"), 396 "started_at": task_info.get("started_at"), 397 "completed_at": task_info.get("completed_at") 398 } 399 400 # Ajouter les résultats si la tâche est terminée 401 if task_info.get("status") == "completed": 402 response["results"] = task_info.get("results") 403 response["result_file"] = task_info.get("result_file") 404 405 # Ajouter les informations d'erreur si la tâche a échoué 406 elif task_info.get("status") == "failed": 407 response["error"] = task_info.get("error") 408 409 return response 410 411 @video_router.get("/tasks", tags=["video analysis"]) 412 async def list_video_tasks( 413 limit: int = Query(20, ge=1, le=100), 414 offset: int = Query(0, ge=0), 415 status: Optional[str] = Query(None, regex="^(pending|running|completed|failed)$"), 416 type: Optional[str] = Query(None, regex="^(manipulation-analysis|nonverbal-analysis)$"), 417 api_key_info = Depends(validate_api_key) 418 ): 419 """ 420 Liste les tâches d'analyse vidéo de l'utilisateur. 421 422 Args: 423 limit: Nombre maximum de tâches à retourner 424 offset: Décalage pour la pagination 425 status: Filtrer par statut (pending, running, completed, failed) 426 type: Filtrer par type d'analyse 427 428 Returns: 429 Liste des tâches d'analyse vidéo 430 """ 431 # Filtrer les tâches par utilisateur 432 user_tasks = { 433 k: v for k, v in video_tasks.items() 434 if v.get("user_id") == api_key_info.user_id 435 } 436 437 # Filtrer par statut si spécifié 438 if status: 439 user_tasks = { 440 k: v for k, v in user_tasks.items() 441 if v.get("status") == status 442 } 443 444 # Filtrer par type si spécifié 445 if type: 446 user_tasks = { 447 k: v for k, v in user_tasks.items() 448 if v.get("type") == type 449 } 450 451 # Trier par date de création (plus récent en premier) 452 sorted_tasks = sorted( 453 user_tasks.items(), 454 key=lambda x: x[1].get("created_at", 0), 455 reverse=True 456 ) 457 458 # Appliquer la pagination 459 paginated_tasks = sorted_tasks[offset:offset+limit] 460 461 # Préparer la réponse 462 response = { 463 "total": len(user_tasks), 464 "limit": limit, 465 "offset": offset, 466 "tasks": {} 467 } 468 469 for task_id, task_info in paginated_tasks: 470 response["tasks"][task_id] = { 471 "type": task_info.get("type"), 472 "status": task_info.get("status"), 473 "message": task_info.get("message"), 474 "progress": task_info.get("progress", 0), 475 "filename": task_info.get("filename"), 476 "created_at": task_info.get("created_at"), 477 "completed_at": task_info.get("completed_at", None) 478 } 479 480 return response 481 482 @video_router.delete("/tasks/{task_id}", tags=["video analysis"]) 483 async def delete_video_task( 484 task_id: str, 485 api_key_info = Depends(validate_api_key) 486 ): 487 """ 488 Supprime une tâche d'analyse vidéo et ses résultats. 489 490 Args: 491 task_id: L'identifiant de la tâche 492 493 Returns: 494 Message de confirmation 495 """ 496 if task_id not in video_tasks: 497 raise HTTPException(status_code=404, detail=f"Tâche {task_id} non trouvée") 498 499 task_info = video_tasks[task_id] 500 501 # Vérifier si la tâche appartient à l'utilisateur associé à la clé API 502 if task_info.get("user_id") != api_key_info.user_id: 503 raise HTTPException( 504 status_code=403, 505 detail="Vous n'avez pas accès à cette tâche" 506 ) 507 508 # Supprimer le fichier vidéo si existant et non conservé 509 if not task_info.get("keep_video", False) and "video_path" in task_info: 510 video_path = task_info["video_path"] 511 if os.path.exists(video_path): 512 try: 513 os.unlink(video_path) 514 except Exception as e: 515 logger.warning(f"Erreur lors de la suppression de la vidéo {video_path}: {str(e)}") 516 517 # Supprimer le fichier de résultats si existant 518 if "result_file" in task_info: 519 result_file = task_info["result_file"] 520 if os.path.exists(result_file): 521 try: 522 os.unlink(result_file) 523 except Exception as e: 524 logger.warning(f"Erreur lors de la suppression du fichier de résultats {result_file}: {str(e)}") 525 526 # Supprimer la tâche 527 del video_tasks[task_id] 528 529 return { 530 "message": f"Tâche {task_id} supprimée avec succès" 531 }