funcs.py
1 # SPDX-FileCopyrightText: 2019-2020 Damien P. George 2 # 3 # SPDX-License-Identifier: MIT 4 # 5 # MicroPython uasyncio module 6 # MIT license; Copyright (c) 2019-2020 Damien P. George 7 # 8 # This code comes from MicroPython, and has not been run through black or pylint there. 9 # Altering these files significantly would make merging difficult, so we will not use 10 # pylint or black. 11 # pylint: skip-file 12 # fmt: off 13 """ 14 Functions 15 ========= 16 """ 17 18 19 from . import core 20 21 22 async def wait_for(aw, timeout, sleep=core.sleep): 23 """Wait for the *aw* awaitable to complete, but cancel if it takes longer 24 than *timeout* seconds. If *aw* is not a task then a task will be created 25 from it. 26 27 If a timeout occurs, it cancels the task and raises ``asyncio.TimeoutError``: 28 this should be trapped by the caller. 29 30 Returns the return value of *aw*. 31 32 This is a coroutine. 33 """ 34 35 aw = core._promote_to_task(aw) 36 if timeout is None: 37 return await aw 38 39 async def runner(waiter, aw): 40 nonlocal status, result 41 try: 42 result = await aw 43 s = True 44 except BaseException as er: 45 s = er 46 if status is None: 47 # The waiter is still waiting, set status for it and cancel it. 48 status = s 49 waiter.cancel() 50 51 # Run aw in a separate runner task that manages its exceptions. 52 status = None 53 result = None 54 runner_task = core.create_task(runner(core.cur_task, aw)) 55 56 try: 57 # Wait for the timeout to elapse. 58 await sleep(timeout) 59 except core.CancelledError as er: 60 if status is True: 61 # aw completed successfully and cancelled the sleep, so return aw's result. 62 return result 63 elif status is None: 64 # This wait_for was cancelled externally, so cancel aw and re-raise. 65 status = True 66 runner_task.cancel() 67 raise er 68 else: 69 # aw raised an exception, propagate it out to the caller. 70 raise status 71 72 # The sleep finished before aw, so cancel aw and raise TimeoutError. 73 status = True 74 runner_task.cancel() 75 await runner_task 76 raise core.TimeoutError 77 78 79 def wait_for_ms(aw, timeout): 80 """Similar to `wait_for` but *timeout* is an integer in milliseconds. 81 82 This is a coroutine, and a MicroPython extension. 83 """ 84 85 return wait_for(aw, timeout, core.sleep_ms) 86 87 88 async def gather(*aws, return_exceptions=False): 89 """Run all *aws* awaitables concurrently. Any *aws* that are not tasks 90 are promoted to tasks. 91 92 Returns a list of return values of all *aws* 93 94 This is a coroutine. 95 """ 96 97 ts = [core._promote_to_task(aw) for aw in aws] 98 for i in range(len(ts)): 99 try: 100 # TODO handle cancel of gather itself 101 # if ts[i].coro: 102 # iter(ts[i]).waiting.push_head(cur_task) 103 # try: 104 # yield 105 # except CancelledError as er: 106 # # cancel all waiting tasks 107 # raise er 108 ts[i] = await ts[i] 109 except Exception as er: 110 if return_exceptions: 111 ts[i] = er 112 else: 113 raise er 114 return ts