socks5.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2015-present The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Dummy Socks5 server for testing.""" 6 7 import select 8 import socket 9 import threading 10 import queue 11 import logging 12 13 from .netutil import ( 14 format_addr_port, 15 set_ephemeral_port_range, 16 ) 17 18 logger = logging.getLogger("TestFramework.socks5") 19 20 # Protocol constants 21 class Command: 22 CONNECT = 0x01 23 24 class AddressType: 25 IPV4 = 0x01 26 DOMAINNAME = 0x03 27 IPV6 = 0x04 28 29 # Utility functions 30 def recvall(s, n): 31 """Receive n bytes from a socket, or fail.""" 32 rv = bytearray() 33 while n > 0: 34 d = s.recv(n) 35 if not d: 36 raise IOError('Unexpected end of stream') 37 rv.extend(d) 38 n -= len(d) 39 return rv 40 41 def sendall(s, data): 42 """Send all data to a socket, or fail.""" 43 sent = 0 44 while sent < len(data): 45 _, wlist, _ = select.select([], [s], []) 46 if len(wlist) > 0: 47 n = s.send(data[sent:]) 48 if n == 0: 49 raise IOError('send() on socket returned 0') 50 sent += n 51 52 def forward_sockets(a, b, wakeup_socket, serv): 53 """Forwards data between sockets a and b until EOF, error, or shutdown. 54 55 Monitors wakeup_socket for a shutdown signal and checks serv.is_running() 56 to exit gracefully when the server is stopping. 57 """ 58 # Mark as non-blocking so that we do not end up in a deadlock-like situation 59 # where we block and wait on data from `a` while there is data ready to be 60 # received on `b` and forwarded to `a`. And at the same time the application 61 # at `a` is not sending anything because it waits for the data from `b` to 62 # respond. 63 a.setblocking(False) 64 b.setblocking(False) 65 sockets = [a, b, wakeup_socket] 66 done = False 67 while not done: 68 # Blocking select with timeout 69 rlist, _, xlist = select.select(sockets, [], sockets, 2) 70 if not serv.is_running(): 71 logger.debug("forward_sockets: Exit due to shutdown") 72 return 73 if len(xlist) > 0: 74 raise IOError('Exceptional condition on socket') 75 for s in rlist: 76 data = s.recv(4096) 77 if data is None or len(data) == 0: 78 done = True 79 break 80 if s == a: 81 sendall(b, data) 82 elif s == b: 83 sendall(a, data) 84 85 # Implementation classes 86 class Socks5Configuration(): 87 """Proxy configuration.""" 88 def __init__(self): 89 self.addr = None # Bind address (must be set) 90 self.af = socket.AF_INET # Bind address family 91 self.unauth = False # Support unauthenticated 92 self.auth = False # Support authentication 93 self.keep_alive = False # Do not automatically close connections 94 # This function is called whenever a new connection arrives to the proxy 95 # and it decides where the connection is redirected to. It is passed: 96 # - the address the client requested to connect to 97 # - the port the client requested to connect to 98 # It is supposed to return an object like: 99 # { 100 # "actual_to_addr": "127.0.0.1" 101 # "actual_to_port": 28276 102 # } 103 # or None. 104 # If it returns an object then the connection is redirected to actual_to_addr:actual_to_port. 105 # If it returns None, or destinations_factory itself is None then the connection is closed. 106 self.destinations_factory = None 107 108 class Socks5Command(): 109 """Information about an incoming socks5 command.""" 110 def __init__(self, cmd, atyp, addr, port, username, password): 111 self.cmd = cmd # Command (one of Command.*) 112 self.atyp = atyp # Address type (one of AddressType.*) 113 self.addr = addr # Address 114 self.port = port # Port to connect to 115 self.username = username 116 self.password = password 117 def __repr__(self): 118 return 'Socks5Command(%s,%s,%s,%s,%s,%s)' % (self.cmd, self.atyp, self.addr, self.port, self.username, self.password) 119 120 class Socks5Connection(): 121 def __init__(self, serv, conn): 122 self.serv = serv 123 self.conn = conn 124 # Socket-pair used to wake up blocking forwarding select 125 # Note: a pipe could be used as well, but that does not work with select() on Windows 126 self.wakeup_socket_pair = socket.socketpair() 127 # Index of this handler (within the server) 128 self.handler_index = None 129 130 def handle(self): 131 """Handle socks5 request according to RFC1928.""" 132 try: 133 # Verify socks version 134 ver = recvall(self.conn, 1)[0] 135 if ver != 0x05: 136 raise IOError('Invalid socks version %i' % ver) 137 # Choose authentication method 138 nmethods = recvall(self.conn, 1)[0] 139 methods = bytearray(recvall(self.conn, nmethods)) 140 method = None 141 if 0x02 in methods and self.serv.conf.auth: 142 method = 0x02 # username/password 143 elif 0x00 in methods and self.serv.conf.unauth: 144 method = 0x00 # unauthenticated 145 if method is None: 146 raise IOError('No supported authentication method was offered') 147 # Send response 148 self.conn.sendall(bytearray([0x05, method])) 149 # Read authentication (optional) 150 username = None 151 password = None 152 if method == 0x02: 153 ver = recvall(self.conn, 1)[0] 154 if ver != 0x01: 155 raise IOError('Invalid auth packet version %i' % ver) 156 ulen = recvall(self.conn, 1)[0] 157 username = str(recvall(self.conn, ulen)) 158 plen = recvall(self.conn, 1)[0] 159 password = str(recvall(self.conn, plen)) 160 # Send authentication response 161 self.conn.sendall(bytearray([0x01, 0x00])) 162 163 # Read connect request 164 ver, cmd, _, atyp = recvall(self.conn, 4) 165 if ver != 0x05: 166 raise IOError('Invalid socks version %i in connect request' % ver) 167 if cmd != Command.CONNECT: 168 raise IOError('Unhandled command %i in connect request' % cmd) 169 170 if atyp == AddressType.IPV4: 171 addr = recvall(self.conn, 4) 172 elif atyp == AddressType.DOMAINNAME: 173 n = recvall(self.conn, 1)[0] 174 addr = recvall(self.conn, n) 175 elif atyp == AddressType.IPV6: 176 addr = recvall(self.conn, 16) 177 else: 178 raise IOError('Unknown address type %i' % atyp) 179 port_hi,port_lo = recvall(self.conn, 2) 180 port = (port_hi << 8) | port_lo 181 182 # Send dummy response 183 self.conn.sendall(bytearray([0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])) 184 185 cmdin = Socks5Command(cmd, atyp, addr, port, username, password) 186 self.serv.queue.put(cmdin) 187 logger.debug('Proxy: %s', cmdin) 188 189 requested_to_addr = addr.decode("utf-8") 190 requested_to = format_addr_port(requested_to_addr, port) 191 192 if self.serv.is_running(): 193 if self.serv.conf.destinations_factory is not None: 194 dest = self.serv.conf.destinations_factory(requested_to_addr, port) 195 if dest is not None: 196 logger.debug(f"Serving connection to {requested_to}, will redirect it to " 197 f"{dest['actual_to_addr']}:{dest['actual_to_port']} instead") 198 with socket.create_connection((dest["actual_to_addr"], dest["actual_to_port"])) as conn_to: 199 forward_sockets(self.conn, conn_to, self.wakeup_socket_pair[1], self.serv) 200 conn_to.close() 201 else: 202 logger.debug(f"Can't serve the connection to {requested_to}: the destinations factory returned None") 203 else: 204 logger.debug(f"Can't serve the connection to {requested_to}: no destinations factory") 205 206 # Fall through to disconnect 207 except Exception as e: 208 logger.exception(f"socks5 request handling failed (running {self.serv.is_running()})") 209 if self.serv.is_running(): 210 self.serv.queue.put(e) 211 finally: 212 if not self.serv.keep_alive: 213 self.conn.close() 214 else: 215 logger.debug("Keeping client connection alive") 216 s0 = self.wakeup_socket_pair[0] 217 s1 = self.wakeup_socket_pair[1] 218 self.wakeup_socket_pair = None 219 try: 220 s0.close() 221 s1.close() 222 except OSError: 223 pass 224 self.serv.remove_handler(self.handler_index) 225 self.handler_index = None 226 227 def wakeup(self): 228 # Wake up the blocking forwarding select by writing to the wake-up socket 229 try: 230 socket_pair = self.wakeup_socket_pair 231 if socket_pair is not None: 232 socket_pair[0].send("CloseWakeup".encode()) 233 logger.debug("Waking up forwarding thread") 234 except OSError as e: 235 logger.warning(f"Error waking up forwarding thread: {e}") 236 pass 237 238 239 # Wrapper for thread.join(), which may throw for daemon threads (in late stages of finalization). 240 # Return True if the thread is no longer active (join succeeded), False otherwise 241 # See PR #34863 for more details on using daemon threads. 242 def try_join_daemon_thread(thread, timeout=0) -> bool: 243 try: 244 thread.join(timeout=timeout) 245 return not thread.is_alive() 246 except Exception as e: 247 logger.debug(f"Exception in thread.join, {e}") 248 return True 249 250 class Socks5Server(): 251 def __init__(self, conf): 252 self.conf = conf 253 self.s = socket.socket(conf.af) 254 self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 255 # When using dynamic port allocation (port=0), ensure we don't get a 256 # port that conflicts with the test framework's static port range. 257 if conf.addr[1] == 0: 258 set_ephemeral_port_range(self.s) 259 self.s.bind(conf.addr) 260 # When port=0, the OS assigns an available port. Update conf.addr 261 # to reflect the actual bound address so callers can use it. 262 self.conf.addr = self.s.getsockname() 263 self.s.listen(5) 264 # Set to False when stop is initiated 265 self._running = False 266 self._running_lock = threading.Lock() 267 self.thread = None 268 self.queue = queue.Queue() # report connections and exceptions to client 269 self.keep_alive = conf.keep_alive 270 # Store the background handlers, needed for clean shutdown 271 # Append-only array, completed handlers are set to None 272 self._handlers = [] 273 self._handlers_lock = threading.Lock() 274 275 def is_running(self) -> bool: 276 with self._running_lock: 277 return self._running 278 279 def set_running(self, new_value: bool): 280 with self._running_lock: 281 self._running = new_value 282 283 def run(self): 284 while self.is_running(): 285 (sockconn, _) = self.s.accept() 286 if self.is_running(): 287 conn = Socks5Connection(self, sockconn) 288 # Use "daemon" threads, see PR #34863 for more discussion. 289 thread = threading.Thread(None, conn.handle, daemon=True) 290 with self._handlers_lock: 291 conn.handler_index = len(self._handlers) 292 self._handlers.append((thread, conn)) 293 assert(conn.handler_index < len(self._handlers)) 294 thread.start() 295 296 def remove_handler(self, handler_index): 297 with self._handlers_lock: 298 if handler_index < len(self._handlers): 299 if self._handlers[handler_index] is not None: 300 self._handlers[handler_index] = None 301 logger.debug(f"Handler {handler_index} removed") 302 303 def start(self): 304 assert not self.is_running() 305 self.set_running(True) 306 self.thread = threading.Thread(None, self.run, daemon=True) 307 self.thread.start() 308 309 def stop(self): 310 self.set_running(False) 311 # connect to self to end run loop 312 s = socket.socket(self.conf.af) 313 s.connect(self.conf.addr) 314 s.close() 315 self.thread.join() 316 # if there are active handlers, close them 317 with self._handlers_lock: 318 items = list(self._handlers) 319 for i, item in enumerate(items): 320 if item is None: 321 continue 322 thread, conn = item 323 # check if thread is still active 324 if not try_join_daemon_thread(thread, timeout=0): 325 conn.wakeup() 326 if try_join_daemon_thread(thread, timeout=2): 327 logger.debug(f"Stop(): Handler {i} thread joined") 328 else: 329 logger.warning(f"Stop(): Handler thread {i} didn't finish after force close")