/ asyncio / task.py
task.py
  1  # SPDX-FileCopyrightText: 2019-2020 Damien P. George
  2  #
  3  # SPDX-License-Identifier: MIT
  4  #
  5  # MicroPython uasyncio module
  6  # MIT license; Copyright (c) 2019-2020 Damien P. George
  7  #
  8  # This code comes from MicroPython, and has not been run through black or pylint there.
  9  # Altering these files significantly would make merging difficult, so we will not use
 10  # pylint or black.
 11  # pylint: skip-file
 12  # fmt: off
 13  """
 14  Tasks
 15  =====
 16  """
 17  
 18  # This file contains the core TaskQueue based on a pairing heap, and the core Task class.
 19  # They can optionally be replaced by C implementations.
 20  
 21  from . import core
 22  
 23  
 24  # pairing-heap meld of 2 heaps; O(1)
 25  def ph_meld(h1, h2):
 26      if h1 is None:
 27          return h2
 28      if h2 is None:
 29          return h1
 30      lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0
 31      if lt:
 32          if h1.ph_child is None:
 33              h1.ph_child = h2
 34          else:
 35              h1.ph_child_last.ph_next = h2
 36          h1.ph_child_last = h2
 37          h2.ph_next = None
 38          h2.ph_rightmost_parent = h1
 39          return h1
 40      else:
 41          h1.ph_next = h2.ph_child
 42          h2.ph_child = h1
 43          if h1.ph_next is None:
 44              h2.ph_child_last = h1
 45              h1.ph_rightmost_parent = h2
 46          return h2
 47  
 48  
 49  # pairing-heap pairing operation; amortised O(log N)
 50  def ph_pairing(child):
 51      heap = None
 52      while child is not None:
 53          n1 = child
 54          child = child.ph_next
 55          n1.ph_next = None
 56          if child is not None:
 57              n2 = child
 58              child = child.ph_next
 59              n2.ph_next = None
 60              n1 = ph_meld(n1, n2)
 61          heap = ph_meld(heap, n1)
 62      return heap
 63  
 64  
 65  # pairing-heap delete of a node; stable, amortised O(log N)
 66  def ph_delete(heap, node):
 67      if node is heap:
 68          child = heap.ph_child
 69          node.ph_child = None
 70          return ph_pairing(child)
 71      # Find parent of node
 72      parent = node
 73      while parent.ph_next is not None:
 74          parent = parent.ph_next
 75      parent = parent.ph_rightmost_parent
 76      # Replace node with pairing of its children
 77      if node is parent.ph_child and node.ph_child is None:
 78          parent.ph_child = node.ph_next
 79          node.ph_next = None
 80          return heap
 81      elif node is parent.ph_child:
 82          child = node.ph_child
 83          next = node.ph_next
 84          node.ph_child = None
 85          node.ph_next = None
 86          node = ph_pairing(child)
 87          parent.ph_child = node
 88      else:
 89          n = parent.ph_child
 90          while node is not n.ph_next:
 91              n = n.ph_next
 92          child = node.ph_child
 93          next = node.ph_next
 94          node.ph_child = None
 95          node.ph_next = None
 96          node = ph_pairing(child)
 97          if node is None:
 98              node = n
 99          else:
100              n.ph_next = node
101      node.ph_next = next
102      if next is None:
103          node.ph_rightmost_parent = parent
104          parent.ph_child_last = node
105      return heap
106  
107  
108  # TaskQueue class based on the above pairing-heap functions.
109  class TaskQueue:
110      def __init__(self):
111          self.heap = None
112  
113      def peek(self):
114          return self.heap
115  
116      def push_sorted(self, v, key):
117          v.data = None
118          v.ph_key = key
119          v.ph_child = None
120          v.ph_next = None
121          self.heap = ph_meld(v, self.heap)
122  
123      def push_head(self, v):
124          self.push_sorted(v, core.ticks())
125  
126      def pop_head(self):
127          v = self.heap
128          self.heap = ph_pairing(self.heap.ph_child)
129          return v
130  
131      def remove(self, v):
132          self.heap = ph_delete(self.heap, v)
133  
134  
135  # Task class representing a coroutine, can be waited on and cancelled.
136  class Task:
137      """This object wraps a coroutine into a running task. Tasks can be waited on
138      using ``await task``, which will wait for the task to complete and return the
139      return value of the task.
140  
141      Tasks should not be created directly, rather use ``create_task`` to create them.
142      """
143  
144      def __init__(self, coro, globals=None):
145          self.coro = coro  # Coroutine of this Task
146          self.data = None  # General data for queue it is waiting on
147          self.state = True  # None, False, True or a TaskQueue instance
148          self.ph_key = 0  # Pairing heap
149          self.ph_child = None  # Paring heap
150          self.ph_child_last = None  # Paring heap
151          self.ph_next = None  # Paring heap
152          self.ph_rightmost_parent = None  # Paring heap
153  
154      def __iter__(self):
155          if not self.state:
156              # Task finished, signal that is has been await'ed on.
157              self.state = False
158          elif self.state is True:
159              # Allocated head of linked list of Tasks waiting on completion of this task.
160              self.state = TaskQueue()
161          return self
162  
163      __await__ = __iter__
164  
165      def __next__(self):
166          if not self.state:
167              # Task finished, raise return value to caller so it can continue.
168              raise self.data
169          else:
170              # Put calling task on waiting queue.
171              self.state.push_head(core.cur_task)
172              # Set calling task's data to this task that it waits on, to double-link it.
173              core.cur_task.data = self
174  
175      def done(self):
176          """Whether the task is complete."""
177  
178          return not self.state
179  
180      def cancel(self):
181          """Cancel the task by injecting a ``CancelledError`` into it. The task
182          may or may not ignore this exception.
183          """
184  
185          # Check if task is already finished.
186          if not self.state:
187              return False
188          # Can't cancel self (not supported yet).
189          if self is core.cur_task:
190              raise RuntimeError("can't cancel self")
191          # If Task waits on another task then forward the cancel to the one it's waiting on.
192          while isinstance(self.data, Task):
193              self = self.data
194          # Reschedule Task as a cancelled task.
195          if hasattr(self.data, "remove"):
196              # Not on the main running queue, remove the task from the queue it's on.
197              self.data.remove(self)
198              core._task_queue.push_head(self)
199          elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
200              # On the main running queue but scheduled in the future, so bring it forward to now.
201              core._task_queue.remove(self)
202              core._task_queue.push_head(self)
203          self.data = core.CancelledError
204          return True