/ src / network / connectionpool.py.bak
connectionpool.py.bak
  1  """
  2  `BMConnectionPool` class definition
  3  """
  4  import errno
  5  import logging
  6  import re
  7  import socket
  8  import sys
  9  import time
 10  import random
 11  
 12  import asyncore_pollchoose as asyncore
 13  import knownnodes
 14  import protocol
 15  import state
 16  from bmconfigparser import config
 17  from connectionchooser import chooseConnection
 18  from node import Peer
 19  from proxy import Proxy
 20  from tcp import (
 21      bootstrap, Socks4aBMConnection, Socks5BMConnection,
 22      TCPConnection, TCPServer)
 23  from udp import UDPSocket
 24  
 25  logger = logging.getLogger('default')
 26  
 27  
 28  class BMConnectionPool(object):
 29      """Pool of all existing connections"""
 30      # pylint: disable=too-many-instance-attributes
 31      trustedPeer = None
 32      """
 33      If the trustedpeer option is specified in keys.dat then this will
 34      contain a Peer which will be connected to instead of using the
 35      addresses advertised by other peers.
 36  
 37      The expected use case is where the user has a trusted server where
 38      they run a Bitmessage daemon permanently. If they then run a second
 39      instance of the client on a local machine periodically when they want
 40      to check for messages it will sync with the network a lot faster
 41      without compromising security.
 42      """
 43  
 44      def __init__(self):
 45          asyncore.set_rates(
 46              config.safeGetInt(
 47                  "bitmessagesettings", "maxdownloadrate"),
 48              config.safeGetInt(
 49                  "bitmessagesettings", "maxuploadrate")
 50          )
 51          self.outboundConnections = {}
 52          self.inboundConnections = {}
 53          self.listeningSockets = {}
 54          self.udpSockets = {}
 55          self.streams = []
 56          self._lastSpawned = 0
 57          self._spawnWait = 2
 58          self._bootstrapped = False
 59  
 60          trustedPeer = config.safeGet(
 61              'bitmessagesettings', 'trustedpeer')
 62          try:
 63              if trustedPeer:
 64                  host, port = trustedPeer.split(':')
 65                  self.trustedPeer = Peer(host, int(port))
 66          except ValueError:
 67              sys.exit(
 68                  'Bad trustedpeer config setting! It should be set as'
 69                  ' trustedpeer=<hostname>:<portnumber>'
 70              )
 71  
 72      def __len__(self):
 73          return len(self.outboundConnections) + len(self.inboundConnections)
 74  
 75      def connections(self):
 76          """
 77          Shortcut for combined list of connections from
 78          `inboundConnections` and `outboundConnections` dicts
 79          """
 80          return self.inboundConnections.values() + self.outboundConnections.values()
 81  
 82      def establishedConnections(self):
 83          """Shortcut for list of connections having fullyEstablished == True"""
 84          return [
 85              x for x in self.connections() if x.fullyEstablished]
 86  
 87      def connectToStream(self, streamNumber):
 88          """Connect to a bitmessage stream"""
 89          self.streams.append(streamNumber)
 90  
 91      def getConnectionByAddr(self, addr):
 92          """
 93          Return an (existing) connection object based on a `Peer` object
 94          (IP and port)
 95          """
 96          try:
 97              return self.inboundConnections[addr]
 98          except KeyError:
 99              pass
100          try:
101              return self.inboundConnections[addr.host]
102          except (KeyError, AttributeError):
103              pass
104          try:
105              return self.outboundConnections[addr]
106          except KeyError:
107              pass
108          try:
109              return self.udpSockets[addr.host]
110          except (KeyError, AttributeError):
111              pass
112          raise KeyError
113  
114      def isAlreadyConnected(self, nodeid):
115          """Check if we're already connected to this peer"""
116          for i in self.connections():
117              try:
118                  if nodeid == i.nodeid:
119                      return True
120              except AttributeError:
121                  pass
122          return False
123  
124      def addConnection(self, connection):
125          """Add a connection object to our internal dict"""
126          if isinstance(connection, UDPSocket):
127              return
128          if connection.isOutbound:
129              self.outboundConnections[connection.destination] = connection
130          else:
131              if connection.destination.host in self.inboundConnections:
132                  self.inboundConnections[connection.destination] = connection
133              else:
134                  self.inboundConnections[connection.destination.host] = \
135                      connection
136  
137      def removeConnection(self, connection):
138          """Remove a connection from our internal dict"""
139          if isinstance(connection, UDPSocket):
140              del self.udpSockets[connection.listening.host]
141          elif isinstance(connection, TCPServer):
142              del self.listeningSockets[Peer(
143                  connection.destination.host, connection.destination.port)]
144          elif connection.isOutbound:
145              try:
146                  del self.outboundConnections[connection.destination]
147              except KeyError:
148                  pass
149          else:
150              try:
151                  del self.inboundConnections[connection.destination]
152              except KeyError:
153                  try:
154                      del self.inboundConnections[connection.destination.host]
155                  except KeyError:
156                      pass
157          connection.handle_close()
158  
159      @staticmethod
160      def getListeningIP():
161          """What IP are we supposed to be listening on?"""
162          if config.safeGet(
163                  "bitmessagesettings", "onionhostname", "").endswith(".onion"):
164              host = config.safeGet(
165                  "bitmessagesettings", "onionbindip")
166          else:
167              host = '127.0.0.1'
168          if (
169              config.safeGetBoolean("bitmessagesettings", "sockslisten")
170              or config.safeGet("bitmessagesettings", "socksproxytype")
171              == "none"
172          ):
173              # python doesn't like bind + INADDR_ANY?
174              # host = socket.INADDR_ANY
175              host = config.get("network", "bind")
176          return host
177  
178      def startListening(self, bind=None):
179          """Open a listening socket and start accepting connections on it"""
180          if bind is None:
181              bind = self.getListeningIP()
182          port = config.safeGetInt("bitmessagesettings", "port")
183          # correct port even if it changed
184          ls = TCPServer(host=bind, port=port)
185          self.listeningSockets[ls.destination] = ls
186  
187      def startUDPSocket(self, bind=None):
188          """
189          Open an UDP socket. Depending on settings, it can either only
190          accept incoming UDP packets, or also be able to send them.
191          """
192          if bind is None:
193              host = self.getListeningIP()
194              udpSocket = UDPSocket(host=host, announcing=True)
195          else:
196              if bind is False:
197                  udpSocket = UDPSocket(announcing=False)
198              else:
199                  udpSocket = UDPSocket(host=bind, announcing=True)
200          self.udpSockets[udpSocket.listening.host] = udpSocket
201  
202      def startBootstrappers(self):
203          """Run the process of resolving bootstrap hostnames"""
204          proxy_type = config.safeGet(
205              'bitmessagesettings', 'socksproxytype')
206          # A plugins may be added here
207          hostname = None
208          if not proxy_type or proxy_type == 'none':
209              connection_base = TCPConnection
210          elif proxy_type == 'SOCKS5':
211              connection_base = Socks5BMConnection
212              hostname = random.choice([  # nosec B311
213                  'quzwelsuziwqgpt2.onion', None
214              ])
215          elif proxy_type == 'SOCKS4a':
216              connection_base = Socks4aBMConnection  # FIXME: I cannot test
217          else:
218              # This should never happen because socksproxytype setting
219              # is handled in bitmessagemain before starting the connectionpool
220              return
221  
222          bootstrapper = bootstrap(connection_base)
223          if not hostname:
224              port = random.choice([8080, 8444])  # nosec B311
225              hostname = 'bootstrap%s.bitmessage.org' % port
226          else:
227              port = 8444
228          self.addConnection(bootstrapper(hostname, port))
229  
230      def loop(self):  # pylint: disable=too-many-branches,too-many-statements
231          """Main Connectionpool's loop"""
232          # pylint: disable=too-many-locals
233          # defaults to empty loop if outbound connections are maxed
234          spawnConnections = False
235          acceptConnections = True
236          if config.safeGetBoolean(
237                  'bitmessagesettings', 'dontconnect'):
238              acceptConnections = False
239          elif config.safeGetBoolean(
240                  'bitmessagesettings', 'sendoutgoingconnections'):
241              spawnConnections = True
242          socksproxytype = config.safeGet(
243              'bitmessagesettings', 'socksproxytype', '')
244          onionsocksproxytype = config.safeGet(
245              'bitmessagesettings', 'onionsocksproxytype', '')
246          if (
247              socksproxytype[:5] == 'SOCKS'
248              and not config.safeGetBoolean(
249                  'bitmessagesettings', 'sockslisten')
250              and '.onion' not in config.safeGet(
251                  'bitmessagesettings', 'onionhostname', '')
252          ):
253              acceptConnections = False
254  
255          # pylint: disable=too-many-nested-blocks
256          if spawnConnections:
257              if not knownnodes.knownNodesActual:
258                  self.startBootstrappers()
259                  knownnodes.knownNodesActual = True
260              if not self._bootstrapped:
261                  self._bootstrapped = True
262                  Proxy.proxy = (
263                      config.safeGet(
264                          'bitmessagesettings', 'sockshostname'),
265                      config.safeGetInt(
266                          'bitmessagesettings', 'socksport')
267                  )
268                  # TODO AUTH
269                  # TODO reset based on GUI settings changes
270                  try:
271                      if not onionsocksproxytype.startswith("SOCKS"):
272                          raise ValueError
273                      Proxy.onion_proxy = (
274                          config.safeGet(
275                              'network', 'onionsockshostname', None),
276                          config.safeGet(
277                              'network', 'onionsocksport', None)
278                      )
279                  except ValueError:
280                      Proxy.onion_proxy = None
281              established = sum(
282                  1 for c in self.outboundConnections.values()
283                  if (c.connected and c.fullyEstablished))
284              pending = len(self.outboundConnections) - established
285              if established < config.safeGetInt(
286                      'bitmessagesettings', 'maxoutboundconnections'):
287                  for i in range(
288                          state.maximumNumberOfHalfOpenConnections - pending):
289                      try:
290                          chosen = self.trustedPeer or chooseConnection(
291                              random.choice(self.streams))  # nosec B311
292                      except ValueError:
293                          continue
294                      if chosen in self.outboundConnections:
295                          continue
296                      if chosen.host in self.inboundConnections:
297                          continue
298                      # don't connect to self
299                      if chosen in state.ownAddresses:
300                          continue
301                      # don't connect to the hosts from the same
302                      # network group, defense against sibyl attacks
303                      host_network_group = protocol.network_group(
304                          chosen.host)
305                      same_group = False
306                      for j in self.outboundConnections.values():
307                          if host_network_group == j.network_group:
308                              same_group = True
309                              if chosen.host == j.destination.host:
310                                  knownnodes.decreaseRating(chosen)
311                              break
312                      if same_group:
313                          continue
314  
315                      try:
316                          if chosen.host.endswith(".onion") and Proxy.onion_proxy:
317                              if onionsocksproxytype == "SOCKS5":
318                                  self.addConnection(Socks5BMConnection(chosen))
319                              elif onionsocksproxytype == "SOCKS4a":
320                                  self.addConnection(Socks4aBMConnection(chosen))
321                          elif socksproxytype == "SOCKS5":
322                              self.addConnection(Socks5BMConnection(chosen))
323                          elif socksproxytype == "SOCKS4a":
324                              self.addConnection(Socks4aBMConnection(chosen))
325                          else:
326                              self.addConnection(TCPConnection(chosen))
327                      except socket.error as e:
328                          if e.errno == errno.ENETUNREACH:
329                              continue
330  
331                      self._lastSpawned = time.time()
332          else:
333              for i in self.outboundConnections.values():
334                  # FIXME: rating will be increased after next connection
335                  i.handle_close()
336  
337          if acceptConnections:
338              if not self.listeningSockets:
339                  if config.safeGet('network', 'bind') == '':
340                      self.startListening()
341                  else:
342                      for bind in re.sub(
343                          r'[^\w.]+', ' ',
344                          config.safeGet('network', 'bind')
345                      ).split():
346                          self.startListening(bind)
347                  logger.info('Listening for incoming connections.')
348              if not self.udpSockets:
349                  if config.safeGet('network', 'bind') == '':
350                      self.startUDPSocket()
351                  else:
352                      for bind in re.sub(
353                          r'[^\w.]+', ' ',
354                          config.safeGet('network', 'bind')
355                      ).split():
356                          self.startUDPSocket(bind)
357                      self.startUDPSocket(False)
358                  logger.info('Starting UDP socket(s).')
359          else:
360              if self.listeningSockets:
361                  for i in self.listeningSockets.values():
362                      i.close_reason = "Stopping listening"
363                      i.accepting = i.connecting = i.connected = False
364                  logger.info('Stopped listening for incoming connections.')
365              if self.udpSockets:
366                  for i in self.udpSockets.values():
367                      i.close_reason = "Stopping UDP socket"
368                      i.accepting = i.connecting = i.connected = False
369                  logger.info('Stopped udp sockets.')
370  
371          loopTime = float(self._spawnWait)
372          if self._lastSpawned < time.time() - self._spawnWait:
373              loopTime = 2.0
374          asyncore.loop(timeout=loopTime, count=1000)
375  
376          reaper = []
377          for i in self.connections():
378              minTx = time.time() - 20
379              if i.fullyEstablished:
380                  minTx -= 300 - 20
381              if i.lastTx < minTx:
382                  if i.fullyEstablished:
383                      i.append_write_buf(protocol.CreatePacket('ping'))
384                  else:
385                      i.close_reason = "Timeout (%is)" % (
386                          time.time() - i.lastTx)
387                      i.set_state("close")
388          for i in (
389              self.connections()
390              + self.listeningSockets.values() + self.udpSockets.values()
391          ):
392              if not (i.accepting or i.connecting or i.connected):
393                  reaper.append(i)
394              else:
395                  try:
396                      if i.state == "close":
397                          reaper.append(i)
398                  except AttributeError:
399                      pass
400          for i in reaper:
401              self.removeConnection(i)
402  
403  
404  pool = BMConnectionPool()