/ src / network / tcp.py.bak
tcp.py.bak
  1  """
  2  TCP protocol handler
  3  """
  4  # pylint: disable=too-many-ancestors
  5  
  6  import logging
  7  import math
  8  import random
  9  import socket
 10  import time
 11  
 12  # magic imports!
 13  import addresses
 14  import l10n
 15  import protocol
 16  import state
 17  import connectionpool
 18  from bmconfigparser import config
 19  from highlevelcrypto import randomBytes
 20  from network import dandelion_ins, invQueue, receiveDataQueue
 21  from queues import UISignalQueue
 22  from tr import _translate
 23  
 24  import asyncore_pollchoose as asyncore
 25  import knownnodes
 26  from network.advanceddispatcher import AdvancedDispatcher
 27  from network.bmproto import BMProto
 28  from network.objectracker import ObjectTracker
 29  from network.socks4a import Socks4aConnection
 30  from network.socks5 import Socks5Connection
 31  from network.tls import TLSDispatcher
 32  from node import Peer
 33  
 34  
 35  logger = logging.getLogger('default')
 36  
 37  
 38  maximumAgeOfNodesThatIAdvertiseToOthers = 10800  #: Equals three hours
 39  maximumTimeOffsetWrongCount = 3  #: Connections with wrong time offset
 40  
 41  
 42  class TCPConnection(BMProto, TLSDispatcher):
 43      # pylint: disable=too-many-instance-attributes
 44      """
 45      .. todo:: Look to understand and/or fix the non-parent-init-called
 46      """
 47  
 48      def __init__(self, address=None, sock=None):
 49          BMProto.__init__(self, address=address, sock=sock)
 50          self.verackReceived = False
 51          self.verackSent = False
 52          self.streams = [0]
 53          self.fullyEstablished = False
 54          self.skipUntil = 0
 55          if address is None and sock is not None:
 56              self.destination = Peer(*sock.getpeername())
 57              self.isOutbound = False
 58              TLSDispatcher.__init__(self, sock, server_side=True)
 59              self.connectedAt = time.time()
 60              logger.debug(
 61                  'Received connection from %s:%i',
 62                  self.destination.host, self.destination.port)
 63              self.nodeid = randomBytes(8)
 64          elif address is not None and sock is not None:
 65              TLSDispatcher.__init__(self, sock, server_side=False)
 66              self.isOutbound = True
 67              logger.debug(
 68                  'Outbound proxy connection to %s:%i',
 69                  self.destination.host, self.destination.port)
 70          else:
 71              self.destination = address
 72              self.isOutbound = True
 73              self.create_socket(
 74                  socket.AF_INET6 if ":" in address.host else socket.AF_INET,
 75                  socket.SOCK_STREAM)
 76              self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 77              TLSDispatcher.__init__(self, sock, server_side=False)
 78              self.connect(self.destination)
 79              logger.debug(
 80                  'Connecting to %s:%i',
 81                  self.destination.host, self.destination.port)
 82          try:
 83              self.local = (
 84                  protocol.checkIPAddress(
 85                      protocol.encodeHost(self.destination.host), True)
 86                  and not protocol.checkSocksIP(self.destination.host)
 87              )
 88          except socket.error:
 89              # it's probably a hostname
 90              pass
 91          self.network_group = protocol.network_group(self.destination.host)
 92          ObjectTracker.__init__(self)  # pylint: disable=non-parent-init-called
 93          self.bm_proto_reset()
 94          self.set_state("bm_header", expectBytes=protocol.Header.size)
 95  
 96      def antiIntersectionDelay(self, initial=False):
 97          """
 98          This is a defense against the so called intersection attacks.
 99  
100          It is called when you notice peer is requesting non-existing
101          objects, or right after the connection is established. It will
102          estimate how long an object will take to propagate across the
103          network, and skip processing "getdata" requests until then. This
104          means an attacker only has one shot per IP to perform the attack.
105          """
106          # estimated time for a small object to propagate across the
107          # whole network
108          max_known_nodes = max(
109              len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes)
110          delay = math.ceil(math.log(max_known_nodes + 2, 20)) * (
111              0.2 + invQueue.queueCount / 2.0)
112          # take the stream with maximum amount of nodes
113          # +2 is to avoid problems with log(0) and log(1)
114          # 20 is avg connected nodes count
115          # 0.2 is avg message transmission time
116          if delay > 0:
117              if initial:
118                  self.skipUntil = self.connectedAt + delay
119                  if self.skipUntil > time.time():
120                      logger.debug(
121                          'Initial skipping processing getdata for %.2fs',
122                          self.skipUntil - time.time())
123              else:
124                  logger.debug(
125                      'Skipping processing getdata due to missing object'
126                      ' for %.2fs', delay)
127                  self.skipUntil = time.time() + delay
128  
129      def checkTimeOffsetNotification(self):
130          """
131          Check if we have connected to too many nodes which have too high
132          time offset from us
133          """
134          if BMProto.timeOffsetWrongCount > \
135                  maximumTimeOffsetWrongCount and \
136                  not self.fullyEstablished:
137              UISignalQueue.put((
138                  'updateStatusBar',
139                  _translate(
140                      "MainWindow",
141                      "The time on your computer, %1, may be wrong. "
142                      "Please verify your settings."
143                  ).arg(l10n.formatTimestamp())))
144  
145      def state_connection_fully_established(self):
146          """
147          State after the bitmessage protocol handshake is completed
148          (version/verack exchange, and if both side support TLS,
149          the TLS handshake as well).
150          """
151          self.set_connection_fully_established()
152          self.set_state("bm_header")
153          self.bm_proto_reset()
154          return True
155  
156      def set_connection_fully_established(self):
157          """Initiate inventory synchronisation."""
158          if not self.isOutbound and not self.local:
159              state.clientHasReceivedIncomingConnections = True
160              UISignalQueue.put(('setStatusIcon', 'green'))
161          UISignalQueue.put((
162              'updateNetworkStatusTab', (self.isOutbound, True, self.destination)
163          ))
164          self.antiIntersectionDelay(True)
165          self.fullyEstablished = True
166          # The connection having host suitable for knownnodes
167          if self.isOutbound or not self.local and not state.socksIP:
168              knownnodes.increaseRating(self.destination)
169              knownnodes.addKnownNode(
170                  self.streams, self.destination, time.time())
171              dandelion_ins.maybeAddStem(self, invQueue)
172          self.sendAddr()
173          self.sendBigInv()
174  
175      def sendAddr(self):
176          """Send a partial list of known addresses to peer."""
177          # We are going to share a maximum number of 1000 addrs (per overlapping
178          # stream) with our peer. 500 from overlapping streams, 250 from the
179          # left child stream, and 250 from the right child stream.
180          maxAddrCount = config.safeGetInt(
181              "bitmessagesettings", "maxaddrperstreamsend", 500)
182  
183          templist = []
184          addrs = {}
185          for stream in self.streams:
186              with knownnodes.knownNodesLock:
187                  for n, s in enumerate((stream, stream * 2, stream * 2 + 1)):
188                      nodes = knownnodes.knownNodes.get(s)
189                      if not nodes:
190                          continue
191                      # only if more recent than 3 hours
192                      # and having positive or neutral rating
193                      filtered = [
194                          (k, v) for k, v in nodes.iteritems()
195                          if v["lastseen"] > int(time.time())
196                          - maximumAgeOfNodesThatIAdvertiseToOthers
197                          and v["rating"] >= 0 and not k.host.endswith('.onion')
198                      ]
199                      # sent 250 only if the remote isn't interested in it
200                      elemCount = min(
201                          len(filtered),
202                          maxAddrCount / 2 if n else maxAddrCount)
203                      addrs[s] = random.sample(filtered, elemCount)
204          for substream in addrs:
205              for peer, params in addrs[substream]:
206                  templist.append((substream, peer, params["lastseen"]))
207          if templist:
208              self.append_write_buf(protocol.assembleAddrMessage(templist))
209  
210      def sendBigInv(self):
211          """
212          Send hashes of all inventory objects, chunked as the protocol has
213          a per-command limit.
214          """
215          def sendChunk():
216              """Send one chunk of inv entries in one command"""
217              if objectCount == 0:
218                  return
219              logger.debug(
220                  'Sending huge inv message with %i objects to just this'
221                  ' one peer', objectCount)
222              self.append_write_buf(protocol.CreatePacket(
223                  'inv', addresses.encodeVarint(objectCount) + payload))
224  
225          # Select all hashes for objects in this stream.
226          bigInvList = {}
227          for stream in self.streams:
228              # may lock for a long time, but I think it's better than
229              # thousands of small locks
230              with self.objectsNewToThemLock:
231                  for objHash in state.Inventory.unexpired_hashes_by_stream(stream):
232                      # don't advertise stem objects on bigInv
233                      if dandelion_ins.hasHash(objHash):
234                          continue
235                      bigInvList[objHash] = 0
236          objectCount = 0
237          payload = b''
238          # Now let us start appending all of these hashes together.
239          # They will be sent out in a big inv message to our new peer.
240          for obj_hash, _ in bigInvList.items():
241              payload += obj_hash
242              objectCount += 1
243  
244              # Remove -1 below when sufficient time has passed for users to
245              # upgrade to versions of PyBitmessage that accept inv with 50,000
246              # items
247              if objectCount >= protocol.MAX_OBJECT_COUNT - 1:
248                  sendChunk()
249                  payload = b''
250                  objectCount = 0
251  
252          # flush
253          sendChunk()
254  
255      def handle_connect(self):
256          """Callback for TCP connection being established."""
257          try:
258              AdvancedDispatcher.handle_connect(self)
259          except socket.error as e:
260              # pylint: disable=protected-access
261              if e.errno in asyncore._DISCONNECTED:
262                  logger.debug(
263                      '%s:%i: Connection failed: %s',
264                      self.destination.host, self.destination.port, e)
265                  return
266          self.nodeid = randomBytes(8)
267          self.append_write_buf(
268              protocol.assembleVersionMessage(
269                  self.destination.host, self.destination.port,
270                  connectionpool.pool.streams, dandelion_ins.enabled,
271                  False, nodeid=self.nodeid))
272          self.connectedAt = time.time()
273          receiveDataQueue.put(self.destination)
274  
275      def handle_read(self):
276          """Callback for reading from a socket"""
277          TLSDispatcher.handle_read(self)
278          receiveDataQueue.put(self.destination)
279  
280      def handle_write(self):
281          """Callback for writing to a socket"""
282          TLSDispatcher.handle_write(self)
283  
284      def handle_close(self):
285          """Callback for connection being closed."""
286          host_is_global = self.isOutbound or not self.local and not state.socksIP
287          if self.fullyEstablished:
288              UISignalQueue.put((
289                  'updateNetworkStatusTab',
290                  (self.isOutbound, False, self.destination)
291              ))
292              if host_is_global:
293                  knownnodes.addKnownNode(
294                      self.streams, self.destination, time.time())
295                  dandelion_ins.maybeRemoveStem(self)
296          else:
297              self.checkTimeOffsetNotification()
298              if host_is_global:
299                  knownnodes.decreaseRating(self.destination)
300          BMProto.handle_close(self)
301  
302  
303  class Socks5BMConnection(Socks5Connection, TCPConnection):
304      """SOCKS5 wrapper for TCP connections"""
305  
306      def __init__(self, address):
307          Socks5Connection.__init__(self, address=address)
308          TCPConnection.__init__(self, address=address, sock=self.socket)
309          self.set_state("init")
310  
311      def state_proxy_handshake_done(self):
312          """
313          State when SOCKS5 connection succeeds, we need to send a
314          Bitmessage handshake to peer.
315          """
316          Socks5Connection.state_proxy_handshake_done(self)
317          self.nodeid = randomBytes(8)
318          self.append_write_buf(
319              protocol.assembleVersionMessage(
320                  self.destination.host, self.destination.port,
321                  connectionpool.pool.streams, dandelion_ins.enabled,
322                  False, nodeid=self.nodeid))
323          self.set_state("bm_header", expectBytes=protocol.Header.size)
324          return True
325  
326  
327  class Socks4aBMConnection(Socks4aConnection, TCPConnection):
328      """SOCKS4a wrapper for TCP connections"""
329  
330      def __init__(self, address):
331          Socks4aConnection.__init__(self, address=address)
332          TCPConnection.__init__(self, address=address, sock=self.socket)
333          self.set_state("init")
334  
335      def state_proxy_handshake_done(self):
336          """
337          State when SOCKS4a connection succeeds, we need to send a
338          Bitmessage handshake to peer.
339          """
340          Socks4aConnection.state_proxy_handshake_done(self)
341          self.nodeid = randomBytes(8)
342          self.append_write_buf(
343              protocol.assembleVersionMessage(
344                  self.destination.host, self.destination.port,
345                  connectionpool.pool.streams, dandelion_ins.enabled,
346                  False, nodeid=self.nodeid))
347          self.set_state("bm_header", expectBytes=protocol.Header.size)
348          return True
349  
350  
351  def bootstrap(connection_class):
352      """Make bootstrapper class for connection type (connection_class)"""
353      class Bootstrapper(connection_class):
354          """Base class for bootstrappers"""
355          _connection_base = connection_class
356  
357          def __init__(self, host, port):
358              self._connection_base.__init__(self, Peer(host, port))
359              self.close_reason = self._succeed = False
360  
361          def bm_command_addr(self):
362              """
363              Got addr message - the bootstrap succeed.
364              Let BMProto process the addr message and switch state to 'close'
365              """
366              BMProto.bm_command_addr(self)
367              self._succeed = True
368              self.close_reason = "Thanks for bootstrapping!"
369              self.set_state("close")
370  
371          def set_connection_fully_established(self):
372              """Only send addr here"""
373              # pylint: disable=attribute-defined-outside-init
374              self.fullyEstablished = True
375              self.sendAddr()
376  
377          def handle_close(self):
378              """
379              After closing the connection switch knownnodes.knownNodesActual
380              back to False if the bootstrapper failed.
381              """
382              BMProto.handle_close(self)
383              if not self._succeed:
384                  knownnodes.knownNodesActual = False
385  
386      return Bootstrapper
387  
388  
389  class TCPServer(AdvancedDispatcher):
390      """TCP connection server for Bitmessage protocol"""
391  
392      def __init__(self, host='127.0.0.1', port=8444):
393          if not hasattr(self, '_map'):
394              AdvancedDispatcher.__init__(self)
395          self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
396          self.set_reuse_addr()
397          for attempt in range(50):
398              try:
399                  if attempt > 0:
400                      logger.warning('Failed to bind on port %s', port)
401                      port = random.randint(32767, 65535)  # nosec B311
402                  self.bind((host, port))
403              except socket.error as e:
404                  if e.errno in (asyncore.EADDRINUSE, asyncore.WSAEADDRINUSE):
405                      continue
406              else:
407                  if attempt > 0:
408                      logger.warning('Setting port to %s', port)
409                      config.set(
410                          'bitmessagesettings', 'port', str(port))
411                      config.save()
412                  break
413          self.destination = Peer(host, port)
414          self.bound = True
415          self.listen(5)
416  
417      def is_bound(self):
418          """Is the socket bound?"""
419          try:
420              return self.bound
421          except AttributeError:
422              return False
423  
424      def handle_accept(self):
425          """Incoming connection callback"""
426          try:
427              sock = self.accept()[0]
428          except (TypeError, IndexError):
429              return
430  
431          state.ownAddresses[Peer(*sock.getsockname())] = True
432          if (
433              len(connectionpool.pool)
434              > config.safeGetInt(
435                  'bitmessagesettings', 'maxtotalconnections')
436                  + config.safeGetInt(
437                      'bitmessagesettings', 'maxbootstrapconnections') + 10
438          ):
439              # 10 is a sort of buffer, in between it will go through
440              # the version handshake and return an error to the peer
441              logger.warning("Server full, dropping connection")
442              sock.close()
443              return
444          try:
445              connectionpool.pool.addConnection(
446                  TCPConnection(sock=sock))
447          except socket.error:
448              pass