/ tea2adt_source / mmrx.py
mmrx.py
1 import sys 2 import subprocess 3 import logging 4 from getpass import getpass 5 from io import StringIO 6 import time 7 from time import sleep 8 import select 9 import threading 10 from pathlib import Path 11 import os 12 from time import sleep 13 import queue 14 import datetime 15 import random 16 import getopt 17 import re 18 import string 19 20 21 22 # message types 23 ############### 24 ''' 25 [init] 26 [init_ack_chat] 27 [init_ack_shell] 28 [init_ack_llm] 29 [init_ack_file] 30 [keepalive] 31 <probe> 32 <start_msg> 33 <end_msg> 34 <preamble> <seq_tx><seq_rx>[ack] <trailer> 35 <preamble> <seq_tx><seq_rx>[data]<input_data> <trailer> 36 <preamble> <seq_tx><seq_rx>[file_name]<file_name>[file]<file_data> <trailer> 37 <preamble> <seq_tx><seq_rx>[file_name]<file_name>[file_end]<file_data> <trailer> 38 \\_______________ _______________________________________// 39 V 40 encrypted 41 ''' 42 43 44 45 # module variables 46 ################## 47 HALF_DUPLEX = False 48 TX_SENDING = False 49 TTS_OUT = False 50 SESSION_ESTABLISHED = False 51 SHELL_OUTPUT_READ_TIMEOUT_SEC = 1 52 LLM_OUTPUT_READ_TIMEOUT_SEC = 1 53 NEED_ACK = False 54 WAITING_FOR_COMMAND_OUTPUT = False 55 WAITING_FOR_LLM_OUTPUT = False 56 START_MSG = "" 57 SPEAKING = False 58 59 60 61 # main 62 ###### 63 def main(): 64 global HALF_DUPLEX 65 global TX_SENDING 66 global TTS_OUT 67 global SESSION_ESTABLISHED 68 global WAITING_FOR_COMMAND_OUTPUT 69 global WAITING_FOR_LLM_OUTPUT 70 global NEED_ACK 71 global START_MSG 72 global SPEAKING 73 74 # local variables 75 ################# 76 PASSWORD = "" 77 run_thread = True 78 stdin_err_cnt = 0 79 file_name = "" 80 RND_MAX_DELAY_MS = 1000 81 AVG_RND_DELAY_SEC = (RND_MAX_DELAY_MS/2000.0) 82 pattern = r'(?<=\?\ |\!\ |\:\ |\.\ )' # |\...\ )' ? 83 84 # definitions 85 ############# 86 INCOMPLETE_FILE_OUT_64 = 0 87 WRITE_FILE_OUT_64 = 1 88 WRITE_AND_DECODE_FILE_OUT_64 = 2 89 APPEND_FILE_OUT_64 = 3 90 DECODE_FILE_OUT_64 = 4 91 APPEND_AND_DECODE_FILE_OUT_64 = 5 92 state_file_out = INCOMPLETE_FILE_OUT_64 93 94 # read tmp path 95 ############### 96 f = open("cfg/tmp_path", "r") 97 TMP_PATH = f.read().splitlines()[0] 98 f.close() 99 HOME = str(Path.home()) 100 101 # default configuration 102 ####################### 103 HALF_DUPLEX = False 104 REMOTE_SHELL = False 105 LLM = False 106 FILE_TRANSFER = False 107 RETRANSMISSION_TIMEOUT_SEC = 3.5 108 SHELL_OUT_MAX_DELAY = 2.75 109 LLM_OUT_MAX_DELAY = 2.75 110 SEND_DELAY_SEC = 0.1 111 ARMOR = "" # "--armor" 112 SHOW_RX_PROMPT = False 113 SHOW_TX_PROMPT = False 114 VERBOSE = False 115 TTS = False 116 TEMP_GPG_FILE = HOME+TMP_PATH+"/tmp/msgrx.gpg" 117 SEQ_TX_FILE = HOME+TMP_PATH+"/state/seq_tx" 118 SEQ_RX_FILE = HOME+TMP_PATH+"/state/seq_rx" 119 SEQ_TX_ACKED_FILE = HOME+TMP_PATH+"/state/seq_tx_acked" 120 SESSION_ESTABLISHED_FILE = HOME+TMP_PATH+"/state/session_established" 121 LOGGING_LEVEL = logging.WARNING 122 INVALID_SEQ_NR = "200" # (outside 0-93 range!) 123 TIMEOUT_POLL_SEC = float(0.01) 124 SPLIT_TX_LINES = int(25) 125 LOG_TO_FILE = False 126 PIPE_SHELL_IN = HOME+TMP_PATH+"/tmp/pipe_shell_in" 127 PIPE_SHELL_OUT = HOME+TMP_PATH+"/tmp/pipe_shell_out" 128 PIPE_LLM_OUT = HOME+TMP_PATH+"/tmp/pipe_llm_out" 129 PIPE_FILE_IN = HOME+TMP_PATH+"/tmp/pipe_file_in" 130 TMPFILE_BASE64_IN=HOME+TMP_PATH+"/tmp/in.64" 131 132 # default state 133 ############### 134 TRANSMITTER_STARTED_FILE = HOME+TMP_PATH+"/state/transmitter_started" 135 TRANSMITTER_STARTED = False 136 TX_SENDING_FILE_FILE = HOME+TMP_PATH+"/state/tx_sending_file" 137 TX_SENDING_FILE = False 138 RX_RECEIVING_FILE = False 139 SESSION_ESTABLISHED = False 140 TTS_OUT_FILE = HOME+TMP_PATH+"/state/tts_out" 141 142 # parse arguments 143 ################# 144 if len(sys.argv) > 1: 145 if sys.argv[1] != "": 146 PASSWORD = sys.argv[1] 147 148 # current configuration 149 ####################### 150 # half_duplex 151 f = open(HOME+TMP_PATH+"/cfg/half_duplex", "r") 152 if f.read().splitlines()[0] == "true": 153 HALF_DUPLEX = True 154 f.close() 155 print("half_duplex = " + str(HALF_DUPLEX)) 156 # llm, remote shell or file transfer? otherwise we use chat per default 157 if len(sys.argv) > 2: 158 if sys.argv[2] == "-l" or sys.argv[2] == "--llm" or sys.argv[2] == "--llm-chat" or sys.argv[2] == "--llm-prompt": 159 LLM = True 160 elif sys.argv[2] == "-s" or sys.argv[2] == "--rs" or sys.argv[2] == "--remote-shell" or sys.argv[2] == "--reverse-shell": 161 REMOTE_SHELL = True 162 elif sys.argv[2] == "-f" or sys.argv[2] == "--file" or sys.argv[2] == "--file-transfer": 163 FILE_TRANSFER = True 164 # keepalive_time_sec 165 f = open(HOME+TMP_PATH+"/cfg/keepalive_time_sec", "r") 166 KEEPALIVE_TIME_SEC = f.read().splitlines()[0] 167 KEEPALIVE_TIME_SEC_F = float(KEEPALIVE_TIME_SEC) 168 if KEEPALIVE_TIME_SEC_F <= AVG_RND_DELAY_SEC: 169 AVG_RND_DELAY_SEC = 0.0 170 print("AVG_RND_DELAY_SEC changed to 0") 171 f.close() 172 print("KEEPALIVE_TIME_SEC = " + str(KEEPALIVE_TIME_SEC)) 173 # retransmission_timeout_sec 174 f = open(HOME+TMP_PATH+"/cfg/retransmission_timeout_sec", "r") 175 RETRANSMISSION_TIMEOUT_SEC = float(f.read().splitlines()[0]) 176 print("retransmission_timeout_sec = " + str(RETRANSMISSION_TIMEOUT_SEC)) 177 # send_delay_sec 178 f = open(HOME+TMP_PATH+"/cfg/send_delay_sec", "r") 179 SEND_DELAY_SEC = f.read().splitlines()[0] 180 SEND_DELAY_SEC_F = float(SEND_DELAY_SEC) 181 f.close() 182 print("send_delay_sec = " + str(SEND_DELAY_SEC)) 183 # armor 184 f = open(HOME+TMP_PATH+"/cfg/armor", "r") 185 ARMOR = f.read().splitlines()[0] 186 f.close() 187 print("armor = " + ARMOR) 188 # need_ack 189 f = open(HOME+TMP_PATH+"/cfg/need_ack", "r") 190 if f.read().splitlines()[0] == "true": 191 NEED_ACK = True 192 f.close() 193 print("need_ack = " + str(NEED_ACK)) 194 # start_msg 195 f = open(HOME+TMP_PATH+"/cfg/start_msg", "r") 196 c1 = f.read(1) 197 f.close() 198 if c1: 199 f = open(HOME+TMP_PATH+"/cfg/start_msg", "r") 200 START_MSG = f.read().splitlines()[0] 201 f.close() 202 print("start_msg = " + START_MSG) 203 # show_rx_prompt 204 f = open(HOME+TMP_PATH+"/cfg/show_rx_prompt", "r") 205 if f.read().splitlines()[0] == "true": 206 SHOW_RX_PROMPT = True 207 f.close() 208 print("show_rx_prompt = " + str(SHOW_RX_PROMPT)) 209 # show_tx_prompt 210 f = open(HOME+TMP_PATH+"/cfg/show_tx_prompt", "r") 211 if f.read().splitlines()[0] == "true": 212 SHOW_TX_PROMPT = True 213 f.close() 214 print("show_tx_prompt = " + str(SHOW_TX_PROMPT)) 215 # syncbyte 216 f = open(HOME+TMP_PATH+"/cfg/syncbyte", "r") 217 SYNC_BYTE = str(f.read().splitlines()[0]) 218 f.close() 219 # print("syncbyte = " + SYNC_BYTE) # already printed in bash 220 # baud 221 f = open(HOME+TMP_PATH+"/cfg/baud", "r") 222 BAUD = str(f.read().splitlines()[0]) 223 f.close() 224 # print("baud = " + BAUD) # already printed in bash 225 # timeout_poll_sec 226 f = open(HOME+TMP_PATH+"/cfg/timeout_poll_sec", "r") 227 TIMEOUT_POLL_SEC = float(f.read().splitlines()[0]) 228 f.close() 229 print("timeout_poll_sec = " + str(TIMEOUT_POLL_SEC)) 230 # split_tx_lines 231 f = open(HOME+TMP_PATH+"/cfg/split_tx_lines", "r") 232 SPLIT_TX_LINES = int(f.read().splitlines()[0]) 233 f.close() 234 # log_to_file 235 f = open(HOME+TMP_PATH+"/cfg/log_to_file", "r") 236 if f.read().splitlines()[0] == "true": 237 LOG_TO_FILE = True 238 date_time = datetime.datetime.now().strftime("%d/%m/%Y, %H:%M:%S") 239 f = open("out/log.txt", "w") 240 f.write("tea2adt started: " + date_time + "\n") 241 f.close() 242 f.close() 243 print("log_to_file = " + str(LOG_TO_FILE)) 244 # logging_level 245 f = open(HOME+TMP_PATH+"/cfg/logging_level", "r") 246 LOGGING_LEVEL = f.read().splitlines()[0] 247 f.close() 248 # evaluate LOGGING_LEVEL as string 249 if LOGGING_LEVEL == "logging.NOTSET": 250 LOGGING_LEVEL = logging.NOTSET 251 elif LOGGING_LEVEL == "logging.DEBUG": 252 LOGGING_LEVEL = logging.DEBUG 253 elif LOGGING_LEVEL == "logging.INFO": 254 LOGGING_LEVEL = logging.INFO 255 elif LOGGING_LEVEL == "logging.WARNING": 256 LOGGING_LEVEL = logging.WARNING 257 elif LOGGING_LEVEL == "logging.ERROR": 258 LOGGING_LEVEL = logging.ERROR 259 elif LOGGING_LEVEL == "logging.CRITICAL": 260 LOGGING_LEVEL = logging.CRITICAL 261 else: 262 LOGGING_LEVEL = logging.WARNING 263 # verbose 264 f = open(HOME+TMP_PATH+"/cfg/verbose", "r") 265 if f.read().splitlines()[0] == "true": 266 VERBOSE = True 267 f.close() 268 if VERBOSE and LOGGING_LEVEL > logging.INFO: 269 print("verbose = true (logging_level forced to logging.INFO)") 270 else: 271 print("verbose = false") 272 # tts (text to speech) 273 f = open(HOME+TMP_PATH+"/cfg/text_to_speech", "r") 274 tts = f.read().splitlines()[0] 275 if tts != "": 276 TTS = True 277 else: 278 TTS = False 279 f.close() 280 281 # logging 282 ######### 283 if VERBOSE: 284 # force logging level to info at least 285 # which is the level to output VERBOSE infos 286 if LOGGING_LEVEL > logging.INFO: 287 LOGGING_LEVEL = logging.INFO 288 if LOG_TO_FILE: 289 logging.basicConfig(filename="out/log.txt", filemode='a', format='%(message)s', level=LOGGING_LEVEL, force=True) 290 else: 291 logging.basicConfig(format='%(message)s', level=LOGGING_LEVEL, force=True) 292 else: 293 if LOG_TO_FILE: 294 logging.basicConfig(filename="out/log.txt", filemode='a', 295 format='%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s] %(message)s', 296 datefmt='%H:%M:%S', level=LOGGING_LEVEL, force=True) 297 else: 298 logging.basicConfig(format='%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s] %(message)s', 299 datefmt='%H:%M:%S', level=LOGGING_LEVEL, force=True) 300 # make sure we also see stderr when logging level is debug 301 if LOGGING_LEVEL == logging.DEBUG: 302 SENT_OUT_TO_DEV_NULL = "" 303 else: 304 SENT_OUT_TO_DEV_NULL = " 2>/dev/null" 305 306 # print logging_level 307 ##################### 308 # evaluate LOGGING_LEVEL as integer value 309 if LOGGING_LEVEL == logging.NOTSET: 310 print("logging_level = logging.NOTSET") 311 elif LOGGING_LEVEL == logging.DEBUG: 312 print("logging_level = logging.DEBUG") 313 elif LOGGING_LEVEL == logging.INFO: 314 print("logging_level = logging.INFO") 315 elif LOGGING_LEVEL == logging.WARNING: 316 print("logging_level = logging.WARNING") 317 elif LOGGING_LEVEL == logging.ERROR: 318 print("logging_level = logging.ERROR") 319 elif LOGGING_LEVEL == logging.CRITICAL: 320 print("logging_level = logging.CRITICAL") 321 else: 322 print("logging_level = logging.WARNING") 323 324 # banner 325 ######## 326 if LLM: 327 print("******************************") 328 print("*** tea2adt LLM prompt *******") 329 print("******************************") 330 elif REMOTE_SHELL: 331 print("******************************") 332 print("*** tea2adt remote shell *****") 333 print("******************************") 334 elif FILE_TRANSFER: 335 print("******************************************************") 336 print("*** tea2adt file transfer receiver,") 337 print("*** the received files can be found in folder rx_files") 338 print("******************************************************") 339 else: 340 print("******************************") 341 print("*** tea2adt chat receiver ****") 342 print("******************************") 343 344 # wait for transmitter to start? 345 ################################ 346 if (TRANSMITTER_STARTED == True) or (REMOTE_SHELL == True) or (LLM == True): 347 pass 348 else: 349 print("Waiting for transmitter to start...") 350 while TRANSMITTER_STARTED == False: 351 f = open(TRANSMITTER_STARTED_FILE, "r") 352 if f.read().splitlines()[0] == "true": 353 TRANSMITTER_STARTED = True 354 f.close() 355 sleep(0.1) 356 357 # prompt message 358 ################ 359 if REMOTE_SHELL: 360 print("Remote shell started...") 361 elif LLM: 362 print("LLM prompt started...") 363 print("Waiting for session to be established...") 364 365 # SET_COMM_IFS 366 ''' 367 # initialize audio interfaces 368 ############################# 369 # NOTE: at this point minimodem --tx and --rx have been started, 370 # thus we can get the sink-input and source-output to then link them with the configured audio interface 371 command = "./set_interfaces.sh" 372 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.DEVNULL, text=True) 373 out, err = p1.communicate() 374 if p1.returncode == 0: 375 p1.terminate() 376 p1.kill() 377 else: 378 logging.warning("Could not initialize audio intefaces!") 379 logging.debug("Audio interfaces set as configured.") 380 ''' 381 382 # current state 383 ############### 384 # session_established 385 f = open(SESSION_ESTABLISHED_FILE, "r") 386 if f.read().splitlines()[0] == "true": 387 SESSION_ESTABLISHED = True 388 f.close() 389 if REMOTE_SHELL or LLM: 390 # force session not yet established 391 SESSION_ESTABLISHED = False 392 f = open(SESSION_ESTABLISHED_FILE, "w") 393 f.write("false") 394 f.close() 395 else: 396 # transmitter_started 397 f = open(TRANSMITTER_STARTED_FILE, "r") 398 if f.read().splitlines()[0] == "true": 399 TRANSMITTER_STARTED = True 400 f.close() 401 # tx_sending_file 402 f = open(TX_SENDING_FILE_FILE, "r") 403 if f.read().splitlines()[0] == "true": 404 TX_SENDING_FILE = True 405 f.close() 406 407 # helper functions 408 ################## 409 def clear_seq_nrs(): 410 f = open(SEQ_TX_FILE, "w") 411 f.write("0") 412 f.close() 413 logging.debug("set seq_tx to 0") 414 f = open(SEQ_RX_FILE, "w") 415 f.write(INVALID_SEQ_NR) 416 f.close() 417 logging.debug("set seq_rx to INVALID_SEQ_NR (outside 0-93 range!)") 418 f = open(SEQ_TX_ACKED_FILE, "w") 419 f.write(INVALID_SEQ_NR) 420 f.close() 421 logging.debug("set seq_tx_acked to INVALID_SEQ_NR (outside 0-93 range!)") 422 423 def init_session(): 424 global SESSION_ESTABLISHED 425 clear_seq_nrs() 426 SESSION_ESTABLISHED = True 427 f = open(SESSION_ESTABLISHED_FILE, "w") 428 f.write("true") 429 f.close() 430 logging.debug("set session_established to true") 431 432 def end_session(): 433 global SESSION_ESTABLISHED 434 clear_seq_nrs() 435 SESSION_ESTABLISHED = False 436 f = open(SESSION_ESTABLISHED_FILE, "w") 437 f.write("false") 438 f.close() 439 logging.debug("set session_established to false") 440 441 def minimodem_tx(message): 442 global START_MSG 443 if START_MSG != "": 444 messageQueue.put(START_MSG) 445 messageQueue.put(message) 446 447 def llm_input(prompt_input): 448 promptInputQueue.put(prompt_input) 449 450 def check_tts(): 451 f = open(TTS_OUT_FILE, "r") 452 tts_out = f.read().splitlines()[0] 453 if tts_out == "true": 454 TTS_OUT = True 455 elif tts_out == "false": 456 TTS_OUT = False 457 f.close() 458 459 # thread to pass prompt input to LLM 460 #################################### 461 def PassPromptInputToLlmThread(): 462 global SPEAKING 463 # main thread loop 464 ################## 465 while run_thread: 466 # block until a prompt input is written to the queue 467 prompt_input = promptInputQueue.get(block=True) 468 # wait first for prompt input to be spoken 469 if TTS: 470 while SPEAKING: 471 sleep(TIMEOUT_POLL_SEC) 472 # WORKAROUND: works with ; but not with "*< ...why? 473 prompt_input = prompt_input.replace(';', '\\;') # 474 # write input to LLM via tmux in order to maintain the session 475 command = "".join(["tmux send-keys -t session_llm \"", prompt_input, "\" Enter"]) 476 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.DEVNULL, text=True) 477 out, err = p1.communicate() 478 if p1.returncode == 0: 479 p1.terminate() 480 p1.kill() 481 else: 482 logging.warning("could not send input to LLM!") 483 logging.debug("Input for LLM sent to model.") 484 485 # thread to transmit minimodem message 486 # (without encryption and seq. nr.) 487 ###################################### 488 def TransmitMinimodemThread(): 489 global TX_SENDING 490 global HALF_DUPLEX 491 # main thread loop 492 ################## 493 while run_thread: 494 # block until a message is input to the queue 495 message = messageQueue.get(block=True) 496 # avoid collissions with tranmissions triggered somewhere else 497 while TX_SENDING: 498 sleep(TIMEOUT_POLL_SEC) 499 # send message 500 TX_SENDING = True 501 # mute mic? 502 if HALF_DUPLEX: 503 command = "./mute_mic.sh" 504 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.DEVNULL, text=True) 505 out, err = p1.communicate() 506 if p1.returncode == 0: 507 p1.terminate() 508 p1.kill() 509 # minimodem --tx 510 command = "".join(["echo \"", message, "\" | minimodem --tx --ascii --quiet --startbits 1 --stopbits 1.0 --sync-byte ", SYNC_BYTE, " --volume 1.0 ", BAUD]) 511 # SET_COMM_IFS 512 # command = "".join(["tmux send-keys -t session_mmtx \"", message, "\" Enter"]) 513 # alternative: 514 # command = "".join(["screen -S session_mmtx -X stuff \"", message, "^M\" Enter"]) 515 if VERBOSE: 516 # stdout to see VERBOSE text from minimodem 517 p1 = subprocess.Popen(command, shell=True, stdout=None, text=True) 518 else: 519 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.DEVNULL, text=True) 520 out, err = p1.communicate() 521 if p1.returncode == 0: 522 p1.terminate() 523 p1.kill() 524 logging.info("> " + message) 525 # unmute mic? 526 if HALF_DUPLEX: 527 command = "./unmute_mic.sh" 528 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.DEVNULL, text=True) 529 out, err = p1.communicate() 530 if p1.returncode == 0: 531 p1.terminate() 532 p1.kill() 533 TX_SENDING = False 534 535 if REMOTE_SHELL or FILE_TRANSFER or LLM: 536 # shell, llm: need to send ACKs here because a user input does not always produce a session output 537 # thus no mmsessionout.sh gets called and thus no AKCs are sent 538 # file: for file transfers the RX process is responsible to send ACKs 539 def transmit_ack(): 540 ackQueue.put("*") 541 542 # thread to transmit ACKs 543 ######################### 544 def TransmitAckThread(): 545 global TX_SENDING 546 # main thread loop 547 ################## 548 while run_thread: 549 # block until something is input to the queue 550 # we don't even check it, any input will trigger an ack 551 ackCmd = ackQueue.get(block=True) 552 # TODO: use delay also with chat and remote-shell? 553 if FILE_TRANSFER: 554 sleep(SEND_DELAY_SEC_F) 555 # avoid collissions with tranmissions triggered somewhere else 556 while TX_SENDING: 557 sleep(TIMEOUT_POLL_SEC) 558 # send ack 559 TX_SENDING = True 560 # add single quotes around the password in case it contains spaces 561 # note: we may instead send something in data with ./mmdata.sh, i.o. just [ack], in order to flush the shell command output 562 command = "".join(["./mmack.sh '", PASSWORD, "'"]) 563 if VERBOSE: 564 # stdout to see VERBOSE text from mmack.sh 565 p1 = subprocess.Popen(command, shell=True, stdout=None, text=True) 566 else: 567 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.DEVNULL, text=True) 568 out, err = p1.communicate() 569 if p1.returncode == 0: 570 p1.terminate() 571 p1.kill() 572 logging.info("> [ack]") 573 TX_SENDING = False 574 575 # TTS only for chat or LLM 576 if not REMOTE_SHELL and not FILE_TRANSFER: 577 578 def tts(data): 579 ttsQueue.put(data) 580 581 def tts_speak(): 582 text = ttsQueue.get(block=True) 583 # TTS out 584 # add quotes around the data in case it contains spaces 585 # TODO: catch if odd number of " characters in text and do something (remove/replace or supress error output) 586 command = "".join(["./tts.sh \"", text, "\""]) 587 if VERBOSE: 588 # stdout to see VERBOSE text from tts.sh 589 p1 = subprocess.Popen(command, shell=True, stdout=None, text=True) 590 else: 591 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, text=True) 592 out, err = p1.communicate() 593 if p1.returncode == 0: 594 p1.terminate() 595 p1.kill() 596 # make a pause after we speak 597 # pause is proportial to the estimated time needed to speak to the end 598 # TODO: adapt if required e.g. when calling espeak -s 110 which is slower than default 150 and thus takes longer 599 sleep(0.5 + 0.1*len(text)) 600 601 # thread to output text-to-speech 602 ################################# 603 def TtsThread(): 604 global TTS_OUT 605 global SPEAKING 606 # main thread loop 607 ################## 608 while run_thread: 609 # block until something is input to the queue 610 #text = ttsQueue.get(block=True) 611 while ttsQueue.empty(): 612 sleep(TIMEOUT_POLL_SEC) 613 # check flag 614 check_tts() 615 # avoid collissions when speaking a split sentence 616 while TTS_OUT: 617 check_tts() 618 sleep(0.1) 619 # set flag 620 SPEAKING = True 621 # speak all inputs in queue 622 while ttsQueue.empty() == False: 623 tts_speak() 624 # clear flag 625 SPEAKING = False 626 627 if LLM: 628 # thread to gather and transmit output from LLM 629 ############################################### 630 def LLMTransmitThread(): 631 global TX_SENDING 632 global SESSION_ESTABLISHED 633 global WAITING_FOR_LLM_OUTPUT 634 global NEED_ACK 635 # main thread loop 636 ################## 637 while run_thread: 638 # block until data in queue 639 while llmOutputQueue.empty(): 640 sleep(TIMEOUT_POLL_SEC) 641 # initialize buffer, flags and counters 642 llm_output = "" 643 llm_timeout = False 644 read_lines = 0 645 start_time = datetime.datetime.now() 646 # consume all LLM output lines up to SPLIT_TX_LINES or timeout LLM_OUTPUT_READ_TIMEOUT_SEC 647 # exit when SPLIT_TX_LINES or LLM_OUTPUT_READ_TIMEOUT_SEC even if there are still data in the queue 648 # this makes sure we send for now at least a partial response to the remote caller 649 while (llmOutputQueue.empty() == False) and (read_lines < SPLIT_TX_LINES) and (llm_timeout == False): 650 try: 651 llm_output_part = llmOutputQueue.get(block=True, timeout=LLM_OUTPUT_READ_TIMEOUT_SEC) 652 # append data 653 if llm_output_part != None: 654 llm_output = "".join([llm_output, llm_output_part]) 655 read_lines = read_lines + 1 656 # timeout? 657 end_time = datetime.datetime.now() 658 delta_time = (end_time - start_time).total_seconds() 659 if delta_time > LLM_OUTPUT_READ_TIMEOUT_SEC: 660 llm_timeout = True 661 except Exception as e: 662 logging.exception("Exception in llmOutputQueue: " + str(e)) 663 # some checks 664 if SESSION_ESTABLISHED and llm_output != "": 665 # reset flag 666 if NEED_ACK: 667 WAITING_FOR_LLM_OUTPUT = False 668 # escape special characters: !"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ 669 # TODO: check why we get /bin/sh: 2: Syntax error: Unterminated quoted string when we use this code 670 # create a translation table 671 ## translation_table = str.maketrans({char: f'\\\\{char}' for char in string.punctuation}) 672 # apply the translation to the input string 673 ## llm_output = llm_output.translate(translation_table) 674 # use utf-8 in LLMOutputThread() instead? 675 # for now replace problematic characters explicitely 676 llm_output_printable = llm_output 677 llm_output = llm_output.replace("\n", "\r") 678 llm_output = llm_output.replace("(", "\\(") 679 llm_output = llm_output.replace(")", "\\)") 680 llm_output = llm_output.replace("�", "\\�") 681 llm_output = llm_output.replace("'", "\\'") 682 # llm_output = llm_output.replace('"', '\\"') # ? 683 llm_output = llm_output.replace('|', '\\|') 684 llm_output = llm_output.replace('#', '\\#') 685 llm_output = llm_output.replace('&', '\\&') 686 # llm_output = llm_output.replace('*', '\\*') # ? 687 llm_output = llm_output.replace(';', '\\;') # ? 688 llm_output = llm_output.replace('<', '\\<') # ? 689 llm_output = llm_output.replace('>', '\\>') 690 # llm_output = llm_output.replace('\', '\\\') # ? 691 # avoid collissions 692 while TX_SENDING: 693 sleep(TIMEOUT_POLL_SEC) 694 # send data 695 TX_SENDING = True 696 # add quotes around the password in case it contains spaces 697 command = "".join(["./mmsessionout.sh \"", PASSWORD, "\" ", llm_output]) 698 if VERBOSE: 699 # stdout to see VERBOSE text from mmsessionout.sh 700 p1 = subprocess.Popen(command, shell=True, stdout=None, text=True) 701 else: 702 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.DEVNULL, text=True) 703 out, err = p1.communicate() 704 if p1.returncode == 0: 705 p1.terminate() 706 p1.kill() 707 # show LLM output in prompt 708 ########################### 709 if SHOW_TX_PROMPT: 710 print("> " + llm_output_printable, end='') 711 else: 712 print(llm_output_printable, end='') 713 TX_SENDING = False 714 715 # thread to get output from LLM 716 ############################### 717 def LLMOutputThread(): 718 # llm output fifo 719 ################# 720 # Open read end of pipe. Open this in non-blocking mode since otherwise it 721 # may block until another process/threads opens the pipe for writing. 722 pipe_llm_out = os.open(PIPE_LLM_OUT, os.O_RDONLY | os.O_NONBLOCK) 723 # read pipe_llm_out 724 ################### 725 # TODO: check why this is not working, 726 # with grep -v we get only the responses from the LLM: 727 # cmd_to_read_pipe_llm_out = "".join(["cat ", PIPE_LLM_OUT, " | grep -v '>>>'"]) 728 cmd_to_read_pipe_llm_out = "".join(["cat ", PIPE_LLM_OUT]) 729 proc = subprocess.Popen(cmd_to_read_pipe_llm_out, 730 shell=True, 731 stdin=pipe_llm_out, 732 stderr=subprocess.PIPE, # stderr=subprocess.DEVNULL, 733 stdout=subprocess.PIPE, 734 text=True) 735 # main loop 736 ########### 737 while run_thread: 738 llm_output_part = proc.stdout.readline() 739 # TODO: check cmd_to_read_pipe_llm_out using grep above and then use this line 740 # if (llm_output_part != "") and (llm_output_part != "\n"): 741 if (llm_output_part != "") and (llm_output_part != "\n") and (">>>" not in llm_output_part): 742 llmOutputQueue.put(llm_output_part) 743 # clean up 744 ########## 745 proc.terminate() 746 proc.wait(timeout=0.2) 747 # Close read end of pipe since it is not used in the parent process. 748 os.close(pipe_llm_out) 749 os.unlink(PIPE_LLM_OUT) 750 751 elif REMOTE_SHELL: 752 # thread to gather and transmit output from shell 753 ################################################# 754 def ShellTransmitThread(): 755 global TX_SENDING 756 global SESSION_ESTABLISHED 757 global WAITING_FOR_COMMAND_OUTPUT 758 global NEED_ACK 759 # main thread loop 760 ################## 761 while run_thread: 762 # block until data in queue 763 while shellOutputQueue.empty() == True: 764 sleep(TIMEOUT_POLL_SEC) 765 # initialize buffer, flags and counters 766 shell_output = "" 767 shell_timeout = False 768 read_lines = 0 769 start_time = datetime.datetime.now() 770 # consume all shell output lines up to SPLIT_TX_LINES or timeout SHELL_OUTPUT_READ_TIMEOUT_SEC 771 # exit when SPLIT_TX_LINES or SHELL_OUTPUT_READ_TIMEOUT_SEC even if there are still data in the queue 772 # this makes sure we send for now at least a partial response to the remote caller 773 while (shellOutputQueue.empty() == False) and (read_lines < SPLIT_TX_LINES) and (shell_timeout == False): 774 try: 775 shell_output_part = shellOutputQueue.get(block=True, timeout=SHELL_OUTPUT_READ_TIMEOUT_SEC) 776 # append data 777 if shell_output_part != None: 778 shell_output = "".join([shell_output, shell_output_part]) 779 read_lines = read_lines + 1 780 # timeout? 781 end_time = datetime.datetime.now() 782 delta_time = (end_time - start_time).total_seconds() 783 if delta_time > SHELL_OUTPUT_READ_TIMEOUT_SEC: 784 shell_timeout = True 785 except Exception as e: 786 logging.exception("Exception in shellOutputQueue: " + str(e)) 787 # some checks 788 if SESSION_ESTABLISHED and shell_output != "": 789 # reset flag 790 if NEED_ACK: 791 WAITING_FOR_COMMAND_OUTPUT = False 792 # prepare data for transmission as a "block" (otherwise we would send each line separately) 793 shell_output_printable = shell_output 794 shell_output = shell_output.replace("\n", "\r") 795 # TODO: find out why we get characters that are not allowed in file names 796 # and extend this workaround to further characters like *, ?, >, <, :, |, ... 797 # check e.g. https://stackoverflow.com/questions/4814040/allowed-characters-in-filename 798 shell_output = shell_output.replace("(", "\\(") 799 shell_output = shell_output.replace(")", "\\)") 800 shell_output = shell_output.replace("�", "\\�") 801 shell_output = shell_output.replace("'", "\\'") 802 # avoid collissions 803 while TX_SENDING: 804 sleep(TIMEOUT_POLL_SEC) 805 # send data 806 TX_SENDING = True 807 # add quotes around the password in case it contains spaces 808 command = "".join(["./mmsessionout.sh \"", PASSWORD, "\" ", shell_output]) 809 if VERBOSE: 810 # stdout to see VERBOSE text from mmsessionout.sh 811 p1 = subprocess.Popen(command, shell=True, stdout=None, text=True) 812 else: 813 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.DEVNULL, text=True) 814 out, err = p1.communicate() 815 if p1.returncode == 0: 816 p1.terminate() 817 p1.kill() 818 # show shell output in prompt 819 ############################# 820 if SHOW_TX_PROMPT: 821 count = shell_output_printable.count("\n") - 1 822 shell_output_printable = shell_output_printable.replace("\n", "\n ", count) 823 print("> " + shell_output_printable, end='') 824 else: 825 print(shell_output_printable, end='') 826 TX_SENDING = False 827 828 # thread to get output from shell 829 ################################# 830 def ShellOutputThread(): 831 # shell output fifo 832 ################### 833 # Open read end of pipe. Open this in non-blocking mode since otherwise it 834 # may block until another process/threads opens the pipe for writing. 835 pipe_shell_out = os.open(PIPE_SHELL_OUT, os.O_RDONLY | os.O_NONBLOCK) 836 # execute cat pipe_shell_out 837 ############################ 838 command = "".join(["cat ", PIPE_SHELL_OUT]) 839 proc = subprocess.Popen(command, 840 shell=True, 841 stdin=pipe_shell_out, 842 stderr=subprocess.PIPE, 843 stdout=subprocess.PIPE, 844 text=True) 845 # main loop 846 ########### 847 while run_thread: 848 shell_output_part = proc.stdout.readline() 849 if shell_output_part != "": 850 shellOutputQueue.put(shell_output_part) 851 # clean up 852 ########## 853 proc.terminate() 854 proc.wait(timeout=0.2) 855 # Close read end of pipe since it is not used in the parent process. 856 os.close(pipe_shell_out) 857 os.unlink(PIPE_SHELL_OUT) 858 859 # queues and threads 860 #################### 861 messageQueue = queue.Queue() 862 # start thread to transmit data 863 transmitMinimodemThread = threading.Thread(name="TransmitMinimodemThread", target=TransmitMinimodemThread) 864 transmitMinimodemThread.start() 865 promptInputQueue = queue.Queue() 866 # start thread to pass prompt input to LLM 867 passPromptInputToLlmThread = threading.Thread(name="PassPromptInputToLlmThread", target=PassPromptInputToLlmThread) 868 passPromptInputToLlmThread.start() 869 870 # start threads for LLM 871 ####################### 872 if LLM: 873 # queue for LLM output 874 llmOutputQueue = queue.Queue() 875 # start thread to gather and transmit output from LLM 876 llmTransmitThread = threading.Thread(name="LLMTransmitThread", target=LLMTransmitThread) 877 llmTransmitThread.start() 878 # start thread to get output from LLM 879 llmOutputThread = threading.Thread(name="LLMOutputThread", target=LLMOutputThread) 880 llmOutputThread.start() 881 # start threads for remote shell 882 ################################ 883 elif REMOTE_SHELL: 884 # queue for shell output 885 shellOutputQueue = queue.Queue() 886 # pipe for shell in 887 pipe_shell_in = os.open(PIPE_SHELL_IN, os.O_WRONLY) 888 # start thread to gather and transmit output from shell 889 shellTransmitThread = threading.Thread(name="ShellTransmitThread", target=ShellTransmitThread) 890 shellTransmitThread.start() 891 # start thread to get output from shell 892 shellOutputThread = threading.Thread(name="ShellOutputThread", target=ShellOutputThread) 893 shellOutputThread.start() 894 elif FILE_TRANSFER: 895 # pipe for file in 896 pipe_file_in = os.open(PIPE_FILE_IN, os.O_WRONLY) 897 # common ressources 898 if REMOTE_SHELL or FILE_TRANSFER or LLM: 899 # queue for ACKs 900 ackQueue = queue.Queue() 901 # start thread to transmit ack 902 transmitAckThread = threading.Thread(name="TransmitAckThread", target=TransmitAckThread) 903 transmitAckThread.start() 904 # TTS only for chat and LLM 905 if not REMOTE_SHELL and not FILE_TRANSFER: 906 # queue for text 907 ttsQueue = queue.Queue() 908 # start thread to speak text 909 ttsThread = threading.Thread(name="TtsThread", target=TtsThread) 910 ttsThread.start() 911 912 # clear session flags and counters 913 ################################## 914 end_session() 915 916 # initialize buffer, flags and counters 917 got_message = False 918 data_buffer = "" 919 start = -1 920 end = -1 921 # first value to be received will be 1 922 seq_tx_prev = 0 923 # default value shall be different to 1 (to be received) and different to INVALID_SEQ_NR (default value) 924 seq_tx_acked_prev = 250 925 926 # main loop 927 ########### 928 while True: 929 try: 930 # Read string data from stdin 931 # and send [keepalive] or [ack] when needed 932 ########################################### 933 data = "" 934 data_encoded = "" 935 while data == "": 936 try: 937 if LLM and NEED_ACK and WAITING_FOR_LLM_OUTPUT: 938 if select.select([sys.stdin,],[],[],LLM_OUT_MAX_DELAY)[0]: 939 data = sys.stdin.readline() 940 else: 941 # send [ack] 942 ############ 943 # when remote shell has no output we need to send the ack here 944 if SESSION_ESTABLISHED and WAITING_FOR_LLM_OUTPUT: 945 WAITING_FOR_LLM_OUTPUT = False 946 transmit_ack() 947 elif REMOTE_SHELL and NEED_ACK and WAITING_FOR_COMMAND_OUTPUT: 948 if select.select([sys.stdin,],[],[],SHELL_OUT_MAX_DELAY)[0]: 949 data = sys.stdin.readline() 950 else: 951 # send [ack] 952 ############ 953 # when remote shell has no output we need to send the ack here 954 if SESSION_ESTABLISHED and WAITING_FOR_COMMAND_OUTPUT: 955 WAITING_FOR_COMMAND_OUTPUT = False 956 transmit_ack() 957 elif (REMOTE_SHELL or FILE_TRANSFER or LLM) and (KEEPALIVE_TIME_SEC_F != 0.0): 958 # note: we subtract the average random delay from KEEPALIVE_TIME_SEC_F 959 if select.select([sys.stdin,],[],[],(KEEPALIVE_TIME_SEC_F - AVG_RND_DELAY_SEC))[0]: 960 data = sys.stdin.readline() 961 else: 962 if FILE_TRANSFER: 963 # read tx_sending_file 964 f = open(TX_SENDING_FILE_FILE, "r") 965 if f.read().splitlines()[0] == "true": 966 TX_SENDING_FILE = True 967 else: 968 TX_SENDING_FILE = False 969 f.close() 970 # send [keepalive] 971 ################## 972 # by remote shell or file transfer we need to do this here 973 # in case of TX_SENDING we skip the transmission of the [keepalive] message 974 if (SESSION_ESTABLISHED == True) and (TX_SENDING == False) and (TX_SENDING_FILE == False) and (RX_RECEIVING_FILE == False): 975 # random delay to prevent collission of [keepalive] messages sent by both devices 976 if FILE_TRANSFER: 977 sleep(float(random.randint(0,RND_MAX_DELAY_MS)/1000.0)) 978 minimodem_tx("[keepalive]") 979 else: 980 data = sys.stdin.readline() 981 except Exception as e: 982 # exit app? 983 ########### 984 if run_thread == False: 985 exit(0) 986 # noise detected? 987 ################# 988 # TODO: need this? 989 ''' 990 # flush stdin 991 sys.stdin.flush() 992 # clean buffer 993 data = "" 994 # ''' 995 stdin_err_cnt = stdin_err_cnt + 1 996 logging.info("".join(["< binary data! count = ", str(stdin_err_cnt)])) 997 # if we receive too many consecutive bytes the program hangs! 998 # by introducing this small delay we seem to prevent that 999 sleep(0.01) 1000 1001 # copy read line and remove invalid characters, 1002 # removed characters may produce a corrupt message 1003 ################################################## 1004 if data != "": 1005 try: 1006 position = 0 1007 for char in data: 1008 try: 1009 # test if the character is valid, 1010 # we catch the exception otherwise 1011 test_char = char.encode('ascii') 1012 # add valid character to encoded buffer 1013 data_encoded = "".join([data_encoded, char]) 1014 except UnicodeEncodeError as e: 1015 # add replacement character to encoded data 1016 # data_encoded = "".join([data_encoded, '?']) 1017 logging.debug("Invalid character at position " + str(position)) 1018 position = position + 1 1019 except Exception as e: 1020 logging.exception("Exception in main loop: " + str(e)) 1021 logging.debug("data_encoded (i.e. with invalid characters removed) = " + data_encoded) 1022 1023 # process data 1024 ############## 1025 # transmit [init_ack_chat], [init_ack_shell] or [init_ack_file] when [init] is received 1026 # seq_tx, seq_rx, seq_tx_acked are initialized and the session is established 1027 # note1: if the session was already established, then it is re-initialized! 1028 # note2: [init] may never come as it may be that our PC initiated the session instead 1029 # note3: this shall be done here e.g. because in shell mode there is no TX process running 1030 if data_encoded != "": 1031 if "[init]" in data_encoded: 1032 if SESSION_ESTABLISHED == False: 1033 if LLM: 1034 print("LLM prompt session initiated by communication partner is now established!") 1035 elif REMOTE_SHELL: 1036 print("Remote shell session initiated by communication partner is now established!") 1037 elif FILE_TRANSFER: 1038 print("File transfer session initiated by communication partner is now established!") 1039 else: 1040 print("Chat session initiated by communication partner is now established!") 1041 init_session() 1042 logging.info("< [init]") 1043 # send init_ack 1044 if LLM: 1045 minimodem_tx("[init_ack_llm]") 1046 elif REMOTE_SHELL: 1047 minimodem_tx("[init_ack_shell]") 1048 elif FILE_TRANSFER: 1049 minimodem_tx("[init_ack_file]") 1050 else: 1051 minimodem_tx("[init_ack_chat]") 1052 elif "[init_ack_chat]" in data_encoded: 1053 if SESSION_ESTABLISHED == False: 1054 print("Chat session initiated locally is now established!") 1055 init_session() 1056 logging.info("< [init_ack_chat]") 1057 elif "[init_ack_file]" in data_encoded: 1058 if SESSION_ESTABLISHED == False: 1059 print("File transfer session initiated locally is now established!") 1060 init_session() 1061 logging.info("< [init_ack_file]") 1062 elif "[init_ack_shell]" in data_encoded: 1063 if SESSION_ESTABLISHED == False: 1064 print("Remote shell session initiated locally is now established!") 1065 init_session() 1066 logging.info("< [init_ack_shell]") 1067 elif "[init_ack_llm]" in data_encoded: 1068 if SESSION_ESTABLISHED == False: 1069 print("LLM session initiated locally is now established!") 1070 init_session() 1071 logging.info("< [init_ack_llm]") 1072 elif "[keepalive]" in data_encoded: 1073 logging.info("< [keepalive]") 1074 elif "[test]" in data_encoded: 1075 logging.info("< [test]") 1076 # check if we got data inside encrypted message 1077 ############################################### 1078 else: 1079 # detect begin and end of message 1080 ################################# 1081 if start == -1: 1082 start = data_encoded.find("-----BEGIN PGP MESSAGE-----") 1083 if start >= 0: 1084 end = data_encoded.find("-----END PGP MESSAGE-----") 1085 if end >= 0: 1086 # got a complete message inside data! 1087 data_buffer = data_encoded[start:end+24] # remove data before begin message and after end message 1088 got_message = True 1089 else: 1090 # got only start message inside data 1091 data_buffer = data_encoded[start:] # remove data before begin message 1092 1093 # detect end of message 1094 ####################### 1095 elif end == -1: 1096 end = data_encoded.find("-----END PGP MESSAGE-----") 1097 if end >= 0: 1098 # now we have the complete message 1099 data_buffer = "".join([data_buffer, data_encoded[:end+24]]) # remove data after end message 1100 got_message = True 1101 # gather intermediate data 1102 else: 1103 data_buffer = "".join([data_buffer, data_encoded]) 1104 1105 # decrypt input message 1106 ####################### 1107 if got_message: 1108 num_bytes_written = 0 1109 # store input data in a temporary file 1110 with open(TEMP_GPG_FILE, "w") as data_file: 1111 try: 1112 num_bytes_written = data_file.write(data_buffer) 1113 logging.debug("Wrote %d bytes to temporal .gpg file." % num_bytes_written) 1114 except Exception as e: 1115 logging.exception("Exception in main loop: " + str(e)) 1116 # cleanup data, flags and indexes for next use 1117 got_message = False 1118 data_buffer = "" 1119 start = -1 1120 end = -1 1121 if num_bytes_written > 0: 1122 try: 1123 # call GPG to decrypt input data 1124 # add single quotes around the password in case it contains spaces 1125 command = "".join(["gpg --batch --passphrase '", PASSWORD, "' ", ARMOR, " -d ", TEMP_GPG_FILE, SENT_OUT_TO_DEV_NULL]) 1126 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, text=True) 1127 out, err = p1.communicate() 1128 if p1.returncode == 0: 1129 p1.terminate() 1130 p1.kill() 1131 seq_tx = ord(out[0])-33 1132 seq_tx_str = str(seq_tx) 1133 # seq_tx_acked is the field seq_rx in the message, 1134 # but from our point of view we store it as the "last acknowledged seq_tx that we sent" 1135 # note: seq_tx in our context is used to store the current seq_tx inside the TX process 1136 seq_tx_acked = ord(out[1])-33 1137 seq_tx_acked_str = str(seq_tx_acked) 1138 # store seq_tx_acked that will be checked by our TX process 1139 if seq_tx_acked != seq_tx_acked_prev: 1140 f = open(SEQ_TX_ACKED_FILE, "w") 1141 f.write(seq_tx_acked_str) 1142 f.close() 1143 seq_tx_acked_prev = seq_tx_acked 1144 logging.debug("received seq_tx_acked = "+seq_tx_acked_str) 1145 1146 # type of message? 1147 ################## 1148 # ACK 1149 ##### 1150 if out[2:7] == "[ack]": 1151 logging.info("< ack["+seq_tx_str+","+seq_tx_acked_str+"]") 1152 # DATA 1153 ###### 1154 elif out[2:8] == "[data]": 1155 decrypted_data = out[8:-1] 1156 logging.debug("The decrypted data in the message is: " + decrypted_data) 1157 # store seq_tx as seq_rx (from the perspective of the receiver) that will be sent/acknowledged next time, 1158 # this always triggers an ACK in TX process, even by repeated messages, 1159 # this is necessary because we don't know if the ACKs actually arrive! 1160 f = open(SEQ_RX_FILE, "w") 1161 f.write(seq_tx_str) 1162 f.close() 1163 logging.debug("received seq_tx = "+seq_tx_str+", now stored as seq_rx to be transmitted/acknowledged next time") 1164 # in case of remote shell we need to send ACK here (just in case the shell has no output, thus no mmsessionout.sh is called) 1165 1166 # new [data] message? 1167 ##################### 1168 # when ACK is enabled, we process data only when seq_tx was increased (considering wraparound to zero) 1169 # when ACK is disabled, we process data only when seq_tx was changed 1170 if (seq_tx == (seq_tx_prev + 1)%94) or ((NEED_ACK == False) and (seq_tx != seq_tx_prev)): 1171 seq_tx_prev = seq_tx 1172 # process only when data is not empty 1173 if decrypted_data != "": 1174 1175 # show input data with RX prompt 1176 ################################ 1177 if SHOW_RX_PROMPT: 1178 if VERBOSE: 1179 print("< [" + f"{seq_tx:02d}" + "," + f"{seq_tx_acked:02d}" + "] " + decrypted_data, flush=True) 1180 else: 1181 print("".join(["< ", decrypted_data]), flush=True) 1182 else: 1183 print(decrypted_data) 1184 1185 # LLM? 1186 ###### 1187 if LLM: 1188 # set flag to transmit ACK on timeout 1189 if NEED_ACK: 1190 WAITING_FOR_LLM_OUTPUT = True 1191 # set flag in advance to know we need prompt input to be spoken first 1192 if TTS and not SPEAKING: 1193 SPEAKING = True 1194 # leave session? 1195 # TODO: this is ollama-specific 1196 # implement here something generic, for now we use this as a general special code to leave 1197 # leave LLM? 1198 if decrypted_data == "/bye": 1199 # generate Ctrl+C 1200 raise KeyboardInterrupt() 1201 else: 1202 # pass prompt input to LLM 1203 llm_input(decrypted_data) 1204 1205 # remote shell? 1206 ############### 1207 elif REMOTE_SHELL: 1208 # set flag to transmit ACK on timeout 1209 if NEED_ACK: 1210 WAITING_FOR_COMMAND_OUTPUT = True 1211 # write input command to pipe_shell_in 1212 os.write(pipe_shell_in, bytes("".join([decrypted_data, '\n']), 'utf-8')) 1213 logging.debug("Shell output sent.") 1214 1215 # text to speech? 1216 ################# 1217 if TTS: 1218 # NOTE: if the communication parter is running a shell then our REMOTE_SHELL will still be False (deactivate TTS in cfg/text_to_speech if required) 1219 if not REMOTE_SHELL and not FILE_TRANSFER: 1220 # NOTE: text with pauses as defined in pattern requires a separate call each time 1221 decrypted_data_list = re.split(pattern, decrypted_data) 1222 for index, sentence in enumerate(decrypted_data_list, 1): 1223 tts(f"{sentence.strip()}") 1224 1225 # empty data 1226 else: 1227 logging.info("< empty_data["+seq_tx_str+","+seq_tx_acked_str+"]") 1228 # repeated data 1229 else: 1230 str_tmp = decrypted_data.replace("\n", "\n ") 1231 logging.info("< repeated_data["+seq_tx_str+","+seq_tx_acked_str+"] = " + str_tmp) 1232 # FILE 1233 ###### 1234 elif out[2:13] == "[file_name]": 1235 # store seq_tx as seq_rx (from the perspective of the receiver) that will be sent/acknowledged next time, 1236 # this always triggers an ACK in TX process, even by repeated messages, 1237 # this is necessary because we don't know if the ACKs actually arrive! 1238 f = open(SEQ_RX_FILE, "w") 1239 f.write(seq_tx_str) 1240 f.close() 1241 logging.debug("received seq_tx = "+seq_tx_str+", now stored as seq_rx to be transmitted/acknowledged next time") 1242 1243 # new [file] data? 1244 ################## 1245 # when ACK is enabled, we process data only when seq_tx was increased (considering wraparound to zero) 1246 # when ACK is disabled, we process data only when seq_tx was changed 1247 if (seq_tx == (seq_tx_prev + 1)%94) or ((NEED_ACK == False) and (seq_tx != seq_tx_prev)): 1248 seq_tx_prev = seq_tx 1249 logging.info("< file_data["+seq_tx_str+","+seq_tx_acked_str+"]") 1250 1251 # classify file data (state machine) 1252 #################################### 1253 end = out.find("[file]") 1254 if end == -1: 1255 end = out.find("[file_end]") 1256 if end != -1: 1257 # new file (complete, incl. file end) 1258 if file_name == "": 1259 RX_RECEIVING_FILE = True 1260 file_name = out[13:end] 1261 state_file_out = WRITE_AND_DECODE_FILE_OUT_64 1262 # file end 1263 else: 1264 state_file_out = APPEND_AND_DECODE_FILE_OUT_64 1265 # move end to beginning of file data 1266 end = end + len("[file_end]") 1267 else: 1268 # incomplete file 1269 state_file_out = INCOMPLETE_FILE_OUT_64 1270 else: 1271 # new file (beginn) 1272 if file_name == "": 1273 RX_RECEIVING_FILE = True 1274 file_name = out[13:end] 1275 state_file_out = WRITE_FILE_OUT_64 1276 # file part 1277 else: 1278 state_file_out = APPEND_FILE_OUT_64 1279 # move end to beginning of file data 1280 end = end + len("[file]") 1281 1282 # process file data 1283 ################### 1284 if state_file_out != INCOMPLETE_FILE_OUT_64: 1285 # TODO: optimize by using out[end:-1] directly...so we don't copy the data, which may be too big 1286 # first check: id(decrypted_file_data) == id(out[end:-1]) ???...then it is just a reference, so it is ok like it is now... 1287 decrypted_file_data = out[end:-1] 1288 # new file 1289 if state_file_out == WRITE_FILE_OUT_64: 1290 f = open(TMPFILE_BASE64_IN, "w") 1291 f.write(decrypted_file_data) 1292 f.close() 1293 # new complete file or last part 1294 elif (state_file_out == WRITE_AND_DECODE_FILE_OUT_64) or (state_file_out == APPEND_AND_DECODE_FILE_OUT_64): 1295 if state_file_out == WRITE_AND_DECODE_FILE_OUT_64: 1296 f = open(TMPFILE_BASE64_IN, "w") 1297 else: 1298 f = open(TMPFILE_BASE64_IN, "a") 1299 f.write(decrypted_file_data) 1300 f.close() 1301 command = "".join(["base64 -d < '", TMPFILE_BASE64_IN, "' > 'rx_files/", file_name, "'"]) 1302 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, text=True) 1303 out, err = p1.communicate() 1304 if p1.returncode == 0: 1305 p1.terminate() 1306 p1.kill() 1307 else: 1308 logging.warning("could not decode received file!") 1309 file_name = "" 1310 RX_RECEIVING_FILE = False 1311 # new begin or part of file 1312 elif (state_file_out == WRITE_FILE_OUT_64) or (state_file_out == APPEND_FILE_OUT_64): 1313 if state_file_out == WRITE_FILE_OUT_64: 1314 f = open(TMPFILE_BASE64_IN, "w") 1315 else: 1316 f = open(TMPFILE_BASE64_IN, "a") 1317 f.write(decrypted_file_data) 1318 f.close() 1319 logging.debug("The decrypted file data in the message is: " + decrypted_file_data) 1320 1321 # write file data to pipe_file_in 1322 ################################# 1323 os.write(pipe_file_in, bytes(decrypted_file_data, 'utf-8')) 1324 logging.debug("File data received.") 1325 1326 # transmit ACK 1327 ############## 1328 if NEED_ACK: 1329 transmit_ack() 1330 # incomplete message 1331 else: 1332 logging.info("< incomplete_file_data["+seq_tx_str+","+seq_tx_acked_str+"]") 1333 # repeated file message 1334 else: 1335 logging.info("< repeated_file_data["+seq_tx_str+","+seq_tx_acked_str+"]") 1336 # transmit ACK 1337 ############## 1338 if NEED_ACK: 1339 transmit_ack() 1340 # unknown message type 1341 else: 1342 logging.info("< unknown_message["+seq_tx_str+","+seq_tx_acked_str+"]") 1343 # handle exceptions from decryption onwards 1344 except Exception as e: 1345 logging.exception(str(e)) 1346 # no message detected 1347 else: 1348 if VERBOSE: 1349 # logging.info("< no_message, start="+str(start)+", end="+str(end)) 1350 # logging.info(" data="+data_encoded+", data_buffer="+data_buffer) 1351 pass 1352 else: 1353 logging.debug("< no_message, start="+str(start)+", end="+str(end)) 1354 logging.debug(" data="+data_encoded+", data_buffer="+data_buffer) 1355 1356 # handle this exception nicely to catch Ctrl+C, otherwise error seen on terminal 1357 except KeyboardInterrupt: 1358 # kill threads 1359 run_thread=False 1360 sleep(0.5) 1361 # output new line, otherwise terminal prompt glued to last text 1362 print("") 1363 # restore mic volume 1364 command = "./restore_audio_settings.sh &" 1365 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.DEVNULL, text=True) 1366 out, err = p1.communicate() 1367 if p1.returncode == 0: 1368 p1.terminate() 1369 p1.kill() 1370 # brute-force cleanup 1371 # we could remove this code, as we terminate processes with SIGTERM in tea2adt.py 1372 # but then running ./tea2adt directly would not cleanup anymore 1373 command = "./killtea2adt.sh 2> /dev/null &" 1374 p1 = subprocess.Popen(command, shell=True, stdout=subprocess.DEVNULL, text=True) 1375 out, err = p1.communicate() 1376 if p1.returncode == 0: 1377 p1.terminate() 1378 p1.kill() 1379 exit(0) 1380 except Exception as e: 1381 logging.exception(str(e)) 1382 1383 # main loop was exit 1384 #################### 1385 # clean up 1386 ########## 1387 os.close(pipe_shell_in) 1388 os.unlink(PIPE_SHELL_IN) 1389 1390 # call main() 1391 ############# 1392 if __name__ == '__main__': 1393 main()