helper_sql.py
1 """ 2 SQL-related functions defined here are really pass the queries (or other SQL 3 commands) to :class:`.threads.sqlThread` through `sqlSubmitQueue` queue and check 4 or return the result got from `sqlReturnQueue`. 5 6 This is done that way because :mod:`sqlite3` is so thread-unsafe that they 7 won't even let you call it from different threads using your own locks. 8 SQLite objects can only be used from one thread. 9 10 .. note:: This actually only applies for certain deployments, and/or 11 really old version of sqlite. I haven't actually seen it anywhere. 12 Current versions do have support for threading and multiprocessing. 13 I don't see an urgent reason to refactor this, but it should be noted 14 in the comment that the problem is mostly not valid. Sadly, last time 15 I checked, there is no reliable way to check whether the library is 16 or isn't thread-safe. 17 """ 18 19 import threading 20 21 from six.moves import queue 22 23 24 sqlSubmitQueue = queue.Queue() 25 """the queue for SQL""" 26 sqlReturnQueue = queue.Queue() 27 """the queue for results""" 28 sql_lock = threading.Lock() 29 """ lock to prevent queueing a new request until the previous response 30 is available """ 31 sql_available = False 32 """set to True by `.threads.sqlThread` immediately upon start""" 33 sql_ready = threading.Event() 34 """set by `.threads.sqlThread` when ready for processing (after 35 initialization is done)""" 36 sql_timeout = 60 37 """timeout for waiting for sql_ready in seconds""" 38 39 40 def sqlQuery(sql_statement, *args): 41 """ 42 Query sqlite and return results 43 44 :param str sql_statement: SQL statement string 45 :param list args: SQL query parameters 46 :rtype: list 47 """ 48 assert sql_available 49 sql_lock.acquire() 50 sqlSubmitQueue.put(sql_statement) 51 52 if args == (): 53 sqlSubmitQueue.put('') 54 elif isinstance(args[0], (list, tuple)): 55 sqlSubmitQueue.put(args[0]) 56 else: 57 sqlSubmitQueue.put(args) 58 queryreturn, _ = sqlReturnQueue.get() 59 sql_lock.release() 60 61 return queryreturn 62 63 64 def sqlExecuteChunked(sql_statement, idCount, *args): 65 """Execute chunked SQL statement to avoid argument limit""" 66 # SQLITE_MAX_VARIABLE_NUMBER, 67 # unfortunately getting/setting isn't exposed to python 68 assert sql_available 69 sqlExecuteChunked.chunkSize = 999 70 71 if idCount == 0 or idCount > len(args): 72 return 0 73 74 total_row_count = 0 75 with sql_lock: 76 for i in range( 77 len(args) - idCount, len(args), 78 sqlExecuteChunked.chunkSize - (len(args) - idCount) 79 ): 80 chunk_slice = args[ 81 i:i + sqlExecuteChunked.chunkSize - (len(args) - idCount) 82 ] 83 sqlSubmitQueue.put( 84 sql_statement.format(','.join('?' * len(chunk_slice))) 85 ) 86 # first static args, and then iterative chunk 87 sqlSubmitQueue.put( 88 args[0:len(args) - idCount] + chunk_slice 89 ) 90 ret_val = sqlReturnQueue.get() 91 total_row_count += ret_val[1] 92 sqlSubmitQueue.put('commit') 93 return total_row_count 94 95 96 def sqlExecute(sql_statement, *args): 97 """Execute SQL statement (optionally with arguments)""" 98 assert sql_available 99 sql_lock.acquire() 100 sqlSubmitQueue.put(sql_statement) 101 102 if args == (): 103 sqlSubmitQueue.put('') 104 else: 105 sqlSubmitQueue.put(args) 106 _, rowcount = sqlReturnQueue.get() 107 sqlSubmitQueue.put('commit') 108 sql_lock.release() 109 return rowcount 110 111 112 def sqlExecuteScript(sql_statement): 113 """Execute SQL script statement""" 114 115 statements = sql_statement.split(";") 116 with SqlBulkExecute() as sql: 117 for q in statements: 118 sql.execute("{}".format(q)) 119 120 121 def sqlStoredProcedure(procName): 122 """Schedule procName to be run""" 123 assert sql_available 124 sql_lock.acquire() 125 sqlSubmitQueue.put(procName) 126 if procName == "exit": 127 sqlSubmitQueue.task_done() 128 sqlSubmitQueue.put("terminate") 129 sql_lock.release() 130 131 132 class SqlBulkExecute(object): 133 """This is used when you have to execute the same statement in a cycle.""" 134 135 def __enter__(self): 136 sql_lock.acquire() 137 return self 138 139 def __exit__(self, exc_type, value, traceback): 140 sqlSubmitQueue.put('commit') 141 sql_lock.release() 142 143 @staticmethod 144 def execute(sql_statement, *args): 145 """Used for statements that do not return results.""" 146 assert sql_available 147 sqlSubmitQueue.put(sql_statement) 148 149 if args == (): 150 sqlSubmitQueue.put('') 151 else: 152 sqlSubmitQueue.put(args) 153 sqlReturnQueue.get()