/ client_code / routing / _router.py
_router.py
  1  # SPDX-License-Identifier: MIT
  2  #
  3  # Copyright (c) 2021 The Anvil Extras project team members listed at
  4  # https://github.com/anvilistas/anvil-extras/graphs/contributors
  5  #
  6  # This software is published at https://github.com/anvilistas/anvil-extras
  7  
  8  from functools import wraps
  9  from itertools import chain
 10  
 11  from anvil import get_open_form, open_form
 12  from anvil.js.window import document
 13  
 14  from ..utils._view_transition import ViewTransition
 15  from ._alert import handle_alert_unload as _handle_alert_unload
 16  from ._logging import logger
 17  from ._utils import ANY, TemplateInfo, get_url_components
 18  
 19  __version__ = "3.1.0"
 20  
 21  
 22  class NavigationExit(Exception):
 23      pass
 24  
 25  
 26  class navigation_context:
 27      contexts = []
 28  
 29      def __init__(self, url_hash):
 30          self.is_stale = False
 31          self.url_hash = url_hash
 32  
 33      def check_stale(self):
 34          if self.is_stale:
 35              raise NavigationExit
 36  
 37      @classmethod
 38      def matches_current_context(cls, url_hash):
 39          current = cls.contexts and cls.contexts[-1]
 40          return current and current.url_hash == url_hash and not current.is_stale
 41  
 42      @classmethod
 43      def mark_all_stale(cls):
 44          for context in cls.contexts:
 45              context.is_stale = True
 46  
 47      def __enter__(self):
 48          num_contexts = len(self.contexts)
 49          logger.debug(f"entering navigation level: {num_contexts}")
 50          self.mark_all_stale()
 51          self.contexts.append(self)
 52          if num_contexts >= 10:
 53              logger.debug(
 54                  "**WARNING**"
 55                  "\nurl_hash redirected too many times without a form load, getting out\ntry setting redirect=False"
 56              )
 57              self.is_stale = True
 58          return self
 59  
 60      def __exit__(self, exc_type, *args):
 61          self.contexts.pop()
 62          num_contexts = len(self.contexts)
 63          logger.debug(f"exiting navigation level: {num_contexts}")
 64          if not num_contexts:
 65              logger.debug("navigation complete\n")
 66          if exc_type is NavigationExit:
 67              return True
 68  
 69  
 70  def _update_key(key):
 71      if type(key) is str:
 72          key = (key, type(get_open_form()).__name__)
 73      return key
 74  
 75  
 76  def _wrap_method(method):
 77      @wraps(method)
 78      def wrapped(self, key, *args):
 79          return method(self, _update_key(key), *args)
 80  
 81      return wrapped
 82  
 83  
 84  class _Cache(dict):
 85      __getitem__ = _wrap_method(dict.__getitem__)
 86      __setitem__ = _wrap_method(dict.__setitem__)
 87      __delitem__ = _wrap_method(dict.__delitem__)
 88      __contains__ = _wrap_method(dict.__contains__)
 89      get = _wrap_method(dict.get)
 90      pop = _wrap_method(dict.pop)
 91      setdefault = _wrap_method(dict.setdefault)
 92  
 93  
 94  default_title = document.title
 95  
 96  _current_form = None
 97  _cache = _Cache()
 98  _routes = {}
 99  _templates = set()
100  _ordered_info = {}
101  _error_form = None
102  _ready = False
103  _queued = []
104  _force_launch = False
105  
106  
107  def launch():
108      global _force_launch, _ready
109      _ready = True
110  
111      template_instance = get_open_form()
112      current_template = type(template_instance)
113  
114      _force_launch = template_instance is None or current_template not in _templates
115  
116      if _queued:
117          # only run the last _queued navigation
118          url_args, properties = _queued.pop()
119      else:
120          url_args, properties = (), {}
121  
122      _queued.clear()
123      try:
124          navigate(*url_args, **properties)
125      finally:
126          _force_launch = False
127  
128  
129  def navigate(url_hash=None, url_pattern=None, url_dict=None, **properties):
130      if not _ready:
131          msg = f"routing is not ready or the template has not finished loading: queuing the call {url_hash!r}"
132          logger.debug(msg)
133          _queued.append([(url_hash, url_pattern, url_dict), properties])
134          return
135      if url_hash is None:
136          url_hash, url_pattern, url_dict = get_url_components()
137      if navigation_context.matches_current_context(url_hash):
138          return
139  
140      msg = f"navigation triggered: url_hash={url_hash!r}, url_pattern={url_pattern!r}, url_dict={url_dict}"
141      logger.debug(msg)
142  
143      global _current_form
144      with navigation_context(url_hash) as nav_context:
145          # it could be initially stale if there are 10+ active contexts
146          nav_context.check_stale()
147          handle_alert_unload()
148          handle_form_unload()
149          nav_context.check_stale()
150          template_info, init_path = load_template_or_redirect(
151              url_hash, url_pattern, url_dict, properties, nav_context
152          )
153          url_args = {
154              "url_hash": url_hash,
155              "url_pattern": url_pattern,
156              "url_dict": url_dict,
157          }
158          alert_on_navigation(**url_args)
159          nav_context.check_stale()
160          form = _cache.get(url_hash)
161          if form is None:
162              form = get_form_to_add(
163                  template_info, init_path, url_hash, url_pattern, url_dict, properties
164              )
165          else:
166              logger.debug(f"loading route: {form.__class__.__name__!r} from cache")
167          with ViewTransition(form):
168              clear_container()
169              nav_context.check_stale()
170              _current_form = form
171              update_form_attrs(form)
172              add_form_to_container(form)
173          # if the form_show was slow don't fire the on_form_load callback
174          nav_context.check_stale()
175          alert_form_loaded(form=form, **url_args)
176  
177  
178  def handle_alert_unload():
179      if _handle_alert_unload():
180          logger.debug("unload prevented by active alert")
181          raise NavigationExit
182  
183  
184  def handle_form_unload():
185      before_unload = getattr(_current_form, "before_unload", None)
186      if before_unload is None:
187          return
188      from . import _navigation
189  
190      with _navigation.PreventUnloading():
191          if before_unload():
192              msg = f"stop unload called from route: {_current_form.__class__.__name__}"
193              logger.debug(msg)
194              _navigation.stopUnload()
195              raise NavigationExit
196  
197  
198  def load_template_or_redirect(url_hash, url_pattern, url_dict, properties, nav_context):
199      global _current_form, _force_launch
200      template_instance = get_open_form()
201      current_template = type(template_instance)
202      if (
203          template_instance is not None
204          and current_template not in _templates
205          and not _force_launch
206      ):
207          raise NavigationExit  # not using templates
208  
209      logger.debug("checking templates and redirects")
210      for info in chain.from_iterable(_ordered_info.values()):
211          callable_, paths, condition = info
212          try:
213              path = next(path for path in paths if url_hash.startswith(path))
214          except StopIteration:
215              continue
216  
217          if condition is None:
218              pass
219          elif not condition():
220              continue
221  
222          if type(info) is TemplateInfo:
223              break
224  
225          redirect_hash = callable_()
226  
227          if isinstance(redirect_hash, str):
228              if navigation_context.matches_current_context(redirect_hash):
229                  # would cause an infinite loop
230                  logger.debug("redirect returned current url_hash, ignoring")
231                  continue
232  
233              from . import set_url_hash
234  
235              logger.debug(f"redirecting to url_hash: {redirect_hash!r}")
236  
237              set_url_hash(
238                  redirect_hash,
239                  set_in_history=False,
240                  redirect=True,
241                  replace_current_url=True,
242              )
243          nav_context.check_stale()
244  
245      else:
246          # if no break
247          load_error_or_raise(f"no template for url_hash={url_hash!r}")
248  
249      if current_template is callable_:
250          logger.debug(f"unchanged template: {callable_.__name__!r}")
251          return info, path
252      else:
253          msg = f"changing template: {current_template.__name__!r} -> {callable_.__name__!r}"
254          logger.debug(msg)
255          _current_form = None
256          # mark context as stale so that this context is no longer considered the current context
257          navigation_context.mark_all_stale()
258          f = callable_()
259          logger.debug(f"loaded template: {callable_.__name__!r}, re-navigating")
260          _queued.append([(url_hash, url_pattern, url_dict), properties])
261          open_form(f)
262          raise NavigationExit
263  
264  
265  def alert_on_navigation(**url_args):
266      f = get_open_form()
267      on_navigation = getattr(f, "on_navigation", None)
268      if on_navigation is not None:
269          logger.debug(f"{f.__class__.__name__}.on_navigation() called")
270          on_navigation(unload_form=_current_form, **url_args)
271  
272  
273  def clear_container():
274      get_open_form().content_panel.clear()
275  
276  
277  def check_cached_templates(route_info, url_hash):
278      templates = route_info.template
279      if len(templates) <= 1:
280          return
281      for template in templates:
282          form = _cache.get((url_hash, template), None)
283          if form is not None:
284              msg = f"loading route: {form.__class__.__name__!r} from cache - cached with {template!r}"
285              logger.debug(msg)
286              return form
287  
288  
289  def get_form_to_add(
290      template_info, init_path, url_hash, url_pattern, url_dict, properties
291  ):
292      global _current_form
293      route_info, dynamic_vars = path_matcher(
294          template_info, init_path, url_hash, url_pattern, url_dict
295      )
296  
297      # check if path is cached with another template
298      form = check_cached_templates(route_info, url_hash)
299      if form is not None:
300          return form
301  
302      form = route_info.form.__new__(route_info.form, **properties)
303      logger.debug(f"adding route: {form.__class__.__name__!r} to cache")
304      _current_form = _cache[url_hash] = form
305      form._routing_props = {
306          "title": route_info.title,
307          "layout_props": {"full_width_row": route_info.fwr},
308      }
309      form.url_keys = route_info.url_keys
310      form.url_pattern = url_pattern
311      form.url_dict = url_dict
312      form.url_hash = url_hash
313      form.dynamic_vars = dynamic_vars
314      form.__init__(**properties)  # this might be slow if it does a bunch of server calls
315      if _current_form is not form:
316          msg = f"problem loading route: {form.__class__.__name__!r}. Another form was during the call to __init__. exiting this navigation"
317          logger.debug(msg)
318          # and if it was slow, and some navigation happened we should end now
319          raise NavigationExit
320      return form
321  
322  
323  def load_error_or_raise(msg):
324      if _error_form is not None:
325          load_error_form()
326          raise NavigationExit
327      else:
328          raise LookupError(msg)
329  
330  
331  def path_matcher(template_info, init_path, url_hash, url_pattern, url_dict):
332      given_parts = url_pattern.split("/")
333      num_given_parts = len(given_parts)
334  
335      valid_routes = _routes.get(template_info.form.__name__, []) + _routes.get(None, [])
336  
337      for route_info in valid_routes:
338          if not route_info.url_pattern.startswith(init_path):
339              route_info = route_info._replace(
340                  url_pattern=init_path + route_info.url_pattern
341              )
342          if num_given_parts != len(route_info.url_parts):
343              # url pattern CANNOT fit, skip deformatting
344              continue
345  
346          dynamic_vars = {}
347          for given, (url_part, is_dynamic) in zip(given_parts, route_info.url_parts):
348              if is_dynamic:
349                  dynamic_vars[url_part] = given
350              elif url_part != given:
351                  break
352          else:  # no break
353              # rely on dict.keys() being set like
354              url_keys = url_dict.keys()
355              route_keys = route_info.url_keys
356              if url_keys == route_keys:
357                  return route_info, dynamic_vars
358              if ANY not in route_keys:
359                  continue
360              route_keys -= {ANY}  # route_keys is a frozen set
361              if route_keys.issubset(url_keys):
362                  return route_info, dynamic_vars
363  
364      logger.debug(
365          f"no route form with: url_pattern={url_pattern!r} url_keys={list(url_dict.keys())}"
366          f"template={template_info.form.__name__!r}\n"
367          "If this is unexpected perhaps you haven't imported the form correctly"
368      )
369      load_error_or_raise(f"{url_hash!r} does not exist")
370  
371  
372  def update_form_attrs(form):
373      # TODO we should probably upate dynamic_vars as well
374      url_hash, url_pattern, url_dict = get_url_components()
375      # reapply these before the show event
376      form.url_hash = url_hash
377      form.url_pattern = url_pattern
378      form.url_dict = url_dict
379      title = getattr(form, "_routing_props", {}).get("title")
380      if title is None:
381          document.title = default_title
382          return
383      try:
384          document.title = title.format(**url_dict, **getattr(form, "dynamic_vars", {}))
385      except Exception:
386          msg = f"error generating the page title - check the title argument in {type(form).__name__!r} template decorator."
387          raise ValueError(msg)
388  
389  
390  def add_form_to_container(form):
391      if form.parent:
392          # I may have been used within another template so remove me from my parent
393          form.remove_from_parent()
394      layout_props = getattr(form, "_routing_props", {}).get("layout_props", {})
395      cp = get_open_form().content_panel
396      cp.clear()  # clear it again
397      cp.add_component(form, **layout_props)
398  
399  
400  def alert_form_loaded(**url_args):
401      f = get_open_form()
402      on_form_load = getattr(f, "on_form_load", None)
403      if on_form_load is not None:
404          logger.debug(f"{f.__class__.__name__}.on_form_load() called")
405          on_form_load(**url_args)
406  
407  
408  def load_error_form():
409      global _error_form, _current_form
410      logger.debug(f"loading error form: {_error_form!r}")
411      url_hash, _, _ = get_url_components()
412      _cache[url_hash] = _error_form()
413      _current_form = _cache[url_hash]
414      f = get_open_form()
415      if f is not None:
416          add_form_to_container(_current_form)
417      else:
418          open_form(_current_form)  # just in case we somehow don't have a valid template!
419  
420  
421  def add_route_info(route_info):
422      msg = "   route registered: (form={form.__name__!r}, url_pattern={url_pattern!r}, url_keys={url_keys}, title={title!r}, template={template!r})"
423      logger.debug(msg.format(**route_info._asdict()))
424      for template in route_info.template:
425          _routes.setdefault(template, []).append(route_info)
426  
427  
428  def add_info(info_type, callable_, priority, info):
429      global _ordered_info, _templates
430      msg = f"{info_type} registered: {repr(info).replace(type(info).__name__, '')}"
431      logger.debug(msg)
432      if info_type == "template":
433          _templates.add(callable_)
434      tmp = _ordered_info
435      tmp.setdefault(priority, []).append(info)
436      ordered = {}
437      for priority in sorted(tmp, reverse=True):
438          # rely on insertion order
439          ordered[priority] = tmp[priority]
440      _ordered_info = ordered