/ video_api.py
video_api.py
  1  from fastapi import APIRouter, File, UploadFile, BackgroundTasks, HTTPException, Depends, Form, Request, Query
  2  from fastapi.responses import JSONResponse
  3  from typing import Dict, List, Optional, Union, Any
  4  import os
  5  import uuid
  6  import time
  7  import json
  8  import aiofiles
  9  import logging
 10  import asyncio
 11  from tempfile import NamedTemporaryFile
 12  from pathlib import Path
 13  
 14  # Par:
 15  from video_models import extract_video_content, analyze_manipulation_strategies, extract_nonverbal, analyze_nonverbal
 16  
 17  # Import des dépendances d'authentification
 18  from auth import validate_api_key, authorize_advanced_models
 19  from database import record_api_usage
 20  from auth_models import UsageRecord
 21  
 22  # Configuration pour le stockage des vidéos
 23  UPLOAD_DIR = Path("uploads/videos")
 24  RESULTS_DIR = Path("results/videos")
 25  UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
 26  RESULTS_DIR.mkdir(parents=True, exist_ok=True)
 27  
 28  # Logging
 29  logger = logging.getLogger("video_api")
 30  
 31  # Création du router
 32  video_router = APIRouter(
 33      prefix="/api/video",
 34      tags=["video analysis"],
 35      responses={401: {"description": "Non autorisé"}},
 36  )
 37  
 38  # Stockage en mémoire des tâches en cours
 39  video_tasks = {}
 40  
 41  # Classe pour suivre la progression
 42  class ProgressTracker:
 43      def __init__(self, task_id: str):
 44          self.task_id = task_id
 45          self.progress = 0
 46          self.message = "Initializing..."
 47          
 48      def __call__(self, progress: float, desc: str = None):
 49          self.progress = float(progress) * 100
 50          if desc:
 51              self.message = desc
 52          
 53          # Mettre à jour l'état de la tâche
 54          if self.task_id in video_tasks:
 55              video_tasks[self.task_id]["progress"] = self.progress
 56              video_tasks[self.task_id]["message"] = self.message
 57  
 58  # Fonction pour vérifier si l'extension est autorisée
 59  def is_allowed_video_file(filename: str) -> bool:
 60      allowed_extensions = [".mp4", ".avi", ".mov", ".mkv", ".webm"]
 61      return any(filename.lower().endswith(ext) for ext in allowed_extensions)
 62  
 63  # Fonction pour sauvegarder une vidéo uploadée
 64  async def save_uploaded_video(file: UploadFile) -> str:
 65      if not is_allowed_video_file(file.filename):
 66          raise HTTPException(
 67              status_code=400,
 68              detail="Format de fichier non autorisé. Formats acceptés: .mp4, .avi, .mov, .mkv, .webm"
 69          )
 70      
 71      # Générer un nom de fichier unique
 72      file_extension = os.path.splitext(file.filename)[1].lower()
 73      filename = f"{uuid.uuid4().hex}{file_extension}"
 74      file_path = UPLOAD_DIR / filename
 75      
 76      # Sauvegarder le fichier
 77      async with aiofiles.open(file_path, "wb") as out_file:
 78          content = await file.read()
 79          await out_file.write(content)
 80      
 81      return str(file_path)
 82  
 83  # Fonction pour traiter l'analyse de manipulation vidéo
 84  async def process_manipulation_analysis(
 85      task_id: str,
 86      video_path: str,
 87      api_key_info: Any
 88  ):
 89      progress_tracker = ProgressTracker(task_id)
 90      
 91      try:
 92          # Mettre à jour l'état de la tâche
 93          video_tasks[task_id]["status"] = "running"
 94          video_tasks[task_id]["started_at"] = time.time()
 95          
 96          # Étape 1 : Extraction du contenu vidéo
 97          start_time = time.time()
 98          extraction_text, extraction_path = extract_video_content(video_path, progress_tracker)
 99          
100          if extraction_path is None:
101              raise Exception(f"Échec de l'extraction: {extraction_text}")
102          
103          # Mettre à jour la progression
104          progress_tracker(0.5, "Starting manipulation strategy analysis...")
105          
106          # Étape 2 : Analyse des stratégies de manipulation
107          analysis = analyze_manipulation_strategies(extraction_text, extraction_path, progress_tracker)
108          
109          # Nettoyer le fichier temporaire
110          if extraction_path and os.path.exists(extraction_path):
111              os.unlink(extraction_path)
112          
113          # Sauvegarder les résultats
114          result_file = RESULTS_DIR / f"{task_id}_manipulation_analysis.json"
115          results = {
116              "extraction": extraction_text,
117              "analysis": analysis,
118              "processing_time": time.time() - start_time
119          }
120          
121          with open(result_file, "w", encoding="utf-8") as f:
122              json.dump(results, f, ensure_ascii=False, indent=2)
123          
124          # Mettre à jour l'état de la tâche
125          video_tasks[task_id].update({
126              "status": "completed",
127              "results": results,
128              "completed_at": time.time(),
129              "message": "Analysis complete",
130              "progress": 100,
131              "result_file": str(result_file)
132          })
133          
134          logger.info(f"Tâche d'analyse de manipulation vidéo {task_id} terminée avec succès")
135          
136      except Exception as e:
137          logger.error(f"Erreur lors de l'analyse de manipulation vidéo {task_id}: {str(e)}")
138          video_tasks[task_id].update({
139              "status": "failed",
140              "error": str(e),
141              "completed_at": time.time(),
142              "message": f"Error: {str(e)}",
143              "progress": 100
144          })
145          
146          # Supprimer le fichier vidéo en cas d'erreur
147          try:
148              if os.path.exists(video_path):
149                  os.unlink(video_path)
150          except Exception:
151              pass
152  
153  # Fonction pour traiter l'analyse non-verbale
154  async def process_nonverbal_analysis(
155      task_id: str,
156      video_path: str,
157      api_key_info: Any
158  ):
159      progress_tracker = ProgressTracker(task_id)
160      
161      try:
162          # Mettre à jour l'état de la tâche
163          video_tasks[task_id]["status"] = "running"
164          video_tasks[task_id]["started_at"] = time.time()
165          
166          # Étape 1 : Extraction des indices non-verbaux
167          start_time = time.time()
168          extraction_text, extraction_path = extract_nonverbal(video_path, progress_tracker)
169          
170          if extraction_path is None:
171              raise Exception(f"Échec de l'extraction: {extraction_text}")
172          
173          # Mettre à jour la progression
174          progress_tracker(0.5, "Starting non-verbal behavior analysis...")
175          
176          # Étape 2 : Analyse des comportements non-verbaux
177          analysis = analyze_nonverbal(extraction_text, extraction_path, progress_tracker)
178          
179          # Nettoyer le fichier temporaire
180          if extraction_path and os.path.exists(extraction_path):
181              os.unlink(extraction_path)
182          
183          # Sauvegarder les résultats
184          result_file = RESULTS_DIR / f"{task_id}_nonverbal_analysis.json"
185          results = {
186              "extraction": extraction_text,
187              "analysis": analysis,
188              "processing_time": time.time() - start_time
189          }
190          
191          with open(result_file, "w", encoding="utf-8") as f:
192              json.dump(results, f, ensure_ascii=False, indent=2)
193          
194          # Mettre à jour l'état de la tâche
195          video_tasks[task_id].update({
196              "status": "completed",
197              "results": results,
198              "completed_at": time.time(),
199              "message": "Analysis complete",
200              "progress": 100,
201              "result_file": str(result_file)
202          })
203          
204          logger.info(f"Tâche d'analyse non-verbale {task_id} terminée avec succès")
205          
206      except Exception as e:
207          logger.error(f"Erreur lors de l'analyse non-verbale {task_id}: {str(e)}")
208          video_tasks[task_id].update({
209              "status": "failed",
210              "error": str(e),
211              "completed_at": time.time(),
212              "message": f"Error: {str(e)}",
213              "progress": 100
214          })
215          
216          # Supprimer le fichier vidéo en cas d'erreur
217          try:
218              if os.path.exists(video_path):
219                  os.unlink(video_path)
220          except Exception:
221              pass
222  
223  @video_router.post("/manipulation-analysis", tags=["video analysis"])
224  async def start_manipulation_analysis(
225      background_tasks: BackgroundTasks,
226      file: UploadFile = File(...),
227      keep_video: bool = Form(False),
228      api_key_info = Depends(validate_api_key)
229  ):
230      """
231      Démarre une analyse des stratégies de manipulation dans une vidéo.
232      
233      Args:
234          file: Fichier vidéo à analyser
235          keep_video: Si True, conserve la vidéo après l'analyse
236          
237      Returns:
238          Un objet de réponse avec l'ID de la tâche pour vérifier l'état plus tard
239      """
240      # Vérifier l'autorisation pour les modèles avancés (cette analyse est considérée comme avancée)
241      authorize_advanced_models(api_key_info)
242      
243      # Sauvegarder la vidéo uploadée
244      video_path = await save_uploaded_video(file)
245      
246      # Générer un ID de tâche
247      task_id = str(uuid.uuid4())
248      
249      # Enregistrer la nouvelle tâche
250      video_tasks[task_id] = {
251          "type": "manipulation-analysis",
252          "status": "pending",
253          "video_path": video_path,
254          "filename": file.filename,
255          "created_at": time.time(),
256          "keep_video": keep_video,
257          "message": "Task queued",
258          "progress": 0,
259          "user_id": api_key_info.user_id,
260          "api_key_id": api_key_info.id
261      }
262      
263      # Lancer la tâche en arrière-plan
264      background_tasks.add_task(
265          process_manipulation_analysis,
266          task_id=task_id,
267          video_path=video_path,
268          api_key_info=api_key_info
269      )
270      
271      # Enregistrer l'utilisation
272      usage_record = UsageRecord(
273          user_id=api_key_info.user_id,
274          api_key_id=api_key_info.key,
275          request_path="/api/video/manipulation-analysis",
276          request_method="POST",
277          tokens_input=0,  # À compléter plus tard
278          tokens_output=0,  # À compléter plus tard
279          processing_time=0.0,  # À mettre à jour une fois terminé
280          status_code=200
281      )
282      record_api_usage(usage_record)
283      
284      logger.info(f"Tâche d'analyse de manipulation vidéo créée avec ID: {task_id}")
285      
286      return {
287          "task_id": task_id,
288          "status": "pending",
289          "message": "Video manipulation analysis task started"
290      }
291  
292  @video_router.post("/nonverbal-analysis", tags=["video analysis"])
293  async def start_nonverbal_analysis(
294      background_tasks: BackgroundTasks,
295      file: UploadFile = File(...),
296      keep_video: bool = Form(False),
297      api_key_info = Depends(validate_api_key)
298  ):
299      """
300      Démarre une analyse des comportements non-verbaux dans une vidéo.
301      
302      Args:
303          file: Fichier vidéo à analyser
304          keep_video: Si True, conserve la vidéo après l'analyse
305          
306      Returns:
307          Un objet de réponse avec l'ID de la tâche pour vérifier l'état plus tard
308      """
309      # Vérifier l'autorisation pour les modèles avancés (cette analyse est considérée comme avancée)
310      authorize_advanced_models(api_key_info)
311      
312      # Sauvegarder la vidéo uploadée
313      video_path = await save_uploaded_video(file)
314      
315      # Générer un ID de tâche
316      task_id = str(uuid.uuid4())
317      
318      # Enregistrer la nouvelle tâche
319      video_tasks[task_id] = {
320          "type": "nonverbal-analysis",
321          "status": "pending",
322          "video_path": video_path,
323          "filename": file.filename,
324          "created_at": time.time(),
325          "keep_video": keep_video,
326          "message": "Task queued",
327          "progress": 0,
328          "user_id": api_key_info.user_id,
329          "api_key_id": api_key_info.id
330      }
331      
332      # Lancer la tâche en arrière-plan
333      background_tasks.add_task(
334          process_nonverbal_analysis,
335          task_id=task_id,
336          video_path=video_path,
337          api_key_info=api_key_info
338      )
339      
340      # Enregistrer l'utilisation
341      usage_record = UsageRecord(
342          user_id=api_key_info.user_id,
343          api_key_id=api_key_info.key,
344          request_path="/api/video/nonverbal-analysis",
345          request_method="POST",
346          tokens_input=0,  # À compléter plus tard
347          tokens_output=0,  # À compléter plus tard
348          processing_time=0.0,  # À mettre à jour une fois terminé
349          status_code=200
350      )
351      record_api_usage(usage_record)
352      
353      logger.info(f"Tâche d'analyse non-verbale créée avec ID: {task_id}")
354      
355      return {
356          "task_id": task_id,
357          "status": "pending",
358          "message": "Non-verbal behavior analysis task started"
359      }
360  
361  @video_router.get("/tasks/{task_id}", tags=["video analysis"])
362  async def get_video_task_status(
363      task_id: str,
364      api_key_info = Depends(validate_api_key)
365  ):
366      """
367      Vérifie l'état d'une tâche d'analyse vidéo.
368      
369      Args:
370          task_id: L'identifiant de la tâche
371          
372      Returns:
373          L'état actuel de la tâche d'analyse vidéo
374      """
375      if task_id not in video_tasks:
376          raise HTTPException(status_code=404, detail=f"Tâche {task_id} non trouvée")
377      
378      task_info = video_tasks[task_id]
379      
380      # Vérifier si la tâche appartient à l'utilisateur associé à la clé API
381      if task_info.get("user_id") != api_key_info.user_id:
382          raise HTTPException(
383              status_code=403,
384              detail="Vous n'avez pas accès à cette tâche"
385          )
386      
387      # Préparer la réponse
388      response = {
389          "task_id": task_id,
390          "type": task_info.get("type"),
391          "status": task_info.get("status"),
392          "message": task_info.get("message"),
393          "progress": task_info.get("progress", 0),
394          "filename": task_info.get("filename"),
395          "created_at": task_info.get("created_at"),
396          "started_at": task_info.get("started_at"),
397          "completed_at": task_info.get("completed_at")
398      }
399      
400      # Ajouter les résultats si la tâche est terminée
401      if task_info.get("status") == "completed":
402          response["results"] = task_info.get("results")
403          response["result_file"] = task_info.get("result_file")
404          
405      # Ajouter les informations d'erreur si la tâche a échoué
406      elif task_info.get("status") == "failed":
407          response["error"] = task_info.get("error")
408          
409      return response
410  
411  @video_router.get("/tasks", tags=["video analysis"])
412  async def list_video_tasks(
413      limit: int = Query(20, ge=1, le=100),
414      offset: int = Query(0, ge=0),
415      status: Optional[str] = Query(None, regex="^(pending|running|completed|failed)$"),
416      type: Optional[str] = Query(None, regex="^(manipulation-analysis|nonverbal-analysis)$"),
417      api_key_info = Depends(validate_api_key)
418  ):
419      """
420      Liste les tâches d'analyse vidéo de l'utilisateur.
421      
422      Args:
423          limit: Nombre maximum de tâches à retourner
424          offset: Décalage pour la pagination
425          status: Filtrer par statut (pending, running, completed, failed)
426          type: Filtrer par type d'analyse
427          
428      Returns:
429          Liste des tâches d'analyse vidéo
430      """
431      # Filtrer les tâches par utilisateur
432      user_tasks = {
433          k: v for k, v in video_tasks.items()
434          if v.get("user_id") == api_key_info.user_id
435      }
436      
437      # Filtrer par statut si spécifié
438      if status:
439          user_tasks = {
440              k: v for k, v in user_tasks.items()
441              if v.get("status") == status
442          }
443          
444      # Filtrer par type si spécifié
445      if type:
446          user_tasks = {
447              k: v for k, v in user_tasks.items()
448              if v.get("type") == type
449          }
450      
451      # Trier par date de création (plus récent en premier)
452      sorted_tasks = sorted(
453          user_tasks.items(),
454          key=lambda x: x[1].get("created_at", 0),
455          reverse=True
456      )
457      
458      # Appliquer la pagination
459      paginated_tasks = sorted_tasks[offset:offset+limit]
460      
461      # Préparer la réponse
462      response = {
463          "total": len(user_tasks),
464          "limit": limit,
465          "offset": offset,
466          "tasks": {}
467      }
468      
469      for task_id, task_info in paginated_tasks:
470          response["tasks"][task_id] = {
471              "type": task_info.get("type"),
472              "status": task_info.get("status"),
473              "message": task_info.get("message"),
474              "progress": task_info.get("progress", 0),
475              "filename": task_info.get("filename"),
476              "created_at": task_info.get("created_at"),
477              "completed_at": task_info.get("completed_at", None)
478          }
479      
480      return response
481  
482  @video_router.delete("/tasks/{task_id}", tags=["video analysis"])
483  async def delete_video_task(
484      task_id: str,
485      api_key_info = Depends(validate_api_key)
486  ):
487      """
488      Supprime une tâche d'analyse vidéo et ses résultats.
489      
490      Args:
491          task_id: L'identifiant de la tâche
492          
493      Returns:
494          Message de confirmation
495      """
496      if task_id not in video_tasks:
497          raise HTTPException(status_code=404, detail=f"Tâche {task_id} non trouvée")
498      
499      task_info = video_tasks[task_id]
500      
501      # Vérifier si la tâche appartient à l'utilisateur associé à la clé API
502      if task_info.get("user_id") != api_key_info.user_id:
503          raise HTTPException(
504              status_code=403,
505              detail="Vous n'avez pas accès à cette tâche"
506          )
507      
508      # Supprimer le fichier vidéo si existant et non conservé
509      if not task_info.get("keep_video", False) and "video_path" in task_info:
510          video_path = task_info["video_path"]
511          if os.path.exists(video_path):
512              try:
513                  os.unlink(video_path)
514              except Exception as e:
515                  logger.warning(f"Erreur lors de la suppression de la vidéo {video_path}: {str(e)}")
516      
517      # Supprimer le fichier de résultats si existant
518      if "result_file" in task_info:
519          result_file = task_info["result_file"]
520          if os.path.exists(result_file):
521              try:
522                  os.unlink(result_file)
523              except Exception as e:
524                  logger.warning(f"Erreur lors de la suppression du fichier de résultats {result_file}: {str(e)}")
525      
526      # Supprimer la tâche
527      del video_tasks[task_id]
528      
529      return {
530          "message": f"Tâche {task_id} supprimée avec succès"
531      }