/ src / network / advanceddispatcher.py.bak
advanceddispatcher.py.bak
  1  """
  2  Improved version of asyncore dispatcher
  3  """
  4  import socket
  5  import threading
  6  import time
  7  
  8  import network.asyncore_pollchoose as asyncore
  9  import state
 10  from threads import BusyError, nonBlocking
 11  
 12  
 13  class ProcessingError(Exception):
 14      """General class for protocol parser exception,
 15      use as a base for others."""
 16      pass
 17  
 18  
 19  class UnknownStateError(ProcessingError):
 20      """Parser points to an unknown (unimplemented) state."""
 21      pass
 22  
 23  
 24  class AdvancedDispatcher(asyncore.dispatcher):
 25      """Improved version of asyncore dispatcher,
 26      with buffers and protocol state."""
 27      # pylint: disable=too-many-instance-attributes
 28      _buf_len = 131072  # 128kB
 29  
 30      def __init__(self, sock=None):
 31          if not hasattr(self, '_map'):
 32              asyncore.dispatcher.__init__(self, sock)
 33          self.connectedAt = 0
 34          self.close_reason = None
 35          self.read_buf = bytearray()
 36          self.write_buf = bytearray()
 37          self.state = "init"
 38          self.lastTx = time.time()
 39          self.sentBytes = 0
 40          self.receivedBytes = 0
 41          self.expectBytes = 0
 42          self.readLock = threading.RLock()
 43          self.writeLock = threading.RLock()
 44          self.processingLock = threading.RLock()
 45          self.uploadChunk = self.downloadChunk = 0
 46  
 47      def append_write_buf(self, data):
 48          """Append binary data to the end of stream write buffer."""
 49          if data:
 50              if isinstance(data, list):
 51                  with self.writeLock:
 52                      for chunk in data:
 53                          self.write_buf.extend(chunk)
 54              else:
 55                  with self.writeLock:
 56                      self.write_buf.extend(data)
 57  
 58      def slice_write_buf(self, length=0):
 59          """Cut the beginning of the stream write buffer."""
 60          if length > 0:
 61              with self.writeLock:
 62                  if length >= len(self.write_buf):
 63                      del self.write_buf[:]
 64                  else:
 65                      del self.write_buf[0:length]
 66  
 67      def slice_read_buf(self, length=0):
 68          """Cut the beginning of the stream read buffer."""
 69          if length > 0:
 70              with self.readLock:
 71                  if length >= len(self.read_buf):
 72                      del self.read_buf[:]
 73                  else:
 74                      del self.read_buf[0:length]
 75  
 76      def process(self):
 77          """Process (parse) data that's in the buffer,
 78          as long as there is enough data and the connection is open."""
 79          while self.connected and not state.shutdown:
 80              try:
 81                  with nonBlocking(self.processingLock):
 82                      if not self.connected or state.shutdown:
 83                          break
 84                      if len(self.read_buf) < self.expectBytes:
 85                          return False
 86                      try:
 87                          cmd = getattr(self, "state_" + str(self.state))
 88                      except AttributeError:
 89                          self.logger.error(
 90                              'Unknown state %s', self.state, exc_info=True)
 91                          raise UnknownStateError(self.state)
 92                      if not cmd():
 93                          break
 94              except BusyError:
 95                  return False
 96          return False
 97  
 98      def set_state(self, state_str, length=0, expectBytes=0):
 99          """Set the next processing state."""
100          self.expectBytes = expectBytes
101          self.slice_read_buf(length)
102          self.state = state_str
103  
104      def writable(self):
105          """Is data from the write buffer ready to be sent to the network?"""
106          self.uploadChunk = AdvancedDispatcher._buf_len
107          if asyncore.maxUploadRate > 0:
108              self.uploadChunk = int(asyncore.uploadBucket)
109          self.uploadChunk = min(self.uploadChunk, len(self.write_buf))
110          return asyncore.dispatcher.writable(self) and (
111              self.connecting or (
112                  self.connected and self.uploadChunk > 0))
113  
114      def readable(self):
115          """Is the read buffer ready to accept data from the network?"""
116          self.downloadChunk = AdvancedDispatcher._buf_len
117          if asyncore.maxDownloadRate > 0:
118              self.downloadChunk = int(asyncore.downloadBucket)
119          try:
120              if self.expectBytes > 0 and not self.fullyEstablished:
121                  self.downloadChunk = min(
122                      self.downloadChunk, self.expectBytes - len(self.read_buf))
123                  if self.downloadChunk < 0:
124                      self.downloadChunk = 0
125          except AttributeError:
126              pass
127          return asyncore.dispatcher.readable(self) and (
128              self.connecting or self.accepting or (
129                  self.connected and self.downloadChunk > 0))
130  
131      def handle_read(self):
132          """Append incoming data to the read buffer."""
133          self.lastTx = time.time()
134          newData = self.recv(self.downloadChunk)
135          self.receivedBytes += len(newData)
136          asyncore.update_received(len(newData))
137          with self.readLock:
138              self.read_buf.extend(newData)
139  
140      def handle_write(self):
141          """Send outgoing data from write buffer."""
142          self.lastTx = time.time()
143          written = self.send(self.write_buf[0:self.uploadChunk])
144          asyncore.update_sent(written)
145          self.sentBytes += written
146          self.slice_write_buf(written)
147  
148      def handle_connect_event(self):
149          """Callback for connection established event."""
150          try:
151              asyncore.dispatcher.handle_connect_event(self)
152          except socket.error as e:
153              # pylint: disable=protected-access
154              if e.args[0] not in asyncore._DISCONNECTED:
155                  raise
156  
157      def handle_connect(self):
158          """Method for handling connection established implementations."""
159          self.lastTx = time.time()
160  
161      def state_close(self):  # pylint: disable=no-self-use
162          """Signal to the processing loop to end."""
163          return False
164  
165      def handle_close(self):
166          """Callback for connection being closed,
167          but can also be called directly when you want connection to close."""
168          with self.readLock:
169              self.read_buf = bytearray()
170          with self.writeLock:
171              self.write_buf = bytearray()
172          self.set_state("close")
173          self.close()