tcp.py.bak
1 """ 2 TCP protocol handler 3 """ 4 # pylint: disable=too-many-ancestors 5 6 import logging 7 import math 8 import random 9 import socket 10 import time 11 12 # magic imports! 13 import addresses 14 import l10n 15 import protocol 16 import state 17 import connectionpool 18 from bmconfigparser import config 19 from highlevelcrypto import randomBytes 20 from network import dandelion_ins, invQueue, receiveDataQueue 21 from queues import UISignalQueue 22 from tr import _translate 23 24 import asyncore_pollchoose as asyncore 25 import knownnodes 26 from network.advanceddispatcher import AdvancedDispatcher 27 from network.bmproto import BMProto 28 from network.objectracker import ObjectTracker 29 from network.socks4a import Socks4aConnection 30 from network.socks5 import Socks5Connection 31 from network.tls import TLSDispatcher 32 from node import Peer 33 34 35 logger = logging.getLogger('default') 36 37 38 maximumAgeOfNodesThatIAdvertiseToOthers = 10800 #: Equals three hours 39 maximumTimeOffsetWrongCount = 3 #: Connections with wrong time offset 40 41 42 class TCPConnection(BMProto, TLSDispatcher): 43 # pylint: disable=too-many-instance-attributes 44 """ 45 .. todo:: Look to understand and/or fix the non-parent-init-called 46 """ 47 48 def __init__(self, address=None, sock=None): 49 BMProto.__init__(self, address=address, sock=sock) 50 self.verackReceived = False 51 self.verackSent = False 52 self.streams = [0] 53 self.fullyEstablished = False 54 self.skipUntil = 0 55 if address is None and sock is not None: 56 self.destination = Peer(*sock.getpeername()) 57 self.isOutbound = False 58 TLSDispatcher.__init__(self, sock, server_side=True) 59 self.connectedAt = time.time() 60 logger.debug( 61 'Received connection from %s:%i', 62 self.destination.host, self.destination.port) 63 self.nodeid = randomBytes(8) 64 elif address is not None and sock is not None: 65 TLSDispatcher.__init__(self, sock, server_side=False) 66 self.isOutbound = True 67 logger.debug( 68 'Outbound proxy connection to %s:%i', 69 self.destination.host, self.destination.port) 70 else: 71 self.destination = address 72 self.isOutbound = True 73 self.create_socket( 74 socket.AF_INET6 if ":" in address.host else socket.AF_INET, 75 socket.SOCK_STREAM) 76 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 77 TLSDispatcher.__init__(self, sock, server_side=False) 78 self.connect(self.destination) 79 logger.debug( 80 'Connecting to %s:%i', 81 self.destination.host, self.destination.port) 82 try: 83 self.local = ( 84 protocol.checkIPAddress( 85 protocol.encodeHost(self.destination.host), True) 86 and not protocol.checkSocksIP(self.destination.host) 87 ) 88 except socket.error: 89 # it's probably a hostname 90 pass 91 self.network_group = protocol.network_group(self.destination.host) 92 ObjectTracker.__init__(self) # pylint: disable=non-parent-init-called 93 self.bm_proto_reset() 94 self.set_state("bm_header", expectBytes=protocol.Header.size) 95 96 def antiIntersectionDelay(self, initial=False): 97 """ 98 This is a defense against the so called intersection attacks. 99 100 It is called when you notice peer is requesting non-existing 101 objects, or right after the connection is established. It will 102 estimate how long an object will take to propagate across the 103 network, and skip processing "getdata" requests until then. This 104 means an attacker only has one shot per IP to perform the attack. 105 """ 106 # estimated time for a small object to propagate across the 107 # whole network 108 max_known_nodes = max( 109 len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) 110 delay = math.ceil(math.log(max_known_nodes + 2, 20)) * ( 111 0.2 + invQueue.queueCount / 2.0) 112 # take the stream with maximum amount of nodes 113 # +2 is to avoid problems with log(0) and log(1) 114 # 20 is avg connected nodes count 115 # 0.2 is avg message transmission time 116 if delay > 0: 117 if initial: 118 self.skipUntil = self.connectedAt + delay 119 if self.skipUntil > time.time(): 120 logger.debug( 121 'Initial skipping processing getdata for %.2fs', 122 self.skipUntil - time.time()) 123 else: 124 logger.debug( 125 'Skipping processing getdata due to missing object' 126 ' for %.2fs', delay) 127 self.skipUntil = time.time() + delay 128 129 def checkTimeOffsetNotification(self): 130 """ 131 Check if we have connected to too many nodes which have too high 132 time offset from us 133 """ 134 if BMProto.timeOffsetWrongCount > \ 135 maximumTimeOffsetWrongCount and \ 136 not self.fullyEstablished: 137 UISignalQueue.put(( 138 'updateStatusBar', 139 _translate( 140 "MainWindow", 141 "The time on your computer, %1, may be wrong. " 142 "Please verify your settings." 143 ).arg(l10n.formatTimestamp()))) 144 145 def state_connection_fully_established(self): 146 """ 147 State after the bitmessage protocol handshake is completed 148 (version/verack exchange, and if both side support TLS, 149 the TLS handshake as well). 150 """ 151 self.set_connection_fully_established() 152 self.set_state("bm_header") 153 self.bm_proto_reset() 154 return True 155 156 def set_connection_fully_established(self): 157 """Initiate inventory synchronisation.""" 158 if not self.isOutbound and not self.local: 159 state.clientHasReceivedIncomingConnections = True 160 UISignalQueue.put(('setStatusIcon', 'green')) 161 UISignalQueue.put(( 162 'updateNetworkStatusTab', (self.isOutbound, True, self.destination) 163 )) 164 self.antiIntersectionDelay(True) 165 self.fullyEstablished = True 166 # The connection having host suitable for knownnodes 167 if self.isOutbound or not self.local and not state.socksIP: 168 knownnodes.increaseRating(self.destination) 169 knownnodes.addKnownNode( 170 self.streams, self.destination, time.time()) 171 dandelion_ins.maybeAddStem(self, invQueue) 172 self.sendAddr() 173 self.sendBigInv() 174 175 def sendAddr(self): 176 """Send a partial list of known addresses to peer.""" 177 # We are going to share a maximum number of 1000 addrs (per overlapping 178 # stream) with our peer. 500 from overlapping streams, 250 from the 179 # left child stream, and 250 from the right child stream. 180 maxAddrCount = config.safeGetInt( 181 "bitmessagesettings", "maxaddrperstreamsend", 500) 182 183 templist = [] 184 addrs = {} 185 for stream in self.streams: 186 with knownnodes.knownNodesLock: 187 for n, s in enumerate((stream, stream * 2, stream * 2 + 1)): 188 nodes = knownnodes.knownNodes.get(s) 189 if not nodes: 190 continue 191 # only if more recent than 3 hours 192 # and having positive or neutral rating 193 filtered = [ 194 (k, v) for k, v in nodes.iteritems() 195 if v["lastseen"] > int(time.time()) 196 - maximumAgeOfNodesThatIAdvertiseToOthers 197 and v["rating"] >= 0 and not k.host.endswith('.onion') 198 ] 199 # sent 250 only if the remote isn't interested in it 200 elemCount = min( 201 len(filtered), 202 maxAddrCount / 2 if n else maxAddrCount) 203 addrs[s] = random.sample(filtered, elemCount) 204 for substream in addrs: 205 for peer, params in addrs[substream]: 206 templist.append((substream, peer, params["lastseen"])) 207 if templist: 208 self.append_write_buf(protocol.assembleAddrMessage(templist)) 209 210 def sendBigInv(self): 211 """ 212 Send hashes of all inventory objects, chunked as the protocol has 213 a per-command limit. 214 """ 215 def sendChunk(): 216 """Send one chunk of inv entries in one command""" 217 if objectCount == 0: 218 return 219 logger.debug( 220 'Sending huge inv message with %i objects to just this' 221 ' one peer', objectCount) 222 self.append_write_buf(protocol.CreatePacket( 223 'inv', addresses.encodeVarint(objectCount) + payload)) 224 225 # Select all hashes for objects in this stream. 226 bigInvList = {} 227 for stream in self.streams: 228 # may lock for a long time, but I think it's better than 229 # thousands of small locks 230 with self.objectsNewToThemLock: 231 for objHash in state.Inventory.unexpired_hashes_by_stream(stream): 232 # don't advertise stem objects on bigInv 233 if dandelion_ins.hasHash(objHash): 234 continue 235 bigInvList[objHash] = 0 236 objectCount = 0 237 payload = b'' 238 # Now let us start appending all of these hashes together. 239 # They will be sent out in a big inv message to our new peer. 240 for obj_hash, _ in bigInvList.items(): 241 payload += obj_hash 242 objectCount += 1 243 244 # Remove -1 below when sufficient time has passed for users to 245 # upgrade to versions of PyBitmessage that accept inv with 50,000 246 # items 247 if objectCount >= protocol.MAX_OBJECT_COUNT - 1: 248 sendChunk() 249 payload = b'' 250 objectCount = 0 251 252 # flush 253 sendChunk() 254 255 def handle_connect(self): 256 """Callback for TCP connection being established.""" 257 try: 258 AdvancedDispatcher.handle_connect(self) 259 except socket.error as e: 260 # pylint: disable=protected-access 261 if e.errno in asyncore._DISCONNECTED: 262 logger.debug( 263 '%s:%i: Connection failed: %s', 264 self.destination.host, self.destination.port, e) 265 return 266 self.nodeid = randomBytes(8) 267 self.append_write_buf( 268 protocol.assembleVersionMessage( 269 self.destination.host, self.destination.port, 270 connectionpool.pool.streams, dandelion_ins.enabled, 271 False, nodeid=self.nodeid)) 272 self.connectedAt = time.time() 273 receiveDataQueue.put(self.destination) 274 275 def handle_read(self): 276 """Callback for reading from a socket""" 277 TLSDispatcher.handle_read(self) 278 receiveDataQueue.put(self.destination) 279 280 def handle_write(self): 281 """Callback for writing to a socket""" 282 TLSDispatcher.handle_write(self) 283 284 def handle_close(self): 285 """Callback for connection being closed.""" 286 host_is_global = self.isOutbound or not self.local and not state.socksIP 287 if self.fullyEstablished: 288 UISignalQueue.put(( 289 'updateNetworkStatusTab', 290 (self.isOutbound, False, self.destination) 291 )) 292 if host_is_global: 293 knownnodes.addKnownNode( 294 self.streams, self.destination, time.time()) 295 dandelion_ins.maybeRemoveStem(self) 296 else: 297 self.checkTimeOffsetNotification() 298 if host_is_global: 299 knownnodes.decreaseRating(self.destination) 300 BMProto.handle_close(self) 301 302 303 class Socks5BMConnection(Socks5Connection, TCPConnection): 304 """SOCKS5 wrapper for TCP connections""" 305 306 def __init__(self, address): 307 Socks5Connection.__init__(self, address=address) 308 TCPConnection.__init__(self, address=address, sock=self.socket) 309 self.set_state("init") 310 311 def state_proxy_handshake_done(self): 312 """ 313 State when SOCKS5 connection succeeds, we need to send a 314 Bitmessage handshake to peer. 315 """ 316 Socks5Connection.state_proxy_handshake_done(self) 317 self.nodeid = randomBytes(8) 318 self.append_write_buf( 319 protocol.assembleVersionMessage( 320 self.destination.host, self.destination.port, 321 connectionpool.pool.streams, dandelion_ins.enabled, 322 False, nodeid=self.nodeid)) 323 self.set_state("bm_header", expectBytes=protocol.Header.size) 324 return True 325 326 327 class Socks4aBMConnection(Socks4aConnection, TCPConnection): 328 """SOCKS4a wrapper for TCP connections""" 329 330 def __init__(self, address): 331 Socks4aConnection.__init__(self, address=address) 332 TCPConnection.__init__(self, address=address, sock=self.socket) 333 self.set_state("init") 334 335 def state_proxy_handshake_done(self): 336 """ 337 State when SOCKS4a connection succeeds, we need to send a 338 Bitmessage handshake to peer. 339 """ 340 Socks4aConnection.state_proxy_handshake_done(self) 341 self.nodeid = randomBytes(8) 342 self.append_write_buf( 343 protocol.assembleVersionMessage( 344 self.destination.host, self.destination.port, 345 connectionpool.pool.streams, dandelion_ins.enabled, 346 False, nodeid=self.nodeid)) 347 self.set_state("bm_header", expectBytes=protocol.Header.size) 348 return True 349 350 351 def bootstrap(connection_class): 352 """Make bootstrapper class for connection type (connection_class)""" 353 class Bootstrapper(connection_class): 354 """Base class for bootstrappers""" 355 _connection_base = connection_class 356 357 def __init__(self, host, port): 358 self._connection_base.__init__(self, Peer(host, port)) 359 self.close_reason = self._succeed = False 360 361 def bm_command_addr(self): 362 """ 363 Got addr message - the bootstrap succeed. 364 Let BMProto process the addr message and switch state to 'close' 365 """ 366 BMProto.bm_command_addr(self) 367 self._succeed = True 368 self.close_reason = "Thanks for bootstrapping!" 369 self.set_state("close") 370 371 def set_connection_fully_established(self): 372 """Only send addr here""" 373 # pylint: disable=attribute-defined-outside-init 374 self.fullyEstablished = True 375 self.sendAddr() 376 377 def handle_close(self): 378 """ 379 After closing the connection switch knownnodes.knownNodesActual 380 back to False if the bootstrapper failed. 381 """ 382 BMProto.handle_close(self) 383 if not self._succeed: 384 knownnodes.knownNodesActual = False 385 386 return Bootstrapper 387 388 389 class TCPServer(AdvancedDispatcher): 390 """TCP connection server for Bitmessage protocol""" 391 392 def __init__(self, host='127.0.0.1', port=8444): 393 if not hasattr(self, '_map'): 394 AdvancedDispatcher.__init__(self) 395 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 396 self.set_reuse_addr() 397 for attempt in range(50): 398 try: 399 if attempt > 0: 400 logger.warning('Failed to bind on port %s', port) 401 port = random.randint(32767, 65535) # nosec B311 402 self.bind((host, port)) 403 except socket.error as e: 404 if e.errno in (asyncore.EADDRINUSE, asyncore.WSAEADDRINUSE): 405 continue 406 else: 407 if attempt > 0: 408 logger.warning('Setting port to %s', port) 409 config.set( 410 'bitmessagesettings', 'port', str(port)) 411 config.save() 412 break 413 self.destination = Peer(host, port) 414 self.bound = True 415 self.listen(5) 416 417 def is_bound(self): 418 """Is the socket bound?""" 419 try: 420 return self.bound 421 except AttributeError: 422 return False 423 424 def handle_accept(self): 425 """Incoming connection callback""" 426 try: 427 sock = self.accept()[0] 428 except (TypeError, IndexError): 429 return 430 431 state.ownAddresses[Peer(*sock.getsockname())] = True 432 if ( 433 len(connectionpool.pool) 434 > config.safeGetInt( 435 'bitmessagesettings', 'maxtotalconnections') 436 + config.safeGetInt( 437 'bitmessagesettings', 'maxbootstrapconnections') + 10 438 ): 439 # 10 is a sort of buffer, in between it will go through 440 # the version handshake and return an error to the peer 441 logger.warning("Server full, dropping connection") 442 sock.close() 443 return 444 try: 445 connectionpool.pool.addConnection( 446 TCPConnection(sock=sock)) 447 except socket.error: 448 pass