/ scripts / test-tcp-server.py
test-tcp-server.py
  1  #!/usr/bin/env python3
  2  """
  3  Simple TCP test server for kabashira-efi testing.
  4  
  5  This server:
  6  1. Listens for TCP connections
  7  2. Echoes back received data
  8  3. Can simulate a Noise handshake responder (basic)
  9  
 10  Usage:
 11      python3 test-tcp-server.py [--port PORT] [--mode echo|noise]
 12  """
 13  
 14  import argparse
 15  import socket
 16  import threading
 17  import sys
 18  from datetime import datetime
 19  
 20  def log(msg):
 21      timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
 22      print(f"[{timestamp}] {msg}")
 23  
 24  def handle_echo_client(conn, addr):
 25      """Simple echo handler - returns whatever it receives."""
 26      log(f"Echo client connected from {addr}")
 27      try:
 28          while True:
 29              data = conn.recv(4096)
 30              if not data:
 31                  break
 32              log(f"Received {len(data)} bytes from {addr}: {data[:64]}...")
 33              conn.sendall(data)
 34              log(f"Echoed {len(data)} bytes back")
 35      except Exception as e:
 36          log(f"Error with {addr}: {e}")
 37      finally:
 38          conn.close()
 39          log(f"Client {addr} disconnected")
 40  
 41  def handle_noise_client(conn, addr):
 42      """
 43      Basic Noise XX responder simulation.
 44  
 45      Noise XX pattern:
 46        -> e           (32 bytes: initiator ephemeral public key)
 47        <- e, ee, s, es (32 + 48 bytes: responder ephemeral + encrypted static)
 48        -> s, se       (48 bytes: encrypted initiator static)
 49      """
 50      log(f"Noise client connected from {addr}")
 51      try:
 52          # Message 1: Receive initiator's ephemeral (32 bytes)
 53          msg1 = conn.recv(32)
 54          if len(msg1) < 32:
 55              log(f"Invalid msg1 length: {len(msg1)}")
 56              return
 57          log(f"Received msg1 (-> e): {msg1.hex()}")
 58  
 59          # Message 2: Send our ephemeral + encrypted static
 60          # For testing, just send dummy data of correct length
 61          import os
 62          ephemeral = os.urandom(32)  # Our ephemeral public key
 63          encrypted_static = os.urandom(48)  # Encrypted static + tag
 64          msg2 = ephemeral + encrypted_static
 65          conn.sendall(msg2)
 66          log(f"Sent msg2 (<- e, ee, s, es): {len(msg2)} bytes")
 67  
 68          # Message 3: Receive initiator's encrypted static
 69          msg3 = conn.recv(48)
 70          if len(msg3) < 48:
 71              log(f"Invalid msg3 length: {len(msg3)}")
 72              return
 73          log(f"Received msg3 (-> s, se): {msg3.hex()}")
 74  
 75          log("Handshake complete (simulated)")
 76  
 77          # Now in transport mode - echo encrypted messages
 78          while True:
 79              # Read length prefix (2 bytes big-endian)
 80              len_prefix = conn.recv(2)
 81              if not len_prefix:
 82                  break
 83              msg_len = int.from_bytes(len_prefix, 'big')
 84  
 85              # Read message
 86              msg = conn.recv(msg_len)
 87              log(f"Received transport message: {len(msg)} bytes")
 88  
 89              # Echo back (in real impl would decrypt, re-encrypt)
 90              conn.sendall(len_prefix + msg)
 91              log(f"Echoed transport message")
 92  
 93      except Exception as e:
 94          log(f"Error with {addr}: {e}")
 95      finally:
 96          conn.close()
 97          log(f"Noise client {addr} disconnected")
 98  
 99  def run_server(port, mode):
100      """Run the TCP server."""
101      handler = handle_echo_client if mode == "echo" else handle_noise_client
102  
103      with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
104          sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
105          sock.bind(('0.0.0.0', port))
106          sock.listen(5)
107  
108          log(f"TCP test server listening on port {port} (mode: {mode})")
109          log("Waiting for connections...")
110          log("")
111          log("If running QEMU with user networking, connect to 10.0.2.2:{port}")
112          log("(10.0.2.2 is the host from guest's perspective)")
113          log("")
114  
115          try:
116              while True:
117                  conn, addr = sock.accept()
118                  thread = threading.Thread(target=handler, args=(conn, addr))
119                  thread.daemon = True
120                  thread.start()
121          except KeyboardInterrupt:
122              log("Server shutting down")
123  
124  def main():
125      parser = argparse.ArgumentParser(description="TCP test server for kabashira-efi")
126      parser.add_argument("--port", "-p", type=int, default=8545,
127                          help="Port to listen on (default: 8545)")
128      parser.add_argument("--mode", "-m", choices=["echo", "noise"], default="echo",
129                          help="Server mode: echo or noise (default: echo)")
130      args = parser.parse_args()
131  
132      run_server(args.port, args.mode)
133  
134  if __name__ == "__main__":
135      main()