/ asyncio / core.py
core.py
  1  # SPDX-FileCopyrightText: 2019 Damien P. George
  2  #
  3  # SPDX-License-Identifier: MIT
  4  #
  5  # MicroPython uasyncio module
  6  # MIT license; Copyright (c) 2019 Damien P. George
  7  #
  8  # This code comes from MicroPython, and has not been run through black or pylint there.
  9  # Altering these files significantly would make merging difficult, so we will not use
 10  # pylint or black.
 11  # pylint: skip-file
 12  # fmt: off
 13  """
 14  Core
 15  ====
 16  """
 17  
 18  from adafruit_ticks import ticks_ms as ticks, ticks_diff, ticks_add
 19  import sys, select, traceback
 20  
 21  # Import TaskQueue and Task, preferring built-in C code over Python code
 22  try:
 23      from _asyncio import TaskQueue, Task
 24  except:
 25      from .task import TaskQueue, Task
 26  
 27  
 28  ################################################################################
 29  # Exceptions
 30  
 31  
 32  class CancelledError(BaseException):
 33      """Injected into a task when calling `Task.cancel()`"""
 34  
 35      pass
 36  
 37  
 38  class TimeoutError(Exception):
 39      """Raised when waiting for a task longer than the specified timeout."""
 40  
 41      pass
 42  
 43  
 44  # Used when calling Loop.call_exception_handler
 45  _exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None}
 46  
 47  
 48  ################################################################################
 49  # Sleep functions
 50  
 51  # "Yield" once, then raise StopIteration
 52  class SingletonGenerator:
 53      def __init__(self):
 54          self.state = None
 55          self.exc = StopIteration()
 56  
 57      def __iter__(self):
 58          return self
 59  
 60      def __await__(self):
 61          return self
 62  
 63      def __next__(self):
 64          if self.state is not None:
 65              _task_queue.push_sorted(cur_task, self.state)
 66              self.state = None
 67              return None
 68          else:
 69              self.exc.__traceback__ = None
 70              raise self.exc
 71  
 72  
 73  # Pause task execution for the given time (integer in milliseconds, uPy extension)
 74  # Use a SingletonGenerator to do it without allocating on the heap
 75  def sleep_ms(t, sgen=SingletonGenerator()):
 76      """Sleep for *t* milliseconds.
 77  
 78      This is a coroutine, and a MicroPython extension.
 79      """
 80  
 81      assert sgen.state is None, "Check for a missing `await` in your code"
 82      sgen.state = ticks_add(ticks(), max(0, t))
 83      return sgen
 84  
 85  
 86  # Pause task execution for the given time (in seconds)
 87  def sleep(t):
 88      """Sleep for *t* seconds
 89  
 90      This is a coroutine.
 91      """
 92  
 93      return sleep_ms(int(t * 1000))
 94  
 95  
 96  ################################################################################
 97  # Queue and poller for stream IO
 98  
 99  
100  class IOQueue:
101      def __init__(self):
102          self.poller = select.poll()
103          self.map = {}  # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
104  
105      def _enqueue(self, s, idx):
106          if id(s) not in self.map:
107              entry = [None, None, s]
108              entry[idx] = cur_task
109              self.map[id(s)] = entry
110              self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
111          else:
112              sm = self.map[id(s)]
113              assert sm[idx] is None
114              assert sm[1 - idx] is not None
115              sm[idx] = cur_task
116              self.poller.modify(s, select.POLLIN | select.POLLOUT)
117          # Link task to this IOQueue so it can be removed if needed
118          cur_task.data = self
119  
120      def _dequeue(self, s):
121          del self.map[id(s)]
122          self.poller.unregister(s)
123  
124      def queue_read(self, s):
125          self._enqueue(s, 0)
126  
127      def queue_write(self, s):
128          self._enqueue(s, 1)
129  
130      def remove(self, task):
131          while True:
132              del_s = None
133              for k in self.map:  # Iterate without allocating on the heap
134                  q0, q1, s = self.map[k]
135                  if q0 is task or q1 is task:
136                      del_s = s
137                      break
138              if del_s is not None:
139                  self._dequeue(s)
140              else:
141                  break
142  
143      def wait_io_event(self, dt):
144          for s, ev in self.poller.ipoll(dt):
145              sm = self.map[id(s)]
146              # print('poll', s, sm, ev)
147              if ev & ~select.POLLOUT and sm[0] is not None:
148                  # POLLIN or error
149                  _task_queue.push_head(sm[0])
150                  sm[0] = None
151              if ev & ~select.POLLIN and sm[1] is not None:
152                  # POLLOUT or error
153                  _task_queue.push_head(sm[1])
154                  sm[1] = None
155              if sm[0] is None and sm[1] is None:
156                  self._dequeue(s)
157              elif sm[0] is None:
158                  self.poller.modify(s, select.POLLOUT)
159              else:
160                  self.poller.modify(s, select.POLLIN)
161  
162  
163  ################################################################################
164  # Main run loop
165  
166  # Ensure the awaitable is a task
167  def _promote_to_task(aw):
168      return aw if isinstance(aw, Task) else create_task(aw)
169  
170  
171  # Create and schedule a new task from a coroutine
172  def create_task(coro):
173      """Create a new task from the given coroutine and schedule it to run.
174  
175      Returns the corresponding `Task` object.
176      """
177  
178      if not hasattr(coro, "send"):
179          raise TypeError("coroutine expected")
180      t = Task(coro, globals())
181      _task_queue.push_head(t)
182      return t
183  
184  
185  # Keep scheduling tasks until there are none left to schedule
186  def run_until_complete(main_task=None):
187      """Run the given *main_task* until it completes."""
188  
189      global cur_task
190      excs_all = (CancelledError, Exception)  # To prevent heap allocation in loop
191      excs_stop = (CancelledError, StopIteration)  # To prevent heap allocation in loop
192      while True:
193          # Wait until the head of _task_queue is ready to run
194          dt = 1
195          while dt > 0:
196              dt = -1
197              t = _task_queue.peek()
198              if t:
199                  # A task waiting on _task_queue; "ph_key" is time to schedule task at
200                  dt = max(0, ticks_diff(t.ph_key, ticks()))
201              elif not _io_queue.map:
202                  # No tasks can be woken so finished running
203                  return
204              # print('(poll {})'.format(dt), len(_io_queue.map))
205              _io_queue.wait_io_event(dt)
206  
207          # Get next task to run and continue it
208          t = _task_queue.pop_head()
209          cur_task = t
210          try:
211              # Continue running the coroutine, it's responsible for rescheduling itself
212              exc = t.data
213              if not exc:
214                  t.coro.send(None)
215              else:
216                  # If the task is finished and on the run queue and gets here, then it
217                  # had an exception and was not await'ed on.  Throwing into it now will
218                  # raise StopIteration and the code below will catch this and run the
219                  # call_exception_handler function.
220                  t.data = None
221                  t.coro.throw(exc)
222          except excs_all as er:
223              # Check the task is not on any event queue
224              assert t.data is None
225              # This task is done, check if it's the main task and then loop should stop
226              if t is main_task:
227                  if isinstance(er, StopIteration):
228                      return er.value
229                  raise er
230              if t.state:
231                  # Task was running but is now finished.
232                  waiting = False
233                  if t.state is True:
234                      # "None" indicates that the task is complete and not await'ed on (yet).
235                      t.state = None
236                  else:
237                      # Schedule any other tasks waiting on the completion of this task.
238                      while t.state.peek():
239                          _task_queue.push_head(t.state.pop_head())
240                          waiting = True
241                      # "False" indicates that the task is complete and has been await'ed on.
242                      t.state = False
243                  if not waiting and not isinstance(er, excs_stop):
244                      # An exception ended this detached task, so queue it for later
245                      # execution to handle the uncaught exception if no other task retrieves
246                      # the exception in the meantime (this is handled by Task.throw).
247                      _task_queue.push_head(t)
248                  # Save return value of coro to pass up to caller.
249                  t.data = er
250              elif t.state is None:
251                  # Task is already finished and nothing await'ed on the task,
252                  # so call the exception handler.
253                  _exc_context["exception"] = exc
254                  _exc_context["future"] = t
255                  Loop.call_exception_handler(_exc_context)
256  
257  
258  # Create a new task from a coroutine and run it until it finishes
259  def run(coro):
260      """Create a new task from the given coroutine and run it until it completes.
261  
262      Returns the value returned by *coro*.
263      """
264  
265      return run_until_complete(create_task(coro))
266  
267  
268  ################################################################################
269  # Event loop wrapper
270  
271  
272  async def _stopper():
273      pass
274  
275  
276  _stop_task = None
277  
278  
279  class Loop:
280      """Class representing the event loop"""
281  
282      _exc_handler = None
283  
284      def create_task(coro):
285          """Create a task from the given *coro* and return the new `Task` object."""
286  
287          return create_task(coro)
288  
289      def run_forever():
290          """Run the event loop until `Loop.stop()` is called."""
291  
292          global _stop_task
293          _stop_task = Task(_stopper(), globals())
294          run_until_complete(_stop_task)
295          # TODO should keep running until .stop() is called, even if there're no tasks left
296  
297      def run_until_complete(aw):
298          """Run the given *awaitable* until it completes.  If *awaitable* is not a task then
299          it will be promoted to one.
300          """
301  
302          return run_until_complete(_promote_to_task(aw))
303  
304      def stop():
305          """Stop the event loop"""
306  
307          global _stop_task
308          if _stop_task is not None:
309              _task_queue.push_head(_stop_task)
310              # If stop() is called again, do nothing
311              _stop_task = None
312  
313      def close():
314          """Close the event loop."""
315  
316          pass
317  
318      def set_exception_handler(handler):
319          """Set the exception handler to call when a Task raises an exception that is not
320          caught.  The *handler* should accept two arguments: ``(loop, context)``
321          """
322  
323          Loop._exc_handler = handler
324  
325      def get_exception_handler():
326          """Get the current exception handler. Returns the handler, or ``None`` if no
327          custom handler is set.
328          """
329  
330          return Loop._exc_handler
331  
332      def default_exception_handler(loop, context):
333          """The default exception handler that is called."""
334  
335          exc = context["exception"]
336          traceback.print_exception(None, exc, exc.__traceback__)
337  
338      def call_exception_handler(context):
339          """Call the current exception handler. The argument *context* is passed through
340          and is a dictionary containing keys:
341          ``'message'``, ``'exception'``, ``'future'``
342          """
343          (Loop._exc_handler or Loop.default_exception_handler)(Loop, context)
344  
345  
346  # The runq_len and waitq_len arguments are for legacy uasyncio compatibility
347  def get_event_loop(runq_len=0, waitq_len=0):
348      """Return the event loop used to schedule and run tasks. See `Loop`."""
349  
350      return Loop
351  
352  
353  def current_task():
354      """Return the `Task` object associated with the currently running task."""
355  
356      return cur_task
357  
358  
359  def new_event_loop():
360      """Reset the event loop and return it.
361  
362      **NOTE**: Since MicroPython only has a single event loop, this function just resets
363      the loop's state, it does not create a new one
364      """
365  
366      global _task_queue, _io_queue
367      # TaskQueue of Task instances
368      _task_queue = TaskQueue()
369      # Task queue and poller for stream IO
370      _io_queue = IOQueue()
371      return Loop
372  
373  
374  # Initialise default event loop
375  new_event_loop()