interface.py
1 """This module contains all the code responsible for the HTOP-like interface.""" 2 3 # Why using asciimatics? 4 # 5 # - curses is hard, and not working well on Windows 6 # - blessings (curses-based) is easier, but does not provide input methods and is not maintained 7 # - blessed (blessings fork) provides input methods, but they are blocking 8 # - urwid seems less easy to use than asciimatics, and older 9 # - clint is not maintained and does not provide input methods 10 # - prompt_toolkit is designed to build interactive (like, very interactive) command line applications 11 # - curtsies, pygcurse, unicurses, npyscreen: all based on curses anyway, which does not work well on Windows 12 # 13 # Well, asciimatics also provides a "top" example, so... 14 15 from __future__ import annotations 16 17 import contextlib 18 import os 19 import sys 20 import time 21 from collections import defaultdict 22 from pathlib import Path 23 from typing import TYPE_CHECKING, Callable, ClassVar, TypedDict 24 25 import pyperclip 26 import requests 27 from asciimatics.event import KeyboardEvent, MouseEvent 28 from asciimatics.screen import ManagedScreen, Screen 29 from loguru import logger 30 31 from aria2p.api import API 32 from aria2p.utils import get_version, load_configuration 33 34 if TYPE_CHECKING: 35 from collections.abc import Sequence 36 37 from aria2p.downloads import Download 38 39 40 configs = load_configuration() 41 42 43 def key_bind_parser(action: str) -> list[Key]: 44 """Return a list of Key instances. 45 46 Parameters: 47 action: The action name. 48 49 Returns: 50 A list of keys. 51 """ 52 default_bindings = configs["DEFAULT"]["key_bindings"] 53 bindings = configs.get("USER", {}).get("key_bindings", default_bindings) 54 55 key_binds = bindings.get(action, default_bindings[action]) 56 57 if isinstance(key_binds, list): 58 return [Key(k) for k in key_binds] 59 return [Key(key_binds)] 60 61 62 def color_palette_parser(palette: str) -> tuple[int, int, int]: 63 """Return a color tuple (foreground color, mode, background color). 64 65 Parameters: 66 palette: The palette name. 67 68 Returns: 69 Foreground color, mode, background color. 70 """ 71 default_colors = configs["DEFAULT"]["colors"] 72 colors = configs.get("USER", {}).get("colors", default_colors) 73 74 # get values of colors and modes for ascimatics.screen module 75 color_map = { 76 "BLACK": Screen.COLOUR_BLACK, 77 "WHITE": Screen.COLOUR_WHITE, 78 "RED": Screen.COLOUR_RED, 79 "CYAN": Screen.COLOUR_CYAN, 80 "YELLOW": Screen.COLOUR_YELLOW, 81 "BLUE": Screen.COLOUR_BLUE, 82 "GREEN": Screen.COLOUR_GREEN, 83 "DEFAULT": Screen.COLOUR_DEFAULT, 84 } 85 mode_map = { 86 "NORMAL": Screen.A_NORMAL, 87 "BOLD": Screen.A_BOLD, 88 "UNDERLINE": Screen.A_UNDERLINE, 89 "REVERSE": Screen.A_REVERSE, 90 } 91 92 palette_colors = colors.get(palette, default_colors[palette]) 93 palette_fg, palette_mode, palette_bg = palette_colors.split(" ") 94 95 return ( 96 color_map[palette_fg], 97 mode_map[palette_mode], 98 color_map[palette_bg], 99 ) 100 101 102 class Key: 103 """A class to represent an input key.""" 104 105 OTHER_KEY_VALUES: ClassVar[dict[str, int]] = { 106 "F1": Screen.KEY_F1, 107 "F2": Screen.KEY_F2, 108 "F3": Screen.KEY_F3, 109 "F4": Screen.KEY_F4, 110 "F5": Screen.KEY_F5, 111 "F6": Screen.KEY_F6, 112 "F7": Screen.KEY_F7, 113 "F8": Screen.KEY_F8, 114 "F9": Screen.KEY_F9, 115 "F10": Screen.KEY_F10, 116 "F11": Screen.KEY_F11, 117 "F12": Screen.KEY_F12, 118 "ESC": Screen.KEY_ESCAPE, 119 "DEL": Screen.KEY_DELETE, 120 "PAGE_UP": Screen.KEY_PAGE_UP, 121 "PAGE_DOWN": Screen.KEY_PAGE_DOWN, 122 "HOME": Screen.KEY_HOME, 123 "END": Screen.KEY_END, 124 "LEFT": Screen.KEY_LEFT, 125 "UP": Screen.KEY_UP, 126 "RIGHT": Screen.KEY_RIGHT, 127 "DOWN": Screen.KEY_DOWN, 128 "BACK": Screen.KEY_BACK, 129 "TAB": Screen.KEY_TAB, 130 "SPACE": ord(" "), 131 "ENTER": ord("\n"), 132 } 133 134 def __init__(self, name: str, value: int | None = None) -> None: 135 """Initialize the object. 136 137 Parameters: 138 name: The key name. 139 value: The key value. 140 """ 141 self.name = name 142 if value is None: 143 value = self.get_value(name) 144 self.value = value 145 146 def get_value(self, name: str) -> int: # noqa: D102 147 try: 148 value = ord(name) 149 except TypeError: 150 value = self.OTHER_KEY_VALUES[name.upper()] 151 152 return value 153 154 def __eq__(self, value: object) -> bool: 155 return self.value == value 156 157 158 class Keys: 159 """The actions and their shortcuts keys.""" 160 161 AUTOCLEAR = key_bind_parser("AUTOCLEAR") 162 CANCEL = key_bind_parser("CANCEL") 163 ENTER = key_bind_parser("ENTER") 164 FILTER = key_bind_parser("FILTER") 165 FOLLOW_ROW = key_bind_parser("FOLLOW_ROW") 166 HELP = key_bind_parser("HELP") 167 MOVE_DOWN = key_bind_parser("MOVE_DOWN") 168 MOVE_LEFT = key_bind_parser("MOVE_LEFT") 169 MOVE_RIGHT = key_bind_parser("MOVE_RIGHT") 170 MOVE_UP = key_bind_parser("MOVE_UP") 171 NEXT_SORT = key_bind_parser("NEXT_SORT") 172 PREVIOUS_SORT = key_bind_parser("PREVIOUS_SORT") 173 PRIORITY_DOWN = key_bind_parser("PRIORITY_DOWN") 174 PRIORITY_UP = key_bind_parser("PRIORITY_UP") 175 QUIT = key_bind_parser("QUIT") 176 REMOVE_ASK = key_bind_parser("REMOVE_ASK") 177 REVERSE_SORT = key_bind_parser("REVERSE_SORT") 178 SEARCH = key_bind_parser("SEARCH") 179 SELECT_SORT = key_bind_parser("SELECT_SORT") 180 SETUP = key_bind_parser("SETUP") 181 TOGGLE_EXPAND_COLLAPSE_ALL = key_bind_parser("TOGGLE_EXPAND_COLLAPSE_ALL") 182 TOGGLE_EXPAND_COLLAPSE = key_bind_parser("TOGGLE_EXPAND_COLLAPSE") 183 TOGGLE_RESUME_PAUSE = key_bind_parser("TOGGLE_RESUME_PAUSE") 184 TOGGLE_RESUME_PAUSE_ALL = key_bind_parser("TOGGLE_RESUME_PAUSE_ALL") 185 TOGGLE_SELECT = key_bind_parser("TOGGLE_SELECT") 186 UN_SELECT_ALL = key_bind_parser("UN_SELECT_ALL") 187 MOVE_HOME = key_bind_parser("MOVE_HOME") 188 MOVE_END = key_bind_parser("MOVE_END") 189 MOVE_UP_STEP = key_bind_parser("MOVE_UP_STEP") 190 MOVE_DOWN_STEP = key_bind_parser("MOVE_DOWN_STEP") 191 RETRY = key_bind_parser("RETRY") 192 RETRY_ALL = key_bind_parser("RETRY_ALL") 193 ADD_DOWNLOADS = key_bind_parser("ADD_DOWNLOADS") 194 195 @staticmethod 196 def names(keys_list: list[Key]) -> list[str]: # noqa: D102 197 return [key.name for key in keys_list] 198 199 @staticmethod 200 def values(keys_list: list[Key]) -> list[int]: # noqa: D102 201 return [key.value for key in keys_list] 202 203 204 class Exit(Exception): # noqa: N818 205 """A simple exception to exit the interactive interface.""" 206 207 208 class Column: 209 """A class to specify a column in the interface. 210 211 It's composed of a header (the string to display on top), a padding (how to align the text), 212 and three callable functions to get the text from a Python object, to sort between these objects, 213 and to get a color palette based on the text. 214 """ 215 216 def __init__( 217 self, 218 header: str, 219 padding: str, 220 get_text: Callable, 221 get_sort: Callable, 222 get_palette: Callable, 223 ) -> None: 224 """Initialize the object. 225 226 Parameters: 227 header: The string to display on top. 228 padding: How to align the text. 229 get_text: Function accepting a Download as argument and returning the text to display. 230 get_sort: Function accepting a Download as argument and returning the attribute used to sort. 231 get_palette: Function accepting text as argument and returning a palette or a palette identifier. 232 """ 233 self.header = header 234 self.padding = padding 235 self.get_text = get_text 236 self.get_sort = get_sort 237 self.get_palette = get_palette 238 239 240 class HorizontalScroll: 241 """A wrapper around asciimatics' Screen.print_at and Screen.paint methods. 242 243 It allows scroll the rows horizontally, used when moving left and right: 244 the first N characters will not be printed. 245 """ 246 247 def __init__(self, screen: Screen, scroll: int = 0) -> None: 248 """Initialize the object. 249 250 Parameters: 251 screen (Screen): The asciimatics screen object. 252 scroll (int): Base scroll to use when printing. Will decrease by one with each character skipped. 253 """ 254 self.screen = screen 255 self.scroll = scroll 256 257 def set_scroll(self, scroll: int) -> None: 258 """Set the scroll value.""" 259 self.scroll = scroll 260 261 def print_at(self, text: str, x: int, y: int, palette: list | tuple) -> int: 262 """Wrapper print_at method. 263 264 Parameters: 265 text: Text to print. 266 x: X axis position / column. 267 y: Y axis position / row. 268 palette: A length-3 tuple or a list of length-3 tuples representing asciimatics palettes. 269 270 Returns: 271 The number of characters actually printed. 272 """ 273 if self.scroll == 0: 274 if isinstance(palette, list): 275 self.screen.paint(text, x, y, colour_map=palette) 276 else: 277 self.screen.print_at(text, x, y, *palette) 278 written = len(text) 279 else: 280 text_length = len(text) 281 if text_length > self.scroll: 282 new_text = text[self.scroll :] 283 written = len(new_text) 284 if isinstance(palette, list): 285 new_palette = palette[self.scroll :] 286 self.screen.paint(new_text, x, y, colour_map=new_palette) 287 else: 288 self.screen.print_at(new_text, x, y, *palette) 289 self.scroll = 0 290 else: 291 self.scroll -= text_length 292 written = 0 293 return written 294 295 296 class Palette: 297 """A simple class to hold palettes getters.""" 298 299 @staticmethod 300 def status(value: str) -> str: 301 """Return the palette for a STATUS cell.""" 302 return "status_" + value 303 304 @staticmethod 305 def name(value: str) -> str | list[tuple[int, int, int]]: 306 """Return the palette for a NAME cell.""" 307 if value.startswith("[METADATA]"): 308 return ( 309 [(Screen.COLOUR_GREEN, Screen.A_UNDERLINE, Screen.COLOUR_BLACK)] * 10 310 + [Interface.palettes["metadata"]] * (len(value.strip()) - 10) 311 + [Interface.palettes["row"]] 312 ) 313 return "name" 314 315 316 class Interface: 317 """The main class responsible for drawing the HTOP-like interface. 318 319 It should be instantiated with an API instance, and then ran with its `run` method. 320 321 If you want to re-use this class' code to create an HTOP-like interface for another purpose, 322 simply change these few things: 323 324 - columns, columns_order and palettes attributes 325 - sort and reverse attributes default values 326 - get_data method. It should return a list of objects that can be compared by equality (==, __eq__, __hash__) 327 - __init__ method to accept other arguments 328 - remove/change the few events with "download" or "self.api" in the process_event method 329 """ 330 331 class State: # noqa: D106 332 MAIN = 0 333 HELP = 1 334 SETUP = 2 335 REMOVE_ASK = 3 336 SELECT_SORT = 4 337 ADD_DOWNLOADS = 9 338 339 state = State.MAIN 340 sleep = 0.005 341 frames = 200 # 200 * 0.005 seconds == 1 second 342 frame = 0 343 focused = 0 344 side_focused = 0 345 sort = 2 346 reverse = True 347 x_scroll = 0 348 x_offset = 0 349 y_offset = 0 350 row_offset = 0 351 refresh = False 352 width: int 353 height: int 354 screen: Screen 355 data: list[Download] 356 rows: list[Sequence[str]] 357 scroller: HorizontalScroll 358 follow = None 359 bounds: list[Sequence[int]] 360 361 palettes: ClassVar[dict[str, tuple[int, int, int]]] = defaultdict(lambda: color_palette_parser("UI")) 362 palettes.update( 363 { 364 "ui": color_palette_parser("UI"), 365 "header": color_palette_parser("HEADER"), 366 "focused_header": color_palette_parser("FOCUSED_HEADER"), 367 "focused_row": color_palette_parser("FOCUSED_ROW"), 368 "status_active": color_palette_parser("STATUS_ACTIVE"), 369 "status_paused": color_palette_parser("STATUS_PAUSED"), 370 "status_waiting": color_palette_parser("STATUS_WAITING"), 371 "status_error": color_palette_parser("STATUS_ERROR"), 372 "status_complete": color_palette_parser("STATUS_COMPLETE"), 373 "metadata": color_palette_parser("METADATA"), 374 "side_column_header": color_palette_parser("SIDE_COLUMN_HEADER"), 375 "side_column_row": color_palette_parser("SIDE_COLUMN_ROW"), 376 "side_column_focused_row": color_palette_parser("SIDE_COLUMN_FOCUSED_ROW"), 377 "bright_help": color_palette_parser("BRIGHT_HELP"), 378 }, 379 ) 380 381 columns_order: ClassVar[list[str]] = ["gid", "status", "progress", "size", "down_speed", "up_speed", "eta", "name"] 382 columns: ClassVar[dict[str, Column]] = { 383 "gid": Column( 384 header="GID", 385 padding=">16", 386 get_text=lambda d: d.gid, 387 get_sort=lambda d: d.gid, 388 get_palette=lambda d: "gid", 389 ), 390 "status": Column( 391 header="STATUS", 392 padding="<9", 393 get_text=lambda d: d.status, 394 get_sort=lambda d: d.status, 395 get_palette=Palette.status, 396 ), 397 "progress": Column( 398 header="PROGRESS", 399 padding=">8", 400 get_text=lambda d: d.progress_string(), 401 get_sort=lambda d: d.progress, 402 get_palette=lambda s: "progress", 403 ), 404 "size": Column( 405 header="SIZE", 406 padding=">11", 407 get_text=lambda d: d.total_length_string(), 408 get_sort=lambda d: d.total_length, 409 get_palette=lambda s: "size", 410 ), 411 "down_speed": Column( 412 header="DOWN_SPEED", 413 padding=">13", 414 get_text=lambda d: d.download_speed_string(), 415 get_sort=lambda d: d.download_speed, 416 get_palette=lambda s: "down_speed", 417 ), 418 "up_speed": Column( 419 header="UP_SPEED", 420 padding=">13", 421 get_text=lambda d: d.upload_speed_string(), 422 get_sort=lambda d: d.upload_speed, 423 get_palette=lambda s: "up_speed", 424 ), 425 "eta": Column( 426 header="ETA", 427 padding=">8", 428 get_text=lambda d: d.eta_string(precision=2), 429 get_sort=lambda d: d.eta, 430 get_palette=lambda s: "eta", 431 ), 432 "name": Column( 433 header="NAME", 434 padding="100%", 435 get_text=lambda d: d.name, 436 get_sort=lambda d: d.name, 437 get_palette=Palette.name, 438 ), 439 } 440 441 remove_ask_header = "Remove:" 442 remove_ask_rows: ClassVar[list[tuple]] = [ 443 ("Remove", lambda d: d.remove(force=False, files=False)), 444 ("Remove with files", lambda d: d.remove(force=False, files=True)), 445 ("Force remove", lambda d: d.remove(force=True, files=False)), 446 ("Force remove with files", lambda d: d.remove(force=True, files=True)), 447 ] 448 last_remove_choice = None 449 450 select_sort_header = "Select sort:" 451 select_sort_rows = columns_order 452 453 downloads_uris: list[str] 454 downloads_uris_header = ( 455 f"Add Download: [ Hit ENTER to download; Hit { ','.join(Keys.names(Keys.ADD_DOWNLOADS)) } to download all ]" 456 ) 457 458 class StateConf(TypedDict): # noqa: D106 459 process_keyboard_event: Callable 460 process_mouse_event: Callable 461 print_functions: list[Callable] 462 463 def __init__(self, api: API | None = None) -> None: 464 """Initialize the object. 465 466 Parameters: 467 api: An instance of API. 468 """ 469 if api is None: 470 api = API() 471 self.api = api 472 473 self.rows = [] 474 self.data = [] 475 self.bounds = [] 476 self.downloads_uris = [] 477 self.height = 20 478 self.width = 80 479 480 # reduce curses' 1 second delay when hitting escape to 25 ms 481 os.environ.setdefault("ESCDELAY", "25") 482 483 self.state_mapping: dict[int, Interface.StateConf] = { 484 self.State.MAIN: { 485 "process_keyboard_event": self.process_keyboard_event_main, 486 "process_mouse_event": self.process_mouse_event_main, 487 "print_functions": [self.print_table], 488 }, 489 self.State.HELP: { 490 "process_keyboard_event": self.process_keyboard_event_help, 491 "process_mouse_event": self.process_mouse_event_help, 492 "print_functions": [self.print_help], 493 }, 494 self.State.SETUP: { 495 "process_keyboard_event": self.process_keyboard_event_setup, 496 "process_mouse_event": self.process_mouse_event_setup, 497 "print_functions": [], 498 }, 499 self.State.REMOVE_ASK: { 500 "process_keyboard_event": self.process_keyboard_event_remove_ask, 501 "process_mouse_event": self.process_mouse_event_remove_ask, 502 "print_functions": [self.print_remove_ask_column, self.print_table], 503 }, 504 self.State.SELECT_SORT: { 505 "process_keyboard_event": self.process_keyboard_event_select_sort, 506 "process_mouse_event": self.process_mouse_event_select_sort, 507 "print_functions": [self.print_select_sort_column, self.print_table], 508 }, 509 self.State.ADD_DOWNLOADS: { 510 "process_keyboard_event": self.process_keyboard_event_add_downloads, 511 "process_mouse_event": self.process_mouse_event_add_downloads, 512 "print_functions": [self.print_add_downloads, self.print_table], 513 }, 514 } 515 516 def run(self) -> bool: 517 """The main drawing loop.""" 518 try: 519 # outer loop to support screen resize 520 while True: 521 with ManagedScreen() as screen: 522 logger.debug(f"Created new screen {screen}") 523 self.set_screen(screen) 524 self.frame = 0 525 # break (and re-enter) when screen has been resized 526 while not screen.has_resized(): 527 # keep previous sort in memory to know if we have to re-sort the rows 528 # once all events are processed (to avoid useless/redundant sort passes) 529 previous_sort = (self.sort, self.reverse) 530 531 # we only refresh when explicitly asked for 532 self.refresh = False 533 534 # process all events before refreshing screen, 535 # otherwise the reactivity is slowed down a lot with fast inputs 536 event = screen.get_event() 537 logger.debug(f"Got event {event}") 538 while event: 539 # avoid crashing the interface if exceptions occur while processing an event 540 try: 541 self.process_event(event) 542 except Exit: 543 logger.debug("Received exit command") 544 return True 545 except Exception as error: # noqa: BLE001 546 # TODO: display error in status bar 547 logger.exception(error) 548 event = screen.get_event() 549 logger.debug(f"Got event {event}") 550 551 # time to update data and rows 552 if self.frame == 0: 553 logger.debug("Tick! Updating data and rows") 554 self.update_data() 555 self.update_rows() 556 self.refresh = True 557 558 # time to refresh the screen 559 if self.refresh: 560 logger.debug("Refresh! Printing text") 561 # sort if needed, unless it was just done at frame 0 when updating 562 if (self.sort, self.reverse) != previous_sort and self.frame != 0: 563 self.sort_data() 564 self.update_rows() 565 566 # actual printing and screen refresh 567 for print_function in self.state_mapping[self.state]["print_functions"]: 568 print_function() 569 screen.refresh() 570 571 # sleep and increment frame 572 time.sleep(self.sleep) 573 self.frame = (self.frame + 1) % self.frames 574 logger.debug("Screen has resized") 575 self.post_resize() 576 except Exception as error: # noqa: BLE001 577 logger.exception(error) 578 return False 579 580 def post_resize(self) -> None: # noqa: D102 581 logger.debug("Running post-resize function") 582 logger.debug("Trying to re-apply pywal color theme") 583 wal_sequences = Path.home() / ".cache" / "wal" / "sequences" 584 try: 585 with wal_sequences.open("rb") as fd: 586 contents = fd.read() 587 sys.stdout.buffer.write(contents) 588 except Exception: # noqa: BLE001,S110 589 pass 590 591 def update_select_sort_rows(self) -> None: # noqa: D102 592 self.select_sort_rows = self.columns_order 593 594 def process_event(self, event: KeyboardEvent | MouseEvent) -> None: 595 """Process an event. 596 597 For reactivity purpose, this method should not compute expensive stuff, only change the state of the interface, 598 changes that will be applied by update_data and update_rows methods. 599 600 Parameters: 601 event (KeyboardEvent | MouseEvent): The event to process. 602 """ 603 if isinstance(event, KeyboardEvent): 604 self.process_keyboard_event(event) 605 606 elif isinstance(event, MouseEvent): 607 self.process_mouse_event(event) 608 609 def process_keyboard_event(self, event: KeyboardEvent) -> None: # noqa: D102 610 self.state_mapping[self.state]["process_keyboard_event"](event) 611 612 def process_keyboard_event_main(self, event: KeyboardEvent) -> None: # noqa: D102 613 if event.key_code in Keys.MOVE_UP: 614 if self.focused > 0: 615 self.focused -= 1 616 logger.debug(f"Move focus up: {self.focused}") 617 618 if self.focused < self.row_offset: 619 self.row_offset = self.focused 620 elif self.focused >= self.row_offset + (self.height - 1): 621 # happens when shrinking height 622 self.row_offset = self.focused + 1 - (self.height - 1) 623 self.follow = None 624 self.refresh = True 625 626 elif event.key_code in Keys.MOVE_DOWN: 627 if self.focused < len(self.rows) - 1: 628 self.focused += 1 629 logger.debug(f"Move focus down: {self.focused}") 630 if self.focused - self.row_offset >= (self.height - 1): 631 self.row_offset = self.focused + 1 - (self.height - 1) 632 self.follow = None 633 self.refresh = True 634 635 elif event.key_code in Keys.MOVE_LEFT: 636 if self.x_scroll > 0: 637 self.x_scroll = max(0, self.x_scroll - 5) 638 self.refresh = True 639 640 elif event.key_code in Keys.MOVE_RIGHT: 641 self.x_scroll += 5 642 self.refresh = True 643 644 elif event.key_code in Keys.HELP: 645 self.state = self.State.HELP 646 self.refresh = True 647 648 elif event.key_code in Keys.SETUP: 649 pass # TODO 650 651 elif event.key_code in Keys.TOGGLE_RESUME_PAUSE: 652 download = self.data[self.focused] 653 if download.is_active or download.is_waiting: 654 logger.debug(f"Pausing download {download.gid}") 655 download.pause() 656 elif download.is_paused: 657 logger.debug(f"Resuming download {download.gid}") 658 download.resume() 659 660 elif event.key_code in Keys.PRIORITY_UP: 661 download = self.data[self.focused] 662 if not download.is_active: 663 download.move_up() 664 self.follow = download 665 666 elif event.key_code in Keys.PRIORITY_DOWN: 667 download = self.data[self.focused] 668 if not download.is_active: 669 download.move_down() 670 self.follow = download 671 672 elif event.key_code in Keys.REVERSE_SORT: 673 self.reverse = not self.reverse 674 self.refresh = True 675 676 elif event.key_code in Keys.NEXT_SORT: 677 if self.sort < len(self.columns) - 1: 678 self.sort += 1 679 self.refresh = True 680 681 elif event.key_code in Keys.PREVIOUS_SORT: 682 if self.sort > 0: 683 self.sort -= 1 684 self.refresh = True 685 686 elif event.key_code in Keys.SELECT_SORT: 687 self.state = self.State.SELECT_SORT 688 self.side_focused = self.sort 689 self.x_offset = self.width_select_sort() + 1 690 self.refresh = True 691 692 elif event.key_code in Keys.REMOVE_ASK: 693 logger.debug("Triggered removal") 694 logger.debug(f"self.focused = {self.focused}") 695 logger.debug(f"len(self.data) = {len(self.data)}") 696 if self.follow_focused(): 697 self.state = self.State.REMOVE_ASK 698 self.x_offset = self.width_remove_ask() + 1 699 if self.last_remove_choice is not None: 700 self.side_focused = self.last_remove_choice 701 self.refresh = True 702 else: 703 logger.debug("Could not focus download") 704 705 elif event.key_code in Keys.TOGGLE_EXPAND_COLLAPSE: # noqa: SIM114 706 pass # TODO 707 708 elif event.key_code in Keys.TOGGLE_EXPAND_COLLAPSE_ALL: 709 pass # TODO 710 711 elif event.key_code in Keys.AUTOCLEAR: 712 self.api.purge() 713 714 elif event.key_code in Keys.FOLLOW_ROW: 715 self.follow_focused() 716 717 elif event.key_code in Keys.SEARCH: # noqa: SIM114 718 pass # TODO 719 720 elif event.key_code in Keys.FILTER: # noqa: SIM114 721 pass # TODO 722 723 elif event.key_code in Keys.TOGGLE_SELECT: # noqa: SIM114 724 pass # TODO 725 726 elif event.key_code in Keys.UN_SELECT_ALL: 727 pass # TODO 728 729 elif event.key_code in Keys.MOVE_HOME: 730 if self.focused > 0: 731 self.focused = 0 732 logger.debug(f"Move focus home: {self.focused}") 733 734 if self.focused < self.row_offset: 735 self.row_offset = self.focused 736 elif self.focused >= self.row_offset + (self.height - 1): 737 # happens when shrinking height 738 self.row_offset = self.focused + 1 - (self.height - 1) 739 self.follow = None 740 self.refresh = True 741 742 elif event.key_code in Keys.MOVE_END: 743 if self.focused < len(self.rows) - 1: 744 self.focused = len(self.rows) - 1 745 logger.debug(f"Move focus end: {self.focused}") 746 747 if self.focused - self.row_offset >= (self.height - 1): 748 self.row_offset = self.focused + 1 - (self.height - 1) 749 self.follow = None 750 self.refresh = True 751 752 elif event.key_code in Keys.MOVE_UP_STEP: 753 if self.focused > 0: 754 self.focused -= len(self.rows) // 5 755 756 self.focused = max(self.focused, 0) 757 logger.debug(f"Move focus up (step): {self.focused}") 758 759 if self.focused < self.row_offset: 760 self.row_offset = self.focused 761 elif self.focused >= self.row_offset + (self.height - 1): 762 # happens when shrinking height 763 self.row_offset = self.focused + 1 - (self.height - 1) 764 765 self.follow = None 766 self.refresh = True 767 768 elif event.key_code in Keys.MOVE_DOWN_STEP: 769 if self.focused < len(self.rows) - 1: 770 self.focused += len(self.rows) // 5 771 772 self.focused = min(self.focused, len(self.rows) - 1) 773 logger.debug(f"Move focus down (step): {self.focused}") 774 775 if self.focused - self.row_offset >= (self.height - 1): 776 self.row_offset = self.focused + 1 - (self.height - 1) 777 self.follow = None 778 self.refresh = True 779 780 elif event.key_code in Keys.TOGGLE_RESUME_PAUSE_ALL: 781 stats = self.api.get_stats() 782 if stats.num_active: 783 self.api.pause_all() 784 else: 785 self.api.resume_all() 786 787 elif event.key_code in Keys.RETRY: 788 download = self.data[self.focused] 789 self.api.retry_downloads([download]) 790 791 elif event.key_code in Keys.RETRY_ALL: 792 downloads = self.data[:] 793 self.api.retry_downloads(downloads) 794 795 elif event.key_code in Keys.ADD_DOWNLOADS: 796 self.state = self.State.ADD_DOWNLOADS 797 self.refresh = True 798 self.side_focused = 0 799 self.x_offset = self.width 800 801 # build set of copied lines 802 copied_lines = set() 803 for line in pyperclip.paste().split("\n") + pyperclip.paste(primary=True).split("\n"): 804 copied_lines.add(line.strip()) 805 with contextlib.suppress(KeyError): 806 copied_lines.remove("") 807 808 # add lines to download uris 809 if copied_lines: 810 self.downloads_uris = sorted(copied_lines) 811 812 elif event.key_code in Keys.QUIT: 813 raise Exit 814 815 def process_keyboard_event_help(self, event: KeyboardEvent) -> None: # noqa: ARG002,D102 816 self.state = self.State.MAIN 817 self.refresh = True 818 819 def process_keyboard_event_setup(self, event: KeyboardEvent) -> None: # noqa: D102 820 pass 821 822 def process_keyboard_event_remove_ask(self, event: KeyboardEvent) -> None: # noqa: D102 823 if event.key_code in Keys.CANCEL: 824 logger.debug("Canceling removal") 825 self.state = self.State.MAIN 826 self.x_offset = 0 827 self.refresh = True 828 829 elif event.key_code in Keys.ENTER: 830 logger.debug("Validate removal") 831 if self.follow: 832 self.remove_ask_rows[self.side_focused][1](self.follow) 833 self.follow = None 834 else: 835 logger.debug("No download was targeted, not removing") 836 self.last_remove_choice = self.side_focused 837 self.state = self.State.MAIN 838 self.x_offset = 0 839 840 # force complete refresh 841 self.frame = 0 842 843 elif event.key_code in Keys.MOVE_UP: 844 if self.side_focused > 0: 845 self.side_focused -= 1 846 logger.debug(f"Moving side focus up: {self.side_focused}") 847 self.refresh = True 848 849 elif event.key_code in Keys.MOVE_DOWN: 850 if self.side_focused < len(self.remove_ask_rows) - 1: 851 self.side_focused += 1 852 logger.debug(f"Moving side focus down: {self.side_focused}") 853 self.refresh = True 854 855 def process_keyboard_event_select_sort(self, event: KeyboardEvent) -> None: # noqa: D102 856 if event.key_code in Keys.CANCEL: 857 self.state = self.State.MAIN 858 self.x_offset = 0 859 self.refresh = True 860 861 elif event.key_code in Keys.ENTER: 862 self.sort = self.side_focused 863 self.state = self.State.MAIN 864 self.x_offset = 0 865 self.refresh = True 866 867 elif event.key_code in Keys.MOVE_UP: 868 if self.side_focused > 0: 869 self.side_focused -= 1 870 self.refresh = True 871 872 elif event.key_code in Keys.MOVE_DOWN: 873 if self.side_focused < len(self.select_sort_rows) - 1: 874 self.side_focused += 1 875 self.refresh = True 876 877 def process_keyboard_event_add_downloads(self, event: KeyboardEvent) -> None: # noqa: D102 878 if event.key_code in Keys.CANCEL: 879 self.state = self.State.MAIN 880 self.x_offset = 0 881 self.refresh = True 882 883 elif event.key_code in Keys.MOVE_UP: 884 if self.side_focused > 0: 885 self.side_focused -= 1 886 887 if self.side_focused < self.row_offset: 888 self.row_offset = self.side_focused 889 elif self.side_focused >= self.row_offset + (self.height - 1): 890 # happens when shrinking height 891 self.row_offset = self.side_focused + 1 - (self.height - 1) 892 self.follow = None 893 self.refresh = True 894 895 elif event.key_code in Keys.MOVE_DOWN: 896 if self.side_focused < len(self.downloads_uris) - 1: 897 self.side_focused += 1 898 if self.side_focused - self.row_offset >= (self.height - 1): 899 self.row_offset = self.side_focused + 1 - (self.height - 1) 900 self.follow = None 901 self.refresh = True 902 903 elif event.key_code in Keys.ENTER: 904 if self.api.add(self.downloads_uris[self.side_focused]): 905 self.downloads_uris.pop(self.side_focused) 906 if 0 < self.side_focused > len(self.downloads_uris) - 1: 907 self.side_focused -= 1 908 self.refresh = True 909 910 elif event.key_code in Keys.ADD_DOWNLOADS: 911 for uri in self.downloads_uris: 912 self.api.add(uri) 913 914 self.downloads_uris.clear() 915 self.refresh = True 916 917 def process_mouse_event(self, event: MouseEvent) -> None: # noqa: D102 918 self.state_mapping[self.state]["process_mouse_event"](event) 919 920 def process_mouse_event_main(self, event: MouseEvent) -> None: # noqa: D102 921 if event.buttons & MouseEvent.LEFT_CLICK: 922 if event.y == 0: 923 new_sort = self.get_column_at_x(event.x) 924 if new_sort == self.sort: 925 self.reverse = not self.reverse 926 else: 927 self.sort = new_sort 928 else: 929 self.focused = min(event.y - 1 + self.row_offset, len(self.rows) - 1) 930 self.refresh = True 931 932 # elif event.buttons & MouseEvent.RIGHT_CLICK: 933 # pass # TODO: expand/collapse 934 935 def process_mouse_event_help(self, event: MouseEvent) -> None: # noqa: D102 936 pass 937 938 def process_mouse_event_setup(self, event: MouseEvent) -> None: # noqa: D102 939 pass 940 941 def process_mouse_event_remove_ask(self, event: MouseEvent) -> None: # noqa: D102 942 pass 943 944 def process_mouse_event_select_sort(self, event: MouseEvent) -> None: # noqa: D102 945 pass 946 947 def process_mouse_event_add_downloads(self, event: MouseEvent) -> None: # noqa: D102 948 pass 949 950 def width_remove_ask(self) -> int: # noqa: D102 951 return max(len(self.remove_ask_header), max(len(row[0]) for row in self.remove_ask_rows)) # noqa: PLW3301 952 953 def width_select_sort(self) -> int: # noqa: D102 954 return max(len(column_name) for column_name in [*self.columns_order, self.select_sort_header]) 955 956 def follow_focused(self) -> bool: # noqa: D102 957 if self.focused < len(self.data): 958 self.follow = self.data[self.focused] 959 return True 960 return False 961 962 def print_add_downloads(self) -> None: # noqa: D102 963 y = self.y_offset 964 padding = self.width 965 header_string = f"{self.downloads_uris_header:<{padding}}" 966 len_header = len(header_string) 967 self.screen.print_at(header_string, 0, y, *self.palettes["side_column_header"]) 968 self.screen.print_at(" ", len_header, y, *self.palettes["default"]) 969 y += 1 970 self.screen.print_at(" " * self.width, 0, y, *self.palettes["ui"]) 971 separator = "..." 972 973 for i, uri in enumerate(self.downloads_uris): 974 y += 1 975 palette = ( 976 self.palettes["side_column_focused_row"] if i == self.side_focused else self.palettes["side_column_row"] 977 ) 978 if len(uri) > self.width: 979 # print part of uri string 980 uri = f"{uri[:(self.width//2)-len(separator)]} {separator} {uri[-(self.width//2)+len(separator):]}" # noqa: PLW2901 981 982 self.screen.print_at(uri, 0, y, *palette) 983 self.screen.print_at(" ", len(uri), y, *self.palettes["default"]) 984 985 for i in range(1, self.height - y): 986 self.screen.print_at(" " * (padding + 1), 0, y + i, *self.palettes["ui"]) 987 988 def print_help(self) -> None: # noqa: D102 989 version = get_version() 990 lines = [ 991 f"aria2p {version} — (C) 2018-2020 Timothée Mazzucotelli and contributors", 992 "Released under the ISC license.", 993 "", 994 ] 995 996 y = 0 997 for line in lines: 998 self.screen.print_at(f"{line:<{self.width}}", 0, y, *self.palettes["bright_help"]) 999 y += 1 1000 1001 for keys, text in [ 1002 (Keys.HELP, " show this help screen"), 1003 (Keys.MOVE_UP, " scroll downloads list"), 1004 (Keys.MOVE_UP_STEP, " scroll downloads list (steps)"), 1005 (Keys.MOVE_DOWN, " scroll downloads list"), 1006 (Keys.MOVE_DOWN_STEP, " scroll downloads list (steps)"), 1007 # not implemented: (Keys.SETUP, " setup"), 1008 (Keys.TOGGLE_RESUME_PAUSE, " toggle pause/resume"), 1009 (Keys.PRIORITY_UP, " priority up (-)"), 1010 (Keys.PRIORITY_DOWN, " priority down (+)"), 1011 (Keys.REVERSE_SORT, " invert sort order"), 1012 (Keys.NEXT_SORT, " sort next column"), 1013 (Keys.PREVIOUS_SORT, " sort previous column"), 1014 (Keys.SELECT_SORT, " select sort column"), 1015 (Keys.REMOVE_ASK, " remove download"), 1016 # not implemented: (Keys.TOGGLE_EXPAND_COLLAPSE, " toggle expand/collapse"), 1017 # not implemented: (Keys.TOGGLE_EXPAND_COLLAPSE_ALL, " toggle expand/collapse all"), 1018 (Keys.AUTOCLEAR, " autopurge downloads"), 1019 (Keys.FOLLOW_ROW, " cursor follows download"), 1020 # not implemented: (Keys.SEARCH, " name search"), 1021 # not implemented: (Keys.FILTER, " name filtering"), 1022 # not implemented: (Keys.TOGGLE_SELECT, " toggle select download"), 1023 # not implemented: (Keys.UN_SELECT_ALL, " unselect all downloads"), 1024 (Keys.MOVE_HOME, " move focus to first download"), 1025 (Keys.MOVE_END, " move focus to last download"), 1026 (Keys.RETRY, " retry failed download"), 1027 (Keys.RETRY_ALL, " retry all failed download"), 1028 (Keys.ADD_DOWNLOADS, " add downloads from clipboard"), 1029 (Keys.QUIT, " quit"), 1030 ]: 1031 self.print_keys(keys, text, y) 1032 y += 1 1033 1034 self.screen.print_at(" " * self.width, 0, y, *self.palettes["ui"]) 1035 y += 1 1036 self.screen.print_at(f"{'Press any key to return.':<{self.width}}", 0, y, *self.palettes["bright_help"]) 1037 y += 1 1038 1039 for i in range(self.height - y): 1040 self.screen.print_at(" " * self.width, 0, y + i, *self.palettes["ui"]) 1041 1042 def print_keys(self, keys: list[Key], text: str, y: int) -> None: # noqa: D102 1043 self.print_keys_text(" ".join(Keys.names(keys)) + ":", text, y) 1044 1045 def print_keys_text(self, keys_text: str, text: str, y: int) -> None: # noqa: D102 1046 length = 8 1047 padding = self.width - length 1048 self.screen.print_at(f"{keys_text:>{length}}", 0, y, *self.palettes["bright_help"]) 1049 self.screen.print_at(f"{text:<{padding}}", length, y, *self.palettes["default"]) 1050 1051 def print_remove_ask_column(self) -> None: # noqa: D102 1052 y = self.y_offset 1053 padding = self.width_remove_ask() 1054 header_string = f"{self.remove_ask_header:<{padding}}" 1055 len_header = len(header_string) 1056 self.screen.print_at(header_string, 0, y, *self.palettes["side_column_header"]) 1057 self.screen.print_at(" ", len_header, y, *self.palettes["default"]) 1058 for i, row in enumerate(self.remove_ask_rows): 1059 y += 1 1060 palette = ( 1061 self.palettes["side_column_focused_row"] if i == self.side_focused else self.palettes["side_column_row"] 1062 ) 1063 row_string = f"{row[0]:<{padding}}" 1064 len_row = len(row_string) 1065 self.screen.print_at(row_string, 0, y, *palette) 1066 self.screen.print_at(" ", len_row, y, *self.palettes["default"]) 1067 1068 for i in range(1, self.height - y): 1069 self.screen.print_at(" " * (padding + 1), 0, y + i, *self.palettes["ui"]) 1070 1071 def print_select_sort_column(self) -> None: # noqa: D102 1072 y = self.y_offset 1073 padding = self.width_select_sort() 1074 header_string = f"{self.select_sort_header:<{padding}}" 1075 len_header = len(header_string) 1076 self.screen.print_at(header_string, 0, y, *self.palettes["side_column_header"]) 1077 self.screen.print_at(" ", len_header, y, *self.palettes["default"]) 1078 for i, row in enumerate(self.select_sort_rows): 1079 y += 1 1080 palette = ( 1081 self.palettes["side_column_focused_row"] if i == self.side_focused else self.palettes["side_column_row"] 1082 ) 1083 row_string = f"{row:<{padding}}" 1084 len_row = len(row_string) 1085 self.screen.print_at(row_string, 0, y, *palette) 1086 self.screen.print_at(" ", len_row, y, *self.palettes["default"]) 1087 1088 for i in range(1, self.height - y): 1089 self.screen.print_at(" " * (padding + 1), 0, y + i, *self.palettes["ui"]) 1090 1091 def print_table(self) -> None: # noqa: D102 1092 self.print_headers() 1093 self.print_rows() 1094 1095 def print_headers(self) -> None: 1096 """Print the headers (columns names).""" 1097 self.scroller.set_scroll(self.x_scroll) 1098 x, y, c = self.x_offset, self.y_offset, 0 1099 1100 for column_name in self.columns_order: 1101 column = self.columns[column_name] 1102 palette = self.palettes["focused_header"] if c == self.sort else self.palettes["header"] 1103 1104 if column.padding == "100%": 1105 header_string = f"{column.header}" 1106 fill_up = " " * max(0, self.width - x - len(header_string)) 1107 written = self.scroller.print_at(header_string, x, y, palette) 1108 self.scroller.print_at(fill_up, x + written, y, self.palettes["header"]) 1109 1110 else: 1111 header_string = f"{column.header:{column.padding}} " 1112 written = self.scroller.print_at(header_string, x, y, palette) 1113 1114 x += written 1115 c += 1 # noqa: SIM113 1116 1117 def print_rows(self) -> None: 1118 """Print the rows.""" 1119 y = self.y_offset + 1 1120 for row in self.rows[self.row_offset : self.row_offset + self.height]: 1121 self.scroller.set_scroll(self.x_scroll) 1122 x = self.x_offset 1123 1124 for i, column_name in enumerate(self.columns_order): 1125 column = self.columns[column_name] 1126 padding = f"<{max(0, self.width - x)}" if column.padding == "100%" else column.padding 1127 1128 if self.focused == y - self.y_offset - 1 + self.row_offset: 1129 palette = self.palettes["focused_row"] 1130 else: 1131 palette = column.get_palette(row[i]) 1132 if isinstance(palette, str): 1133 palette = self.palettes[palette] 1134 1135 field_string = f"{row[i]:{padding}} " 1136 written = self.scroller.print_at(field_string, x, y, palette) 1137 x += written 1138 1139 y += 1 1140 1141 for i in range(self.height - y): 1142 self.screen.print_at(" " * self.width, self.x_offset, y + i, *self.palettes["ui"]) 1143 1144 def get_column_at_x(self, x: int) -> int: 1145 """For an horizontal position X, return the column index.""" 1146 for i, bound in enumerate(self.bounds): 1147 if bound[0] <= x <= bound[1]: 1148 return i 1149 raise ValueError("clicked outside of boundaries") 1150 1151 def set_screen(self, screen: Screen) -> None: 1152 """Set the screen object, its scroller wrapper, width, height, and columns bounds.""" 1153 self.screen = screen 1154 self.height, self.width = screen.dimensions 1155 self.scroller = HorizontalScroll(screen) 1156 self.bounds = [] 1157 for column_name in self.columns_order: 1158 column = self.columns[column_name] 1159 if column.padding == "100%": # last column 1160 self.bounds.append((self.bounds[-1][1] + 1, self.width)) 1161 else: 1162 padding = int(column.padding.lstrip("<>=^")) 1163 if not self.bounds: 1164 self.bounds = [(0, padding)] 1165 else: 1166 self.bounds.append((self.bounds[-1][1] + 1, self.bounds[-1][1] + 1 + padding)) 1167 1168 def get_data(self) -> list[Download]: 1169 """Return a list of objects.""" 1170 return self.api.get_downloads() 1171 1172 def update_data(self) -> None: 1173 """Set the interface data and rows contents.""" 1174 try: 1175 self.data = self.get_data() 1176 self.sort_data() 1177 except requests.exceptions.Timeout: 1178 logger.debug("Request timeout") 1179 1180 def sort_data(self) -> None: 1181 """Sort data according to interface state.""" 1182 sort_function = self.columns[self.columns_order[self.sort]].get_sort 1183 self.data = sorted(self.data, key=sort_function, reverse=self.reverse) 1184 1185 def update_rows(self) -> None: 1186 """Update rows contents according to data and interface state.""" 1187 text_getters = [self.columns[c].get_text for c in self.columns_order] 1188 n_columns = len(self.columns_order) 1189 self.rows = [tuple(text_getters[i](item) for i in range(n_columns)) for item in self.data] 1190 if self.follow: 1191 self.focused = self.data.index(self.follow)