connectionpool.py.bak
1 """ 2 `BMConnectionPool` class definition 3 """ 4 import errno 5 import logging 6 import re 7 import socket 8 import sys 9 import time 10 import random 11 12 import asyncore_pollchoose as asyncore 13 import knownnodes 14 import protocol 15 import state 16 from bmconfigparser import config 17 from connectionchooser import chooseConnection 18 from node import Peer 19 from proxy import Proxy 20 from tcp import ( 21 bootstrap, Socks4aBMConnection, Socks5BMConnection, 22 TCPConnection, TCPServer) 23 from udp import UDPSocket 24 25 logger = logging.getLogger('default') 26 27 28 class BMConnectionPool(object): 29 """Pool of all existing connections""" 30 # pylint: disable=too-many-instance-attributes 31 trustedPeer = None 32 """ 33 If the trustedpeer option is specified in keys.dat then this will 34 contain a Peer which will be connected to instead of using the 35 addresses advertised by other peers. 36 37 The expected use case is where the user has a trusted server where 38 they run a Bitmessage daemon permanently. If they then run a second 39 instance of the client on a local machine periodically when they want 40 to check for messages it will sync with the network a lot faster 41 without compromising security. 42 """ 43 44 def __init__(self): 45 asyncore.set_rates( 46 config.safeGetInt( 47 "bitmessagesettings", "maxdownloadrate"), 48 config.safeGetInt( 49 "bitmessagesettings", "maxuploadrate") 50 ) 51 self.outboundConnections = {} 52 self.inboundConnections = {} 53 self.listeningSockets = {} 54 self.udpSockets = {} 55 self.streams = [] 56 self._lastSpawned = 0 57 self._spawnWait = 2 58 self._bootstrapped = False 59 60 trustedPeer = config.safeGet( 61 'bitmessagesettings', 'trustedpeer') 62 try: 63 if trustedPeer: 64 host, port = trustedPeer.split(':') 65 self.trustedPeer = Peer(host, int(port)) 66 except ValueError: 67 sys.exit( 68 'Bad trustedpeer config setting! It should be set as' 69 ' trustedpeer=<hostname>:<portnumber>' 70 ) 71 72 def __len__(self): 73 return len(self.outboundConnections) + len(self.inboundConnections) 74 75 def connections(self): 76 """ 77 Shortcut for combined list of connections from 78 `inboundConnections` and `outboundConnections` dicts 79 """ 80 return self.inboundConnections.values() + self.outboundConnections.values() 81 82 def establishedConnections(self): 83 """Shortcut for list of connections having fullyEstablished == True""" 84 return [ 85 x for x in self.connections() if x.fullyEstablished] 86 87 def connectToStream(self, streamNumber): 88 """Connect to a bitmessage stream""" 89 self.streams.append(streamNumber) 90 91 def getConnectionByAddr(self, addr): 92 """ 93 Return an (existing) connection object based on a `Peer` object 94 (IP and port) 95 """ 96 try: 97 return self.inboundConnections[addr] 98 except KeyError: 99 pass 100 try: 101 return self.inboundConnections[addr.host] 102 except (KeyError, AttributeError): 103 pass 104 try: 105 return self.outboundConnections[addr] 106 except KeyError: 107 pass 108 try: 109 return self.udpSockets[addr.host] 110 except (KeyError, AttributeError): 111 pass 112 raise KeyError 113 114 def isAlreadyConnected(self, nodeid): 115 """Check if we're already connected to this peer""" 116 for i in self.connections(): 117 try: 118 if nodeid == i.nodeid: 119 return True 120 except AttributeError: 121 pass 122 return False 123 124 def addConnection(self, connection): 125 """Add a connection object to our internal dict""" 126 if isinstance(connection, UDPSocket): 127 return 128 if connection.isOutbound: 129 self.outboundConnections[connection.destination] = connection 130 else: 131 if connection.destination.host in self.inboundConnections: 132 self.inboundConnections[connection.destination] = connection 133 else: 134 self.inboundConnections[connection.destination.host] = \ 135 connection 136 137 def removeConnection(self, connection): 138 """Remove a connection from our internal dict""" 139 if isinstance(connection, UDPSocket): 140 del self.udpSockets[connection.listening.host] 141 elif isinstance(connection, TCPServer): 142 del self.listeningSockets[Peer( 143 connection.destination.host, connection.destination.port)] 144 elif connection.isOutbound: 145 try: 146 del self.outboundConnections[connection.destination] 147 except KeyError: 148 pass 149 else: 150 try: 151 del self.inboundConnections[connection.destination] 152 except KeyError: 153 try: 154 del self.inboundConnections[connection.destination.host] 155 except KeyError: 156 pass 157 connection.handle_close() 158 159 @staticmethod 160 def getListeningIP(): 161 """What IP are we supposed to be listening on?""" 162 if config.safeGet( 163 "bitmessagesettings", "onionhostname", "").endswith(".onion"): 164 host = config.safeGet( 165 "bitmessagesettings", "onionbindip") 166 else: 167 host = '127.0.0.1' 168 if ( 169 config.safeGetBoolean("bitmessagesettings", "sockslisten") 170 or config.safeGet("bitmessagesettings", "socksproxytype") 171 == "none" 172 ): 173 # python doesn't like bind + INADDR_ANY? 174 # host = socket.INADDR_ANY 175 host = config.get("network", "bind") 176 return host 177 178 def startListening(self, bind=None): 179 """Open a listening socket and start accepting connections on it""" 180 if bind is None: 181 bind = self.getListeningIP() 182 port = config.safeGetInt("bitmessagesettings", "port") 183 # correct port even if it changed 184 ls = TCPServer(host=bind, port=port) 185 self.listeningSockets[ls.destination] = ls 186 187 def startUDPSocket(self, bind=None): 188 """ 189 Open an UDP socket. Depending on settings, it can either only 190 accept incoming UDP packets, or also be able to send them. 191 """ 192 if bind is None: 193 host = self.getListeningIP() 194 udpSocket = UDPSocket(host=host, announcing=True) 195 else: 196 if bind is False: 197 udpSocket = UDPSocket(announcing=False) 198 else: 199 udpSocket = UDPSocket(host=bind, announcing=True) 200 self.udpSockets[udpSocket.listening.host] = udpSocket 201 202 def startBootstrappers(self): 203 """Run the process of resolving bootstrap hostnames""" 204 proxy_type = config.safeGet( 205 'bitmessagesettings', 'socksproxytype') 206 # A plugins may be added here 207 hostname = None 208 if not proxy_type or proxy_type == 'none': 209 connection_base = TCPConnection 210 elif proxy_type == 'SOCKS5': 211 connection_base = Socks5BMConnection 212 hostname = random.choice([ # nosec B311 213 'quzwelsuziwqgpt2.onion', None 214 ]) 215 elif proxy_type == 'SOCKS4a': 216 connection_base = Socks4aBMConnection # FIXME: I cannot test 217 else: 218 # This should never happen because socksproxytype setting 219 # is handled in bitmessagemain before starting the connectionpool 220 return 221 222 bootstrapper = bootstrap(connection_base) 223 if not hostname: 224 port = random.choice([8080, 8444]) # nosec B311 225 hostname = 'bootstrap%s.bitmessage.org' % port 226 else: 227 port = 8444 228 self.addConnection(bootstrapper(hostname, port)) 229 230 def loop(self): # pylint: disable=too-many-branches,too-many-statements 231 """Main Connectionpool's loop""" 232 # pylint: disable=too-many-locals 233 # defaults to empty loop if outbound connections are maxed 234 spawnConnections = False 235 acceptConnections = True 236 if config.safeGetBoolean( 237 'bitmessagesettings', 'dontconnect'): 238 acceptConnections = False 239 elif config.safeGetBoolean( 240 'bitmessagesettings', 'sendoutgoingconnections'): 241 spawnConnections = True 242 socksproxytype = config.safeGet( 243 'bitmessagesettings', 'socksproxytype', '') 244 onionsocksproxytype = config.safeGet( 245 'bitmessagesettings', 'onionsocksproxytype', '') 246 if ( 247 socksproxytype[:5] == 'SOCKS' 248 and not config.safeGetBoolean( 249 'bitmessagesettings', 'sockslisten') 250 and '.onion' not in config.safeGet( 251 'bitmessagesettings', 'onionhostname', '') 252 ): 253 acceptConnections = False 254 255 # pylint: disable=too-many-nested-blocks 256 if spawnConnections: 257 if not knownnodes.knownNodesActual: 258 self.startBootstrappers() 259 knownnodes.knownNodesActual = True 260 if not self._bootstrapped: 261 self._bootstrapped = True 262 Proxy.proxy = ( 263 config.safeGet( 264 'bitmessagesettings', 'sockshostname'), 265 config.safeGetInt( 266 'bitmessagesettings', 'socksport') 267 ) 268 # TODO AUTH 269 # TODO reset based on GUI settings changes 270 try: 271 if not onionsocksproxytype.startswith("SOCKS"): 272 raise ValueError 273 Proxy.onion_proxy = ( 274 config.safeGet( 275 'network', 'onionsockshostname', None), 276 config.safeGet( 277 'network', 'onionsocksport', None) 278 ) 279 except ValueError: 280 Proxy.onion_proxy = None 281 established = sum( 282 1 for c in self.outboundConnections.values() 283 if (c.connected and c.fullyEstablished)) 284 pending = len(self.outboundConnections) - established 285 if established < config.safeGetInt( 286 'bitmessagesettings', 'maxoutboundconnections'): 287 for i in range( 288 state.maximumNumberOfHalfOpenConnections - pending): 289 try: 290 chosen = self.trustedPeer or chooseConnection( 291 random.choice(self.streams)) # nosec B311 292 except ValueError: 293 continue 294 if chosen in self.outboundConnections: 295 continue 296 if chosen.host in self.inboundConnections: 297 continue 298 # don't connect to self 299 if chosen in state.ownAddresses: 300 continue 301 # don't connect to the hosts from the same 302 # network group, defense against sibyl attacks 303 host_network_group = protocol.network_group( 304 chosen.host) 305 same_group = False 306 for j in self.outboundConnections.values(): 307 if host_network_group == j.network_group: 308 same_group = True 309 if chosen.host == j.destination.host: 310 knownnodes.decreaseRating(chosen) 311 break 312 if same_group: 313 continue 314 315 try: 316 if chosen.host.endswith(".onion") and Proxy.onion_proxy: 317 if onionsocksproxytype == "SOCKS5": 318 self.addConnection(Socks5BMConnection(chosen)) 319 elif onionsocksproxytype == "SOCKS4a": 320 self.addConnection(Socks4aBMConnection(chosen)) 321 elif socksproxytype == "SOCKS5": 322 self.addConnection(Socks5BMConnection(chosen)) 323 elif socksproxytype == "SOCKS4a": 324 self.addConnection(Socks4aBMConnection(chosen)) 325 else: 326 self.addConnection(TCPConnection(chosen)) 327 except socket.error as e: 328 if e.errno == errno.ENETUNREACH: 329 continue 330 331 self._lastSpawned = time.time() 332 else: 333 for i in self.outboundConnections.values(): 334 # FIXME: rating will be increased after next connection 335 i.handle_close() 336 337 if acceptConnections: 338 if not self.listeningSockets: 339 if config.safeGet('network', 'bind') == '': 340 self.startListening() 341 else: 342 for bind in re.sub( 343 r'[^\w.]+', ' ', 344 config.safeGet('network', 'bind') 345 ).split(): 346 self.startListening(bind) 347 logger.info('Listening for incoming connections.') 348 if not self.udpSockets: 349 if config.safeGet('network', 'bind') == '': 350 self.startUDPSocket() 351 else: 352 for bind in re.sub( 353 r'[^\w.]+', ' ', 354 config.safeGet('network', 'bind') 355 ).split(): 356 self.startUDPSocket(bind) 357 self.startUDPSocket(False) 358 logger.info('Starting UDP socket(s).') 359 else: 360 if self.listeningSockets: 361 for i in self.listeningSockets.values(): 362 i.close_reason = "Stopping listening" 363 i.accepting = i.connecting = i.connected = False 364 logger.info('Stopped listening for incoming connections.') 365 if self.udpSockets: 366 for i in self.udpSockets.values(): 367 i.close_reason = "Stopping UDP socket" 368 i.accepting = i.connecting = i.connected = False 369 logger.info('Stopped udp sockets.') 370 371 loopTime = float(self._spawnWait) 372 if self._lastSpawned < time.time() - self._spawnWait: 373 loopTime = 2.0 374 asyncore.loop(timeout=loopTime, count=1000) 375 376 reaper = [] 377 for i in self.connections(): 378 minTx = time.time() - 20 379 if i.fullyEstablished: 380 minTx -= 300 - 20 381 if i.lastTx < minTx: 382 if i.fullyEstablished: 383 i.append_write_buf(protocol.CreatePacket('ping')) 384 else: 385 i.close_reason = "Timeout (%is)" % ( 386 time.time() - i.lastTx) 387 i.set_state("close") 388 for i in ( 389 self.connections() 390 + self.listeningSockets.values() + self.udpSockets.values() 391 ): 392 if not (i.accepting or i.connecting or i.connected): 393 reaper.append(i) 394 else: 395 try: 396 if i.state == "close": 397 reaper.append(i) 398 except AttributeError: 399 pass 400 for i in reaper: 401 self.removeConnection(i) 402 403 404 pool = BMConnectionPool()