test-tcp-server.py
1 #!/usr/bin/env python3 2 """ 3 Simple TCP test server for kabashira-efi testing. 4 5 This server: 6 1. Listens for TCP connections 7 2. Echoes back received data 8 3. Can simulate a Noise handshake responder (basic) 9 10 Usage: 11 python3 test-tcp-server.py [--port PORT] [--mode echo|noise] 12 """ 13 14 import argparse 15 import socket 16 import threading 17 import sys 18 from datetime import datetime 19 20 def log(msg): 21 timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] 22 print(f"[{timestamp}] {msg}") 23 24 def handle_echo_client(conn, addr): 25 """Simple echo handler - returns whatever it receives.""" 26 log(f"Echo client connected from {addr}") 27 try: 28 while True: 29 data = conn.recv(4096) 30 if not data: 31 break 32 log(f"Received {len(data)} bytes from {addr}: {data[:64]}...") 33 conn.sendall(data) 34 log(f"Echoed {len(data)} bytes back") 35 except Exception as e: 36 log(f"Error with {addr}: {e}") 37 finally: 38 conn.close() 39 log(f"Client {addr} disconnected") 40 41 def handle_noise_client(conn, addr): 42 """ 43 Basic Noise XX responder simulation. 44 45 Noise XX pattern: 46 -> e (32 bytes: initiator ephemeral public key) 47 <- e, ee, s, es (32 + 48 bytes: responder ephemeral + encrypted static) 48 -> s, se (48 bytes: encrypted initiator static) 49 """ 50 log(f"Noise client connected from {addr}") 51 try: 52 # Message 1: Receive initiator's ephemeral (32 bytes) 53 msg1 = conn.recv(32) 54 if len(msg1) < 32: 55 log(f"Invalid msg1 length: {len(msg1)}") 56 return 57 log(f"Received msg1 (-> e): {msg1.hex()}") 58 59 # Message 2: Send our ephemeral + encrypted static 60 # For testing, just send dummy data of correct length 61 import os 62 ephemeral = os.urandom(32) # Our ephemeral public key 63 encrypted_static = os.urandom(48) # Encrypted static + tag 64 msg2 = ephemeral + encrypted_static 65 conn.sendall(msg2) 66 log(f"Sent msg2 (<- e, ee, s, es): {len(msg2)} bytes") 67 68 # Message 3: Receive initiator's encrypted static 69 msg3 = conn.recv(48) 70 if len(msg3) < 48: 71 log(f"Invalid msg3 length: {len(msg3)}") 72 return 73 log(f"Received msg3 (-> s, se): {msg3.hex()}") 74 75 log("Handshake complete (simulated)") 76 77 # Now in transport mode - echo encrypted messages 78 while True: 79 # Read length prefix (2 bytes big-endian) 80 len_prefix = conn.recv(2) 81 if not len_prefix: 82 break 83 msg_len = int.from_bytes(len_prefix, 'big') 84 85 # Read message 86 msg = conn.recv(msg_len) 87 log(f"Received transport message: {len(msg)} bytes") 88 89 # Echo back (in real impl would decrypt, re-encrypt) 90 conn.sendall(len_prefix + msg) 91 log(f"Echoed transport message") 92 93 except Exception as e: 94 log(f"Error with {addr}: {e}") 95 finally: 96 conn.close() 97 log(f"Noise client {addr} disconnected") 98 99 def run_server(port, mode): 100 """Run the TCP server.""" 101 handler = handle_echo_client if mode == "echo" else handle_noise_client 102 103 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 104 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 105 sock.bind(('0.0.0.0', port)) 106 sock.listen(5) 107 108 log(f"TCP test server listening on port {port} (mode: {mode})") 109 log("Waiting for connections...") 110 log("") 111 log("If running QEMU with user networking, connect to 10.0.2.2:{port}") 112 log("(10.0.2.2 is the host from guest's perspective)") 113 log("") 114 115 try: 116 while True: 117 conn, addr = sock.accept() 118 thread = threading.Thread(target=handler, args=(conn, addr)) 119 thread.daemon = True 120 thread.start() 121 except KeyboardInterrupt: 122 log("Server shutting down") 123 124 def main(): 125 parser = argparse.ArgumentParser(description="TCP test server for kabashira-efi") 126 parser.add_argument("--port", "-p", type=int, default=8545, 127 help="Port to listen on (default: 8545)") 128 parser.add_argument("--mode", "-m", choices=["echo", "noise"], default="echo", 129 help="Server mode: echo or noise (default: echo)") 130 args = parser.parse_args() 131 132 run_server(args.port, args.mode) 133 134 if __name__ == "__main__": 135 main()