socks5.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2015-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Dummy Socks5 server for testing."""
  6  
  7  import select
  8  import socket
  9  import threading
 10  import queue
 11  import logging
 12  
 13  from .netutil import (
 14      format_addr_port,
 15      set_ephemeral_port_range,
 16  )
 17  
 18  logger = logging.getLogger("TestFramework.socks5")
 19  
 20  # Protocol constants
 21  class Command:
 22      CONNECT = 0x01
 23  
 24  class AddressType:
 25      IPV4 = 0x01
 26      DOMAINNAME = 0x03
 27      IPV6 = 0x04
 28  
 29  # Utility functions
 30  def recvall(s, n):
 31      """Receive n bytes from a socket, or fail."""
 32      rv = bytearray()
 33      while n > 0:
 34          d = s.recv(n)
 35          if not d:
 36              raise IOError('Unexpected end of stream')
 37          rv.extend(d)
 38          n -= len(d)
 39      return rv
 40  
 41  def sendall(s, data):
 42      """Send all data to a socket, or fail."""
 43      sent = 0
 44      while sent < len(data):
 45          _, wlist, _ = select.select([], [s], [])
 46          if len(wlist) > 0:
 47              n = s.send(data[sent:])
 48              if n == 0:
 49                  raise IOError('send() on socket returned 0')
 50              sent += n
 51  
 52  def forward_sockets(a, b, wakeup_socket, serv):
 53      """Forwards data between sockets a and b until EOF, error, or shutdown.
 54  
 55      Monitors wakeup_socket for a shutdown signal and checks serv.is_running()
 56      to exit gracefully when the server is stopping.
 57      """
 58      # Mark as non-blocking so that we do not end up in a deadlock-like situation
 59      # where we block and wait on data from `a` while there is data ready to be
 60      # received on `b` and forwarded to `a`. And at the same time the application
 61      # at `a` is not sending anything because it waits for the data from `b` to
 62      # respond.
 63      a.setblocking(False)
 64      b.setblocking(False)
 65      sockets = [a, b, wakeup_socket]
 66      done = False
 67      while not done:
 68          # Blocking select with timeout
 69          rlist, _, xlist = select.select(sockets, [], sockets, 2)
 70          if not serv.is_running():
 71              logger.debug("forward_sockets: Exit due to shutdown")
 72              return
 73          if len(xlist) > 0:
 74              raise IOError('Exceptional condition on socket')
 75          for s in rlist:
 76              data = s.recv(4096)
 77              if data is None or len(data) == 0:
 78                  done = True
 79                  break
 80              if s == a:
 81                  sendall(b, data)
 82              elif s == b:
 83                  sendall(a, data)
 84  
 85  # Implementation classes
 86  class Socks5Configuration():
 87      """Proxy configuration."""
 88      def __init__(self):
 89          self.addr = None # Bind address (must be set)
 90          self.af = socket.AF_INET # Bind address family
 91          self.unauth = False  # Support unauthenticated
 92          self.auth = False  # Support authentication
 93          self.keep_alive = False  # Do not automatically close connections
 94          # This function is called whenever a new connection arrives to the proxy
 95          # and it decides where the connection is redirected to. It is passed:
 96          # - the address the client requested to connect to
 97          # - the port the client requested to connect to
 98          # It is supposed to return an object like:
 99          # {
100          #     "actual_to_addr": "127.0.0.1"
101          #     "actual_to_port": 28276
102          # }
103          # or None.
104          # If it returns an object then the connection is redirected to actual_to_addr:actual_to_port.
105          # If it returns None, or destinations_factory itself is None then the connection is closed.
106          self.destinations_factory = None
107  
108  class Socks5Command():
109      """Information about an incoming socks5 command."""
110      def __init__(self, cmd, atyp, addr, port, username, password):
111          self.cmd = cmd # Command (one of Command.*)
112          self.atyp = atyp # Address type (one of AddressType.*)
113          self.addr = addr # Address
114          self.port = port # Port to connect to
115          self.username = username
116          self.password = password
117      def __repr__(self):
118          return 'Socks5Command(%s,%s,%s,%s,%s,%s)' % (self.cmd, self.atyp, self.addr, self.port, self.username, self.password)
119  
120  class Socks5Connection():
121      def __init__(self, serv, conn):
122          self.serv = serv
123          self.conn = conn
124          # Socket-pair used to wake up blocking forwarding select
125          # Note: a pipe could be used as well, but that does not work with select() on Windows
126          self.wakeup_socket_pair = socket.socketpair()
127          # Index of this handler (within the server)
128          self.handler_index = None
129  
130      def handle(self):
131          """Handle socks5 request according to RFC1928."""
132          try:
133              # Verify socks version
134              ver = recvall(self.conn, 1)[0]
135              if ver != 0x05:
136                  raise IOError('Invalid socks version %i' % ver)
137              # Choose authentication method
138              nmethods = recvall(self.conn, 1)[0]
139              methods = bytearray(recvall(self.conn, nmethods))
140              method = None
141              if 0x02 in methods and self.serv.conf.auth:
142                  method = 0x02 # username/password
143              elif 0x00 in methods and self.serv.conf.unauth:
144                  method = 0x00 # unauthenticated
145              if method is None:
146                  raise IOError('No supported authentication method was offered')
147              # Send response
148              self.conn.sendall(bytearray([0x05, method]))
149              # Read authentication (optional)
150              username = None
151              password = None
152              if method == 0x02:
153                  ver = recvall(self.conn, 1)[0]
154                  if ver != 0x01:
155                      raise IOError('Invalid auth packet version %i' % ver)
156                  ulen = recvall(self.conn, 1)[0]
157                  username = str(recvall(self.conn, ulen))
158                  plen = recvall(self.conn, 1)[0]
159                  password = str(recvall(self.conn, plen))
160                  # Send authentication response
161                  self.conn.sendall(bytearray([0x01, 0x00]))
162  
163              # Read connect request
164              ver, cmd, _, atyp = recvall(self.conn, 4)
165              if ver != 0x05:
166                  raise IOError('Invalid socks version %i in connect request' % ver)
167              if cmd != Command.CONNECT:
168                  raise IOError('Unhandled command %i in connect request' % cmd)
169  
170              if atyp == AddressType.IPV4:
171                  addr = recvall(self.conn, 4)
172              elif atyp == AddressType.DOMAINNAME:
173                  n = recvall(self.conn, 1)[0]
174                  addr = recvall(self.conn, n)
175              elif atyp == AddressType.IPV6:
176                  addr = recvall(self.conn, 16)
177              else:
178                  raise IOError('Unknown address type %i' % atyp)
179              port_hi,port_lo = recvall(self.conn, 2)
180              port = (port_hi << 8) | port_lo
181  
182              # Send dummy response
183              self.conn.sendall(bytearray([0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]))
184  
185              cmdin = Socks5Command(cmd, atyp, addr, port, username, password)
186              self.serv.queue.put(cmdin)
187              logger.debug('Proxy: %s', cmdin)
188  
189              requested_to_addr = addr.decode("utf-8")
190              requested_to = format_addr_port(requested_to_addr, port)
191  
192              if self.serv.is_running():
193                  if self.serv.conf.destinations_factory is not None:
194                      dest = self.serv.conf.destinations_factory(requested_to_addr, port)
195                      if dest is not None:
196                          logger.debug(f"Serving connection to {requested_to}, will redirect it to "
197                                      f"{dest['actual_to_addr']}:{dest['actual_to_port']} instead")
198                          with socket.create_connection((dest["actual_to_addr"], dest["actual_to_port"])) as conn_to:
199                              forward_sockets(self.conn, conn_to, self.wakeup_socket_pair[1], self.serv)
200                              conn_to.close()
201                      else:
202                          logger.debug(f"Can't serve the connection to {requested_to}: the destinations factory returned None")
203                  else:
204                      logger.debug(f"Can't serve the connection to {requested_to}: no destinations factory")
205  
206              # Fall through to disconnect
207          except Exception as e:
208              logger.exception(f"socks5 request handling failed (running {self.serv.is_running()})")
209              if self.serv.is_running():
210                  self.serv.queue.put(e)
211          finally:
212              if not self.serv.keep_alive:
213                  self.conn.close()
214              else:
215                  logger.debug("Keeping client connection alive")
216              s0 = self.wakeup_socket_pair[0]
217              s1 = self.wakeup_socket_pair[1]
218              self.wakeup_socket_pair = None
219              try:
220                  s0.close()
221                  s1.close()
222              except OSError:
223                  pass
224              self.serv.remove_handler(self.handler_index)
225              self.handler_index = None
226  
227      def wakeup(self):
228          # Wake up the blocking forwarding select by writing to the wake-up socket
229          try:
230              socket_pair = self.wakeup_socket_pair
231              if socket_pair is not None:
232                  socket_pair[0].send("CloseWakeup".encode())
233                  logger.debug("Waking up forwarding thread")
234          except OSError as e:
235              logger.warning(f"Error waking up forwarding thread: {e}")
236              pass
237  
238  
239  # Wrapper for thread.join(), which may throw for daemon threads (in late stages of finalization).
240  # Return True if the thread is no longer active (join succeeded), False otherwise
241  # See PR #34863 for more details on using daemon threads.
242  def try_join_daemon_thread(thread, timeout=0) -> bool:
243      try:
244          thread.join(timeout=timeout)
245          return not thread.is_alive()
246      except Exception as e:
247          logger.debug(f"Exception in thread.join, {e}")
248          return True
249  
250  class Socks5Server():
251      def __init__(self, conf):
252          self.conf = conf
253          self.s = socket.socket(conf.af)
254          self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
255          # When using dynamic port allocation (port=0), ensure we don't get a
256          # port that conflicts with the test framework's static port range.
257          if conf.addr[1] == 0:
258              set_ephemeral_port_range(self.s)
259          self.s.bind(conf.addr)
260          # When port=0, the OS assigns an available port. Update conf.addr
261          # to reflect the actual bound address so callers can use it.
262          self.conf.addr = self.s.getsockname()
263          self.s.listen(5)
264          # Set to False when stop is initiated
265          self._running = False
266          self._running_lock = threading.Lock()
267          self.thread = None
268          self.queue = queue.Queue() # report connections and exceptions to client
269          self.keep_alive = conf.keep_alive
270          # Store the background handlers, needed for clean shutdown
271          # Append-only array, completed handlers are set to None
272          self._handlers = []
273          self._handlers_lock = threading.Lock()
274  
275      def is_running(self) -> bool:
276          with self._running_lock:
277              return self._running
278  
279      def set_running(self, new_value: bool):
280          with self._running_lock:
281              self._running = new_value
282  
283      def run(self):
284          while self.is_running():
285              (sockconn, _) = self.s.accept()
286              if self.is_running():
287                  conn = Socks5Connection(self, sockconn)
288                  # Use "daemon" threads, see PR #34863 for more discussion.
289                  thread = threading.Thread(None, conn.handle, daemon=True)
290                  with self._handlers_lock:
291                      conn.handler_index = len(self._handlers)
292                      self._handlers.append((thread, conn))
293                      assert(conn.handler_index < len(self._handlers))
294                  thread.start()
295  
296      def remove_handler(self, handler_index):
297          with self._handlers_lock:
298              if handler_index < len(self._handlers):
299                  if self._handlers[handler_index] is not None:
300                      self._handlers[handler_index] = None
301                      logger.debug(f"Handler {handler_index} removed")
302  
303      def start(self):
304          assert not self.is_running()
305          self.set_running(True)
306          self.thread = threading.Thread(None, self.run, daemon=True)
307          self.thread.start()
308  
309      def stop(self):
310          self.set_running(False)
311          # connect to self to end run loop
312          s = socket.socket(self.conf.af)
313          s.connect(self.conf.addr)
314          s.close()
315          self.thread.join()
316          # if there are active handlers, close them
317          with self._handlers_lock:
318              items = list(self._handlers)
319          for i, item in enumerate(items):
320              if item is None:
321                  continue
322              thread, conn = item
323              # check if thread is still active
324              if not try_join_daemon_thread(thread, timeout=0):
325                  conn.wakeup()
326                  if try_join_daemon_thread(thread, timeout=2):
327                      logger.debug(f"Stop(): Handler {i} thread joined")
328                  else:
329                      logger.warning(f"Stop(): Handler thread {i} didn't finish after force close")