core.py
1 # SPDX-FileCopyrightText: 2019 Damien P. George 2 # 3 # SPDX-License-Identifier: MIT 4 # 5 # MicroPython uasyncio module 6 # MIT license; Copyright (c) 2019 Damien P. George 7 # 8 # This code comes from MicroPython, and has not been run through black or pylint there. 9 # Altering these files significantly would make merging difficult, so we will not use 10 # pylint or black. 11 # pylint: skip-file 12 # fmt: off 13 """ 14 Core 15 ==== 16 """ 17 18 from adafruit_ticks import ticks_ms as ticks, ticks_diff, ticks_add 19 import sys, select, traceback 20 21 # Import TaskQueue and Task, preferring built-in C code over Python code 22 try: 23 from _asyncio import TaskQueue, Task 24 except: 25 from .task import TaskQueue, Task 26 27 28 ################################################################################ 29 # Exceptions 30 31 32 class CancelledError(BaseException): 33 """Injected into a task when calling `Task.cancel()`""" 34 35 pass 36 37 38 class TimeoutError(Exception): 39 """Raised when waiting for a task longer than the specified timeout.""" 40 41 pass 42 43 44 # Used when calling Loop.call_exception_handler 45 _exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None} 46 47 48 ################################################################################ 49 # Sleep functions 50 51 # "Yield" once, then raise StopIteration 52 class SingletonGenerator: 53 def __init__(self): 54 self.state = None 55 self.exc = StopIteration() 56 57 def __iter__(self): 58 return self 59 60 def __await__(self): 61 return self 62 63 def __next__(self): 64 if self.state is not None: 65 _task_queue.push_sorted(cur_task, self.state) 66 self.state = None 67 return None 68 else: 69 self.exc.__traceback__ = None 70 raise self.exc 71 72 73 # Pause task execution for the given time (integer in milliseconds, uPy extension) 74 # Use a SingletonGenerator to do it without allocating on the heap 75 def sleep_ms(t, sgen=SingletonGenerator()): 76 """Sleep for *t* milliseconds. 77 78 This is a coroutine, and a MicroPython extension. 79 """ 80 81 assert sgen.state is None, "Check for a missing `await` in your code" 82 sgen.state = ticks_add(ticks(), max(0, t)) 83 return sgen 84 85 86 # Pause task execution for the given time (in seconds) 87 def sleep(t): 88 """Sleep for *t* seconds 89 90 This is a coroutine. 91 """ 92 93 return sleep_ms(int(t * 1000)) 94 95 96 ################################################################################ 97 # Queue and poller for stream IO 98 99 100 class IOQueue: 101 def __init__(self): 102 self.poller = select.poll() 103 self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream] 104 105 def _enqueue(self, s, idx): 106 if id(s) not in self.map: 107 entry = [None, None, s] 108 entry[idx] = cur_task 109 self.map[id(s)] = entry 110 self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT) 111 else: 112 sm = self.map[id(s)] 113 assert sm[idx] is None 114 assert sm[1 - idx] is not None 115 sm[idx] = cur_task 116 self.poller.modify(s, select.POLLIN | select.POLLOUT) 117 # Link task to this IOQueue so it can be removed if needed 118 cur_task.data = self 119 120 def _dequeue(self, s): 121 del self.map[id(s)] 122 self.poller.unregister(s) 123 124 def queue_read(self, s): 125 self._enqueue(s, 0) 126 127 def queue_write(self, s): 128 self._enqueue(s, 1) 129 130 def remove(self, task): 131 while True: 132 del_s = None 133 for k in self.map: # Iterate without allocating on the heap 134 q0, q1, s = self.map[k] 135 if q0 is task or q1 is task: 136 del_s = s 137 break 138 if del_s is not None: 139 self._dequeue(s) 140 else: 141 break 142 143 def wait_io_event(self, dt): 144 for s, ev in self.poller.ipoll(dt): 145 sm = self.map[id(s)] 146 # print('poll', s, sm, ev) 147 if ev & ~select.POLLOUT and sm[0] is not None: 148 # POLLIN or error 149 _task_queue.push_head(sm[0]) 150 sm[0] = None 151 if ev & ~select.POLLIN and sm[1] is not None: 152 # POLLOUT or error 153 _task_queue.push_head(sm[1]) 154 sm[1] = None 155 if sm[0] is None and sm[1] is None: 156 self._dequeue(s) 157 elif sm[0] is None: 158 self.poller.modify(s, select.POLLOUT) 159 else: 160 self.poller.modify(s, select.POLLIN) 161 162 163 ################################################################################ 164 # Main run loop 165 166 # Ensure the awaitable is a task 167 def _promote_to_task(aw): 168 return aw if isinstance(aw, Task) else create_task(aw) 169 170 171 # Create and schedule a new task from a coroutine 172 def create_task(coro): 173 """Create a new task from the given coroutine and schedule it to run. 174 175 Returns the corresponding `Task` object. 176 """ 177 178 if not hasattr(coro, "send"): 179 raise TypeError("coroutine expected") 180 t = Task(coro, globals()) 181 _task_queue.push_head(t) 182 return t 183 184 185 # Keep scheduling tasks until there are none left to schedule 186 def run_until_complete(main_task=None): 187 """Run the given *main_task* until it completes.""" 188 189 global cur_task 190 excs_all = (CancelledError, Exception) # To prevent heap allocation in loop 191 excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop 192 while True: 193 # Wait until the head of _task_queue is ready to run 194 dt = 1 195 while dt > 0: 196 dt = -1 197 t = _task_queue.peek() 198 if t: 199 # A task waiting on _task_queue; "ph_key" is time to schedule task at 200 dt = max(0, ticks_diff(t.ph_key, ticks())) 201 elif not _io_queue.map: 202 # No tasks can be woken so finished running 203 return 204 # print('(poll {})'.format(dt), len(_io_queue.map)) 205 _io_queue.wait_io_event(dt) 206 207 # Get next task to run and continue it 208 t = _task_queue.pop_head() 209 cur_task = t 210 try: 211 # Continue running the coroutine, it's responsible for rescheduling itself 212 exc = t.data 213 if not exc: 214 t.coro.send(None) 215 else: 216 # If the task is finished and on the run queue and gets here, then it 217 # had an exception and was not await'ed on. Throwing into it now will 218 # raise StopIteration and the code below will catch this and run the 219 # call_exception_handler function. 220 t.data = None 221 t.coro.throw(exc) 222 except excs_all as er: 223 # Check the task is not on any event queue 224 assert t.data is None 225 # This task is done, check if it's the main task and then loop should stop 226 if t is main_task: 227 if isinstance(er, StopIteration): 228 return er.value 229 raise er 230 if t.state: 231 # Task was running but is now finished. 232 waiting = False 233 if t.state is True: 234 # "None" indicates that the task is complete and not await'ed on (yet). 235 t.state = None 236 else: 237 # Schedule any other tasks waiting on the completion of this task. 238 while t.state.peek(): 239 _task_queue.push_head(t.state.pop_head()) 240 waiting = True 241 # "False" indicates that the task is complete and has been await'ed on. 242 t.state = False 243 if not waiting and not isinstance(er, excs_stop): 244 # An exception ended this detached task, so queue it for later 245 # execution to handle the uncaught exception if no other task retrieves 246 # the exception in the meantime (this is handled by Task.throw). 247 _task_queue.push_head(t) 248 # Save return value of coro to pass up to caller. 249 t.data = er 250 elif t.state is None: 251 # Task is already finished and nothing await'ed on the task, 252 # so call the exception handler. 253 _exc_context["exception"] = exc 254 _exc_context["future"] = t 255 Loop.call_exception_handler(_exc_context) 256 257 258 # Create a new task from a coroutine and run it until it finishes 259 def run(coro): 260 """Create a new task from the given coroutine and run it until it completes. 261 262 Returns the value returned by *coro*. 263 """ 264 265 return run_until_complete(create_task(coro)) 266 267 268 ################################################################################ 269 # Event loop wrapper 270 271 272 async def _stopper(): 273 pass 274 275 276 _stop_task = None 277 278 279 class Loop: 280 """Class representing the event loop""" 281 282 _exc_handler = None 283 284 def create_task(coro): 285 """Create a task from the given *coro* and return the new `Task` object.""" 286 287 return create_task(coro) 288 289 def run_forever(): 290 """Run the event loop until `Loop.stop()` is called.""" 291 292 global _stop_task 293 _stop_task = Task(_stopper(), globals()) 294 run_until_complete(_stop_task) 295 # TODO should keep running until .stop() is called, even if there're no tasks left 296 297 def run_until_complete(aw): 298 """Run the given *awaitable* until it completes. If *awaitable* is not a task then 299 it will be promoted to one. 300 """ 301 302 return run_until_complete(_promote_to_task(aw)) 303 304 def stop(): 305 """Stop the event loop""" 306 307 global _stop_task 308 if _stop_task is not None: 309 _task_queue.push_head(_stop_task) 310 # If stop() is called again, do nothing 311 _stop_task = None 312 313 def close(): 314 """Close the event loop.""" 315 316 pass 317 318 def set_exception_handler(handler): 319 """Set the exception handler to call when a Task raises an exception that is not 320 caught. The *handler* should accept two arguments: ``(loop, context)`` 321 """ 322 323 Loop._exc_handler = handler 324 325 def get_exception_handler(): 326 """Get the current exception handler. Returns the handler, or ``None`` if no 327 custom handler is set. 328 """ 329 330 return Loop._exc_handler 331 332 def default_exception_handler(loop, context): 333 """The default exception handler that is called.""" 334 335 exc = context["exception"] 336 traceback.print_exception(None, exc, exc.__traceback__) 337 338 def call_exception_handler(context): 339 """Call the current exception handler. The argument *context* is passed through 340 and is a dictionary containing keys: 341 ``'message'``, ``'exception'``, ``'future'`` 342 """ 343 (Loop._exc_handler or Loop.default_exception_handler)(Loop, context) 344 345 346 # The runq_len and waitq_len arguments are for legacy uasyncio compatibility 347 def get_event_loop(runq_len=0, waitq_len=0): 348 """Return the event loop used to schedule and run tasks. See `Loop`.""" 349 350 return Loop 351 352 353 def current_task(): 354 """Return the `Task` object associated with the currently running task.""" 355 356 return cur_task 357 358 359 def new_event_loop(): 360 """Reset the event loop and return it. 361 362 **NOTE**: Since MicroPython only has a single event loop, this function just resets 363 the loop's state, it does not create a new one 364 """ 365 366 global _task_queue, _io_queue 367 # TaskQueue of Task instances 368 _task_queue = TaskQueue() 369 # Task queue and poller for stream IO 370 _io_queue = IOQueue() 371 return Loop 372 373 374 # Initialise default event loop 375 new_event_loop()