/ src / network / connectionpool.py
connectionpool.py
  1  """
  2  `BMConnectionPool` class definition
  3  """
  4  import errno
  5  import logging
  6  import re
  7  import socket
  8  import sys
  9  import os
 10  import time
 11  import random
 12  
 13  sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
 14  
 15  from network import asyncore_pollchoose as asyncore
 16  from network import knownnodes
 17  import protocol
 18  import state
 19  from bmconfigparser import config
 20  from connectionchooser import chooseConnection
 21  from node import Peer
 22  from proxy import Proxy
 23  from network.tcp import (
 24      bootstrap, Socks4aBMConnection, Socks5BMConnection,
 25      TCPConnection, TCPServer)
 26  from udp import UDPSocket
 27  
 28  logger = logging.getLogger('default')
 29  
 30  
 31  class BMConnectionPool(object):
 32      """Pool of all existing connections"""
 33      # pylint: disable=too-many-instance-attributes
 34      trustedPeer = None
 35      """
 36      If the trustedpeer option is specified in keys.dat then this will
 37      contain a Peer which will be connected to instead of using the
 38      addresses advertised by other peers.
 39  
 40      The expected use case is where the user has a trusted server where
 41      they run a Bitmessage daemon permanently. If they then run a second
 42      instance of the client on a local machine periodically when they want
 43      to check for messages it will sync with the network a lot faster
 44      without compromising security.
 45      """
 46  
 47      def __init__(self):
 48          asyncore.set_rates(
 49              config.safeGetInt(
 50                  "bitmessagesettings", "maxdownloadrate"),
 51              config.safeGetInt(
 52                  "bitmessagesettings", "maxuploadrate")
 53          )
 54          self.outboundConnections = {}
 55          self.inboundConnections = {}
 56          self.listeningSockets = {}
 57          self.udpSockets = {}
 58          self.streams = []
 59          self._lastSpawned = 0
 60          self._spawnWait = 2
 61          self._bootstrapped = False
 62  
 63          trustedPeer = config.safeGet(
 64              'bitmessagesettings', 'trustedpeer')
 65          try:
 66              if trustedPeer:
 67                  host, port = trustedPeer.split(':')
 68                  self.trustedPeer = Peer(host, int(port))
 69          except ValueError:
 70              sys.exit(
 71                  'Bad trustedpeer config setting! It should be set as'
 72                  ' trustedpeer=<hostname>:<portnumber>'
 73              )
 74  
 75      def __len__(self):
 76          return len(self.outboundConnections) + len(self.inboundConnections)
 77  
 78      def connections(self):
 79          """
 80          Shortcut for combined list of connections from
 81          `inboundConnections` and `outboundConnections` dicts
 82          """
 83          return list(self.inboundConnections.values()) + list(self.outboundConnections.values())
 84  
 85      def establishedConnections(self):
 86          """Shortcut for list of connections having fullyEstablished == True"""
 87          return [
 88              x for x in self.connections() if x.fullyEstablished]
 89  
 90      def connectToStream(self, streamNumber):
 91          """Connect to a bitmessage stream"""
 92          self.streams.append(streamNumber)
 93  
 94      def getConnectionByAddr(self, addr):
 95          """
 96          Return an (existing) connection object based on a `Peer` object
 97          (IP and port)
 98          """
 99          try:
100              return self.inboundConnections[addr]
101          except KeyError:
102              pass
103          try:
104              return self.inboundConnections[addr.host]
105          except (KeyError, AttributeError):
106              pass
107          try:
108              return self.outboundConnections[addr]
109          except KeyError:
110              pass
111          try:
112              return self.udpSockets[addr.host]
113          except (KeyError, AttributeError):
114              pass
115          raise KeyError
116  
117      def isAlreadyConnected(self, nodeid):
118          """Check if we're already connected to this peer"""
119          for i in self.connections():
120              try:
121                  if nodeid == i.nodeid:
122                      return True
123              except AttributeError:
124                  pass
125          return False
126  
127      def addConnection(self, connection):
128          """Add a connection object to our internal dict"""
129          if isinstance(connection, UDPSocket):
130              return
131          if connection.isOutbound:
132              self.outboundConnections[connection.destination] = connection
133          else:
134              if connection.destination.host in self.inboundConnections:
135                  self.inboundConnections[connection.destination] = connection
136              else:
137                  self.inboundConnections[connection.destination.host] = \
138                      connection
139  
140      def removeConnection(self, connection):
141          """Remove a connection from our internal dict"""
142          if isinstance(connection, UDPSocket):
143              del self.udpSockets[connection.listening.host]
144          elif isinstance(connection, TCPServer):
145              del self.listeningSockets[Peer(
146                  connection.destination.host, connection.destination.port)]
147          elif connection.isOutbound:
148              try:
149                  del self.outboundConnections[connection.destination]
150              except KeyError:
151                  pass
152          else:
153              try:
154                  del self.inboundConnections[connection.destination]
155              except KeyError:
156                  try:
157                      del self.inboundConnections[connection.destination.host]
158                  except KeyError:
159                      pass
160          connection.handle_close()
161  
162      @staticmethod
163      def getListeningIP():
164          """What IP are we supposed to be listening on?"""
165          if config.safeGet(
166                  "bitmessagesettings", "onionhostname", "").endswith(".onion"):
167              host = config.safeGet(
168                  "bitmessagesettings", "onionbindip")
169          else:
170              host = '127.0.0.1'
171          if (
172              config.safeGetBoolean("bitmessagesettings", "sockslisten")
173              or config.safeGet("bitmessagesettings", "socksproxytype")
174              == "none"
175          ):
176              # python doesn't like bind + INADDR_ANY?
177              # host = socket.INADDR_ANY
178              host = config.get("network", "bind")
179          return host
180  
181      def startListening(self, bind=None):
182          """Open a listening socket and start accepting connections on it"""
183          if bind is None:
184              bind = self.getListeningIP()
185          port = config.safeGetInt("bitmessagesettings", "port")
186          # correct port even if it changed
187          ls = TCPServer(host=bind, port=port)
188          self.listeningSockets[ls.destination] = ls
189  
190      def startUDPSocket(self, bind=None):
191          """
192          Open an UDP socket. Depending on settings, it can either only
193          accept incoming UDP packets, or also be able to send them.
194          """
195          if bind is None:
196              host = self.getListeningIP()
197              udpSocket = UDPSocket(host=host, announcing=True)
198          else:
199              if bind is False:
200                  udpSocket = UDPSocket(announcing=False)
201              else:
202                  udpSocket = UDPSocket(host=bind, announcing=True)
203          self.udpSockets[udpSocket.listening.host] = udpSocket
204  
205      def startBootstrappers(self):
206          """Run the process of resolving bootstrap hostnames"""
207          proxy_type = config.safeGet(
208              'bitmessagesettings', 'socksproxytype')
209          # A plugins may be added here
210          hostname = None
211          if not proxy_type or proxy_type == 'none':
212              connection_base = TCPConnection
213          elif proxy_type == 'SOCKS5':
214              connection_base = Socks5BMConnection
215              hostname = random.choice([  # nosec B311
216                  'quzwelsuziwqgpt2.onion', None
217              ])
218          elif proxy_type == 'SOCKS4a':
219              connection_base = Socks4aBMConnection  # FIXME: I cannot test
220          else:
221              # This should never happen because socksproxytype setting
222              # is handled in bitmessagemain before starting the connectionpool
223              return
224  
225          bootstrapper = bootstrap(connection_base)
226          if not hostname:
227              port = random.choice([8080, 8444])  # nosec B311
228              hostname = 'bootstrap%s.bitmessage.org' % port
229          else:
230              port = 8444
231          self.addConnection(bootstrapper(hostname, port))
232  
233      def loop(self):  # pylint: disable=too-many-branches,too-many-statements
234          """Main Connectionpool's loop"""
235          # pylint: disable=too-many-locals
236          # defaults to empty loop if outbound connections are maxed
237          spawnConnections = False
238          acceptConnections = True
239          if config.safeGetBoolean(
240                  'bitmessagesettings', 'dontconnect'):
241              acceptConnections = False
242          elif config.safeGetBoolean(
243                  'bitmessagesettings', 'sendoutgoingconnections'):
244              spawnConnections = True
245          socksproxytype = config.safeGet(
246              'bitmessagesettings', 'socksproxytype', '')
247          onionsocksproxytype = config.safeGet(
248              'bitmessagesettings', 'onionsocksproxytype', '')
249          if (
250              socksproxytype[:5] == 'SOCKS'
251              and not config.safeGetBoolean(
252                  'bitmessagesettings', 'sockslisten')
253              and '.onion' not in config.safeGet(
254                  'bitmessagesettings', 'onionhostname', '')
255          ):
256              acceptConnections = False
257  
258          # pylint: disable=too-many-nested-blocks
259          if spawnConnections:
260              if not knownnodes.knownNodesActual:
261                  self.startBootstrappers()
262                  knownnodes.knownNodesActual = True
263              if not self._bootstrapped:
264                  self._bootstrapped = True
265                  Proxy.proxy = (
266                      config.safeGet(
267                          'bitmessagesettings', 'sockshostname'),
268                      config.safeGetInt(
269                          'bitmessagesettings', 'socksport')
270                  )
271                  # TODO AUTH
272                  # TODO reset based on GUI settings changes
273                  try:
274                      if not onionsocksproxytype.startswith("SOCKS"):
275                          raise ValueError
276                      Proxy.onion_proxy = (
277                          config.safeGet(
278                              'network', 'onionsockshostname', None),
279                          config.safeGet(
280                              'network', 'onionsocksport', None)
281                      )
282                  except ValueError:
283                      Proxy.onion_proxy = None
284              established = sum(
285                  1 for c in list(self.outboundConnections.values())
286                  if (c.connected and c.fullyEstablished))
287              pending = len(self.outboundConnections) - established
288              if established < config.safeGetInt(
289                      'bitmessagesettings', 'maxoutboundconnections'):
290                  for i in range(
291                          state.maximumNumberOfHalfOpenConnections - pending):
292                      try:
293                          chosen = self.trustedPeer or chooseConnection(
294                              random.choice(self.streams))  # nosec B311
295                      except ValueError:
296                          continue
297                      if chosen in self.outboundConnections:
298                          continue
299                      if chosen.host in self.inboundConnections:
300                          continue
301                      # don't connect to self
302                      if chosen in state.ownAddresses:
303                          continue
304                      # don't connect to the hosts from the same
305                      # network group, defense against sibyl attacks
306                      host_network_group = protocol.network_group(
307                          chosen.host)
308                      same_group = False
309                      for j in list(self.outboundConnections.values()):
310                          if host_network_group == j.network_group:
311                              same_group = True
312                              if chosen.host == j.destination.host:
313                                  knownnodes.decreaseRating(chosen)
314                              break
315                      if same_group:
316                          continue
317  
318                      try:
319                          if chosen.host.endswith(".onion") and Proxy.onion_proxy:
320                              if onionsocksproxytype == "SOCKS5":
321                                  self.addConnection(Socks5BMConnection(chosen))
322                              elif onionsocksproxytype == "SOCKS4a":
323                                  self.addConnection(Socks4aBMConnection(chosen))
324                          elif socksproxytype == "SOCKS5":
325                              self.addConnection(Socks5BMConnection(chosen))
326                          elif socksproxytype == "SOCKS4a":
327                              self.addConnection(Socks4aBMConnection(chosen))
328                          else:
329                              self.addConnection(TCPConnection(chosen))
330                      except socket.error as e:
331                          if e.errno == errno.ENETUNREACH:
332                              continue
333  
334                      self._lastSpawned = time.time()
335          else:
336              for i in list(self.outboundConnections.values()):
337                  # FIXME: rating will be increased after next connection
338                  i.handle_close()
339  
340          if acceptConnections:
341              if not self.listeningSockets:
342                  if config.safeGet('network', 'bind') == '':
343                      self.startListening()
344                  else:
345                      for bind in re.sub(
346                          r'[^\w.]+', ' ',
347                          config.safeGet('network', 'bind')
348                      ).split():
349                          self.startListening(bind)
350                  logger.info('Listening for incoming connections.')
351              if not self.udpSockets:
352                  if config.safeGet('network', 'bind') == '':
353                      self.startUDPSocket()
354                  else:
355                      for bind in re.sub(
356                          r'[^\w.]+', ' ',
357                          config.safeGet('network', 'bind')
358                      ).split():
359                          self.startUDPSocket(bind)
360                      self.startUDPSocket(False)
361                  logger.info('Starting UDP socket(s).')
362          else:
363              if self.listeningSockets:
364                  for i in list(self.listeningSockets.values()):
365                      i.close_reason = "Stopping listening"
366                      i.accepting = i.connecting = i.connected = False
367                  logger.info('Stopped listening for incoming connections.')
368              if self.udpSockets:
369                  for i in list(self.udpSockets.values()):
370                      i.close_reason = "Stopping UDP socket"
371                      i.accepting = i.connecting = i.connected = False
372                  logger.info('Stopped udp sockets.')
373  
374          loopTime = float(self._spawnWait)
375          if self._lastSpawned < time.time() - self._spawnWait:
376              loopTime = 2.0
377          asyncore.loop(timeout=loopTime, count=1000)
378  
379          reaper = []
380          for i in self.connections():
381              minTx = time.time() - 20
382              if i.fullyEstablished:
383                  minTx -= 300 - 20
384              if i.lastTx < minTx:
385                  if i.fullyEstablished:
386                      i.append_write_buf(protocol.CreatePacket('ping'))
387                  else:
388                      i.close_reason = "Timeout (%is)" % (
389                          time.time() - i.lastTx)
390                      i.set_state("close")
391          for i in (
392              self.connections()
393              + list(self.listeningSockets.values()) + list(self.udpSockets.values())
394          ):
395              if not (i.accepting or i.connecting or i.connected):
396                  reaper.append(i)
397              else:
398                  try:
399                      if i.state == "close":
400                          reaper.append(i)
401                  except AttributeError:
402                      pass
403          for i in reaper:
404              self.removeConnection(i)
405  
406  
407  pool = BMConnectionPool()