/ BandwidthProcessor / bandwidth_processor.py
bandwidth_processor.py
1 import yt_dlp 2 import sys 3 import ffmpeg 4 import os 5 from scipy.io.wavfile import write 6 import numpy as np 7 from scipy.signal import resample 8 import json 9 import time 10 11 from datetime import datetime 12 import pymysql as mysql 13 import socket 14 import threading 15 from langdetect import detect 16 import logging 17 from functions import * 18 19 20 21 22 environment = 'production' # Default to 'production' if not set 23 if len(sys.argv)> 1 and sys.argv[1] is not None: 24 environment = sys.argv[1] 25 26 home_dir = "/home/cyril/dev/VideoTranslator" 27 temp_dir = home_dir + '/temp' 28 downloads_dir = home_dir + '/downloads' 29 output_dir = home_dir + '/output' 30 IPs = ( 31 '162.199.220.174', 32 'translatizer.com' 33 ) 34 35 pw = 'H0ur1!' 36 host = '127.0.0.1' 37 BANDWIDTH_PORT = 2212 38 GPU_PORT = 2213 39 if environment == 'production': 40 logging.basicConfig(filename='bandwidth_processor.log', level=logging.INFO) 41 home_dir = "/home/ubuntu/apps/VideoTranslator" 42 output_dir = home_dir + '/output' 43 temp_dir = home_dir + '/temp' 44 downloads_dir = home_dir + '/downloads' 45 pw = 'Cyr1lH0ur1!' 46 host = 'translatizer.com' 47 BANDWIDTH_PORT = 62212 48 GPU_PORT = 62213 49 else: 50 logging.basicConfig(level=logging.INFO, 51 format='%(asctime)s - %(levelname)s - %(message)s') 52 # MySQL Configuration 53 db_config = { 54 'user': 'appuser', 55 'password': pw, 56 'host': host, # or the IP address of your MySQL server 57 'database': 'videotranslator', 58 } 59 60 61 62 def get_destination_ip(): 63 retValue = '' 64 if environment == 'debug': 65 retValue = '127.0.0.1' 66 else: 67 if os.environ.get('PUBLIC_IP') == IPs[0]: 68 retValue = IPs[1] 69 else: 70 retValue = IPs[0] 71 return retValue 72 73 74 75 def merge_audio_files(original_file, translated_file, output_file): 76 """ 77 Merge two mono audio files into one stereo file using FFmpeg. 78 79 Parameters: 80 - original_file (str): Path to the first mono audio file. 81 - translated_file (str): Path to the second mono audio file. 82 - output_file (str): Path to the output stereo audio file. 83 84 Returns: 85 - str: Output message from FFmpeg. 86 """ 87 """ 88 Merge two mono audio files into one stereo file using FFmpeg. 89 90 Parameters: 91 - original_file (str): Path to the first mono audio file. 92 - translated_file (str): Path to the second mono audio file. 93 - output_file (str): Path to the output stereo audio file. 94 """ 95 96 input_args = ["-i", original_file, "-i", translated_file] 97 filter_args = [ 98 "-filter_complex", 99 "[0:a]volume=0.2,pan=mono|c0=c0[a1]; [1:a]volume=0.8,pan=mono|c0=c0[a2]; [a1][a2]amerge=inputs=2[aout]", 100 "-ac", "2", 101 "-map", "[aout]" 102 ] 103 output_args = [output_file] 104 105 cmd = ["ffmpeg"] + input_args + filter_args + output_args 106 107 ffmpeg.run(cmd) 108 109 # Example usage: 110 # merge_audio_files("original_trunc.wav", "translated.wav", "merged.wav") 111 112 import subprocess 113 114 115 116 117 def split_elements_for_caption(input_list): 118 output_list = [] 119 120 for element in input_list: 121 words = element['text'].split() 122 total_words = len(words) 123 total_duration = element['end'] - element['start'] 124 125 # Calculate the duration per word 126 duration_per_word = total_duration / total_words 127 128 # Split the words into chunks of 10 or fewer 129 for i in range(0, total_words, 10): 130 chunk_words = words[i:i + 10] 131 chunk_start = element['start'] + i * duration_per_word 132 chunk_end = chunk_start + len(chunk_words) * duration_per_word 133 134 output_list.append({ 135 'start': chunk_start, 136 'end': chunk_end, 137 'text': ' '.join(chunk_words) 138 }) 139 140 return output_list 141 142 def convert_to_transcript(result): 143 retValue = [] 144 for i in range(0,len(result['segments']) ): 145 current_part = { 146 'start': result['segments'][i]['start'], 147 'end': result['segments'][i]['end'], 148 'speaker_id': None, 149 'text': result['segments'][i]['text'] 150 151 } 152 retValue.append(current_part) 153 return retValue 154 155 def split_text(result): 156 words = [] 157 segment_id = 0 158 word_id = 0 159 for segment in result['segments']: 160 segment_id += 1 161 for word in segment['words']: 162 word_id += 1 163 thisWord = { 164 'start': word['start'], 165 'end': word['end'], 166 'speaker_id': None, 167 'word': word['word'] 168 169 } 170 words.append(thisWord) 171 parts = [] 172 current_part_text = [] 173 words_so_far = 0 174 word_count = 0 175 for word in words: 176 current_part_text.append(word['word']) 177 word_count += 1 178 179 # Check if the word count is between 150 and 200 and the word ends with a period 180 if word_count >= 35: 181 if word_count >= 60 or word['word'][-1] == '.' or word['word'][-1] == '?': 182 current_part_text_str = ''.join(current_part_text) 183 current_part = { 184 'start': words[words_so_far]['start'], 185 'end': words[words_so_far + word_count - 1]['end'], 186 'speaker_id': None, 187 'text': current_part_text_str 188 189 } 190 parts.append(current_part) 191 current_part_text = [] 192 words_so_far += word_count 193 word_count = 0 194 195 # Add any remaining words to the parts list 196 if current_part_text: 197 current_part_text_str = ''.join(current_part_text) 198 current_part = { 199 'start': words[words_so_far]['start'], 200 'end': words[words_so_far + word_count - 1]['end'], 201 'speaker_id': None, 202 'text': current_part_text_str, 203 204 } 205 parts.append(current_part) 206 # retValue = split_elements_for_caption(parts) 207 return parts 208 209 210 def fetch_queue_in_db_from_id(db_config, queue_id): 211 url = None 212 target_language_code = None 213 conn = mysql.connect(**db_config) 214 cursor = conn.cursor() 215 cursor.execute( 216 "SELECT url, target_language_code FROM queue WHERE id = %s", (queue_id,)) 217 item = cursor.fetchone() 218 if item: 219 url = item[0] 220 target_language_code = item[1] 221 return url, target_language_code 222 223 def index_exists(index_name, cursor): 224 query = "SELECT COUNT(1) AS IndexIsThere FROM INFORMATION_SCHEMA.STATISTICS WHERE table_schema=DATABASE() AND index_name='" + index_name + "';" 225 cursor.execute(query) 226 227 # Fetch the result 228 result = cursor.fetchone() 229 return result[0] 230 231 232 233 234 def get_audio_codec(video_path): 235 cmd = [ 236 'ffprobe', 237 '-v', 'quiet', 238 '-print_format', 'json', 239 '-show_streams', 240 video_path 241 ] 242 243 244 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 245 output = result.stdout.decode('utf-8') 246 data = json.loads(output) 247 248 for stream in data['streams']: 249 if stream['codec_type'] == 'audio': 250 return stream['codec_name'] 251 252 return None 253 254 255 def extractAudioFromVideo(filename, desired_samplerate=24000): 256 logging.info(os.path.exists(downloads_dir + '/' + filename)) 257 out = None 258 try: 259 out, _ = ( 260 ffmpeg.input(downloads_dir + '/' + filename) 261 .output('pipe:', format='wav', ac=1, ar=desired_samplerate) 262 .run(capture_stdout=True, capture_stderr=True) 263 ) 264 except ffmpeg._run.Error as e: 265 logging.info("FFmpeg Error:", e.stderr.decode()) 266 267 268 return out 269 270 271 def normalize_text(t): 272 return t.replace("%", "o/o") 273 274 275 def format_drawtext_filter(captions, video): 276 """Generate drawtext filter string for ffmpeg.""" 277 278 279 filters = [] 280 font_size = int(video['width'] / 20) 281 for caption in captions: 282 text = caption['text'] 283 modified_text = str(text).replace("'", "\u2019") 284 modified_text = str(modified_text).replace('"', "\u2019") 285 modified_text = normalize_text(modified_text) 286 modified_text_lines = modified_text.split('\n') 287 th_str = "" 288 for i in range(0, len(modified_text_lines)): 289 if len(modified_text_lines) == 2: 290 if i == 0: 291 th_str = "1.2" 292 else: 293 th_str = "1.1" 294 else: 295 th_str = "1.1" 296 297 filter_str = ( 298 # f"drawtext=text='{modified_text}':x=10:y=H-th-10:fontsize=24:fontcolor=white:" fontsize=24:fontcolor=black: 299 f"drawtext=text='{modified_text_lines[i]}':x=(w-text_w)/2:y=" + "h/" + th_str + "-th:fontsize=" + str( 300 font_size) + ":fontcolor=yellow:box=1:boxcolor=black@0.5:" 301 f"enable='between(t,{round(float(caption['start']),2)},{round(float(caption['end']),2)})'" 302 ) 303 filters.append(filter_str) 304 return ",".join(filters) 305 306 import subprocess 307 308 309 310 311 def add_subtitles_to_video(subtitles, language, video): 312 313 314 filename = video['filename'] 315 #bitrate = get_video_bitrate(filename) 316 317 subtitles2 = split_captions_for_display(subtitles) 318 subtitles_list = split_in_smaller_chunks(subtitles2, 900) 319 if (len(subtitles_list) > 1): 320 print_with_current_milli_time(logging, "Breaking video_id=" + str(video['id']) + " in " + str(len(subtitles_list)) + " parts") 321 part_start_time = 0 322 for i in range(0, len(subtitles_list)): 323 if len(subtitles_list) == 1: 324 output_video = output_dir + '/' + video['url_hash'] + "_" + language + ".mp4" 325 else: 326 output_video = temp_dir + '/' + video['url_hash'] + "_" + language + "_" + str(i) + ".mp4" 327 if os.path.exists(output_video): 328 os.remove(output_video) 329 drawtext_filter = format_drawtext_filter(subtitles_list[i], video) 330 331 cmd = '' 332 video_duration = subtitles_list[i][len(subtitles_list[i]) - 1]['end'] - part_start_time 333 '''cmd = [ 334 'ffmpeg', # specify the exact path to your FFmpeg binary 335 '-hwaccel', 'cuda', 336 '-hwaccel_device', '0', 337 '-i', downloads_dir + '/' + filename, 338 '-ss', str(part_start_time), 339 '-t', str(video_duration), 340 '-vf', drawtext_filter, 341 '-c:v', 'h264_nvenc', 342 output_video 343 ]''' 344 if (environment == 'production'): 345 if len(subtitles_list) == 1: 346 cmd = "ffmpeg -y -i " + downloads_dir + '/' + filename + " -i " + home_dir + "/static/Translatizer_logo.png -filter_complex " + '"' + "[1:v]format=rgba,colorchannelmixer=aa=0.5[watermark]; [0:v][watermark] overlay=x=10:y=25:enable='between(t,0,10)'[watermarked]; [watermarked] drawtext=text='Translated with':fontsize=24:fontcolor=white@0.5:x=10:y=5:enable='between(t,0,10)', " + drawtext_filter + "[output]" + '"' + " -map [output] -map 0:a -c:v libx264 " + output_video 347 os.system(cmd) 348 '''cmd = [ 349 'ffmpeg', # specify the exact path to your FFmpeg binary 350 '-i', downloads_dir + '/' + filename, 351 '-i', home_dir + '/static/Translatizer_logo.png' 352 '-filter_complex', "[1:v]format=rgba,colorchannelmixer=aa=0.5[watermark]; [0:v][watermark] overlay=x=10:y=main_h-overlay_h-10:enable='between(t,0,10)'[watermarked]; [watermarked] drawtext=text='Translated with':fontsize=24:fontcolor=white@0.5:x=10:y=main_h-text_h-10:enable='between(t,0,10)', " + drawtext_filter + "[output]" 353 '-map', '[output]' 354 '-c:v', 'libx264', 355 output_video 356 ]''' 357 else: 358 359 '''cmd = [ 360 'ffmpeg', # specify the exact path to your FFmpeg binary 361 '-i', downloads_dir + '/' + filename, 362 '-ss', str(part_start_time), 363 '-t', str(video_duration), 364 '-vf', drawtext_filter, 365 '-c:v', 'libx264', 366 output_video 367 ] 368 result = subprocess.run(cmd)''' 369 cmd = "ffmpeg -y -ss " + str(round(float(part_start_time), 2)) + " -t " + str(round(float(video_duration), 2)) + " -i " + downloads_dir + '/' + filename + " -i " + home_dir + "/static/Translatizer_logo.png -filter_complex " + '"' + "[1:v]format=rgba,colorchannelmixer=aa=0.5[watermark]; [0:v][watermark] overlay=x=10:y=25:enable='between(t,0,10)'[watermarked]; [watermarked] drawtext=text='Translated with':fontsize=24:fontcolor=white@0.5:x=10:y=5:enable='between(t,0,10)', " + drawtext_filter + "[output]" + '"' + " -map [output] -map 0:a -c:v libx264 " + output_video 370 os.system(cmd) 371 else: 372 if len(subtitles_list) == 1: 373 cmd = "ffmpeg -y -hwaccel cuda -hwaccel_device 0 -i " + downloads_dir + '/' + filename + " -i " + home_dir + "/static/Translatizer_logo.png -filter_complex " + '"' + "[1:v]format=rgba,colorchannelmixer=aa=0.5[watermark]; [0:v][watermark] overlay=x=10:y=25:enable='between(t,0,10)'[watermarked]; [watermarked] drawtext=text='Translated with':fontsize=24:fontcolor=white@0.5:x=10:y=5:enable='between(t,0,10)', " + drawtext_filter + "[output]" + '"' + " -map [output] -map 0:a -c:v h264_nvenc -pix_fmt yuv420p " + output_video 374 os.system(cmd) 375 '''cmd = [ 376 'ffmpeg', # specify the exact path to your FFmpeg binary 377 '-hwaccel', 'cuda', 378 '-hwaccel_device', '0', 379 '-i', downloads_dir + '/' + filename, 380 '-vf', drawtext_filter, 381 '-c:v', 'h264_nvenc', 382 '-pix_fmt', 'yuv420p', 383 output_video 384 ]''' 385 386 else: 387 cmd = "ffmpeg -y -hwaccel cuda -hwaccel_device 0 -ss " + str(round(float(part_start_time), 2)) + " -t " + str(round(float(video_duration), 2)) + " -i " + downloads_dir + '/' + filename + " -i " + home_dir + "/static/Translatizer_logo.png -filter_complex " + '"' + "[1:v]format=rgba,colorchannelmixer=aa=0.5[watermark]; [0:v][watermark] overlay=x=10:y=25:enable='between(t,0,10)'[watermarked]; [watermarked] drawtext=text='Translated with':fontsize=24:fontcolor=white@0.5:x=10:y=5:enable='between(t,0,10)', " + drawtext_filter + "[output]" + '"' + " -map [output] -map 0:a -c:v h264_nvenc -pix_fmt yuv420p " + output_video 388 os.system(cmd) 389 '''cmd = [ 390 'ffmpeg', # specify the exact path to your FFmpeg binary 391 '-hwaccel', 'cuda', 392 '-hwaccel_device', '0', 393 '-i', downloads_dir + '/' + filename, 394 '-ss', str(part_start_time), 395 '-t', str(video_duration), 396 '-vf', drawtext_filter, 397 '-c:v', 'h264_nvenc', 398 '-pix_fmt', 'yuv420p', 399 output_video 400 ] 401 result = subprocess.run(cmd)''' 402 403 # 404 part_start_time += video_duration 405 406 if len(subtitles_list) > 1: 407 str1 = '' 408 str2 = "" 409 for i in range(0, len(subtitles_list)): 410 str1 += " -i " + temp_dir + '/' + video['url_hash'] + "_" + language + "_" + str(i) + ".mp4 " 411 str2 += "[" + str(i) + ":v:0][" + str(i) + ":a:0]" 412 str2 += "concat=n=" + str(len(subtitles_list)) + ":v=1:a=1[outv][outa]" 413 414 cmd = "ffmpeg -y " + str1 + " -filter_complex " + str2 + " -map [outv] -map [outa] -c:v libx264 " + output_dir + '/' + video['url_hash'] + "_" + language + ".mp4" 415 os.system(cmd) 416 for i in range(0, len(subtitles_list)): 417 fileToDelete = temp_dir + '/' + video['url_hash'] + "_" + language + "_" + str(i) + ".mp4" 418 try: 419 os.remove(fileToDelete) 420 except Exception as e: 421 logging.info(e) 422 423 424 425 426 def generate_srt(subtitles): 427 """Generate SRT content from a list of subtitles.""" 428 srt_content = "" 429 for i, (start, end, text) in enumerate(subtitles, 1): 430 srt_content += f"{i}\n" 431 srt_content += f"{format_timestamp(start)} --> {format_timestamp(end)}\n" 432 srt_content += f"{text}\n\n" 433 return srt_content 434 435 436 def get_wav_duration(wav_bytes, sample_rate): 437 # Subtract the header size to get the size of the audio data 438 audio_data_size = len(wav_bytes) - 44 439 # Calculate the number of samples 440 num_samples = audio_data_size / 2 441 # Calculate the duration 442 duration = num_samples / sample_rate 443 return duration 444 445 446 447 448 449 450 451 def save_wav_to_disk(wav_array, sampling_rate, output_file): 452 wav_int16 = (wav_array * np.iinfo(np.int16).max).astype(np.int16) 453 write(output_file, sampling_rate, wav_int16) 454 455 456 def write_wav_files(wav_arrays, sampling_rate, output_dir, prefix="wav"): 457 file_paths = [] 458 for idx, wav_array in enumerate(wav_arrays): 459 # Convert the wav array to int16 format 460 wav_int16 = (wav_array * np.iinfo(np.int16).max).astype(np.int16) 461 462 # Define the output file path 463 file_path = f"{output_dir}/{prefix}_{idx + 1}.wav" 464 file_paths.append(file_path) 465 466 # Write the wav array to a WAV file 467 write(file_path, sampling_rate, wav_int16) 468 469 return file_paths 470 471 472 473 def download_video(url): 474 475 476 # Ensure the output directory exists 477 # create a new sha256 hash object 478 479 480 url_hash = hashlib.sha256(url.encode()).hexdigest() 481 # Define the output template for the downloaded file 482 output_template = os.path.join(downloads_dir, url_hash + '.%(ext)s') 483 if url.find("twitter.com") > -1 or url.find(".x.com") > -1: 484 format = 'bestvideo+bestaudio/best' 485 else: 486 format = 'bestvideo[height<=480]+bestaudio/best[height<=480]' 487 options = { 488 'format': format, 489 'outtmpl': output_template, 490 'writeinfojson': True, 491 'quiet': True, 492 'nocheckcertificate': True, 493 } 494 495 fileNameWithPath = '' 496 if not os.path.exists(fileNameWithPath): 497 with yt_dlp.YoutubeDL(options) as ydl: 498 # Extract video information without downloading 499 info_dict = ydl.extract_info(url, download=True) 500 fileNameWithPath = ydl.prepare_filename(info_dict) 501 list = str.split(fileNameWithPath, '/') 502 outputName = list[len(list) - 1] 503 video = { 504 'id': None, 505 'url': url, 506 'url_hash': url_hash, 507 'thumbnail_url': info_dict.get('thumbnail', ''), 508 'filename': outputName, 509 'language_id': None, 510 'metadata' : [], 511 'width': info_dict['width'], 512 'height': info_dict['height'] 513 } 514 metadata = { 515 'language_id': None, 516 'title': info_dict.get('title', ''), 517 'description': info_dict.get('description', ''), 518 'tags': ', '.join(info_dict.get('tags', [])), 519 } 520 video['metadata'].append(metadata) 521 print_with_current_milli_time(logging, "Download complete for video " + url) 522 return video, outputName 523 524 525 def process_item(url, queue_id): 526 SampleRate = 16000 527 # Replace with your correct path 528 useChatGPT = False 529 530 531 video = fetch_video_in_db_from_url(db_config, url) 532 533 534 if video['id'] == None: 535 print_with_current_milli_time(logging, "Downloading video " + url) 536 video, filename = download_video(video['url']) 537 else: 538 print_with_current_milli_time(logging, "Getting video from DB...") 539 filename = video['filename'] 540 language_detected = None 541 542 title = video['metadata'][0]['title'] 543 description = video['metadata'][0]['description'] 544 if title is not None: 545 language_detected = detect(title) 546 language_id, nllb_code, language_name = fetch_language_values(db_config, language_detected) 547 video['language_id'] = language_id 548 video['metadata'][0]['language_id'] = language_id 549 save_to_database(db_config, video) 550 original_audio_array = extractAudioFromVideo(filename, SampleRate) 551 print_with_current_milli_time(logging, "Sending audio to GPU server...") 552 drop_audio_array_in_queue(original_audio_array, queue_id, video['id']) 553 554 # original_audio_array = torchaudio.transforms.Resample(orig_freq=SampleRate, new_freq=16000)(original_audio_array) 555 556 557 558 def process_video(queue_id, video_id): 559 url, target_language_code = fetch_queue_in_db_from_id(db_config, queue_id) 560 video = fetch_video_in_db_from_id(db_config, video_id) 561 562 print_with_current_milli_time(logging, "Creating video...") 563 filename = video['filename'] 564 good_segments = get_transcript_from_db(db_config, video, target_language_code, 'chatgpt') 565 if good_segments is None or len(good_segments) == 0: 566 good_segments = get_transcript_from_db(db_config, video, target_language_code) 567 add_subtitles_to_video(good_segments, target_language_code, video) 568 print_with_current_milli_time(logging, "Video complete") 569 now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') 570 conn = mysql.connect(**db_config) 571 cursor = conn.cursor() 572 cursor.execute("UPDATE queue SET time_ended = %s, status = 1 WHERE id = %s", (now, queue_id)) 573 conn.commit() 574 575 576 '''def drop_process_video_request_in_queue(queue_id, video_id): 577 processVideoq.put((queue_id, video_id)) 578 def processVideoWorker(processVideoq): 579 while True: 580 # Get item from queue 581 item = processVideoq.get() 582 583 if item is not None: 584 queue_id, video_id = item 585 process_video(queue_id, video_id)''' 586 587 def processVideoWorker(queue_id, video_id): 588 try: 589 print_with_current_milli_time(logging, "Creating video for queue_id=" + str(queue_id)) 590 process_video(queue_id, video_id) 591 items_currently_in_video_queue.remove(queue_id) 592 print_with_current_milli_time(logging, "Video finished for queue_id=" + str(queue_id)) 593 594 except Exception as e: 595 conn = mysql.connect(**db_config) 596 cursor = conn.cursor() 597 print_with_current_milli_time(logging, "Error processing queue ID:" + str(queue_id)) 598 print_with_current_milli_time(logging, "An exception occurred:" + str(e)) 599 cursor.execute("UPDATE queue SET status = 0 WHERE id = %s", 600 queue_id) 601 conn.commit() 602 items_currently_in_video_queue.remove(queue_id) 603 604 605 606 607 608 609 def worker(q): 610 global queue_id_being_processed_for_audio 611 while True: 612 # Get item from queue 613 item = q.get() 614 615 if item is not None: 616 audio_data, queue_id, video_id = item 617 print_with_current_milli_time(logging, 618 "sending queue id=" + str(queue_id) + " video_id=" + str(video_id) + " to gpu") 619 retValue = False 620 while retValue is False: 621 622 retValue = send_audio_array_to_host(audio_data, queue_id, video_id, get_destination_ip(), GPU_PORT) 623 if retValue: 624 print_with_current_milli_time(logging, 625 "Successfully sent queue id=" + str(queue_id) + " video_id=" + str(video_id) + " to gpu") 626 items_currently_in_audio_queue.remove(queue_id) 627 time.sleep(1) 628 629 630 631 import queue 632 633 def drop_audio_array_in_queue(audio_data, queue_id, video_id): 634 q.put((audio_data, queue_id, video_id)) 635 def send_audio_array_to_host(audio_data, queue_id, video_id, host, port): 636 contentSent = False 637 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 638 639 try: 640 s.connect((host, port)) 641 data_type = "AUDIO" 642 header = f"{data_type:<5},{queue_id:5},{video_id:5},".encode('utf-8') 643 s.sendall(header + audio_data) 644 contentSent = True 645 print_with_current_milli_time(logging, "Sent audio to GPU server...") 646 except ConnectionRefusedError as e: 647 time.sleep(3) 648 return contentSent 649 650 # Configuration 651 HOST = '0.0.0.0' 652 653 HEADER_SIZE = 18 654 items_currently_in_audio_queue = [] 655 items_currently_in_video_queue = [] 656 657 658 659 660 661 662 663 def check_queue(): 664 time_per_video_seconds_if_more_than_10_minutes = 0.72 665 time_per_video_seconds_if_less_than_10_minutes = 0.68 666 thread1 = None 667 thread2 = None 668 global items_currently_in_audio_queue 669 global items_currently_in_video_queue 670 while True: 671 conn = mysql.connect(**db_config) 672 cursor = conn.cursor() 673 # Check for an item with time_started equals to null 674 cursor.execute("SELECT id, url, target_language_code FROM queue WHERE time_started IS NULL and time_ended IS NULL order by id LIMIT 1") 675 item = cursor.fetchone() 676 677 if item: 678 queue_id, url, target_language_code = item 679 print_with_current_milli_time(logging, "Received queue item " + str(queue_id)) 680 # Set time_started to now 681 now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') 682 cursor.execute("UPDATE queue SET time_started = %s WHERE id = %s", (now, queue_id)) 683 conn.commit() 684 status = 0 685 if queue_id not in items_currently_in_audio_queue: 686 items_currently_in_audio_queue.append(queue_id) 687 thread1 = threading.Thread(target=process_item, 688 args=(url, queue_id)) 689 thread1.start() 690 status = 1 691 '''try: 692 process_item(url, queue_id) 693 status = 1 694 except Exception as e: 695 print(e)''' 696 cursor.execute("SELECT id, url FROM queue WHERE time_content_processed IS NOT NULL and time_ended IS NULL order by id LIMIT 1") 697 item = cursor.fetchone() 698 if item: 699 queue_id, url = item 700 701 if queue_id not in items_currently_in_video_queue: 702 items_currently_in_video_queue.append(queue_id) 703 video = fetch_video_in_db_from_url(db_config, url) 704 if video['id'] is not None: 705 thread = threading.Thread(target=processVideoWorker, args=(queue_id, video['id'])) 706 thread.start() 707 708 '''cursor.execute( 709 "select TIMESTAMPDIFF(SECOND,time_entered, utc_timestamp()), duration as secs, id from queue where time_started is not null and time_ended is null and status is null") 710 rows = cursor.fetchall() 711 for i in range(0, len(rows)): 712 time_for_current_item = rows[i][0] 713 duration = rows[i][1] 714 ready_in = 0 715 if duration > 600: 716 ready_in += duration * time_per_video_seconds_if_more_than_10_minutes - time_for_current_item 717 else: 718 ready_in += duration * time_per_video_seconds_if_less_than_10_minutes - time_for_current_item 719 if ready_in <= 0: 720 queue_id = rows[i][2] 721 logging.info("Aborting queue ID:" + str(queue_id)) 722 cursor.execute("UPDATE queue SET status = 0 WHERE id = %s", 723 queue_id) 724 conn.commit()''' 725 726 # Wait for 1 second before checking again 727 conn.close() 728 time.sleep(1) 729 730 731 q = queue.Queue() 732 #processVideoq = queue.Queue() 733 734 735 if __name__ == '__main__': 736 # Start the server thread 737 738 739 # Start worker thread 740 t = threading.Thread(target=worker, args=(q,)) 741 t.start() 742 743 check_queue()