/ asyncio / funcs.py
funcs.py
  1  # SPDX-FileCopyrightText: 2019-2020 Damien P. George
  2  #
  3  # SPDX-License-Identifier: MIT
  4  #
  5  # MicroPython uasyncio module
  6  # MIT license; Copyright (c) 2019-2020 Damien P. George
  7  #
  8  # This code comes from MicroPython, and has not been run through black or pylint there.
  9  # Altering these files significantly would make merging difficult, so we will not use
 10  # pylint or black.
 11  # pylint: skip-file
 12  # fmt: off
 13  """
 14  Functions
 15  =========
 16  """
 17  
 18  
 19  from . import core
 20  
 21  
 22  async def wait_for(aw, timeout, sleep=core.sleep):
 23      """Wait for the *aw* awaitable to complete, but cancel if it takes longer
 24      than *timeout* seconds. If *aw* is not a task then a task will be created
 25      from it.
 26  
 27      If a timeout occurs, it cancels the task and raises ``asyncio.TimeoutError``:
 28      this should be trapped by the caller.
 29  
 30      Returns the return value of *aw*.
 31  
 32      This is a coroutine.
 33      """
 34  
 35      aw = core._promote_to_task(aw)
 36      if timeout is None:
 37          return await aw
 38  
 39      async def runner(waiter, aw):
 40          nonlocal status, result
 41          try:
 42              result = await aw
 43              s = True
 44          except BaseException as er:
 45              s = er
 46          if status is None:
 47              # The waiter is still waiting, set status for it and cancel it.
 48              status = s
 49              waiter.cancel()
 50  
 51      # Run aw in a separate runner task that manages its exceptions.
 52      status = None
 53      result = None
 54      runner_task = core.create_task(runner(core.cur_task, aw))
 55  
 56      try:
 57          # Wait for the timeout to elapse.
 58          await sleep(timeout)
 59      except core.CancelledError as er:
 60          if status is True:
 61              # aw completed successfully and cancelled the sleep, so return aw's result.
 62              return result
 63          elif status is None:
 64              # This wait_for was cancelled externally, so cancel aw and re-raise.
 65              status = True
 66              runner_task.cancel()
 67              raise er
 68          else:
 69              # aw raised an exception, propagate it out to the caller.
 70              raise status
 71  
 72      # The sleep finished before aw, so cancel aw and raise TimeoutError.
 73      status = True
 74      runner_task.cancel()
 75      await runner_task
 76      raise core.TimeoutError
 77  
 78  
 79  def wait_for_ms(aw, timeout):
 80      """Similar to `wait_for` but *timeout* is an integer in milliseconds.
 81  
 82      This is a coroutine, and a MicroPython extension.
 83      """
 84  
 85      return wait_for(aw, timeout, core.sleep_ms)
 86  
 87  
 88  async def gather(*aws, return_exceptions=False):
 89      """Run all *aws* awaitables concurrently. Any *aws* that are not tasks
 90      are promoted to tasks.
 91  
 92      Returns a list of return values of all *aws*
 93  
 94      This is a coroutine.
 95      """
 96  
 97      ts = [core._promote_to_task(aw) for aw in aws]
 98      for i in range(len(ts)):
 99          try:
100              # TODO handle cancel of gather itself
101              # if ts[i].coro:
102              #    iter(ts[i]).waiting.push_head(cur_task)
103              #    try:
104              #        yield
105              #    except CancelledError as er:
106              #        # cancel all waiting tasks
107              #        raise er
108              ts[i] = await ts[i]
109          except Exception as er:
110              if return_exceptions:
111                  ts[i] = er
112              else:
113                  raise er
114      return ts