/ server_code / serialisation.py
serialisation.py
  1  # SPDX-License-Identifier: MIT
  2  #
  3  # Copyright (c) 2021 The Anvil Extras project team members listed at
  4  # https://github.com/anvilistas/anvil-extras/graphs/contributors
  5  #
  6  # This software is published at https://github.com/anvilistas/anvil-extras
  7  
  8  from anvil.tables import app_tables
  9  
 10  from . import lazy_module_loader as lazy
 11  
 12  __version__ = "3.1.0"
 13  
 14  LINKED_COLUMN_TYPES = ("liveObject", "liveObjectArray", "link_single", "link_multiple")
 15  LO, LOA, LS, LM = LINKED_COLUMN_TYPES
 16  FIELD_TYPES = None
 17  
 18  
 19  def _get_field_types():
 20      global FIELD_TYPES
 21      if FIELD_TYPES is None:
 22          mm = lazy.marshmallow
 23          FIELD_TYPES = {
 24              "bool": mm.fields.Boolean,
 25              "date": mm.fields.Raw,
 26              "datetime": mm.fields.Raw,
 27              "number": mm.fields.Number,
 28              "string": mm.fields.Str,
 29              "simpleObject": mm.fields.Raw,
 30              "media": mm.fields.Raw,
 31          }
 32      return FIELD_TYPES
 33  
 34  
 35  def _exclusions(table_name, ignore_columns):
 36      """Generate a list of columns to exclude from serialisation for a given table name
 37  
 38      Parameters
 39      ----------
 40      table_name : str
 41          The name of a data table within the app
 42      ignore_columns :  list, tuple, dict or str
 43          A list or tuple of column names to ignore, a dict mapping
 44          table names to such lists or tuples, or a string with a single column name
 45  
 46      Returns
 47      -------
 48      list
 49        of column names
 50      """
 51      if isinstance(ignore_columns, (list, tuple)):
 52          return ignore_columns
 53      elif isinstance(ignore_columns, dict):
 54          if table_name in ignore_columns.keys():
 55              return ignore_columns[table_name]
 56          else:
 57              return []
 58      elif isinstance(ignore_columns, str):
 59          return [ignore_columns]
 60      else:
 61          return []
 62  
 63  
 64  def _link_columns(columns):
 65      """Generate a dict mapping linked column types to sets of column names
 66  
 67      Parameters
 68      ----------
 69      columns : list
 70          of the form return by table.list_columns()
 71  
 72      Returns
 73      -------
 74      dict
 75  
 76          e.g. For a table with two linked columns, 'link1' and link2' plus a multi-link
 77          column, 'multilink', this would return:
 78  
 79          {"liveObject": {"link1", "link2"}, "liveObjectArray": {"multilink"}}
 80      """
 81      rv = {
 82          field_type: {c["name"] for c in columns if c["type"] == field_type}
 83          for field_type in LINKED_COLUMN_TYPES
 84      }
 85      rv[LS] |= rv[LO]
 86      rv[LM] |= rv[LOA]
 87      return rv
 88  
 89  
 90  def _basic_schema_definition(table_name, columns, ignore_columns, with_id):
 91      """
 92      Parameters
 93      ----------
 94      table_name : str
 95          The name of a data table within the app
 96      columns : dict
 97          mapping table names to column lists as generated by _columns
 98      ignore_columns :  list, tuple, dict or str
 99          A list or tuple of column names to ignore, a dict mapping
100          table names to such lists or tuples, or a string with a single column name
101      with_id : boolean
102          whether the internal anvil id should be included in the serialised output
103      """
104      mm = lazy.marshmallow
105      field_types = _get_field_types()
106      exclusions = _exclusions(table_name, ignore_columns)
107      try:
108          result = {
109              column["name"]: field_types[column["type"]]()
110              for column in columns[table_name]
111              if column["type"] not in LINKED_COLUMN_TYPES
112              and column["name"] not in exclusions
113          }
114      except KeyError as e:
115          raise ValueError(f"{e} columns are not supported")
116      if with_id:
117          result["_id"] = mm.fields.Function(lambda row: row.get_id())
118      return result
119  
120  
121  def _schema_definition(table_name, columns, ignore_columns, linked_tables, with_id):
122      """A recursive function to generate a dict for passing to mm.Schema.from_dict
123  
124      Parameters
125      ----------
126      table_name : str
127          The name of a data table within the app
128      columns : dict
129          mapping table names to column lists as generated by _columns
130      ignore_columns :  list, tuple, dict or str
131          A list or tuple of column names to ignore, a dict mapping
132          table names to such lists or tuples, or a string with a single column name
133      linked_tables : dict
134          mapping a table name to a dict which, in turn, maps a column name to a linked
135          table name
136      with_id : boolean
137          whether the internal anvil id should be included in the serialised output
138  
139      Returns
140      -------
141      dict
142      """
143      mm = lazy.marshmallow
144      result = _basic_schema_definition(table_name, columns, ignore_columns, with_id)
145      if table_name in linked_tables:
146          link_columns = _link_columns(columns[table_name])
147          linked = {
148              column: mm.fields.Nested(
149                  mm.Schema.from_dict(
150                      _schema_definition(
151                          linked_table, columns, ignore_columns, linked_tables, with_id
152                      )
153                  )
154              )
155              for column, linked_table in linked_tables[table_name].items()
156              if column in link_columns[LS]
157          }
158          multilinked = {
159              column: mm.fields.List(
160                  mm.fields.Nested(
161                      mm.Schema.from_dict(
162                          _schema_definition(
163                              linked_table,
164                              columns,
165                              ignore_columns,
166                              linked_tables,
167                              with_id,
168                          )
169                      )
170                  )
171              )
172              for column, linked_table in linked_tables[table_name].items()
173              if column in link_columns[LM]
174          }
175          result = {
176              **result,
177              **linked,
178              **multilinked,
179          }
180      return result
181  
182  
183  # The following functions hit the data tables service and thus have no tests.
184  def _columns(table_name, linked_tables):
185      """Generate a dict mapping table names to column lists
186  
187      For the given table and each table found in the linked_tables dict.
188  
189      Parameters
190      ----------
191      table_name : str
192          The name of a data table within the app
193      linked_tables : dict
194          mapping a table name to a dict which, in turn, maps a column name to a linked
195          table name
196  
197      Returns
198      -------
199      dict
200      """
201      tables = {table_name}.union(
202          {table for link in linked_tables.values() for table in link.values()}
203      )
204      return {table: getattr(app_tables, table).list_columns() for table in tables}
205  
206  
207  def datatable_schema(
208      table_name, ignore_columns=None, linked_tables=None, with_id=False
209  ):
210      """Generate a marshmallow Schema dynamically from a table name
211  
212      Parameters
213      ----------
214      table_name : str
215          The name of a data table within the app
216      ignore_columns :  list, tuple, dict or str
217          A list or tuple of column names to ignore, a dict mapping
218          table names to such lists or tuples, or a string with a single column name
219      linked_tables : dict
220          mapping a table name to a dict which, in turn, maps a column name to a linked
221          table name
222      with_id : boolean
223          whether the internal anvil id should be included in the serialised output
224  
225      Returns
226      -------
227      marshmallow.Schema
228      """
229      mm = lazy.marshmallow
230      if linked_tables is None:
231          linked_tables = {}
232      columns = _columns(table_name, linked_tables)
233      schema_definition = _schema_definition(
234          table_name, columns, ignore_columns, linked_tables, with_id
235      )
236      return mm.Schema.from_dict(schema_definition)()