/ api / video_router.py
video_router.py
  1  """
  2  Router for video analysis features
  3  ----------------------------------------------
  4  This module implements routes for extracting and analyzing video content,
  5  including nonverbal analysis and manipulation strategies.
  6  """
  7  
  8  import os
  9  import logging
 10  import time
 11  import traceback
 12  from typing import Optional, Dict, Any, List
 13  from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Depends, Body, BackgroundTasks, Request
 14  from fastapi.responses import JSONResponse
 15  from pydantic import BaseModel
 16  
 17  # Import response models
 18  from .response_models import (
 19      VideoExtractionResponse, 
 20      NonverbalAnalysisResponse,
 21      ErrorResponse,
 22      TaskResponse
 23  )
 24  
 25  # Import video processing functions
 26  from video_models import (
 27      extract_video_content,
 28      extract_nonverbal,
 29      analyze_nonverbal,
 30      analyze_manipulation_strategies
 31  )
 32  
 33  # Import for authentication
 34  from auth import get_current_active_user, User
 35  
 36  # Import configuration
 37  from config import video_config, system_prompts
 38  
 39  # Import task manager
 40  from inference_engine import (
 41      TaskType, 
 42      create_task, 
 43      update_task, 
 44      get_task_status,
 45      ProgressTracker
 46  )
 47  
 48  # Import prompt manager
 49  from utils.prompt_manager import get_prompt_manager
 50  
 51  # Logging configuration
 52  logger = logging.getLogger("api.video")
 53  
 54  # Create router
 55  video_router = APIRouter(
 56      prefix="/video",
 57      tags=["Video"],
 58      responses={
 59          400: {"model": ErrorResponse, "description": "Invalid request"},
 60          401: {"model": ErrorResponse, "description": "Unauthorized"},
 61          404: {"model": ErrorResponse, "description": "File not found"},
 62          500: {"model": ErrorResponse, "description": "Server error"}
 63      }
 64  )
 65  
 66  # Configuration
 67  ALLOWED_EXTENSIONS = {'mp4', 'mov', 'avi', 'mkv', 'webm'}
 68  UPLOAD_FOLDER = 'uploads'
 69  RESULTS_FOLDER = 'results/videos'
 70  os.makedirs(UPLOAD_FOLDER, exist_ok=True)
 71  os.makedirs(RESULTS_FOLDER, exist_ok=True)
 72  
 73  # Pydantic models for requests
 74  class AnalysisRequest(BaseModel):
 75      extraction_text: str
 76      extraction_path: Optional[str] = None
 77  
 78  def allowed_file(filename: str) -> bool:
 79      """Checks if the file has an allowed extension"""
 80      return '.' in filename and \
 81             filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
 82  
 83  async def save_uploaded_file(file: UploadFile) -> str:
 84      """Saves an uploaded file and returns its path"""
 85      if not file:
 86          raise HTTPException(status_code=400, detail="No file was provided")
 87      
 88      if not allowed_file(file.filename):
 89          raise HTTPException(status_code=400, detail="Unsupported file type")
 90      
 91      # Secure filename
 92      filename = "".join(c for c in file.filename if c.isalnum() or c in "._- ")
 93      timestamp = int(time.time())
 94      safe_path = os.path.join(UPLOAD_FOLDER, f"{timestamp}_{filename}")
 95      
 96      # Save file
 97      contents = await file.read()
 98      with open(safe_path, "wb") as f:
 99          f.write(contents)
100      
101      return safe_path
102  
103  def create_output_filename(original_filename: str, prefix: str = "video") -> str:
104      """Creates a filename for the video analysis output"""
105      base_name = os.path.basename(original_filename)
106      name_without_ext = os.path.splitext(base_name)[0]
107      timestamp = int(time.time())
108      return os.path.join(RESULTS_FOLDER, f"{prefix}_{name_without_ext}_{timestamp}.txt")
109  
110  def progress_callback(progress: float, desc: str) -> None:
111      """Progress function (for compatibility)"""
112      logger.debug(f"Progress: {progress*100:.1f}% - {desc}")
113  
114  # Analysis function with prompt manager
115  def formatted_analyze_nonverbal(extraction_text: str, extraction_path: Optional[str], progress=None):
116      """Modified version of analyze_nonverbal using the prompt manager"""
117      prompt_manager = get_prompt_manager()
118      
119      # Use formatted prompt with placeholder {text}
120      if "nonverbal_analysis" in system_prompts:
121          prompt = prompt_manager.format_prompt_direct(
122              system_prompts["nonverbal_analysis"], 
123              text=extraction_text
124          )
125      else:
126          # Fallback to default prompt if not found
127          prompt = prompt_manager.format_prompt_direct(
128              "Analyze the nonverbal cues in the following video content: {text}",
129              text=extraction_text
130          )
131      
132      # Call the original function with the formatted prompt
133      return analyze_nonverbal(prompt, extraction_path, progress=progress)
134  
135  # Analysis function with prompt manager
136  def formatted_analyze_manipulation(extraction_text: str, extraction_path: Optional[str], progress=None):
137      """Modified version of analyze_manipulation_strategies using the prompt manager"""
138      prompt_manager = get_prompt_manager()
139      
140      # Use formatted prompt with placeholder {text}
141      if "manipulation_analysis" in system_prompts:
142          prompt = prompt_manager.format_prompt_direct(
143              system_prompts["manipulation_analysis"], 
144              text=extraction_text
145          )
146      else:
147          # Fallback to default prompt if not found
148          prompt = prompt_manager.format_prompt_direct(
149              "Analyze the manipulation strategies in the following video content: {text}",
150              text=extraction_text
151          )
152      
153      # Call the original function with the formatted prompt
154      return analyze_manipulation_strategies(prompt, extraction_path, progress=progress)
155  
156  async def process_video_task(task_id: str, task_type: str, video_path: str, **kwargs):
157      """Asynchronous function to process a video task in the background"""
158      try:
159          # Update status
160          update_task(task_id, {
161              "status": "running",
162              "message": "Video processing in progress..."
163          })
164          
165          # Initialize progress tracker
166          progress_tracker = ProgressTracker(task_id)
167          
168          # Determine task type and function to call
169          if task_type == TaskType.VIDEO_NONVERBAL:
170              update_task(task_id, {"message": "Extracting nonverbal cues..."})
171              content, temp_path = extract_nonverbal(video_path, progress=progress_tracker)
172              result = {
173                  "content": content,
174                  "file_path": temp_path
175              }
176              success_message = "Nonverbal cues extraction completed successfully"
177          
178          elif task_type == TaskType.VIDEO_MANIPULATION:
179              update_task(task_id, {"message": "Extracting video content..."})
180              content, temp_path = extract_video_content(video_path, progress=progress_tracker)
181              result = {
182                  "content": content,
183                  "file_path": temp_path
184              }
185              success_message = "Video content extraction completed successfully"
186          
187          else:
188              raise ValueError(f"Unsupported video task type: {task_type}")
189              
190          # Update with results
191          update_task(task_id, {
192              "status": "completed",
193              "results": result,
194              "message": success_message
195          })
196          
197          logger.info(f"Video task {task_id} completed successfully")
198          
199      except Exception as e:
200          logger.error(f"Error during video task {task_id}: {str(e)}")
201          logger.error(traceback.format_exc())
202          update_task(task_id, {
203              "status": "failed",
204              "error": str(e),
205              "message": f"Error: {str(e)}"
206          })
207  
208  @video_router.post('/extract', response_model=VideoExtractionResponse)
209  async def video_extraction(
210      video: UploadFile = File(...),
211      current_user: User = Depends(get_current_active_user)
212  ):
213      """Extracts content from a video"""
214      try:
215          # Save uploaded video
216          video_path = await save_uploaded_file(video)
217          logger.info(f"Video saved to {video_path}")
218          
219          # Extract video content
220          content, temp_path = extract_video_content(
221              video_path, 
222              progress=lambda progress, desc: logger.debug(f"Progress: {progress*100:.1f}% - {desc}")
223          )
224          
225          # Prepare response
226          response = VideoExtractionResponse(
227              content=content,
228              file_path=temp_path,
229              message="Video extraction successful"
230          )
231          
232          return response
233          
234      except HTTPException:
235          raise
236      except Exception as e:
237          logger.error(f"Error during video extraction: {str(e)}")
238          raise HTTPException(status_code=500, detail=f"Error during video extraction: {str(e)}")
239  
240  @video_router.post('/extract_nonverbal', response_model=VideoExtractionResponse)
241  async def nonverbal_extraction(
242      video: UploadFile = File(...),
243      current_user: User = Depends(get_current_active_user)
244  ):
245      """Extracts nonverbal cues from a video"""
246      try:
247          # Save uploaded video
248          video_path = await save_uploaded_file(video)
249          logger.info(f"Video saved to {video_path}")
250          
251          # Extract nonverbal cues
252          content, temp_path = extract_nonverbal(
253              video_path, 
254              progress=lambda progress, desc: logger.debug(f"Progress: {progress*100:.1f}% - {desc}")
255          )
256          
257          # Prepare response
258          response = VideoExtractionResponse(
259              content=content,
260              file_path=temp_path,
261              message="Nonverbal cues extraction successful"
262          )
263          
264          return response
265          
266      except HTTPException:
267          raise
268      except Exception as e:
269          logger.error(f"Error during nonverbal cues extraction: {str(e)}")
270          raise HTTPException(status_code=500, detail=f"Error during nonverbal cues extraction: {str(e)}")
271  
272  @video_router.post('/analyze_nonverbal', response_model=NonverbalAnalysisResponse)
273  async def analyze_nonverbal_api(
274      request: Request,
275      analysis_req: AnalysisRequest = Body(...),
276      current_user: User = Depends(get_current_active_user)
277  ):
278      """Analyzes nonverbal cues in a video"""
279      try:
280          # Analyze nonverbal cues with prompt manager
281          analysis = formatted_analyze_nonverbal(
282              analysis_req.extraction_text, 
283              analysis_req.extraction_path, 
284              progress=progress_callback
285          )
286          
287          # Apply JSONSimplifier post-processor if available
288          result = {"analysis": analysis, "message": "Nonverbal cues analysis successful"}
289          json_simplifier = getattr(request.app.state, "json_simplifier", None)
290          if json_simplifier and json_simplifier.should_process("video"):
291              result = json_simplifier.process(result, "video")
292          
293          # Prepare response
294          response = NonverbalAnalysisResponse(**result)
295          
296          return response
297          
298      except HTTPException:
299          raise
300      except Exception as e:
301          logger.error(f"Error during nonverbal cues analysis: {str(e)}")
302          raise HTTPException(status_code=500, detail=f"Error during nonverbal cues analysis: {str(e)}")
303  
304  @video_router.post('/analyze_manipulation', response_model=NonverbalAnalysisResponse)
305  async def analyze_manipulation_api(
306      request: Request,
307      analysis_req: AnalysisRequest = Body(...),
308      current_user: User = Depends(get_current_active_user)
309  ):
310      """Analyzes manipulation strategies in a video"""
311      try:
312          # Analyze manipulation strategies with prompt manager
313          analysis = formatted_analyze_manipulation(
314              analysis_req.extraction_text, 
315              analysis_req.extraction_path, 
316              progress=progress_callback
317          )
318          
319          # Apply JSONSimplifier post-processor if available
320          result = {"analysis": analysis, "message": "Manipulation strategies analysis successful"}
321          json_simplifier = getattr(request.app.state, "json_simplifier", None)
322          if json_simplifier and json_simplifier.should_process("video"):
323              result = json_simplifier.process(result, "video")
324          
325          # Prepare response
326          response = NonverbalAnalysisResponse(**result)
327          
328          return response
329          
330      except HTTPException:
331          raise
332      except Exception as e:
333          logger.error(f"Error during manipulation strategies analysis: {str(e)}")
334          raise HTTPException(status_code=500, detail=f"Error during manipulation strategies analysis: {str(e)}")
335  
336  @video_router.post('/async_extract', response_model=TaskResponse)
337  async def async_video_extraction(
338      background_tasks: BackgroundTasks,
339      video: UploadFile = File(...),
340      extract_type: str = Form("standard"),  # 'standard' or 'nonverbal'
341      current_user: User = Depends(get_current_active_user)
342  ):
343      """Starts an asynchronous video extraction (in background)"""
344      try:
345          # Save uploaded video
346          video_path = await save_uploaded_file(video)
347          logger.info(f"Video saved to {video_path} for asynchronous extraction")
348          
349          # Determine task type
350          if extract_type.lower() == "nonverbal":
351              task_type = TaskType.VIDEO_NONVERBAL
352              task_desc = "nonverbal cues extraction"
353          else:
354              task_type = TaskType.VIDEO_MANIPULATION
355              task_desc = "video content extraction"
356          
357          # Task parameters
358          task_params = {
359              "video_path": video_path,
360              "extract_type": extract_type,
361              "output_txt": create_output_filename(video_path, prefix=extract_type)
362          }
363          
364          # Create task
365          task_id = create_task(
366              task_type=task_type,
367              user_id=current_user.username,
368              params=task_params
369          )
370          
371          # Launch task in background
372          background_tasks.add_task(
373              process_video_task,
374              task_id=task_id,
375              task_type=task_type,
376              video_path=video_path,
377              **task_params
378          )
379          
380          return TaskResponse(
381              task_id=task_id,
382              status="pending",
383              message=f"{task_desc} task launched successfully"
384          )
385          
386      except HTTPException:
387          raise
388      except Exception as e:
389          logger.error(f"Error launching asynchronous video extraction: {str(e)}")
390          raise HTTPException(
391              status_code=500, 
392              detail=f"Error launching asynchronous video extraction: {str(e)}"
393          )
394  
395  @video_router.post('/async_analyze', response_model=TaskResponse)
396  async def async_video_analysis(
397      background_tasks: BackgroundTasks,
398      analysis_type: str = Form(...),  # 'nonverbal' or 'manipulation'
399      extraction_text: str = Form(...),
400      extraction_path: Optional[str] = Form(None),
401      current_user: User = Depends(get_current_active_user)
402  ):
403      """Starts an asynchronous video analysis (in background)"""
404      try:
405          # Check analysis type
406          if analysis_type not in ["nonverbal", "manipulation"]:
407              raise HTTPException(
408                  status_code=400,
409                  detail="Invalid analysis type. Use 'nonverbal' or 'manipulation'"
410              )
411          
412          # Task parameters
413          task_params = {
414              "extraction_text": extraction_text,
415              "extraction_path": extraction_path,
416              "analysis_type": analysis_type
417          }
418          
419          # Determine task type
420          if analysis_type == "nonverbal":
421              task_type = TaskType.VIDEO_NONVERBAL
422              task_desc = "nonverbal cues analysis"
423          else:
424              task_type = TaskType.VIDEO_MANIPULATION
425              task_desc = "manipulation strategies analysis"
426          
427          # Create task
428          task_id = create_task(
429              task_type=task_type,
430              user_id=current_user.username,
431              params=task_params
432          )
433          
434          # Function to execute analysis in background
435          async def execute_analysis(task_id: str, analysis_type: str, extraction_text: str, extraction_path: Optional[str]):
436              try:
437                  # Update status
438                  update_task(task_id, {
439                      "status": "running",
440                      "message": f"{analysis_type} analysis in progress..."
441                  })
442                  
443                  # Initialize progress tracker
444                  progress_tracker = ProgressTracker(task_id)
445                  
446                  # Get prompt manager
447                  prompt_manager = get_prompt_manager()
448                  
449                  # Execute appropriate analysis with formatted prompts
450                  if analysis_type == "nonverbal":
451                      # Format prompt for nonverbal analysis
452                      if "nonverbal_analysis" in system_prompts:
453                          formatted_text = prompt_manager.format_prompt_direct(
454                              system_prompts["nonverbal_analysis"], 
455                              text=extraction_text
456                          )
457                      else:
458                          formatted_text = prompt_manager.format_prompt_direct(
459                              "Analyze the nonverbal cues in the following video content: {text}",
460                              text=extraction_text
461                          )
462                      
463                      analysis = analyze_nonverbal(
464                          formatted_text, 
465                          extraction_path, 
466                          progress=progress_tracker
467                      )
468                      success_message = "Nonverbal cues analysis completed successfully"
469                  else:
470                      # Format prompt for manipulation analysis
471                      if "manipulation_analysis" in system_prompts:
472                          formatted_text = prompt_manager.format_prompt_direct(
473                              system_prompts["manipulation_analysis"], 
474                              text=extraction_text
475                          )
476                      else:
477                          formatted_text = prompt_manager.format_prompt_direct(
478                              "Analyze the manipulation strategies in the following video content: {text}",
479                              text=extraction_text
480                          )
481                      
482                      analysis = analyze_manipulation_strategies(
483                          formatted_text, 
484                          extraction_path, 
485                          progress=progress_tracker
486                      )
487                      success_message = "Manipulation strategies analysis completed successfully"
488                  
489                  # Get JSONSimplifier if available
490                  from main import app
491                  json_simplifier = getattr(app.state, "json_simplifier", None)
492                  
493                  # Prepare result
494                  result = analysis
495                  
496                  # Apply post-processor if available
497                  if json_simplifier and json_simplifier.should_process("video"):
498                      result_dict = {"result": result}
499                      processed = json_simplifier.process(result_dict, "video")
500                      result = processed.get("result", result)
501                      
502                      # If plain text explanation is available, add it to results
503                      if "plain_explanation" in processed:
504                          result["plain_explanation"] = processed["plain_explanation"]
505                  
506                  # Update with results
507                  update_task(task_id, {
508                      "status": "completed",
509                      "results": result,
510                      "message": success_message
511                  })
512                  
513                  logger.info(f"Analysis task {task_id} completed successfully")
514                  
515              except Exception as e:
516                  logger.error(f"Error during analysis task {task_id}: {str(e)}")
517                  logger.error(traceback.format_exc())
518                  update_task(task_id, {
519                      "status": "failed",
520                      "error": str(e),
521                      "message": f"Error: {str(e)}"
522                  })
523          
524          # Launch task in background
525          background_tasks.add_task(
526              execute_analysis,
527              task_id=task_id,
528              analysis_type=analysis_type,
529              extraction_text=extraction_text,
530              extraction_path=extraction_path
531          )
532          
533          return TaskResponse(
534              task_id=task_id,
535              status="pending",
536              message=f"{task_desc} task launched successfully"
537          )
538          
539      except HTTPException:
540          raise
541      except Exception as e:
542          logger.error(f"Error launching asynchronous video analysis: {str(e)}")
543          raise HTTPException(
544              status_code=500, 
545              detail=f"Error launching asynchronous video analysis: {str(e)}"
546          )
547  
548  @video_router.get('/allowed_extensions')
549  async def get_allowed_extensions():
550      """Retrieves the list of allowed video file extensions"""
551      return {
552          "allowed_extensions": list(ALLOWED_EXTENSIONS),
553          "max_upload_size_mb": video_config["max_upload_size_mb"]
554      }
555  
556  @video_router.get('/task/{task_id}/result', response_model=None)
557  async def get_task_result(
558      task_id: str,
559      request: Request,
560      current_user: User = Depends(get_current_active_user)
561  ):
562      """Retrieves the result of a video analysis task"""
563      try:
564          # Get task status
565          task = get_task_status(task_id)
566          
567          if not task:
568              raise HTTPException(status_code=404, detail="Task not found")
569              
570          if task["status"] != "completed":
571              return {
572                  "status": task["status"],
573                  "message": task.get("message", "Task is being processed")
574              }
575          
576          # Get results
577          result = task.get("results", {})
578          
579          # Apply JSONSimplifier post-processor if available and not already applied
580          if "plain_explanation" not in result:
581              json_simplifier = getattr(request.app.state, "json_simplifier", None)
582              if json_simplifier and json_simplifier.should_process("video"):
583                  result_dict = {"result": result}
584                  processed = json_simplifier.process(result_dict, "video")
585                  
586                  # If plain text explanation is available, add it to results
587                  if "plain_explanation" in processed:
588                      result["plain_explanation"] = processed["plain_explanation"]
589          
590          return {
591              "status": "completed",
592              "result": result,
593              "message": task.get("message", "Task completed successfully")
594          }
595          
596      except HTTPException:
597          raise
598      except Exception as e:
599          logger.error(f"Error retrieving task result: {str(e)}")
600          raise HTTPException(
601              status_code=500, 
602              detail=f"Error retrieving task result: {str(e)}"
603          )