advanceddispatcher.py.bak
1 """ 2 Improved version of asyncore dispatcher 3 """ 4 import socket 5 import threading 6 import time 7 8 import network.asyncore_pollchoose as asyncore 9 import state 10 from threads import BusyError, nonBlocking 11 12 13 class ProcessingError(Exception): 14 """General class for protocol parser exception, 15 use as a base for others.""" 16 pass 17 18 19 class UnknownStateError(ProcessingError): 20 """Parser points to an unknown (unimplemented) state.""" 21 pass 22 23 24 class AdvancedDispatcher(asyncore.dispatcher): 25 """Improved version of asyncore dispatcher, 26 with buffers and protocol state.""" 27 # pylint: disable=too-many-instance-attributes 28 _buf_len = 131072 # 128kB 29 30 def __init__(self, sock=None): 31 if not hasattr(self, '_map'): 32 asyncore.dispatcher.__init__(self, sock) 33 self.connectedAt = 0 34 self.close_reason = None 35 self.read_buf = bytearray() 36 self.write_buf = bytearray() 37 self.state = "init" 38 self.lastTx = time.time() 39 self.sentBytes = 0 40 self.receivedBytes = 0 41 self.expectBytes = 0 42 self.readLock = threading.RLock() 43 self.writeLock = threading.RLock() 44 self.processingLock = threading.RLock() 45 self.uploadChunk = self.downloadChunk = 0 46 47 def append_write_buf(self, data): 48 """Append binary data to the end of stream write buffer.""" 49 if data: 50 if isinstance(data, list): 51 with self.writeLock: 52 for chunk in data: 53 self.write_buf.extend(chunk) 54 else: 55 with self.writeLock: 56 self.write_buf.extend(data) 57 58 def slice_write_buf(self, length=0): 59 """Cut the beginning of the stream write buffer.""" 60 if length > 0: 61 with self.writeLock: 62 if length >= len(self.write_buf): 63 del self.write_buf[:] 64 else: 65 del self.write_buf[0:length] 66 67 def slice_read_buf(self, length=0): 68 """Cut the beginning of the stream read buffer.""" 69 if length > 0: 70 with self.readLock: 71 if length >= len(self.read_buf): 72 del self.read_buf[:] 73 else: 74 del self.read_buf[0:length] 75 76 def process(self): 77 """Process (parse) data that's in the buffer, 78 as long as there is enough data and the connection is open.""" 79 while self.connected and not state.shutdown: 80 try: 81 with nonBlocking(self.processingLock): 82 if not self.connected or state.shutdown: 83 break 84 if len(self.read_buf) < self.expectBytes: 85 return False 86 try: 87 cmd = getattr(self, "state_" + str(self.state)) 88 except AttributeError: 89 self.logger.error( 90 'Unknown state %s', self.state, exc_info=True) 91 raise UnknownStateError(self.state) 92 if not cmd(): 93 break 94 except BusyError: 95 return False 96 return False 97 98 def set_state(self, state_str, length=0, expectBytes=0): 99 """Set the next processing state.""" 100 self.expectBytes = expectBytes 101 self.slice_read_buf(length) 102 self.state = state_str 103 104 def writable(self): 105 """Is data from the write buffer ready to be sent to the network?""" 106 self.uploadChunk = AdvancedDispatcher._buf_len 107 if asyncore.maxUploadRate > 0: 108 self.uploadChunk = int(asyncore.uploadBucket) 109 self.uploadChunk = min(self.uploadChunk, len(self.write_buf)) 110 return asyncore.dispatcher.writable(self) and ( 111 self.connecting or ( 112 self.connected and self.uploadChunk > 0)) 113 114 def readable(self): 115 """Is the read buffer ready to accept data from the network?""" 116 self.downloadChunk = AdvancedDispatcher._buf_len 117 if asyncore.maxDownloadRate > 0: 118 self.downloadChunk = int(asyncore.downloadBucket) 119 try: 120 if self.expectBytes > 0 and not self.fullyEstablished: 121 self.downloadChunk = min( 122 self.downloadChunk, self.expectBytes - len(self.read_buf)) 123 if self.downloadChunk < 0: 124 self.downloadChunk = 0 125 except AttributeError: 126 pass 127 return asyncore.dispatcher.readable(self) and ( 128 self.connecting or self.accepting or ( 129 self.connected and self.downloadChunk > 0)) 130 131 def handle_read(self): 132 """Append incoming data to the read buffer.""" 133 self.lastTx = time.time() 134 newData = self.recv(self.downloadChunk) 135 self.receivedBytes += len(newData) 136 asyncore.update_received(len(newData)) 137 with self.readLock: 138 self.read_buf.extend(newData) 139 140 def handle_write(self): 141 """Send outgoing data from write buffer.""" 142 self.lastTx = time.time() 143 written = self.send(self.write_buf[0:self.uploadChunk]) 144 asyncore.update_sent(written) 145 self.sentBytes += written 146 self.slice_write_buf(written) 147 148 def handle_connect_event(self): 149 """Callback for connection established event.""" 150 try: 151 asyncore.dispatcher.handle_connect_event(self) 152 except socket.error as e: 153 # pylint: disable=protected-access 154 if e.args[0] not in asyncore._DISCONNECTED: 155 raise 156 157 def handle_connect(self): 158 """Method for handling connection established implementations.""" 159 self.lastTx = time.time() 160 161 def state_close(self): # pylint: disable=no-self-use 162 """Signal to the processing loop to end.""" 163 return False 164 165 def handle_close(self): 166 """Callback for connection being closed, 167 but can also be called directly when you want connection to close.""" 168 with self.readLock: 169 self.read_buf = bytearray() 170 with self.writeLock: 171 self.write_buf = bytearray() 172 self.set_state("close") 173 self.close()