/ client_code / persistence.py
persistence.py
  1  # SPDX-License-Identifier: MIT
  2  #
  3  # Copyright (c) 2021 The Anvil Extras project team members listed at
  4  # https://github.com/anvilistas/anvil-extras/graphs/contributors
  5  #
  6  # This software is published at https://github.com/anvilistas/anvil-extras
  7  import anvil.server
  8  
  9  from .utils._warnings import warn as _warn
 10  
 11  __version__ = "3.1.0"
 12  
 13  
 14  def _snakify(text):
 15      return "".join("_" + c.lower() if c.isupper() else c for c in text).lstrip("_")
 16  
 17  
 18  class LinkedAttribute:
 19      """A descriptor class for adding linked table items as attributes
 20  
 21      For a class backed by a data tables row, this class is used to dyamically add
 22      linked table items as attributes to the parent object.
 23      """
 24  
 25      def __init__(self, linked_column, linked_attr):
 26          """
 27          Parameters
 28          ----------
 29          linked_column: str
 30              The name of the column in the row object which links to another table
 31          linked_attr: str
 32              The name of the column in the linked table which contains the required
 33              value
 34          """
 35          _warn(
 36              "persistence.LinkedAttribute",
 37              "LinkedAttribute is deprecated and will be removed in future versions. Use a LinkedClass instead.",
 38              "DEPRECATION_WARNING",
 39          )
 40          self._linked_column = linked_column
 41          self._linked_attr = linked_attr
 42  
 43      def __set_name__(self, owner, name):
 44          if name == self._linked_column:
 45              raise ValueError(
 46                  "Attribute name cannot be the same as the linked column name"
 47              )
 48          self._name = name
 49  
 50      def __get__(self, instance, objtype=None):
 51          if instance is None:
 52              return self
 53  
 54          if instance._delta:
 55              return instance._delta[self._name]
 56  
 57          if not instance._store:
 58              return None
 59  
 60          if not instance._store[self._linked_column]:
 61              return None
 62  
 63          return instance._store[self._linked_column][self._linked_attr]
 64  
 65      def __set__(self, instance, value):
 66          instance._delta[self._name] = value
 67  
 68  
 69  class LinkedClass:
 70      "A descriptor class for adding objects based on linked tables as attributes"
 71  
 72      def __init__(self, cls, *args, linked_column=None, **kwargs):
 73          self._linked_column = linked_column
 74          self._cls = cls
 75          self._args = args or []
 76          self._kwargs = kwargs or {}
 77  
 78      def __get__(self, instance, objtype=None):
 79          if instance is None:
 80              return self
 81  
 82          store = (
 83              instance._delta
 84              if instance._delta and self._linked_column in instance._delta
 85              else instance._store
 86          )
 87          if not store or store[self._linked_column] is None:
 88              return None
 89  
 90          return self._cls(store[self._linked_column], *self._args, **self._kwargs)
 91  
 92      def __set__(self, instance, value):
 93          value = self._cls(value._store) if value is not None else None
 94          instance._delta[self._linked_column] = value
 95  
 96  
 97  class PersistedClass:
 98      key = None
 99  
100      @classmethod
101      def __init_subclass__(cls, **kwargs):
102          super().__init_subclass__(**kwargs)
103          cls._snake_name = _snakify(cls.__name__)
104          cls._cache = {}
105          for attr, value in cls.__dict__.items():
106              try:
107                  is_persisted_class = issubclass(value, PersistedClass)
108              except TypeError:
109                  is_persisted_class = False
110  
111              if is_persisted_class:
112                  setattr(cls, attr, LinkedClass(cls=value, linked_column=attr))
113  
114      @classmethod
115      def search(cls, lazy=False, *args, **kwargs):
116          rows = anvil.server.call(f"search_{cls._snake_name}", *args, **kwargs)
117          if lazy:
118              return (cls(store=row) for row in rows)
119  
120          result = [cls(store=row) for row in rows]
121          cls._cache.clear()
122          for obj in result:
123              cls._cache[getattr(obj, cls.key)] = obj
124          return result
125  
126      @classmethod
127      def get(cls, key):
128          try:
129              return cls._cache[key]
130          except KeyError:
131              row = anvil.server.call(f"get_{cls._snake_name}", **{cls.key: key})
132              obj = cls(store=row)
133              cls._cache[key] = obj
134              return obj
135  
136      def __init__(self, store=None, *args, **kwargs):
137          self._store = store or {}
138          self._delta = kwargs
139  
140      def __getattr__(self, key):
141          if self._delta and key in self._delta:
142              return self._delta[key]
143  
144          # if the _store raises a KeyError
145          # we aren't yet backed by a row object
146          # so return None
147          try:
148              return self._store[key]
149          except KeyError:
150              return None
151  
152      def __getitem__(self, key):
153          return getattr(self, key)
154  
155      def __setattr__(self, key, value):
156          is_private = key.startswith("_")
157          is_descriptor = hasattr(self.__class__, key) and hasattr(
158              getattr(self.__class__, key), "__set__"
159          )
160          if is_private or is_descriptor:
161              object.__setattr__(self, key, value)
162          else:
163              self._delta[key] = value
164  
165      def __setitem__(self, key, value):
166          setattr(self, key, value)
167  
168      def __eq__(self, other):
169          if not isinstance(other, type(self)):
170              return NotImplemented
171          return getattr(self, self.key) == getattr(other, other.key)
172  
173      def add(self, *args, **kwargs):
174          self._store = anvil.server.call(
175              f"add_{self._snake_name}", _serialise_delta(self._delta), *args, **kwargs
176          )
177          self._delta.clear()
178  
179      def update(self, *args, **kwargs):
180          anvil.server.call(
181              f"update_{self._snake_name}",
182              self._store,
183              _serialise_delta(self._delta),
184              *args,
185              **kwargs,
186          )
187          self._delta.clear()
188  
189      def delete(self, *args, **kwargs):
190          anvil.server.call(f"delete_{self._snake_name}", self._store, *args, **kwargs)
191          self._delta.clear()
192  
193      def reset(self):
194          self._delta.clear()
195  
196  
197  def _serialise_delta(delta):
198      return {
199          key: value._store if isinstance(value, PersistedClass) else value
200          for key, value in delta.items()
201      }
202  
203  
204  def persisted_class(cls):
205      """A decorator for a class with a persistence mechanism"""
206      return type(cls.__name__, (cls, PersistedClass), cls.__dict__.copy())