/ src / helper_sql.py
helper_sql.py
  1  """
  2  SQL-related functions defined here are really pass the queries (or other SQL
  3  commands) to :class:`.threads.sqlThread` through `sqlSubmitQueue` queue and check
  4  or return the result got from `sqlReturnQueue`.
  5  
  6  This is done that way because :mod:`sqlite3` is so thread-unsafe that they
  7  won't even let you call it from different threads using your own locks.
  8  SQLite objects can only be used from one thread.
  9  
 10  .. note:: This actually only applies for certain deployments, and/or
 11     really old version of sqlite. I haven't actually seen it anywhere.
 12     Current versions do have support for threading and multiprocessing.
 13     I don't see an urgent reason to refactor this, but it should be noted
 14     in the comment that the problem is mostly not valid. Sadly, last time
 15     I checked, there is no reliable way to check whether the library is
 16     or isn't thread-safe.
 17  """
 18  
 19  import threading
 20  
 21  from six.moves import queue
 22  
 23  
 24  sqlSubmitQueue = queue.Queue()
 25  """the queue for SQL"""
 26  sqlReturnQueue = queue.Queue()
 27  """the queue for results"""
 28  sql_lock = threading.Lock()
 29  """ lock to prevent queueing a new request until the previous response
 30      is available """
 31  sql_available = False
 32  """set to True by `.threads.sqlThread` immediately upon start"""
 33  sql_ready = threading.Event()
 34  """set by `.threads.sqlThread` when ready for processing (after
 35     initialization is done)"""
 36  sql_timeout = 60
 37  """timeout for waiting for sql_ready in seconds"""
 38  
 39  
 40  def sqlQuery(sql_statement, *args):
 41      """
 42      Query sqlite and return results
 43  
 44      :param str sql_statement: SQL statement string
 45      :param list args: SQL query parameters
 46      :rtype: list
 47      """
 48      assert sql_available
 49      sql_lock.acquire()
 50      sqlSubmitQueue.put(sql_statement)
 51  
 52      if args == ():
 53          sqlSubmitQueue.put('')
 54      elif isinstance(args[0], (list, tuple)):
 55          sqlSubmitQueue.put(args[0])
 56      else:
 57          sqlSubmitQueue.put(args)
 58      queryreturn, _ = sqlReturnQueue.get()
 59      sql_lock.release()
 60  
 61      return queryreturn
 62  
 63  
 64  def sqlExecuteChunked(sql_statement, idCount, *args):
 65      """Execute chunked SQL statement to avoid argument limit"""
 66      # SQLITE_MAX_VARIABLE_NUMBER,
 67      # unfortunately getting/setting isn't exposed to python
 68      assert sql_available
 69      sqlExecuteChunked.chunkSize = 999
 70  
 71      if idCount == 0 or idCount > len(args):
 72          return 0
 73  
 74      total_row_count = 0
 75      with sql_lock:
 76          for i in range(
 77                  len(args) - idCount, len(args),
 78                  sqlExecuteChunked.chunkSize - (len(args) - idCount)
 79          ):
 80              chunk_slice = args[
 81                  i:i + sqlExecuteChunked.chunkSize - (len(args) - idCount)
 82              ]
 83              sqlSubmitQueue.put(
 84                  sql_statement.format(','.join('?' * len(chunk_slice)))
 85              )
 86              # first static args, and then iterative chunk
 87              sqlSubmitQueue.put(
 88                  args[0:len(args) - idCount] + chunk_slice
 89              )
 90              ret_val = sqlReturnQueue.get()
 91              total_row_count += ret_val[1]
 92          sqlSubmitQueue.put('commit')
 93      return total_row_count
 94  
 95  
 96  def sqlExecute(sql_statement, *args):
 97      """Execute SQL statement (optionally with arguments)"""
 98      assert sql_available
 99      sql_lock.acquire()
100      sqlSubmitQueue.put(sql_statement)
101  
102      if args == ():
103          sqlSubmitQueue.put('')
104      else:
105          sqlSubmitQueue.put(args)
106      _, rowcount = sqlReturnQueue.get()
107      sqlSubmitQueue.put('commit')
108      sql_lock.release()
109      return rowcount
110  
111  
112  def sqlExecuteScript(sql_statement):
113      """Execute SQL script statement"""
114  
115      statements = sql_statement.split(";")
116      with SqlBulkExecute() as sql:
117          for q in statements:
118              sql.execute("{}".format(q))
119  
120  
121  def sqlStoredProcedure(procName):
122      """Schedule procName to be run"""
123      assert sql_available
124      sql_lock.acquire()
125      sqlSubmitQueue.put(procName)
126      if procName == "exit":
127          sqlSubmitQueue.task_done()
128          sqlSubmitQueue.put("terminate")
129      sql_lock.release()
130  
131  
132  class SqlBulkExecute(object):
133      """This is used when you have to execute the same statement in a cycle."""
134  
135      def __enter__(self):
136          sql_lock.acquire()
137          return self
138  
139      def __exit__(self, exc_type, value, traceback):
140          sqlSubmitQueue.put('commit')
141          sql_lock.release()
142  
143      @staticmethod
144      def execute(sql_statement, *args):
145          """Used for statements that do not return results."""
146          assert sql_available
147          sqlSubmitQueue.put(sql_statement)
148  
149          if args == ():
150              sqlSubmitQueue.put('')
151          else:
152              sqlSubmitQueue.put(args)
153          sqlReturnQueue.get()