/ BandwidthProcessor / bandwidth_processor.py
bandwidth_processor.py
  1  import yt_dlp
  2  import sys
  3  import ffmpeg
  4  import os
  5  from scipy.io.wavfile import write
  6  import numpy as np
  7  from scipy.signal import resample
  8  import json
  9  import time
 10  
 11  from datetime import datetime
 12  import pymysql as mysql
 13  import socket
 14  import threading
 15  from langdetect import detect
 16  import logging
 17  from functions import *
 18  
 19  
 20  
 21  
 22  environment = 'production'  # Default to 'production' if not set
 23  if len(sys.argv)> 1 and  sys.argv[1] is not None:
 24      environment = sys.argv[1]
 25  
 26  home_dir = "/home/cyril/dev/VideoTranslator"
 27  temp_dir = home_dir + '/temp'
 28  downloads_dir = home_dir + '/downloads'
 29  output_dir = home_dir + '/output'
 30  IPs = (
 31      '162.199.220.174',
 32      'translatizer.com'
 33  )
 34  
 35  pw = 'H0ur1!'
 36  host = '127.0.0.1'
 37  BANDWIDTH_PORT = 2212
 38  GPU_PORT = 2213
 39  if environment == 'production':
 40      logging.basicConfig(filename='bandwidth_processor.log', level=logging.INFO)
 41      home_dir = "/home/ubuntu/apps/VideoTranslator"
 42      output_dir = home_dir + '/output'
 43      temp_dir = home_dir + '/temp'
 44      downloads_dir = home_dir + '/downloads'
 45      pw = 'Cyr1lH0ur1!'
 46      host = 'translatizer.com'
 47      BANDWIDTH_PORT = 62212
 48      GPU_PORT = 62213
 49  else:
 50      logging.basicConfig(level=logging.INFO,
 51                          format='%(asctime)s - %(levelname)s - %(message)s')
 52  # MySQL Configuration
 53  db_config = {
 54      'user': 'appuser',
 55      'password': pw,
 56      'host': host,  # or the IP address of your MySQL server
 57      'database': 'videotranslator',
 58  }
 59  
 60  
 61  
 62  def get_destination_ip():
 63      retValue = ''
 64      if environment == 'debug':
 65          retValue = '127.0.0.1'
 66      else:
 67          if os.environ.get('PUBLIC_IP') == IPs[0]:
 68              retValue = IPs[1]
 69          else:
 70              retValue = IPs[0]
 71      return retValue
 72  
 73  
 74  
 75  def merge_audio_files(original_file, translated_file, output_file):
 76      """
 77      Merge two mono audio files into one stereo file using FFmpeg.
 78  
 79      Parameters:
 80      - original_file (str): Path to the first mono audio file.
 81      - translated_file (str): Path to the second mono audio file.
 82      - output_file (str): Path to the output stereo audio file.
 83  
 84      Returns:
 85      - str: Output message from FFmpeg.
 86      """
 87      """
 88      Merge two mono audio files into one stereo file using FFmpeg.
 89  
 90      Parameters:
 91      - original_file (str): Path to the first mono audio file.
 92      - translated_file (str): Path to the second mono audio file.
 93      - output_file (str): Path to the output stereo audio file.
 94      """
 95  
 96      input_args = ["-i", original_file, "-i", translated_file]
 97      filter_args = [
 98          "-filter_complex",
 99          "[0:a]volume=0.2,pan=mono|c0=c0[a1]; [1:a]volume=0.8,pan=mono|c0=c0[a2]; [a1][a2]amerge=inputs=2[aout]",
100          "-ac", "2",
101          "-map", "[aout]"
102      ]
103      output_args = [output_file]
104  
105      cmd = ["ffmpeg"] + input_args + filter_args + output_args
106  
107      ffmpeg.run(cmd)
108  
109      # Example usage:
110      # merge_audio_files("original_trunc.wav", "translated.wav", "merged.wav")
111  
112      import subprocess
113  
114  
115  
116  
117  def split_elements_for_caption(input_list):
118      output_list = []
119  
120      for element in input_list:
121          words = element['text'].split()
122          total_words = len(words)
123          total_duration = element['end'] - element['start']
124  
125          # Calculate the duration per word
126          duration_per_word = total_duration / total_words
127  
128          # Split the words into chunks of 10 or fewer
129          for i in range(0, total_words, 10):
130              chunk_words = words[i:i + 10]
131              chunk_start = element['start'] + i * duration_per_word
132              chunk_end = chunk_start + len(chunk_words) * duration_per_word
133  
134              output_list.append({
135                  'start': chunk_start,
136                  'end': chunk_end,
137                  'text': ' '.join(chunk_words)
138              })
139  
140      return output_list
141  
142  def convert_to_transcript(result):
143      retValue = []
144      for i in range(0,len(result['segments']) ):
145          current_part = {
146              'start': result['segments'][i]['start'],
147              'end': result['segments'][i]['end'],
148              'speaker_id': None,
149              'text': result['segments'][i]['text']
150  
151          }
152          retValue.append(current_part)
153      return retValue
154  
155  def split_text(result):
156      words = []
157      segment_id = 0
158      word_id = 0
159      for segment in result['segments']:
160          segment_id += 1
161          for word in segment['words']:
162              word_id += 1
163              thisWord = {
164                  'start': word['start'],
165                  'end': word['end'],
166                  'speaker_id': None,
167                  'word': word['word']
168  
169              }
170              words.append(thisWord)
171      parts = []
172      current_part_text = []
173      words_so_far = 0
174      word_count = 0
175      for word in words:
176          current_part_text.append(word['word'])
177          word_count += 1
178  
179          # Check if the word count is between 150 and 200 and the word ends with a period
180          if word_count >= 35:
181              if word_count >= 60 or word['word'][-1] == '.' or word['word'][-1] == '?':
182                  current_part_text_str = ''.join(current_part_text)
183                  current_part = {
184                      'start': words[words_so_far]['start'],
185                      'end': words[words_so_far + word_count - 1]['end'],
186                      'speaker_id': None,
187                      'text': current_part_text_str
188  
189                  }
190                  parts.append(current_part)
191                  current_part_text = []
192                  words_so_far += word_count
193                  word_count = 0
194  
195      # Add any remaining words to the parts list
196      if current_part_text:
197          current_part_text_str = ''.join(current_part_text)
198          current_part = {
199              'start': words[words_so_far]['start'],
200              'end': words[words_so_far + word_count - 1]['end'],
201              'speaker_id': None,
202              'text': current_part_text_str,
203  
204          }
205          parts.append(current_part)
206      # retValue = split_elements_for_caption(parts)
207      return parts
208  
209  
210  def fetch_queue_in_db_from_id(db_config, queue_id):
211      url = None
212      target_language_code = None
213      conn = mysql.connect(**db_config)
214      cursor = conn.cursor()
215      cursor.execute(
216          "SELECT  url, target_language_code FROM queue  WHERE id = %s", (queue_id,))
217      item = cursor.fetchone()
218      if item:
219          url = item[0]
220          target_language_code = item[1]
221      return url, target_language_code
222  
223  def index_exists(index_name, cursor):
224      query = "SELECT COUNT(1) AS IndexIsThere FROM INFORMATION_SCHEMA.STATISTICS  WHERE table_schema=DATABASE() AND index_name='" + index_name + "';"
225      cursor.execute(query)
226  
227      # Fetch the result
228      result = cursor.fetchone()
229      return result[0]
230  
231  
232  
233  
234  def get_audio_codec(video_path):
235      cmd = [
236          'ffprobe',
237          '-v', 'quiet',
238          '-print_format', 'json',
239          '-show_streams',
240          video_path
241      ]
242  
243  
244      result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
245      output = result.stdout.decode('utf-8')
246      data = json.loads(output)
247  
248      for stream in data['streams']:
249          if stream['codec_type'] == 'audio':
250              return stream['codec_name']
251  
252      return None
253  
254  
255  def extractAudioFromVideo(filename, desired_samplerate=24000):
256      logging.info(os.path.exists(downloads_dir + '/' + filename))
257      out = None
258      try:
259          out, _ = (
260              ffmpeg.input(downloads_dir + '/' + filename)
261              .output('pipe:', format='wav', ac=1, ar=desired_samplerate)
262              .run(capture_stdout=True, capture_stderr=True)
263          )
264      except ffmpeg._run.Error as e:
265          logging.info("FFmpeg Error:", e.stderr.decode())
266  
267  
268      return out
269  
270  
271  def normalize_text(t):
272      return t.replace("%", "o/o")
273  
274  
275  def format_drawtext_filter(captions, video):
276      """Generate drawtext filter string for ffmpeg."""
277  
278  
279      filters = []
280      font_size = int(video['width'] / 20)
281      for caption in captions:
282          text = caption['text']
283          modified_text = str(text).replace("'", "\u2019")
284          modified_text = str(modified_text).replace('"', "\u2019")
285          modified_text = normalize_text(modified_text)
286          modified_text_lines = modified_text.split('\n')
287          th_str = ""
288          for i in range(0, len(modified_text_lines)):
289              if len(modified_text_lines) == 2:
290                  if i == 0:
291                      th_str = "1.2"
292                  else:
293                      th_str = "1.1"
294              else:
295                  th_str = "1.1"
296  
297              filter_str = (
298                  # f"drawtext=text='{modified_text}':x=10:y=H-th-10:fontsize=24:fontcolor=white:" fontsize=24:fontcolor=black:
299                      f"drawtext=text='{modified_text_lines[i]}':x=(w-text_w)/2:y=" + "h/" + th_str + "-th:fontsize=" + str(
300                  font_size) + ":fontcolor=yellow:box=1:boxcolor=black@0.5:"
301                               f"enable='between(t,{round(float(caption['start']),2)},{round(float(caption['end']),2)})'"
302              )
303              filters.append(filter_str)
304      return ",".join(filters)
305  
306  import subprocess
307  
308  
309  
310  
311  def add_subtitles_to_video(subtitles, language, video):
312  
313  
314      filename = video['filename']
315      #bitrate = get_video_bitrate(filename)
316  
317      subtitles2 = split_captions_for_display(subtitles)
318      subtitles_list = split_in_smaller_chunks(subtitles2, 900)
319      if (len(subtitles_list) > 1):
320          print_with_current_milli_time(logging, "Breaking video_id=" + str(video['id']) + " in " + str(len(subtitles_list)) + " parts")
321      part_start_time = 0
322      for i in range(0, len(subtitles_list)):
323          if len(subtitles_list) == 1:
324              output_video = output_dir + '/' + video['url_hash'] + "_" + language + ".mp4"
325          else:
326              output_video = temp_dir + '/' + video['url_hash'] + "_" + language + "_" + str(i) + ".mp4"
327          if os.path.exists(output_video):
328              os.remove(output_video)
329          drawtext_filter = format_drawtext_filter(subtitles_list[i], video)
330  
331          cmd = ''
332          video_duration = subtitles_list[i][len(subtitles_list[i]) - 1]['end'] - part_start_time
333          '''cmd = [
334              'ffmpeg',  # specify the exact path to your FFmpeg binary
335              '-hwaccel', 'cuda',
336              '-hwaccel_device', '0',
337              '-i', downloads_dir + '/' + filename,
338              '-ss', str(part_start_time),
339              '-t', str(video_duration),
340              '-vf', drawtext_filter,
341              '-c:v', 'h264_nvenc',
342              output_video
343          ]'''
344          if (environment == 'production'):
345              if len(subtitles_list) == 1:
346                  cmd = "ffmpeg -y -i " + downloads_dir + '/' + filename + " -i " + home_dir + "/static/Translatizer_logo.png -filter_complex " + '"' + "[1:v]format=rgba,colorchannelmixer=aa=0.5[watermark];  [0:v][watermark] overlay=x=10:y=25:enable='between(t,0,10)'[watermarked];  [watermarked] drawtext=text='Translated with':fontsize=24:fontcolor=white@0.5:x=10:y=5:enable='between(t,0,10)', " + drawtext_filter + "[output]" + '"' + " -map [output] -map 0:a -c:v libx264 " + output_video
347                  os.system(cmd)
348                  '''cmd = [
349                      'ffmpeg',  # specify the exact path to your FFmpeg binary
350                      '-i', downloads_dir + '/' + filename,
351                      '-i', home_dir + '/static/Translatizer_logo.png'
352                      '-filter_complex', "[1:v]format=rgba,colorchannelmixer=aa=0.5[watermark];  [0:v][watermark] overlay=x=10:y=main_h-overlay_h-10:enable='between(t,0,10)'[watermarked];  [watermarked] drawtext=text='Translated with':fontsize=24:fontcolor=white@0.5:x=10:y=main_h-text_h-10:enable='between(t,0,10)', " + drawtext_filter + "[output]"
353                      '-map', '[output]'                                                                                                                                   
354                      '-c:v', 'libx264',
355                      output_video
356                  ]'''
357              else:
358  
359                  '''cmd = [
360                      'ffmpeg',  # specify the exact path to your FFmpeg binary
361                      '-i', downloads_dir + '/' + filename,
362                      '-ss', str(part_start_time),
363                      '-t', str(video_duration),
364                      '-vf', drawtext_filter,
365                      '-c:v', 'libx264',
366                      output_video
367                  ]
368                  result = subprocess.run(cmd)'''
369                  cmd = "ffmpeg -y -ss " + str(round(float(part_start_time), 2)) + " -t " + str(round(float(video_duration), 2)) + " -i " + downloads_dir + '/' + filename + " -i " + home_dir + "/static/Translatizer_logo.png -filter_complex " + '"' + "[1:v]format=rgba,colorchannelmixer=aa=0.5[watermark];  [0:v][watermark] overlay=x=10:y=25:enable='between(t,0,10)'[watermarked];  [watermarked] drawtext=text='Translated with':fontsize=24:fontcolor=white@0.5:x=10:y=5:enable='between(t,0,10)', " + drawtext_filter + "[output]" + '"' + " -map [output] -map 0:a -c:v libx264 " + output_video
370                  os.system(cmd)
371          else:
372              if len(subtitles_list) == 1:
373                  cmd = "ffmpeg -y -hwaccel cuda -hwaccel_device 0 -i " + downloads_dir + '/' + filename + " -i " + home_dir + "/static/Translatizer_logo.png -filter_complex " + '"' + "[1:v]format=rgba,colorchannelmixer=aa=0.5[watermark];  [0:v][watermark] overlay=x=10:y=25:enable='between(t,0,10)'[watermarked];  [watermarked] drawtext=text='Translated with':fontsize=24:fontcolor=white@0.5:x=10:y=5:enable='between(t,0,10)', " + drawtext_filter + "[output]" + '"' + " -map [output] -map 0:a -c:v h264_nvenc -pix_fmt yuv420p " + output_video
374                  os.system(cmd)
375                  '''cmd = [
376                      'ffmpeg',  # specify the exact path to your FFmpeg binary
377                      '-hwaccel', 'cuda',
378                      '-hwaccel_device', '0',
379                      '-i', downloads_dir + '/' + filename,
380                      '-vf', drawtext_filter,
381                      '-c:v', 'h264_nvenc',
382                      '-pix_fmt', 'yuv420p',
383                      output_video
384                  ]'''
385  
386              else:
387                  cmd = "ffmpeg -y -hwaccel cuda -hwaccel_device 0 -ss " + str(round(float(part_start_time), 2)) + " -t " + str(round(float(video_duration), 2)) + " -i " + downloads_dir + '/' + filename + " -i " + home_dir + "/static/Translatizer_logo.png -filter_complex " + '"' + "[1:v]format=rgba,colorchannelmixer=aa=0.5[watermark];  [0:v][watermark] overlay=x=10:y=25:enable='between(t,0,10)'[watermarked];  [watermarked] drawtext=text='Translated with':fontsize=24:fontcolor=white@0.5:x=10:y=5:enable='between(t,0,10)', " + drawtext_filter + "[output]" + '"' + " -map [output] -map 0:a -c:v h264_nvenc -pix_fmt yuv420p " + output_video
388                  os.system(cmd)
389                  '''cmd = [
390                      'ffmpeg',  # specify the exact path to your FFmpeg binary
391                      '-hwaccel', 'cuda',
392                      '-hwaccel_device', '0',
393                      '-i', downloads_dir + '/' + filename,
394                      '-ss', str(part_start_time),
395                      '-t', str(video_duration),
396                      '-vf', drawtext_filter,
397                      '-c:v', 'h264_nvenc',
398                      '-pix_fmt', 'yuv420p',
399                      output_video
400                  ]
401                  result = subprocess.run(cmd)'''
402  
403          #
404          part_start_time += video_duration
405  
406      if len(subtitles_list) > 1:
407          str1 = ''
408          str2 = ""
409          for i in range(0, len(subtitles_list)):
410              str1 += " -i " + temp_dir + '/' + video['url_hash'] + "_" + language + "_" + str(i) + ".mp4 "
411              str2 += "[" + str(i) + ":v:0][" + str(i) + ":a:0]"
412          str2 += "concat=n=" + str(len(subtitles_list)) + ":v=1:a=1[outv][outa]"
413  
414          cmd = "ffmpeg -y " + str1 + " -filter_complex " + str2 + " -map [outv] -map [outa] -c:v libx264 " + output_dir + '/' + video['url_hash'] + "_" + language + ".mp4"
415          os.system(cmd)
416          for i in range(0, len(subtitles_list)):
417              fileToDelete = temp_dir + '/' + video['url_hash'] + "_" + language + "_" + str(i) + ".mp4"
418              try:
419                  os.remove(fileToDelete)
420              except Exception as e:
421                  logging.info(e)
422  
423  
424  
425  
426  def generate_srt(subtitles):
427      """Generate SRT content from a list of subtitles."""
428      srt_content = ""
429      for i, (start, end, text) in enumerate(subtitles, 1):
430          srt_content += f"{i}\n"
431          srt_content += f"{format_timestamp(start)} --> {format_timestamp(end)}\n"
432          srt_content += f"{text}\n\n"
433      return srt_content
434  
435  
436  def get_wav_duration(wav_bytes, sample_rate):
437      # Subtract the header size to get the size of the audio data
438      audio_data_size = len(wav_bytes) - 44
439      # Calculate the number of samples
440      num_samples = audio_data_size / 2
441      # Calculate the duration
442      duration = num_samples / sample_rate
443      return duration
444  
445  
446  
447  
448  
449  
450  
451  def save_wav_to_disk(wav_array, sampling_rate, output_file):
452      wav_int16 = (wav_array * np.iinfo(np.int16).max).astype(np.int16)
453      write(output_file, sampling_rate, wav_int16)
454  
455  
456  def write_wav_files(wav_arrays, sampling_rate, output_dir, prefix="wav"):
457      file_paths = []
458      for idx, wav_array in enumerate(wav_arrays):
459          # Convert the wav array to int16 format
460          wav_int16 = (wav_array * np.iinfo(np.int16).max).astype(np.int16)
461  
462          # Define the output file path
463          file_path = f"{output_dir}/{prefix}_{idx + 1}.wav"
464          file_paths.append(file_path)
465  
466          # Write the wav array to a WAV file
467          write(file_path, sampling_rate, wav_int16)
468  
469      return file_paths
470  
471  
472  
473  def download_video(url):
474  
475  
476      # Ensure the output directory exists
477      # create a new sha256 hash object
478  
479  
480      url_hash = hashlib.sha256(url.encode()).hexdigest()
481      # Define the output template for the downloaded file
482      output_template = os.path.join(downloads_dir, url_hash + '.%(ext)s')
483      if url.find("twitter.com") > -1 or url.find(".x.com") > -1:
484          format = 'bestvideo+bestaudio/best'
485      else:
486          format = 'bestvideo[height<=480]+bestaudio/best[height<=480]'
487      options = {
488          'format': format,
489          'outtmpl': output_template,
490          'writeinfojson': True,
491          'quiet': True,
492          'nocheckcertificate': True,
493      }
494  
495      fileNameWithPath = ''
496      if not os.path.exists(fileNameWithPath):
497          with yt_dlp.YoutubeDL(options) as ydl:
498              # Extract video information without downloading
499              info_dict = ydl.extract_info(url, download=True)
500              fileNameWithPath = ydl.prepare_filename(info_dict)
501              list = str.split(fileNameWithPath, '/')
502              outputName = list[len(list) - 1]
503              video = {
504                  'id': None,
505                  'url': url,
506                  'url_hash': url_hash,
507                  'thumbnail_url': info_dict.get('thumbnail', ''),
508                  'filename': outputName,
509                  'language_id': None,
510                  'metadata' : [],
511                  'width': info_dict['width'],
512                  'height': info_dict['height']
513              }
514              metadata = {
515                  'language_id': None,
516                  'title': info_dict.get('title', ''),
517                  'description': info_dict.get('description', ''),
518                  'tags': ', '.join(info_dict.get('tags', [])),
519              }
520              video['metadata'].append(metadata)
521      print_with_current_milli_time(logging, "Download complete for video " + url)
522      return video, outputName
523  
524  
525  def process_item(url, queue_id):
526      SampleRate = 16000
527      # Replace with your correct path
528      useChatGPT = False
529  
530  
531      video = fetch_video_in_db_from_url(db_config, url)
532  
533  
534      if video['id'] == None:
535          print_with_current_milli_time(logging, "Downloading video " + url)
536          video, filename = download_video(video['url'])
537      else:
538          print_with_current_milli_time(logging, "Getting video from DB...")
539          filename = video['filename']
540      language_detected = None
541  
542      title = video['metadata'][0]['title']
543      description = video['metadata'][0]['description']
544      if title is not None:
545          language_detected = detect(title)
546          language_id, nllb_code, language_name = fetch_language_values(db_config, language_detected)
547          video['language_id'] = language_id
548          video['metadata'][0]['language_id'] = language_id
549          save_to_database(db_config, video)
550      original_audio_array = extractAudioFromVideo(filename, SampleRate)
551      print_with_current_milli_time(logging, "Sending audio to GPU server...")
552      drop_audio_array_in_queue(original_audio_array, queue_id, video['id'])
553  
554      # original_audio_array = torchaudio.transforms.Resample(orig_freq=SampleRate, new_freq=16000)(original_audio_array)
555  
556  
557  
558  def process_video(queue_id, video_id):
559      url, target_language_code = fetch_queue_in_db_from_id(db_config, queue_id)
560      video = fetch_video_in_db_from_id(db_config, video_id)
561  
562      print_with_current_milli_time(logging, "Creating video...")
563      filename = video['filename']
564      good_segments = get_transcript_from_db(db_config, video, target_language_code, 'chatgpt')
565      if good_segments is None or len(good_segments) == 0:
566          good_segments = get_transcript_from_db(db_config, video, target_language_code)
567      add_subtitles_to_video(good_segments, target_language_code, video)
568      print_with_current_milli_time(logging, "Video complete")
569      now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
570      conn = mysql.connect(**db_config)
571      cursor = conn.cursor()
572      cursor.execute("UPDATE queue SET time_ended = %s, status = 1 WHERE id = %s", (now, queue_id))
573      conn.commit()
574  
575  
576  '''def drop_process_video_request_in_queue(queue_id, video_id):
577      processVideoq.put((queue_id, video_id))
578  def processVideoWorker(processVideoq):
579      while True:
580          # Get item from queue
581          item = processVideoq.get()
582  
583          if item is not None:
584              queue_id, video_id = item
585              process_video(queue_id, video_id)'''
586  
587  def processVideoWorker(queue_id, video_id):
588      try:
589          print_with_current_milli_time(logging, "Creating video for queue_id=" + str(queue_id))
590          process_video(queue_id, video_id)
591          items_currently_in_video_queue.remove(queue_id)
592          print_with_current_milli_time(logging, "Video finished for queue_id=" + str(queue_id))
593  
594      except Exception as e:
595          conn = mysql.connect(**db_config)
596          cursor = conn.cursor()
597          print_with_current_milli_time(logging, "Error processing queue ID:" + str(queue_id))
598          print_with_current_milli_time(logging, "An exception occurred:" +  str(e))
599          cursor.execute("UPDATE queue SET status = 0 WHERE id = %s",
600                         queue_id)
601          conn.commit()
602          items_currently_in_video_queue.remove(queue_id)
603  
604  
605  
606  
607  
608  
609  def worker(q):
610      global queue_id_being_processed_for_audio
611      while True:
612          # Get item from queue
613          item = q.get()
614  
615          if item is not None:
616              audio_data, queue_id, video_id = item
617              print_with_current_milli_time(logging,
618                  "sending queue id=" + str(queue_id) + " video_id=" + str(video_id) + " to gpu")
619              retValue = False
620              while retValue is False:
621  
622                  retValue = send_audio_array_to_host(audio_data, queue_id, video_id, get_destination_ip(), GPU_PORT)
623                  if retValue:
624                      print_with_current_milli_time(logging,
625                          "Successfully sent queue id=" + str(queue_id) + " video_id=" + str(video_id) + " to gpu")
626                      items_currently_in_audio_queue.remove(queue_id)
627                  time.sleep(1)
628  
629  
630  
631  import queue
632  
633  def drop_audio_array_in_queue(audio_data, queue_id, video_id):
634      q.put((audio_data, queue_id, video_id))
635  def send_audio_array_to_host(audio_data, queue_id, video_id, host, port):
636      contentSent = False
637      with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
638  
639          try:
640              s.connect((host, port))
641              data_type = "AUDIO"
642              header = f"{data_type:<5},{queue_id:5},{video_id:5},".encode('utf-8')
643              s.sendall(header + audio_data)
644              contentSent = True
645              print_with_current_milli_time(logging, "Sent audio to GPU server...")
646          except ConnectionRefusedError as e:
647              time.sleep(3)
648      return contentSent
649  
650  # Configuration
651  HOST = '0.0.0.0'
652  
653  HEADER_SIZE = 18
654  items_currently_in_audio_queue = []
655  items_currently_in_video_queue = []
656  
657  
658  
659  
660  
661  
662  
663  def check_queue():
664      time_per_video_seconds_if_more_than_10_minutes = 0.72
665      time_per_video_seconds_if_less_than_10_minutes = 0.68
666      thread1 = None
667      thread2 = None
668      global items_currently_in_audio_queue
669      global items_currently_in_video_queue
670      while True:
671          conn = mysql.connect(**db_config)
672          cursor = conn.cursor()
673          # Check for an item with time_started equals to null
674          cursor.execute("SELECT id, url, target_language_code FROM queue  WHERE time_started IS NULL and time_ended IS NULL order by id LIMIT 1")
675          item = cursor.fetchone()
676  
677          if item:
678              queue_id, url, target_language_code = item
679              print_with_current_milli_time(logging, "Received queue item " + str(queue_id))
680              # Set time_started to now
681              now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
682              cursor.execute("UPDATE queue SET time_started = %s WHERE id = %s", (now, queue_id))
683              conn.commit()
684              status = 0
685              if queue_id not in items_currently_in_audio_queue:
686                  items_currently_in_audio_queue.append(queue_id)
687                  thread1 = threading.Thread(target=process_item,
688                                             args=(url, queue_id))
689              thread1.start()
690              status = 1
691              '''try:
692                  process_item(url, queue_id)
693                  status = 1
694              except Exception as e:
695                  print(e)'''
696          cursor.execute("SELECT id, url FROM queue  WHERE time_content_processed IS NOT NULL and time_ended IS NULL order by id LIMIT 1")
697          item = cursor.fetchone()
698          if item:
699              queue_id, url = item
700  
701              if queue_id not in items_currently_in_video_queue:
702                  items_currently_in_video_queue.append(queue_id)
703                  video = fetch_video_in_db_from_url(db_config, url)
704                  if video['id'] is not None:
705                      thread = threading.Thread(target=processVideoWorker, args=(queue_id, video['id']))
706                      thread.start()
707  
708          '''cursor.execute(
709              "select TIMESTAMPDIFF(SECOND,time_entered, utc_timestamp()), duration as secs, id from queue where time_started is not null and time_ended is null and status is null")
710          rows = cursor.fetchall()
711          for i in range(0, len(rows)):
712              time_for_current_item = rows[i][0]
713              duration = rows[i][1]
714              ready_in = 0
715              if duration > 600:
716                  ready_in += duration * time_per_video_seconds_if_more_than_10_minutes - time_for_current_item
717              else:
718                  ready_in += duration * time_per_video_seconds_if_less_than_10_minutes - time_for_current_item
719              if ready_in <= 0:
720                  queue_id = rows[i][2]
721                  logging.info("Aborting queue ID:" + str(queue_id))
722                  cursor.execute("UPDATE queue SET status = 0 WHERE id = %s",
723                                 queue_id)
724                  conn.commit()'''
725  
726          # Wait for 1 second before checking again
727          conn.close()
728          time.sleep(1)
729  
730  
731  q = queue.Queue()
732  #processVideoq = queue.Queue()
733  
734  
735  if __name__ == '__main__':
736      # Start the server thread
737  
738  
739      # Start worker thread
740      t = threading.Thread(target=worker, args=(q,))
741      t.start()
742  
743      check_queue()