/ client_code / persistence.py
persistence.py
1 # SPDX-License-Identifier: MIT 2 # 3 # Copyright (c) 2021 The Anvil Extras project team members listed at 4 # https://github.com/anvilistas/anvil-extras/graphs/contributors 5 # 6 # This software is published at https://github.com/anvilistas/anvil-extras 7 import anvil.server 8 9 from .utils._warnings import warn as _warn 10 11 __version__ = "3.1.0" 12 13 14 def _snakify(text): 15 return "".join("_" + c.lower() if c.isupper() else c for c in text).lstrip("_") 16 17 18 class LinkedAttribute: 19 """A descriptor class for adding linked table items as attributes 20 21 For a class backed by a data tables row, this class is used to dyamically add 22 linked table items as attributes to the parent object. 23 """ 24 25 def __init__(self, linked_column, linked_attr): 26 """ 27 Parameters 28 ---------- 29 linked_column: str 30 The name of the column in the row object which links to another table 31 linked_attr: str 32 The name of the column in the linked table which contains the required 33 value 34 """ 35 _warn( 36 "persistence.LinkedAttribute", 37 "LinkedAttribute is deprecated and will be removed in future versions. Use a LinkedClass instead.", 38 "DEPRECATION_WARNING", 39 ) 40 self._linked_column = linked_column 41 self._linked_attr = linked_attr 42 43 def __set_name__(self, owner, name): 44 if name == self._linked_column: 45 raise ValueError( 46 "Attribute name cannot be the same as the linked column name" 47 ) 48 self._name = name 49 50 def __get__(self, instance, objtype=None): 51 if instance is None: 52 return self 53 54 if instance._delta: 55 return instance._delta[self._name] 56 57 if not instance._store: 58 return None 59 60 if not instance._store[self._linked_column]: 61 return None 62 63 return instance._store[self._linked_column][self._linked_attr] 64 65 def __set__(self, instance, value): 66 instance._delta[self._name] = value 67 68 69 class LinkedClass: 70 "A descriptor class for adding objects based on linked tables as attributes" 71 72 def __init__(self, cls, *args, linked_column=None, **kwargs): 73 self._linked_column = linked_column 74 self._cls = cls 75 self._args = args or [] 76 self._kwargs = kwargs or {} 77 78 def __get__(self, instance, objtype=None): 79 if instance is None: 80 return self 81 82 store = ( 83 instance._delta 84 if instance._delta and self._linked_column in instance._delta 85 else instance._store 86 ) 87 if not store or store[self._linked_column] is None: 88 return None 89 90 return self._cls(store[self._linked_column], *self._args, **self._kwargs) 91 92 def __set__(self, instance, value): 93 value = self._cls(value._store) if value is not None else None 94 instance._delta[self._linked_column] = value 95 96 97 class PersistedClass: 98 key = None 99 100 @classmethod 101 def __init_subclass__(cls, **kwargs): 102 super().__init_subclass__(**kwargs) 103 cls._snake_name = _snakify(cls.__name__) 104 cls._cache = {} 105 for attr, value in cls.__dict__.items(): 106 try: 107 is_persisted_class = issubclass(value, PersistedClass) 108 except TypeError: 109 is_persisted_class = False 110 111 if is_persisted_class: 112 setattr(cls, attr, LinkedClass(cls=value, linked_column=attr)) 113 114 @classmethod 115 def search(cls, lazy=False, *args, **kwargs): 116 rows = anvil.server.call(f"search_{cls._snake_name}", *args, **kwargs) 117 if lazy: 118 return (cls(store=row) for row in rows) 119 120 result = [cls(store=row) for row in rows] 121 cls._cache.clear() 122 for obj in result: 123 cls._cache[getattr(obj, cls.key)] = obj 124 return result 125 126 @classmethod 127 def get(cls, key): 128 try: 129 return cls._cache[key] 130 except KeyError: 131 row = anvil.server.call(f"get_{cls._snake_name}", **{cls.key: key}) 132 obj = cls(store=row) 133 cls._cache[key] = obj 134 return obj 135 136 def __init__(self, store=None, *args, **kwargs): 137 self._store = store or {} 138 self._delta = kwargs 139 140 def __getattr__(self, key): 141 if self._delta and key in self._delta: 142 return self._delta[key] 143 144 # if the _store raises a KeyError 145 # we aren't yet backed by a row object 146 # so return None 147 try: 148 return self._store[key] 149 except KeyError: 150 return None 151 152 def __getitem__(self, key): 153 return getattr(self, key) 154 155 def __setattr__(self, key, value): 156 is_private = key.startswith("_") 157 is_descriptor = hasattr(self.__class__, key) and hasattr( 158 getattr(self.__class__, key), "__set__" 159 ) 160 if is_private or is_descriptor: 161 object.__setattr__(self, key, value) 162 else: 163 self._delta[key] = value 164 165 def __setitem__(self, key, value): 166 setattr(self, key, value) 167 168 def __eq__(self, other): 169 if not isinstance(other, type(self)): 170 return NotImplemented 171 return getattr(self, self.key) == getattr(other, other.key) 172 173 def add(self, *args, **kwargs): 174 self._store = anvil.server.call( 175 f"add_{self._snake_name}", _serialise_delta(self._delta), *args, **kwargs 176 ) 177 self._delta.clear() 178 179 def update(self, *args, **kwargs): 180 anvil.server.call( 181 f"update_{self._snake_name}", 182 self._store, 183 _serialise_delta(self._delta), 184 *args, 185 **kwargs, 186 ) 187 self._delta.clear() 188 189 def delete(self, *args, **kwargs): 190 anvil.server.call(f"delete_{self._snake_name}", self._store, *args, **kwargs) 191 self._delta.clear() 192 193 def reset(self): 194 self._delta.clear() 195 196 197 def _serialise_delta(delta): 198 return { 199 key: value._store if isinstance(value, PersistedClass) else value 200 for key, value in delta.items() 201 } 202 203 204 def persisted_class(cls): 205 """A decorator for a class with a persistence mechanism""" 206 return type(cls.__name__, (cls, PersistedClass), cls.__dict__.copy())