/ client_code / storage.py
storage.py
  1  # SPDX-License-Identifier: MIT
  2  #
  3  # Copyright (c) 2021 The Anvil Extras project team members listed at
  4  # https://github.com/anvilistas/anvil-extras/graphs/contributors
  5  #
  6  # This software is published at https://github.com/anvilistas/anvil-extras
  7  
  8  from datetime import date, datetime
  9  
 10  import anvil.js
 11  from anvil.js import ExternalError
 12  from anvil.js import window as _window
 13  
 14  __version__ = "3.1.0"
 15  __all__ = ["local_storage", "indexed_db"]
 16  
 17  try:
 18      _forage = _window.localforage
 19  except AttributeError:
 20      _ForageModule = anvil.js.import_from(
 21          "https://cdn.skypack.dev/pin/localforage@v1.10.0-vSTz1U7CF0tUryZh6xTs/mode=imports,min/optimized/localforage.js"
 22      )
 23      _forage = _ForageModule.default
 24  
 25  _forage.dropInstance()
 26  
 27  
 28  _Proxy = type(_window)
 29  _Object = _window.Object
 30  _NoneType = type(None)
 31  _Array = type(_window.Array())
 32  
 33  _SPECIAL = "$$anvil-extras$$:"
 34  
 35  
 36  def _is_str(key):
 37      if type(key) is not str:
 38          msg = f"Keys must be strings when serialzing to browser Storage. Found {type(key).__name__}"
 39          raise TypeError(msg)
 40      return True
 41  
 42  
 43  def _serialize(obj):
 44      # we won't support subclasses of builtins so just check type is
 45      ob_type = type(obj)
 46      if ob_type in (str, int, float, bool, _NoneType, bytes, _Proxy):
 47          return obj
 48      elif ob_type in (list, tuple, _Array):
 49          return [_serialize(item) for item in obj]
 50      elif ob_type is dict:
 51          return {key: _serialize(val) for key, val in obj.items() if _is_str(key)}
 52      elif ob_type is datetime:
 53          return {_SPECIAL + "datetime": obj.isoformat()}
 54      elif ob_type is date:
 55          return {_SPECIAL + "date": obj.isoformat()}
 56      else:
 57          raise TypeError(f"Cannot serialize an object of type {ob_type.__name__}")
 58  
 59  
 60  _deserializers = {"date": date.fromisoformat, "datetime": datetime.fromisoformat}
 61  
 62  
 63  def _special_deserialize(key, value):
 64      assert key.startswith(_SPECIAL), "not a special key"
 65      key = key[len(_SPECIAL) :]
 66      try:
 67          return _deserializers[key](value)
 68      except KeyError:
 69          raise ValueError(f"unknown special deserialization type {key!r}")
 70  
 71  
 72  def _deserialize(obj):
 73      """convert simple proxy objects (and nested simple proxy objects) to dictionaries"""
 74      ob_type = type(obj)
 75      if ob_type in (list, _Array):
 76          return [_deserialize(item) for item in obj]
 77      elif ob_type is _Proxy and obj.__class__ == _Object:
 78          # Then we're a simple proxy object
 79          # keys are strings so only _deserialize the values
 80          # use _Object.keys to avoid possible name conflict
 81          keys = _Object.keys(obj)
 82          if len(keys) == 1 and keys[0].startswith(_SPECIAL):
 83              key = keys[0]
 84              return _special_deserialize(key, obj[key])
 85          return {key: _deserialize(obj[key]) for key in keys}
 86      else:
 87          # we're either bytes, str, ints, floats, None, bool
 88          return obj
 89  
 90  
 91  def wrap_with_retry(fn):
 92      def wrapper(*args, **kws):
 93          try:
 94              return fn(*args, **kws)
 95          except ExternalError:
 96              try:
 97                  return fn(*args, **kws)
 98              except ExternalError:
 99                  raise
100  
101      return wrapper
102  
103  
104  class RetryStoreWrapper:
105      def __init__(self, store):
106          self._store = store
107  
108      def __getattr__(self, name):
109          maybe_method = getattr(self._store, name)
110          if callable(maybe_method):
111              return wrap_with_retry(maybe_method)
112          return maybe_method
113  
114  
115  class StorageWrapper:
116      _driver = None
117      _stores = None
118  
119      def __new__(cls, store_name):
120          if cls._driver is None:
121              raise NotImplementedError(
122                  "StorageWrapper cannot be initiated without a valid _driver"
123              )
124          if not isinstance(store_name, str):
125              raise TypeError(
126                  f"store_name should be a str, (got {store_name.__class__.__name__})"
127              )
128          known_stores = cls._stores
129          if known_stores is None:
130              # initialize the _stores cache
131              known_stores = cls._stores = {}
132          elif store_name in known_stores:
133              return known_stores[store_name]
134  
135          store = object.__new__(cls)
136          forage_store = _forage.createInstance(
137              {
138                  "storeName": store_name,
139                  "driver": [cls._driver, f"fail{cls._driver}"],
140                  "name": "anvil_extras",
141              }
142          )
143          store._store = RetryStoreWrapper(forage_store)
144          store._name = store_name
145          known_stores[store_name] = store
146          return store
147  
148      def is_available(self):
149          """check if the store object is available and accessible."""
150          # in some browsers localStorageWrapper might not be available
151          if not self._store.supports(self._driver):
152              # we cn't rely on this method - it's just for browser support
153              return False
154          try:
155              self._store.length()
156              # call a method in the store to activate the store
157              return True
158          except Exception:
159              return False
160  
161      def __getitem__(self, key):
162          if key in self._store.keys():
163              return _deserialize(self._store.getItem(key))
164          raise KeyError(key)
165  
166      def __setitem__(self, key, val):
167          self._store.setItem(key, _serialize(val))
168  
169      def __delitem__(self, key):
170          # we can't block here so do a Promise hack
171          _window.Promise(lambda res, rej: self._store.removeItem(key))
172          return None
173  
174      def __contains__(self, key):
175          return key in self._store.keys()
176  
177      def __repr__(self):
178          # we can't print the items like a dictionary since we get a SuspensionError here
179          return f"<{self.__class__.__name__} for {self._name!r} store>"
180  
181      def __iter__(self):
182          # self.keys() suspends and __iter__ can't suspend
183          return StoreIterator(self)
184  
185      def __len__(self):
186          return self._store.length()
187  
188      def keys(self):
189          """returns the keys for the store as an iterator"""
190          return self._store.keys()
191  
192      def items(self):
193          """returns the items for the store as an iterator"""
194          return (
195              (key, _deserialize(self._store.getItem(key))) for key in self._store.keys()
196          )
197  
198      def values(self):
199          """returns the values for the store as an iterator"""
200          return (_deserialize(self._store.getItem(key)) for key in self._store.keys())
201  
202      def store(self, key: str, value):
203          """store a key value pair in the store"""
204          self[key] = value
205  
206      put = store  # backward compatibility
207  
208      def get(self, key: str, default=None):
209          """get a value from the store, returns the default value if the key is not in the store"""
210          try:
211              return self[key]
212          except KeyError:
213              return default
214  
215      def pop(self, key: str, default=None):
216          """remove specified key and return the corresponding value.\n\nIf key is not found, default is returned"""
217          try:
218              return self.get(key, default)
219          finally:
220              del self[key]
221  
222      def clear(self):
223          """clear all items from the store"""
224          self._store.clear()
225  
226      def update(self, other, **kws):
227          """update the store item with key/value pairs from other"""
228          other = dict(other, **kws)
229          for key, value in other.items():
230              self[key] = value
231  
232      @classmethod
233      def create_store(cls, store_name: str):
234          """
235          Create a new storage object inside the browser's IndexedDB or localStorage.
236          e.g. todo_store = indexed_db.create_store('todos')
237          message_store = indexed_db.create_store('messages')
238          """
239          return cls(store_name)
240  
241  
242  class StoreIterator:
243      def __init__(self, store):
244          self._store = store
245          self._keys = None
246  
247      def __iter__(self):
248          return self
249  
250      def __next__(self):
251          if self._keys is None:
252              self._keys = iter(self._store.keys())
253          return next(self._keys)
254  
255  
256  # The following defines a forage driver whose job is to
257  # try to access the appropriate browser store,
258  # catch the exception and the throw a nicer exception
259  def _fail_access(fn):
260      def _do_fail(*args):
261          # see what happens when we try to access the storage object
262          msg = "Browser storage object is not available."
263          try:
264              fn()
265          except Exception as e:
266              msg += f"\nWhen trying to access the storage object got - {e}"
267          raise RuntimeError(msg)
268  
269      return _do_fail
270  
271  
272  def _fail_db(*args):
273      def check_db(res, rej):
274          req = _window.indexedDB.open("anvil_extras")
275          req.onerror = lambda e: rej(req.error.toString())
276          req.onsuccess = lambda r: res(None)
277  
278      # this is asyncronous so use the Promise api
279      anvil.js.await_promise(_window.Promise(check_db))
280  
281  
282  def _fail_ls(*args):
283      _window.get("localStorage")
284  
285  
286  def _defineFailDriver(driver: str):
287      fail_fn = _fail_db if driver == _forage.INDEXEDDB else _fail_ls
288      fail_callback = _fail_access(fail_fn)
289      _forage.defineDriver(
290          {
291              "_driver": f"fail{driver}",
292              "_initStorage": lambda options: None,
293              "clear": fail_callback,
294              "getItem": fail_callback,
295              "iterate": fail_callback,
296              "key": fail_callback,
297              "keys": fail_callback,
298              "length": fail_callback,
299              "removeItem": fail_callback,
300              "setItem": fail_callback,
301          }
302      )
303  
304  
305  _defineFailDriver(_forage.INDEXEDDB)
306  _defineFailDriver(_forage.LOCALSTORAGE)
307  
308  
309  class IndexedDBWrapper(StorageWrapper):
310      _driver = _forage.INDEXEDDB
311  
312  
313  indexed_db = IndexedDBWrapper.create_store("default")
314  
315  
316  class LocalStorageWrapper(StorageWrapper):
317      _driver = _forage.LOCALSTORAGE
318  
319  
320  local_storage = LocalStorageWrapper.create_store("default")
321  
322  
323  def __getattr__(name):
324      if name == "session_storage":
325          raise Exception("deprecated - session_storage is no longer supported")
326      raise AttributeError(name)
327  
328  
329  if __name__ == "__main__":
330      for _ in local_storage, indexed_db:
331          print(_)
332          _["foo"] = "bar"
333          assert _["foo"] == "bar"
334          del _["foo"]
335          assert _.get("foo", "sentinel") == "sentinel"
336          try:
337              _["foo"]
338          except KeyError:
339              pass
340          else:
341              raise AssertionError
342          _.put("foo", 1)
343          assert _.pop("foo") == 1
344          x = [{"a": "b"}, "foo"]
345          _["x"] = x
346          assert _["x"] == x == _.get("x") == _.pop("x")
347          _["foo"] = None
348          _["eggs"] = None
349          _.update({"foo": "bar"}, eggs="spam", x=1)
350          for i in _:  # shouldn't fail
351              pass
352          assert len(list(_.keys())) == 3 and _["eggs"] == "spam"
353          assert list(_) == list(_.keys())
354  
355          date_objs = [datetime.now(), datetime.now().astimezone(), date.today()]
356          _["d"] = date_objs
357          assert _["d"] == date_objs
358          try:
359              _["foo"] = slice(1, 2, 3)
360          except TypeError:
361              pass
362          else:
363              raise AssertionError
364  
365          _.clear()
366          assert len(_) == 0
367          print("===== Tests Passed =====")