/ transcription_utils.py
transcription_utils.py
  1  """
  2  Utilitaires de transcription vidéo pour l'API
  3  --------------------------------------------
  4  Ce module fournit des fonctions pour la transcription audio de vidéos,
  5  avec et sans identification des locuteurs (diarization).
  6  """
  7  
  8  import os
  9  import logging
 10  import whisper
 11  import numpy as np
 12  import traceback
 13  from tempfile import NamedTemporaryFile
 14  from pathlib import Path
 15  
 16  # Logging
 17  logger = logging.getLogger("transcription_utils")
 18  
 19  # Vérifier les dépendances optionnelles
 20  try:
 21      from moviepy.editor import AudioFileClip
 22      MOVIEPY_AVAILABLE = True
 23  except ImportError:
 24      MOVIEPY_AVAILABLE = False
 25      logger.warning("MoviePy non disponible. La conversion audio sera désactivée.")
 26  
 27  try:
 28      from pyannote.audio import Pipeline
 29      PYANNOTE_AVAILABLE = True
 30  except ImportError:
 31      PYANNOTE_AVAILABLE = False
 32      logger.warning("Pyannote.audio non disponible. La diarization sera désactivée.")
 33  
 34  # Modèle Whisper global pour réutilisation
 35  whisper_model = None
 36  
 37  # Configuration
 38  HUGGINGFACE_TOKEN = os.environ.get("HUGGINGFACE_TOKEN", "")
 39  WHISPER_MODEL_SIZE = os.environ.get("WHISPER_MODEL_SIZE", "medium")
 40  AUDIO_TMP_DIR = Path("uploads/audio")
 41  AUDIO_TMP_DIR.mkdir(parents=True, exist_ok=True)
 42  
 43  def get_whisper_model(model_size=None):
 44      """Charge ou récupère le modèle Whisper"""
 45      global whisper_model
 46      
 47      if whisper_model is None:
 48          model_size = model_size or WHISPER_MODEL_SIZE
 49          logger.info(f"Chargement du modèle Whisper {model_size}...")
 50          whisper_model = whisper.load_model(model_size)
 51          
 52      return whisper_model
 53  
 54  def extract_audio(video_path, audio_path=None, progress=None):
 55      """
 56      Extrait l'audio d'une vidéo
 57      
 58      Args:
 59          video_path: Chemin vers le fichier vidéo
 60          audio_path: Chemin de sortie pour le fichier audio (facultatif)
 61          progress: Fonction de suivi de progression (facultatif)
 62          
 63      Returns:
 64          Chemin vers le fichier audio extrait
 65      """
 66      if not MOVIEPY_AVAILABLE:
 67          raise ImportError("MoviePy est requis pour l'extraction audio")
 68      
 69      if progress:
 70          progress(0.1, desc="Extraction de l'audio de la vidéo...")
 71      
 72      try:
 73          # Créer un fichier temporaire si aucun chemin n'est spécifié
 74          if audio_path is None:
 75              audio_file = NamedTemporaryFile(delete=False, suffix=".wav", dir=AUDIO_TMP_DIR)
 76              audio_path = audio_file.name
 77              audio_file.close()
 78          
 79          # Extraire l'audio
 80          audio = AudioFileClip(video_path)
 81          audio.write_audiofile(audio_path, codec="pcm_s16le", verbose=False, logger=None)
 82          
 83          if progress:
 84              progress(0.3, desc="Extraction audio terminée")
 85          
 86          return audio_path
 87          
 88      except Exception as e:
 89          error_msg = f"Erreur lors de l'extraction audio: {str(e)}"
 90          logger.error(error_msg)
 91          logger.error(traceback.format_exc())
 92          raise Exception(error_msg)
 93  
 94  def transcribe_audio(audio_path, model_size=None, progress=None):
 95      """
 96      Transcrit un fichier audio en texte
 97      
 98      Args:
 99          audio_path: Chemin vers le fichier audio
100          model_size: Taille du modèle Whisper à utiliser
101          progress: Fonction de suivi de progression (facultatif)
102          
103      Returns:
104          Texte transcrit ou segments détaillés selon le paramètre detailed
105      """
106      try:
107          if progress:
108              progress(0.4, desc="Chargement du modèle de transcription...")
109          
110          # Charger le modèle Whisper
111          model = get_whisper_model(model_size)
112          
113          if progress:
114              progress(0.5, desc="Transcription audio en cours...")
115          
116          # Transcrire l'audio
117          result = model.transcribe(audio_path)
118          
119          if progress:
120              progress(0.8, desc="Transcription terminée")
121          
122          return result
123          
124      except Exception as e:
125          error_msg = f"Erreur lors de la transcription: {str(e)}"
126          logger.error(error_msg)
127          logger.error(traceback.format_exc())
128          raise Exception(error_msg)
129  
130  def diarize_audio(audio_path, huggingface_token=None, progress=None):
131      """
132      Identifie les locuteurs dans un fichier audio
133      
134      Args:
135          audio_path: Chemin vers le fichier audio
136          huggingface_token: Token Hugging Face pour l'accès au modèle
137          progress: Fonction de suivi de progression (facultatif)
138          
139      Returns:
140          Liste de segments avec informations sur les locuteurs
141      """
142      if not PYANNOTE_AVAILABLE:
143          raise ImportError("Pyannote.audio est requis pour la diarization")
144      
145      if not huggingface_token and not HUGGINGFACE_TOKEN:
146          raise ValueError("Un token Hugging Face est requis pour la diarization")
147      
148      token = huggingface_token or HUGGINGFACE_TOKEN
149      
150      try:
151          if progress:
152              progress(0.6, desc="Chargement du modèle de diarization...")
153          
154          # Initialiser le pipeline de diarization
155          pipeline = Pipeline.from_pretrained(
156              "pyannote/speaker-diarization-3.1",
157              use_auth_token=token
158          )
159          
160          if progress:
161              progress(0.7, desc="Identification des locuteurs en cours...")
162          
163          # Effectuer la diarization
164          diarization = pipeline(audio_path)
165          
166          # Extraire les segments avec locuteurs
167          speaker_segments = []
168          for segment, _, speaker in diarization.itertracks(yield_label=True):
169              speaker_segments.append((segment.start, segment.end, speaker))
170          
171          if progress:
172              progress(0.9, desc="Identification des locuteurs terminée")
173          
174          return speaker_segments
175          
176      except Exception as e:
177          error_msg = f"Erreur lors de la diarization: {str(e)}"
178          logger.error(error_msg)
179          logger.error(traceback.format_exc())
180          raise Exception(error_msg)
181  
182  def assign_speakers(transcription, diarization):
183      """
184      Associe les locuteurs identifiés aux segments de transcription
185      
186      Args:
187          transcription: Résultat de la transcription avec Whisper
188          diarization: Résultat de la diarization avec Pyannote
189          
190      Returns:
191          Liste de segments avec texte et locuteur attribué
192      """
193      final_transcription = []
194      
195      segments = transcription["segments"]
196      
197      for segment in segments:
198          start, end, text = segment["start"], segment["end"], segment["text"]
199          speaker = "Unknown"
200          
201          # Trouver le locuteur principal pour ce segment
202          speaker_times = {}
203          
204          for d_start, d_end, d_speaker in diarization:
205              # Calculer le chevauchement
206              overlap_start = max(d_start, start)
207              overlap_end = min(d_end, end)
208              
209              if overlap_start < overlap_end:
210                  overlap_duration = overlap_end - overlap_start
211                  
212                  if d_speaker in speaker_times:
213                      speaker_times[d_speaker] += overlap_duration
214                  else:
215                      speaker_times[d_speaker] = overlap_duration
216          
217          # Sélectionner le locuteur avec le plus de temps de parole dans ce segment
218          if speaker_times:
219              speaker = max(speaker_times, key=speaker_times.get)
220          
221          # Ajouter le segment avec son locuteur
222          final_transcription.append({
223              "start": start,
224              "end": end,
225              "speaker": speaker,
226              "text": text
227          })
228      
229      return final_transcription
230  
231  def process_monologue(video_path, output_txt=None, model_size=None, progress=None):
232      """
233      Transcrit une vidéo sans identification des locuteurs
234      
235      Args:
236          video_path: Chemin vers le fichier vidéo
237          output_txt: Chemin de sortie pour le fichier texte (facultatif)
238          model_size: Taille du modèle Whisper à utiliser
239          progress: Fonction de suivi de progression (facultatif)
240          
241      Returns:
242          Texte transcrit et chemin vers le fichier texte s'il a été créé
243      """
244      try:
245          # Extraire l'audio
246          audio_path = extract_audio(video_path, progress=progress)
247          
248          # Transcrire l'audio
249          result = transcribe_audio(audio_path, model_size, progress=progress)
250          transcription = result["text"]
251          
252          # Enregistrer le résultat si un chemin est spécifié
253          if output_txt:
254              with open(output_txt, "w", encoding="utf-8") as f:
255                  f.write(transcription)
256              
257              if progress:
258                  progress(1.0, desc=f"Transcription enregistrée dans {output_txt}")
259          else:
260              if progress:
261                  progress(1.0, desc="Transcription terminée")
262          
263          # Nettoyer le fichier audio temporaire
264          if audio_path.startswith(str(AUDIO_TMP_DIR)):
265              try:
266                  os.unlink(audio_path)
267              except:
268                  pass
269          
270          return {
271              "transcription": transcription,
272              "segments": result["segments"],
273              "file_path": output_txt
274          }
275          
276      except Exception as e:
277          error_msg = f"Erreur lors du traitement vidéo (monologue): {str(e)}"
278          logger.error(error_msg)
279          logger.error(traceback.format_exc())
280          raise Exception(error_msg)
281  
282  def process_multiple_speakers(video_path, output_txt=None, model_size=None, huggingface_token=None, progress=None):
283      """
284      Transcrit une vidéo avec identification des locuteurs
285      
286      Args:
287          video_path: Chemin vers le fichier vidéo
288          output_txt: Chemin de sortie pour le fichier texte (facultatif)
289          model_size: Taille du modèle Whisper à utiliser
290          huggingface_token: Token Hugging Face pour l'accès au modèle
291          progress: Fonction de suivi de progression (facultatif)
292          
293      Returns:
294          Liste de segments avec texte et locuteur, et chemin vers le fichier texte s'il a été créé
295      """
296      try:
297          # Extraire l'audio
298          audio_path = extract_audio(video_path, progress=progress)
299          
300          # Transcrire l'audio
301          result = transcribe_audio(audio_path, model_size, progress=progress)
302          
303          # Identifier les locuteurs
304          diarization = diarize_audio(audio_path, huggingface_token, progress=progress)
305          
306          # Associer les locuteurs à la transcription
307          final_transcription = assign_speakers(result, diarization)
308          
309          # Enregistrer le résultat si un chemin est spécifié
310          if output_txt:
311              with open(output_txt, "w", encoding="utf-8") as f:
312                  for segment in final_transcription:
313                      f.write(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['speaker']}: {segment['text']}\n")
314              
315              if progress:
316                  progress(1.0, desc=f"Transcription enregistrée dans {output_txt}")
317          else:
318              if progress:
319                  progress(1.0, desc="Transcription terminée")
320          
321          # Nettoyer le fichier audio temporaire
322          if audio_path.startswith(str(AUDIO_TMP_DIR)):
323              try:
324                  os.unlink(audio_path)
325              except:
326                  pass
327          
328          return {
329              "segments": final_transcription,
330              "file_path": output_txt
331          }
332          
333      except Exception as e:
334          error_msg = f"Erreur lors du traitement vidéo (locuteurs multiples): {str(e)}"
335          logger.error(error_msg)
336          logger.error(traceback.format_exc())
337          raise Exception(error_msg)