task.py
1 # SPDX-FileCopyrightText: 2019-2020 Damien P. George 2 # 3 # SPDX-License-Identifier: MIT 4 # 5 # MicroPython uasyncio module 6 # MIT license; Copyright (c) 2019-2020 Damien P. George 7 # 8 # This code comes from MicroPython, and has not been run through black or pylint there. 9 # Altering these files significantly would make merging difficult, so we will not use 10 # pylint or black. 11 # pylint: skip-file 12 # fmt: off 13 """ 14 Tasks 15 ===== 16 """ 17 18 # This file contains the core TaskQueue based on a pairing heap, and the core Task class. 19 # They can optionally be replaced by C implementations. 20 21 from . import core 22 23 24 # pairing-heap meld of 2 heaps; O(1) 25 def ph_meld(h1, h2): 26 if h1 is None: 27 return h2 28 if h2 is None: 29 return h1 30 lt = core.ticks_diff(h1.ph_key, h2.ph_key) < 0 31 if lt: 32 if h1.ph_child is None: 33 h1.ph_child = h2 34 else: 35 h1.ph_child_last.ph_next = h2 36 h1.ph_child_last = h2 37 h2.ph_next = None 38 h2.ph_rightmost_parent = h1 39 return h1 40 else: 41 h1.ph_next = h2.ph_child 42 h2.ph_child = h1 43 if h1.ph_next is None: 44 h2.ph_child_last = h1 45 h1.ph_rightmost_parent = h2 46 return h2 47 48 49 # pairing-heap pairing operation; amortised O(log N) 50 def ph_pairing(child): 51 heap = None 52 while child is not None: 53 n1 = child 54 child = child.ph_next 55 n1.ph_next = None 56 if child is not None: 57 n2 = child 58 child = child.ph_next 59 n2.ph_next = None 60 n1 = ph_meld(n1, n2) 61 heap = ph_meld(heap, n1) 62 return heap 63 64 65 # pairing-heap delete of a node; stable, amortised O(log N) 66 def ph_delete(heap, node): 67 if node is heap: 68 child = heap.ph_child 69 node.ph_child = None 70 return ph_pairing(child) 71 # Find parent of node 72 parent = node 73 while parent.ph_next is not None: 74 parent = parent.ph_next 75 parent = parent.ph_rightmost_parent 76 # Replace node with pairing of its children 77 if node is parent.ph_child and node.ph_child is None: 78 parent.ph_child = node.ph_next 79 node.ph_next = None 80 return heap 81 elif node is parent.ph_child: 82 child = node.ph_child 83 next = node.ph_next 84 node.ph_child = None 85 node.ph_next = None 86 node = ph_pairing(child) 87 parent.ph_child = node 88 else: 89 n = parent.ph_child 90 while node is not n.ph_next: 91 n = n.ph_next 92 child = node.ph_child 93 next = node.ph_next 94 node.ph_child = None 95 node.ph_next = None 96 node = ph_pairing(child) 97 if node is None: 98 node = n 99 else: 100 n.ph_next = node 101 node.ph_next = next 102 if next is None: 103 node.ph_rightmost_parent = parent 104 parent.ph_child_last = node 105 return heap 106 107 108 # TaskQueue class based on the above pairing-heap functions. 109 class TaskQueue: 110 def __init__(self): 111 self.heap = None 112 113 def peek(self): 114 return self.heap 115 116 def push_sorted(self, v, key): 117 v.data = None 118 v.ph_key = key 119 v.ph_child = None 120 v.ph_next = None 121 self.heap = ph_meld(v, self.heap) 122 123 def push_head(self, v): 124 self.push_sorted(v, core.ticks()) 125 126 def pop_head(self): 127 v = self.heap 128 self.heap = ph_pairing(self.heap.ph_child) 129 return v 130 131 def remove(self, v): 132 self.heap = ph_delete(self.heap, v) 133 134 135 # Task class representing a coroutine, can be waited on and cancelled. 136 class Task: 137 """This object wraps a coroutine into a running task. Tasks can be waited on 138 using ``await task``, which will wait for the task to complete and return the 139 return value of the task. 140 141 Tasks should not be created directly, rather use ``create_task`` to create them. 142 """ 143 144 def __init__(self, coro, globals=None): 145 self.coro = coro # Coroutine of this Task 146 self.data = None # General data for queue it is waiting on 147 self.state = True # None, False, True or a TaskQueue instance 148 self.ph_key = 0 # Pairing heap 149 self.ph_child = None # Paring heap 150 self.ph_child_last = None # Paring heap 151 self.ph_next = None # Paring heap 152 self.ph_rightmost_parent = None # Paring heap 153 154 def __iter__(self): 155 if not self.state: 156 # Task finished, signal that is has been await'ed on. 157 self.state = False 158 elif self.state is True: 159 # Allocated head of linked list of Tasks waiting on completion of this task. 160 self.state = TaskQueue() 161 return self 162 163 __await__ = __iter__ 164 165 def __next__(self): 166 if not self.state: 167 # Task finished, raise return value to caller so it can continue. 168 raise self.data 169 else: 170 # Put calling task on waiting queue. 171 self.state.push_head(core.cur_task) 172 # Set calling task's data to this task that it waits on, to double-link it. 173 core.cur_task.data = self 174 175 def done(self): 176 """Whether the task is complete.""" 177 178 return not self.state 179 180 def cancel(self): 181 """Cancel the task by injecting a ``CancelledError`` into it. The task 182 may or may not ignore this exception. 183 """ 184 185 # Check if task is already finished. 186 if not self.state: 187 return False 188 # Can't cancel self (not supported yet). 189 if self is core.cur_task: 190 raise RuntimeError("can't cancel self") 191 # If Task waits on another task then forward the cancel to the one it's waiting on. 192 while isinstance(self.data, Task): 193 self = self.data 194 # Reschedule Task as a cancelled task. 195 if hasattr(self.data, "remove"): 196 # Not on the main running queue, remove the task from the queue it's on. 197 self.data.remove(self) 198 core._task_queue.push_head(self) 199 elif core.ticks_diff(self.ph_key, core.ticks()) > 0: 200 # On the main running queue but scheduled in the future, so bring it forward to now. 201 core._task_queue.remove(self) 202 core._task_queue.push_head(self) 203 self.data = core.CancelledError 204 return True