connectionpool.py
1 """ 2 `BMConnectionPool` class definition 3 """ 4 import errno 5 import logging 6 import re 7 import socket 8 import sys 9 import os 10 import time 11 import random 12 13 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) 14 15 from network import asyncore_pollchoose as asyncore 16 from network import knownnodes 17 import protocol 18 import state 19 from bmconfigparser import config 20 from connectionchooser import chooseConnection 21 from node import Peer 22 from proxy import Proxy 23 from network.tcp import ( 24 bootstrap, Socks4aBMConnection, Socks5BMConnection, 25 TCPConnection, TCPServer) 26 from udp import UDPSocket 27 28 logger = logging.getLogger('default') 29 30 31 class BMConnectionPool(object): 32 """Pool of all existing connections""" 33 # pylint: disable=too-many-instance-attributes 34 trustedPeer = None 35 """ 36 If the trustedpeer option is specified in keys.dat then this will 37 contain a Peer which will be connected to instead of using the 38 addresses advertised by other peers. 39 40 The expected use case is where the user has a trusted server where 41 they run a Bitmessage daemon permanently. If they then run a second 42 instance of the client on a local machine periodically when they want 43 to check for messages it will sync with the network a lot faster 44 without compromising security. 45 """ 46 47 def __init__(self): 48 asyncore.set_rates( 49 config.safeGetInt( 50 "bitmessagesettings", "maxdownloadrate"), 51 config.safeGetInt( 52 "bitmessagesettings", "maxuploadrate") 53 ) 54 self.outboundConnections = {} 55 self.inboundConnections = {} 56 self.listeningSockets = {} 57 self.udpSockets = {} 58 self.streams = [] 59 self._lastSpawned = 0 60 self._spawnWait = 2 61 self._bootstrapped = False 62 63 trustedPeer = config.safeGet( 64 'bitmessagesettings', 'trustedpeer') 65 try: 66 if trustedPeer: 67 host, port = trustedPeer.split(':') 68 self.trustedPeer = Peer(host, int(port)) 69 except ValueError: 70 sys.exit( 71 'Bad trustedpeer config setting! It should be set as' 72 ' trustedpeer=<hostname>:<portnumber>' 73 ) 74 75 def __len__(self): 76 return len(self.outboundConnections) + len(self.inboundConnections) 77 78 def connections(self): 79 """ 80 Shortcut for combined list of connections from 81 `inboundConnections` and `outboundConnections` dicts 82 """ 83 return list(self.inboundConnections.values()) + list(self.outboundConnections.values()) 84 85 def establishedConnections(self): 86 """Shortcut for list of connections having fullyEstablished == True""" 87 return [ 88 x for x in self.connections() if x.fullyEstablished] 89 90 def connectToStream(self, streamNumber): 91 """Connect to a bitmessage stream""" 92 self.streams.append(streamNumber) 93 94 def getConnectionByAddr(self, addr): 95 """ 96 Return an (existing) connection object based on a `Peer` object 97 (IP and port) 98 """ 99 try: 100 return self.inboundConnections[addr] 101 except KeyError: 102 pass 103 try: 104 return self.inboundConnections[addr.host] 105 except (KeyError, AttributeError): 106 pass 107 try: 108 return self.outboundConnections[addr] 109 except KeyError: 110 pass 111 try: 112 return self.udpSockets[addr.host] 113 except (KeyError, AttributeError): 114 pass 115 raise KeyError 116 117 def isAlreadyConnected(self, nodeid): 118 """Check if we're already connected to this peer""" 119 for i in self.connections(): 120 try: 121 if nodeid == i.nodeid: 122 return True 123 except AttributeError: 124 pass 125 return False 126 127 def addConnection(self, connection): 128 """Add a connection object to our internal dict""" 129 if isinstance(connection, UDPSocket): 130 return 131 if connection.isOutbound: 132 self.outboundConnections[connection.destination] = connection 133 else: 134 if connection.destination.host in self.inboundConnections: 135 self.inboundConnections[connection.destination] = connection 136 else: 137 self.inboundConnections[connection.destination.host] = \ 138 connection 139 140 def removeConnection(self, connection): 141 """Remove a connection from our internal dict""" 142 if isinstance(connection, UDPSocket): 143 del self.udpSockets[connection.listening.host] 144 elif isinstance(connection, TCPServer): 145 del self.listeningSockets[Peer( 146 connection.destination.host, connection.destination.port)] 147 elif connection.isOutbound: 148 try: 149 del self.outboundConnections[connection.destination] 150 except KeyError: 151 pass 152 else: 153 try: 154 del self.inboundConnections[connection.destination] 155 except KeyError: 156 try: 157 del self.inboundConnections[connection.destination.host] 158 except KeyError: 159 pass 160 connection.handle_close() 161 162 @staticmethod 163 def getListeningIP(): 164 """What IP are we supposed to be listening on?""" 165 if config.safeGet( 166 "bitmessagesettings", "onionhostname", "").endswith(".onion"): 167 host = config.safeGet( 168 "bitmessagesettings", "onionbindip") 169 else: 170 host = '127.0.0.1' 171 if ( 172 config.safeGetBoolean("bitmessagesettings", "sockslisten") 173 or config.safeGet("bitmessagesettings", "socksproxytype") 174 == "none" 175 ): 176 # python doesn't like bind + INADDR_ANY? 177 # host = socket.INADDR_ANY 178 host = config.get("network", "bind") 179 return host 180 181 def startListening(self, bind=None): 182 """Open a listening socket and start accepting connections on it""" 183 if bind is None: 184 bind = self.getListeningIP() 185 port = config.safeGetInt("bitmessagesettings", "port") 186 # correct port even if it changed 187 ls = TCPServer(host=bind, port=port) 188 self.listeningSockets[ls.destination] = ls 189 190 def startUDPSocket(self, bind=None): 191 """ 192 Open an UDP socket. Depending on settings, it can either only 193 accept incoming UDP packets, or also be able to send them. 194 """ 195 if bind is None: 196 host = self.getListeningIP() 197 udpSocket = UDPSocket(host=host, announcing=True) 198 else: 199 if bind is False: 200 udpSocket = UDPSocket(announcing=False) 201 else: 202 udpSocket = UDPSocket(host=bind, announcing=True) 203 self.udpSockets[udpSocket.listening.host] = udpSocket 204 205 def startBootstrappers(self): 206 """Run the process of resolving bootstrap hostnames""" 207 proxy_type = config.safeGet( 208 'bitmessagesettings', 'socksproxytype') 209 # A plugins may be added here 210 hostname = None 211 if not proxy_type or proxy_type == 'none': 212 connection_base = TCPConnection 213 elif proxy_type == 'SOCKS5': 214 connection_base = Socks5BMConnection 215 hostname = random.choice([ # nosec B311 216 'quzwelsuziwqgpt2.onion', None 217 ]) 218 elif proxy_type == 'SOCKS4a': 219 connection_base = Socks4aBMConnection # FIXME: I cannot test 220 else: 221 # This should never happen because socksproxytype setting 222 # is handled in bitmessagemain before starting the connectionpool 223 return 224 225 bootstrapper = bootstrap(connection_base) 226 if not hostname: 227 port = random.choice([8080, 8444]) # nosec B311 228 hostname = 'bootstrap%s.bitmessage.org' % port 229 else: 230 port = 8444 231 self.addConnection(bootstrapper(hostname, port)) 232 233 def loop(self): # pylint: disable=too-many-branches,too-many-statements 234 """Main Connectionpool's loop""" 235 # pylint: disable=too-many-locals 236 # defaults to empty loop if outbound connections are maxed 237 spawnConnections = False 238 acceptConnections = True 239 if config.safeGetBoolean( 240 'bitmessagesettings', 'dontconnect'): 241 acceptConnections = False 242 elif config.safeGetBoolean( 243 'bitmessagesettings', 'sendoutgoingconnections'): 244 spawnConnections = True 245 socksproxytype = config.safeGet( 246 'bitmessagesettings', 'socksproxytype', '') 247 onionsocksproxytype = config.safeGet( 248 'bitmessagesettings', 'onionsocksproxytype', '') 249 if ( 250 socksproxytype[:5] == 'SOCKS' 251 and not config.safeGetBoolean( 252 'bitmessagesettings', 'sockslisten') 253 and '.onion' not in config.safeGet( 254 'bitmessagesettings', 'onionhostname', '') 255 ): 256 acceptConnections = False 257 258 # pylint: disable=too-many-nested-blocks 259 if spawnConnections: 260 if not knownnodes.knownNodesActual: 261 self.startBootstrappers() 262 knownnodes.knownNodesActual = True 263 if not self._bootstrapped: 264 self._bootstrapped = True 265 Proxy.proxy = ( 266 config.safeGet( 267 'bitmessagesettings', 'sockshostname'), 268 config.safeGetInt( 269 'bitmessagesettings', 'socksport') 270 ) 271 # TODO AUTH 272 # TODO reset based on GUI settings changes 273 try: 274 if not onionsocksproxytype.startswith("SOCKS"): 275 raise ValueError 276 Proxy.onion_proxy = ( 277 config.safeGet( 278 'network', 'onionsockshostname', None), 279 config.safeGet( 280 'network', 'onionsocksport', None) 281 ) 282 except ValueError: 283 Proxy.onion_proxy = None 284 established = sum( 285 1 for c in list(self.outboundConnections.values()) 286 if (c.connected and c.fullyEstablished)) 287 pending = len(self.outboundConnections) - established 288 if established < config.safeGetInt( 289 'bitmessagesettings', 'maxoutboundconnections'): 290 for i in range( 291 state.maximumNumberOfHalfOpenConnections - pending): 292 try: 293 chosen = self.trustedPeer or chooseConnection( 294 random.choice(self.streams)) # nosec B311 295 except ValueError: 296 continue 297 if chosen in self.outboundConnections: 298 continue 299 if chosen.host in self.inboundConnections: 300 continue 301 # don't connect to self 302 if chosen in state.ownAddresses: 303 continue 304 # don't connect to the hosts from the same 305 # network group, defense against sibyl attacks 306 host_network_group = protocol.network_group( 307 chosen.host) 308 same_group = False 309 for j in list(self.outboundConnections.values()): 310 if host_network_group == j.network_group: 311 same_group = True 312 if chosen.host == j.destination.host: 313 knownnodes.decreaseRating(chosen) 314 break 315 if same_group: 316 continue 317 318 try: 319 if chosen.host.endswith(".onion") and Proxy.onion_proxy: 320 if onionsocksproxytype == "SOCKS5": 321 self.addConnection(Socks5BMConnection(chosen)) 322 elif onionsocksproxytype == "SOCKS4a": 323 self.addConnection(Socks4aBMConnection(chosen)) 324 elif socksproxytype == "SOCKS5": 325 self.addConnection(Socks5BMConnection(chosen)) 326 elif socksproxytype == "SOCKS4a": 327 self.addConnection(Socks4aBMConnection(chosen)) 328 else: 329 self.addConnection(TCPConnection(chosen)) 330 except socket.error as e: 331 if e.errno == errno.ENETUNREACH: 332 continue 333 334 self._lastSpawned = time.time() 335 else: 336 for i in list(self.outboundConnections.values()): 337 # FIXME: rating will be increased after next connection 338 i.handle_close() 339 340 if acceptConnections: 341 if not self.listeningSockets: 342 if config.safeGet('network', 'bind') == '': 343 self.startListening() 344 else: 345 for bind in re.sub( 346 r'[^\w.]+', ' ', 347 config.safeGet('network', 'bind') 348 ).split(): 349 self.startListening(bind) 350 logger.info('Listening for incoming connections.') 351 if not self.udpSockets: 352 if config.safeGet('network', 'bind') == '': 353 self.startUDPSocket() 354 else: 355 for bind in re.sub( 356 r'[^\w.]+', ' ', 357 config.safeGet('network', 'bind') 358 ).split(): 359 self.startUDPSocket(bind) 360 self.startUDPSocket(False) 361 logger.info('Starting UDP socket(s).') 362 else: 363 if self.listeningSockets: 364 for i in list(self.listeningSockets.values()): 365 i.close_reason = "Stopping listening" 366 i.accepting = i.connecting = i.connected = False 367 logger.info('Stopped listening for incoming connections.') 368 if self.udpSockets: 369 for i in list(self.udpSockets.values()): 370 i.close_reason = "Stopping UDP socket" 371 i.accepting = i.connecting = i.connected = False 372 logger.info('Stopped udp sockets.') 373 374 loopTime = float(self._spawnWait) 375 if self._lastSpawned < time.time() - self._spawnWait: 376 loopTime = 2.0 377 asyncore.loop(timeout=loopTime, count=1000) 378 379 reaper = [] 380 for i in self.connections(): 381 minTx = time.time() - 20 382 if i.fullyEstablished: 383 minTx -= 300 - 20 384 if i.lastTx < minTx: 385 if i.fullyEstablished: 386 i.append_write_buf(protocol.CreatePacket('ping')) 387 else: 388 i.close_reason = "Timeout (%is)" % ( 389 time.time() - i.lastTx) 390 i.set_state("close") 391 for i in ( 392 self.connections() 393 + list(self.listeningSockets.values()) + list(self.udpSockets.values()) 394 ): 395 if not (i.accepting or i.connecting or i.connected): 396 reaper.append(i) 397 else: 398 try: 399 if i.state == "close": 400 reaper.append(i) 401 except AttributeError: 402 pass 403 for i in reaper: 404 self.removeConnection(i) 405 406 407 pool = BMConnectionPool()