/ src / aria2p / interface.py
interface.py
   1  """This module contains all the code responsible for the HTOP-like interface."""
   2  
   3  # Why using asciimatics?
   4  #
   5  # - curses is hard, and not working well on Windows
   6  # - blessings (curses-based) is easier, but does not provide input methods and is not maintained
   7  # - blessed (blessings fork) provides input methods, but they are blocking
   8  # - urwid seems less easy to use than asciimatics, and older
   9  # - clint is not maintained and does not provide input methods
  10  # - prompt_toolkit is designed to build interactive (like, very interactive) command line applications
  11  # - curtsies, pygcurse, unicurses, npyscreen: all based on curses anyway, which does not work well on Windows
  12  #
  13  # Well, asciimatics also provides a "top" example, so...
  14  
  15  from __future__ import annotations
  16  
  17  import contextlib
  18  import os
  19  import sys
  20  import time
  21  from collections import defaultdict
  22  from pathlib import Path
  23  from typing import TYPE_CHECKING, Callable, ClassVar, TypedDict
  24  
  25  import pyperclip
  26  import requests
  27  from asciimatics.event import KeyboardEvent, MouseEvent
  28  from asciimatics.screen import ManagedScreen, Screen
  29  from loguru import logger
  30  
  31  from aria2p.api import API
  32  from aria2p.utils import get_version, load_configuration
  33  
  34  if TYPE_CHECKING:
  35      from collections.abc import Sequence
  36  
  37      from aria2p.downloads import Download
  38  
  39  
  40  configs = load_configuration()
  41  
  42  
  43  def key_bind_parser(action: str) -> list[Key]:
  44      """Return a list of Key instances.
  45  
  46      Parameters:
  47          action: The action name.
  48  
  49      Returns:
  50          A list of keys.
  51      """
  52      default_bindings = configs["DEFAULT"]["key_bindings"]
  53      bindings = configs.get("USER", {}).get("key_bindings", default_bindings)
  54  
  55      key_binds = bindings.get(action, default_bindings[action])
  56  
  57      if isinstance(key_binds, list):
  58          return [Key(k) for k in key_binds]
  59      return [Key(key_binds)]
  60  
  61  
  62  def color_palette_parser(palette: str) -> tuple[int, int, int]:
  63      """Return a color tuple (foreground color, mode, background color).
  64  
  65      Parameters:
  66          palette: The palette name.
  67  
  68      Returns:
  69          Foreground color, mode, background color.
  70      """
  71      default_colors = configs["DEFAULT"]["colors"]
  72      colors = configs.get("USER", {}).get("colors", default_colors)
  73  
  74      # get values of colors and modes for ascimatics.screen module
  75      color_map = {
  76          "BLACK": Screen.COLOUR_BLACK,
  77          "WHITE": Screen.COLOUR_WHITE,
  78          "RED": Screen.COLOUR_RED,
  79          "CYAN": Screen.COLOUR_CYAN,
  80          "YELLOW": Screen.COLOUR_YELLOW,
  81          "BLUE": Screen.COLOUR_BLUE,
  82          "GREEN": Screen.COLOUR_GREEN,
  83          "DEFAULT": Screen.COLOUR_DEFAULT,
  84      }
  85      mode_map = {
  86          "NORMAL": Screen.A_NORMAL,
  87          "BOLD": Screen.A_BOLD,
  88          "UNDERLINE": Screen.A_UNDERLINE,
  89          "REVERSE": Screen.A_REVERSE,
  90      }
  91  
  92      palette_colors = colors.get(palette, default_colors[palette])
  93      palette_fg, palette_mode, palette_bg = palette_colors.split(" ")
  94  
  95      return (
  96          color_map[palette_fg],
  97          mode_map[palette_mode],
  98          color_map[palette_bg],
  99      )
 100  
 101  
 102  class Key:
 103      """A class to represent an input key."""
 104  
 105      OTHER_KEY_VALUES: ClassVar[dict[str, int]] = {
 106          "F1": Screen.KEY_F1,
 107          "F2": Screen.KEY_F2,
 108          "F3": Screen.KEY_F3,
 109          "F4": Screen.KEY_F4,
 110          "F5": Screen.KEY_F5,
 111          "F6": Screen.KEY_F6,
 112          "F7": Screen.KEY_F7,
 113          "F8": Screen.KEY_F8,
 114          "F9": Screen.KEY_F9,
 115          "F10": Screen.KEY_F10,
 116          "F11": Screen.KEY_F11,
 117          "F12": Screen.KEY_F12,
 118          "ESC": Screen.KEY_ESCAPE,
 119          "DEL": Screen.KEY_DELETE,
 120          "PAGE_UP": Screen.KEY_PAGE_UP,
 121          "PAGE_DOWN": Screen.KEY_PAGE_DOWN,
 122          "HOME": Screen.KEY_HOME,
 123          "END": Screen.KEY_END,
 124          "LEFT": Screen.KEY_LEFT,
 125          "UP": Screen.KEY_UP,
 126          "RIGHT": Screen.KEY_RIGHT,
 127          "DOWN": Screen.KEY_DOWN,
 128          "BACK": Screen.KEY_BACK,
 129          "TAB": Screen.KEY_TAB,
 130          "SPACE": ord(" "),
 131          "ENTER": ord("\n"),
 132      }
 133  
 134      def __init__(self, name: str, value: int | None = None) -> None:
 135          """Initialize the object.
 136  
 137          Parameters:
 138              name: The key name.
 139              value: The key value.
 140          """
 141          self.name = name
 142          if value is None:
 143              value = self.get_value(name)
 144          self.value = value
 145  
 146      def get_value(self, name: str) -> int:  # noqa: D102
 147          try:
 148              value = ord(name)
 149          except TypeError:
 150              value = self.OTHER_KEY_VALUES[name.upper()]
 151  
 152          return value
 153  
 154      def __eq__(self, value: object) -> bool:
 155          return self.value == value
 156  
 157  
 158  class Keys:
 159      """The actions and their shortcuts keys."""
 160  
 161      AUTOCLEAR = key_bind_parser("AUTOCLEAR")
 162      CANCEL = key_bind_parser("CANCEL")
 163      ENTER = key_bind_parser("ENTER")
 164      FILTER = key_bind_parser("FILTER")
 165      FOLLOW_ROW = key_bind_parser("FOLLOW_ROW")
 166      HELP = key_bind_parser("HELP")
 167      MOVE_DOWN = key_bind_parser("MOVE_DOWN")
 168      MOVE_LEFT = key_bind_parser("MOVE_LEFT")
 169      MOVE_RIGHT = key_bind_parser("MOVE_RIGHT")
 170      MOVE_UP = key_bind_parser("MOVE_UP")
 171      NEXT_SORT = key_bind_parser("NEXT_SORT")
 172      PREVIOUS_SORT = key_bind_parser("PREVIOUS_SORT")
 173      PRIORITY_DOWN = key_bind_parser("PRIORITY_DOWN")
 174      PRIORITY_UP = key_bind_parser("PRIORITY_UP")
 175      QUIT = key_bind_parser("QUIT")
 176      REMOVE_ASK = key_bind_parser("REMOVE_ASK")
 177      REVERSE_SORT = key_bind_parser("REVERSE_SORT")
 178      SEARCH = key_bind_parser("SEARCH")
 179      SELECT_SORT = key_bind_parser("SELECT_SORT")
 180      SETUP = key_bind_parser("SETUP")
 181      TOGGLE_EXPAND_COLLAPSE_ALL = key_bind_parser("TOGGLE_EXPAND_COLLAPSE_ALL")
 182      TOGGLE_EXPAND_COLLAPSE = key_bind_parser("TOGGLE_EXPAND_COLLAPSE")
 183      TOGGLE_RESUME_PAUSE = key_bind_parser("TOGGLE_RESUME_PAUSE")
 184      TOGGLE_RESUME_PAUSE_ALL = key_bind_parser("TOGGLE_RESUME_PAUSE_ALL")
 185      TOGGLE_SELECT = key_bind_parser("TOGGLE_SELECT")
 186      UN_SELECT_ALL = key_bind_parser("UN_SELECT_ALL")
 187      MOVE_HOME = key_bind_parser("MOVE_HOME")
 188      MOVE_END = key_bind_parser("MOVE_END")
 189      MOVE_UP_STEP = key_bind_parser("MOVE_UP_STEP")
 190      MOVE_DOWN_STEP = key_bind_parser("MOVE_DOWN_STEP")
 191      RETRY = key_bind_parser("RETRY")
 192      RETRY_ALL = key_bind_parser("RETRY_ALL")
 193      ADD_DOWNLOADS = key_bind_parser("ADD_DOWNLOADS")
 194  
 195      @staticmethod
 196      def names(keys_list: list[Key]) -> list[str]:  # noqa: D102
 197          return [key.name for key in keys_list]
 198  
 199      @staticmethod
 200      def values(keys_list: list[Key]) -> list[int]:  # noqa: D102
 201          return [key.value for key in keys_list]
 202  
 203  
 204  class Exit(Exception):  # noqa: N818
 205      """A simple exception to exit the interactive interface."""
 206  
 207  
 208  class Column:
 209      """A class to specify a column in the interface.
 210  
 211      It's composed of a header (the string to display on top), a padding (how to align the text),
 212      and three callable functions to get the text from a Python object, to sort between these objects,
 213      and to get a color palette based on the text.
 214      """
 215  
 216      def __init__(
 217          self,
 218          header: str,
 219          padding: str,
 220          get_text: Callable,
 221          get_sort: Callable,
 222          get_palette: Callable,
 223      ) -> None:
 224          """Initialize the object.
 225  
 226          Parameters:
 227              header: The string to display on top.
 228              padding: How to align the text.
 229              get_text: Function accepting a Download as argument and returning the text to display.
 230              get_sort: Function accepting a Download as argument and returning the attribute used to sort.
 231              get_palette: Function accepting text as argument and returning a palette or a palette identifier.
 232          """
 233          self.header = header
 234          self.padding = padding
 235          self.get_text = get_text
 236          self.get_sort = get_sort
 237          self.get_palette = get_palette
 238  
 239  
 240  class HorizontalScroll:
 241      """A wrapper around asciimatics' Screen.print_at and Screen.paint methods.
 242  
 243      It allows scroll the rows horizontally, used when moving left and right:
 244      the first N characters will not be printed.
 245      """
 246  
 247      def __init__(self, screen: Screen, scroll: int = 0) -> None:
 248          """Initialize the object.
 249  
 250          Parameters:
 251              screen (Screen): The asciimatics screen object.
 252              scroll (int): Base scroll to use when printing. Will decrease by one with each character skipped.
 253          """
 254          self.screen = screen
 255          self.scroll = scroll
 256  
 257      def set_scroll(self, scroll: int) -> None:
 258          """Set the scroll value."""
 259          self.scroll = scroll
 260  
 261      def print_at(self, text: str, x: int, y: int, palette: list | tuple) -> int:
 262          """Wrapper print_at method.
 263  
 264          Parameters:
 265              text: Text to print.
 266              x: X axis position / column.
 267              y: Y axis position / row.
 268              palette: A length-3 tuple or a list of length-3 tuples representing asciimatics palettes.
 269  
 270          Returns:
 271              The number of characters actually printed.
 272          """
 273          if self.scroll == 0:
 274              if isinstance(palette, list):
 275                  self.screen.paint(text, x, y, colour_map=palette)
 276              else:
 277                  self.screen.print_at(text, x, y, *palette)
 278              written = len(text)
 279          else:
 280              text_length = len(text)
 281              if text_length > self.scroll:
 282                  new_text = text[self.scroll :]
 283                  written = len(new_text)
 284                  if isinstance(palette, list):
 285                      new_palette = palette[self.scroll :]
 286                      self.screen.paint(new_text, x, y, colour_map=new_palette)
 287                  else:
 288                      self.screen.print_at(new_text, x, y, *palette)
 289                  self.scroll = 0
 290              else:
 291                  self.scroll -= text_length
 292                  written = 0
 293          return written
 294  
 295  
 296  class Palette:
 297      """A simple class to hold palettes getters."""
 298  
 299      @staticmethod
 300      def status(value: str) -> str:
 301          """Return the palette for a STATUS cell."""
 302          return "status_" + value
 303  
 304      @staticmethod
 305      def name(value: str) -> str | list[tuple[int, int, int]]:
 306          """Return the palette for a NAME cell."""
 307          if value.startswith("[METADATA]"):
 308              return (
 309                  [(Screen.COLOUR_GREEN, Screen.A_UNDERLINE, Screen.COLOUR_BLACK)] * 10
 310                  + [Interface.palettes["metadata"]] * (len(value.strip()) - 10)
 311                  + [Interface.palettes["row"]]
 312              )
 313          return "name"
 314  
 315  
 316  class Interface:
 317      """The main class responsible for drawing the HTOP-like interface.
 318  
 319      It should be instantiated with an API instance, and then ran with its `run` method.
 320  
 321      If you want to re-use this class' code to create an HTOP-like interface for another purpose,
 322      simply change these few things:
 323  
 324      - columns, columns_order and palettes attributes
 325      - sort and reverse attributes default values
 326      - get_data method. It should return a list of objects that can be compared by equality (==, __eq__, __hash__)
 327      - __init__ method to accept other arguments
 328      - remove/change the few events with "download" or "self.api" in the process_event method
 329      """
 330  
 331      class State:  # noqa: D106
 332          MAIN = 0
 333          HELP = 1
 334          SETUP = 2
 335          REMOVE_ASK = 3
 336          SELECT_SORT = 4
 337          ADD_DOWNLOADS = 9
 338  
 339      state = State.MAIN
 340      sleep = 0.005
 341      frames = 200  # 200 * 0.005 seconds == 1 second
 342      frame = 0
 343      focused = 0
 344      side_focused = 0
 345      sort = 2
 346      reverse = True
 347      x_scroll = 0
 348      x_offset = 0
 349      y_offset = 0
 350      row_offset = 0
 351      refresh = False
 352      width: int
 353      height: int
 354      screen: Screen
 355      data: list[Download]
 356      rows: list[Sequence[str]]
 357      scroller: HorizontalScroll
 358      follow = None
 359      bounds: list[Sequence[int]]
 360  
 361      palettes: ClassVar[dict[str, tuple[int, int, int]]] = defaultdict(lambda: color_palette_parser("UI"))
 362      palettes.update(
 363          {
 364              "ui": color_palette_parser("UI"),
 365              "header": color_palette_parser("HEADER"),
 366              "focused_header": color_palette_parser("FOCUSED_HEADER"),
 367              "focused_row": color_palette_parser("FOCUSED_ROW"),
 368              "status_active": color_palette_parser("STATUS_ACTIVE"),
 369              "status_paused": color_palette_parser("STATUS_PAUSED"),
 370              "status_waiting": color_palette_parser("STATUS_WAITING"),
 371              "status_error": color_palette_parser("STATUS_ERROR"),
 372              "status_complete": color_palette_parser("STATUS_COMPLETE"),
 373              "metadata": color_palette_parser("METADATA"),
 374              "side_column_header": color_palette_parser("SIDE_COLUMN_HEADER"),
 375              "side_column_row": color_palette_parser("SIDE_COLUMN_ROW"),
 376              "side_column_focused_row": color_palette_parser("SIDE_COLUMN_FOCUSED_ROW"),
 377              "bright_help": color_palette_parser("BRIGHT_HELP"),
 378          },
 379      )
 380  
 381      columns_order: ClassVar[list[str]] = ["gid", "status", "progress", "size", "down_speed", "up_speed", "eta", "name"]
 382      columns: ClassVar[dict[str, Column]] = {
 383          "gid": Column(
 384              header="GID",
 385              padding=">16",
 386              get_text=lambda d: d.gid,
 387              get_sort=lambda d: d.gid,
 388              get_palette=lambda d: "gid",
 389          ),
 390          "status": Column(
 391              header="STATUS",
 392              padding="<9",
 393              get_text=lambda d: d.status,
 394              get_sort=lambda d: d.status,
 395              get_palette=Palette.status,
 396          ),
 397          "progress": Column(
 398              header="PROGRESS",
 399              padding=">8",
 400              get_text=lambda d: d.progress_string(),
 401              get_sort=lambda d: d.progress,
 402              get_palette=lambda s: "progress",
 403          ),
 404          "size": Column(
 405              header="SIZE",
 406              padding=">11",
 407              get_text=lambda d: d.total_length_string(),
 408              get_sort=lambda d: d.total_length,
 409              get_palette=lambda s: "size",
 410          ),
 411          "down_speed": Column(
 412              header="DOWN_SPEED",
 413              padding=">13",
 414              get_text=lambda d: d.download_speed_string(),
 415              get_sort=lambda d: d.download_speed,
 416              get_palette=lambda s: "down_speed",
 417          ),
 418          "up_speed": Column(
 419              header="UP_SPEED",
 420              padding=">13",
 421              get_text=lambda d: d.upload_speed_string(),
 422              get_sort=lambda d: d.upload_speed,
 423              get_palette=lambda s: "up_speed",
 424          ),
 425          "eta": Column(
 426              header="ETA",
 427              padding=">8",
 428              get_text=lambda d: d.eta_string(precision=2),
 429              get_sort=lambda d: d.eta,
 430              get_palette=lambda s: "eta",
 431          ),
 432          "name": Column(
 433              header="NAME",
 434              padding="100%",
 435              get_text=lambda d: d.name,
 436              get_sort=lambda d: d.name,
 437              get_palette=Palette.name,
 438          ),
 439      }
 440  
 441      remove_ask_header = "Remove:"
 442      remove_ask_rows: ClassVar[list[tuple]] = [
 443          ("Remove", lambda d: d.remove(force=False, files=False)),
 444          ("Remove with files", lambda d: d.remove(force=False, files=True)),
 445          ("Force remove", lambda d: d.remove(force=True, files=False)),
 446          ("Force remove with files", lambda d: d.remove(force=True, files=True)),
 447      ]
 448      last_remove_choice = None
 449  
 450      select_sort_header = "Select sort:"
 451      select_sort_rows = columns_order
 452  
 453      downloads_uris: list[str]
 454      downloads_uris_header = (
 455          f"Add Download: [ Hit ENTER to download; Hit { ','.join(Keys.names(Keys.ADD_DOWNLOADS)) } to download all ]"
 456      )
 457  
 458      class StateConf(TypedDict):  # noqa: D106
 459          process_keyboard_event: Callable
 460          process_mouse_event: Callable
 461          print_functions: list[Callable]
 462  
 463      def __init__(self, api: API | None = None) -> None:
 464          """Initialize the object.
 465  
 466          Parameters:
 467              api: An instance of API.
 468          """
 469          if api is None:
 470              api = API()
 471          self.api = api
 472  
 473          self.rows = []
 474          self.data = []
 475          self.bounds = []
 476          self.downloads_uris = []
 477          self.height = 20
 478          self.width = 80
 479  
 480          # reduce curses' 1 second delay when hitting escape to 25 ms
 481          os.environ.setdefault("ESCDELAY", "25")
 482  
 483          self.state_mapping: dict[int, Interface.StateConf] = {
 484              self.State.MAIN: {
 485                  "process_keyboard_event": self.process_keyboard_event_main,
 486                  "process_mouse_event": self.process_mouse_event_main,
 487                  "print_functions": [self.print_table],
 488              },
 489              self.State.HELP: {
 490                  "process_keyboard_event": self.process_keyboard_event_help,
 491                  "process_mouse_event": self.process_mouse_event_help,
 492                  "print_functions": [self.print_help],
 493              },
 494              self.State.SETUP: {
 495                  "process_keyboard_event": self.process_keyboard_event_setup,
 496                  "process_mouse_event": self.process_mouse_event_setup,
 497                  "print_functions": [],
 498              },
 499              self.State.REMOVE_ASK: {
 500                  "process_keyboard_event": self.process_keyboard_event_remove_ask,
 501                  "process_mouse_event": self.process_mouse_event_remove_ask,
 502                  "print_functions": [self.print_remove_ask_column, self.print_table],
 503              },
 504              self.State.SELECT_SORT: {
 505                  "process_keyboard_event": self.process_keyboard_event_select_sort,
 506                  "process_mouse_event": self.process_mouse_event_select_sort,
 507                  "print_functions": [self.print_select_sort_column, self.print_table],
 508              },
 509              self.State.ADD_DOWNLOADS: {
 510                  "process_keyboard_event": self.process_keyboard_event_add_downloads,
 511                  "process_mouse_event": self.process_mouse_event_add_downloads,
 512                  "print_functions": [self.print_add_downloads, self.print_table],
 513              },
 514          }
 515  
 516      def run(self) -> bool:
 517          """The main drawing loop."""
 518          try:
 519              # outer loop to support screen resize
 520              while True:
 521                  with ManagedScreen() as screen:
 522                      logger.debug(f"Created new screen {screen}")
 523                      self.set_screen(screen)
 524                      self.frame = 0
 525                      # break (and re-enter) when screen has been resized
 526                      while not screen.has_resized():
 527                          # keep previous sort in memory to know if we have to re-sort the rows
 528                          # once all events are processed (to avoid useless/redundant sort passes)
 529                          previous_sort = (self.sort, self.reverse)
 530  
 531                          # we only refresh when explicitly asked for
 532                          self.refresh = False
 533  
 534                          # process all events before refreshing screen,
 535                          # otherwise the reactivity is slowed down a lot with fast inputs
 536                          event = screen.get_event()
 537                          logger.debug(f"Got event {event}")
 538                          while event:
 539                              # avoid crashing the interface if exceptions occur while processing an event
 540                              try:
 541                                  self.process_event(event)
 542                              except Exit:
 543                                  logger.debug("Received exit command")
 544                                  return True
 545                              except Exception as error:  # noqa: BLE001
 546                                  # TODO: display error in status bar
 547                                  logger.exception(error)
 548                              event = screen.get_event()
 549                              logger.debug(f"Got event {event}")
 550  
 551                          # time to update data and rows
 552                          if self.frame == 0:
 553                              logger.debug("Tick! Updating data and rows")
 554                              self.update_data()
 555                              self.update_rows()
 556                              self.refresh = True
 557  
 558                          # time to refresh the screen
 559                          if self.refresh:
 560                              logger.debug("Refresh! Printing text")
 561                              # sort if needed, unless it was just done at frame 0 when updating
 562                              if (self.sort, self.reverse) != previous_sort and self.frame != 0:
 563                                  self.sort_data()
 564                                  self.update_rows()
 565  
 566                              # actual printing and screen refresh
 567                              for print_function in self.state_mapping[self.state]["print_functions"]:
 568                                  print_function()
 569                              screen.refresh()
 570  
 571                          # sleep and increment frame
 572                          time.sleep(self.sleep)
 573                          self.frame = (self.frame + 1) % self.frames
 574                      logger.debug("Screen has resized")
 575                      self.post_resize()
 576          except Exception as error:  # noqa: BLE001
 577              logger.exception(error)
 578              return False
 579  
 580      def post_resize(self) -> None:  # noqa: D102
 581          logger.debug("Running post-resize function")
 582          logger.debug("Trying to re-apply pywal color theme")
 583          wal_sequences = Path.home() / ".cache" / "wal" / "sequences"
 584          try:
 585              with wal_sequences.open("rb") as fd:
 586                  contents = fd.read()
 587                  sys.stdout.buffer.write(contents)
 588          except Exception:  # noqa: BLE001,S110
 589              pass
 590  
 591      def update_select_sort_rows(self) -> None:  # noqa: D102
 592          self.select_sort_rows = self.columns_order
 593  
 594      def process_event(self, event: KeyboardEvent | MouseEvent) -> None:
 595          """Process an event.
 596  
 597          For reactivity purpose, this method should not compute expensive stuff, only change the state of the interface,
 598          changes that will be applied by update_data and update_rows methods.
 599  
 600          Parameters:
 601              event (KeyboardEvent | MouseEvent): The event to process.
 602          """
 603          if isinstance(event, KeyboardEvent):
 604              self.process_keyboard_event(event)
 605  
 606          elif isinstance(event, MouseEvent):
 607              self.process_mouse_event(event)
 608  
 609      def process_keyboard_event(self, event: KeyboardEvent) -> None:  # noqa: D102
 610          self.state_mapping[self.state]["process_keyboard_event"](event)
 611  
 612      def process_keyboard_event_main(self, event: KeyboardEvent) -> None:  # noqa: D102
 613          if event.key_code in Keys.MOVE_UP:
 614              if self.focused > 0:
 615                  self.focused -= 1
 616                  logger.debug(f"Move focus up: {self.focused}")
 617  
 618                  if self.focused < self.row_offset:
 619                      self.row_offset = self.focused
 620                  elif self.focused >= self.row_offset + (self.height - 1):
 621                      # happens when shrinking height
 622                      self.row_offset = self.focused + 1 - (self.height - 1)
 623                  self.follow = None
 624                  self.refresh = True
 625  
 626          elif event.key_code in Keys.MOVE_DOWN:
 627              if self.focused < len(self.rows) - 1:
 628                  self.focused += 1
 629                  logger.debug(f"Move focus down: {self.focused}")
 630                  if self.focused - self.row_offset >= (self.height - 1):
 631                      self.row_offset = self.focused + 1 - (self.height - 1)
 632                  self.follow = None
 633                  self.refresh = True
 634  
 635          elif event.key_code in Keys.MOVE_LEFT:
 636              if self.x_scroll > 0:
 637                  self.x_scroll = max(0, self.x_scroll - 5)
 638                  self.refresh = True
 639  
 640          elif event.key_code in Keys.MOVE_RIGHT:
 641              self.x_scroll += 5
 642              self.refresh = True
 643  
 644          elif event.key_code in Keys.HELP:
 645              self.state = self.State.HELP
 646              self.refresh = True
 647  
 648          elif event.key_code in Keys.SETUP:
 649              pass  # TODO
 650  
 651          elif event.key_code in Keys.TOGGLE_RESUME_PAUSE:
 652              download = self.data[self.focused]
 653              if download.is_active or download.is_waiting:
 654                  logger.debug(f"Pausing download {download.gid}")
 655                  download.pause()
 656              elif download.is_paused:
 657                  logger.debug(f"Resuming download {download.gid}")
 658                  download.resume()
 659  
 660          elif event.key_code in Keys.PRIORITY_UP:
 661              download = self.data[self.focused]
 662              if not download.is_active:
 663                  download.move_up()
 664                  self.follow = download
 665  
 666          elif event.key_code in Keys.PRIORITY_DOWN:
 667              download = self.data[self.focused]
 668              if not download.is_active:
 669                  download.move_down()
 670                  self.follow = download
 671  
 672          elif event.key_code in Keys.REVERSE_SORT:
 673              self.reverse = not self.reverse
 674              self.refresh = True
 675  
 676          elif event.key_code in Keys.NEXT_SORT:
 677              if self.sort < len(self.columns) - 1:
 678                  self.sort += 1
 679                  self.refresh = True
 680  
 681          elif event.key_code in Keys.PREVIOUS_SORT:
 682              if self.sort > 0:
 683                  self.sort -= 1
 684                  self.refresh = True
 685  
 686          elif event.key_code in Keys.SELECT_SORT:
 687              self.state = self.State.SELECT_SORT
 688              self.side_focused = self.sort
 689              self.x_offset = self.width_select_sort() + 1
 690              self.refresh = True
 691  
 692          elif event.key_code in Keys.REMOVE_ASK:
 693              logger.debug("Triggered removal")
 694              logger.debug(f"self.focused = {self.focused}")
 695              logger.debug(f"len(self.data) = {len(self.data)}")
 696              if self.follow_focused():
 697                  self.state = self.State.REMOVE_ASK
 698                  self.x_offset = self.width_remove_ask() + 1
 699                  if self.last_remove_choice is not None:
 700                      self.side_focused = self.last_remove_choice
 701                  self.refresh = True
 702              else:
 703                  logger.debug("Could not focus download")
 704  
 705          elif event.key_code in Keys.TOGGLE_EXPAND_COLLAPSE:  # noqa: SIM114
 706              pass  # TODO
 707  
 708          elif event.key_code in Keys.TOGGLE_EXPAND_COLLAPSE_ALL:
 709              pass  # TODO
 710  
 711          elif event.key_code in Keys.AUTOCLEAR:
 712              self.api.purge()
 713  
 714          elif event.key_code in Keys.FOLLOW_ROW:
 715              self.follow_focused()
 716  
 717          elif event.key_code in Keys.SEARCH:  # noqa: SIM114
 718              pass  # TODO
 719  
 720          elif event.key_code in Keys.FILTER:  # noqa: SIM114
 721              pass  # TODO
 722  
 723          elif event.key_code in Keys.TOGGLE_SELECT:  # noqa: SIM114
 724              pass  # TODO
 725  
 726          elif event.key_code in Keys.UN_SELECT_ALL:
 727              pass  # TODO
 728  
 729          elif event.key_code in Keys.MOVE_HOME:
 730              if self.focused > 0:
 731                  self.focused = 0
 732                  logger.debug(f"Move focus home: {self.focused}")
 733  
 734                  if self.focused < self.row_offset:
 735                      self.row_offset = self.focused
 736                  elif self.focused >= self.row_offset + (self.height - 1):
 737                      # happens when shrinking height
 738                      self.row_offset = self.focused + 1 - (self.height - 1)
 739                  self.follow = None
 740                  self.refresh = True
 741  
 742          elif event.key_code in Keys.MOVE_END:
 743              if self.focused < len(self.rows) - 1:
 744                  self.focused = len(self.rows) - 1
 745                  logger.debug(f"Move focus end: {self.focused}")
 746  
 747                  if self.focused - self.row_offset >= (self.height - 1):
 748                      self.row_offset = self.focused + 1 - (self.height - 1)
 749                  self.follow = None
 750                  self.refresh = True
 751  
 752          elif event.key_code in Keys.MOVE_UP_STEP:
 753              if self.focused > 0:
 754                  self.focused -= len(self.rows) // 5
 755  
 756                  self.focused = max(self.focused, 0)
 757                  logger.debug(f"Move focus up (step): {self.focused}")
 758  
 759                  if self.focused < self.row_offset:
 760                      self.row_offset = self.focused
 761                  elif self.focused >= self.row_offset + (self.height - 1):
 762                      # happens when shrinking height
 763                      self.row_offset = self.focused + 1 - (self.height - 1)
 764  
 765                  self.follow = None
 766                  self.refresh = True
 767  
 768          elif event.key_code in Keys.MOVE_DOWN_STEP:
 769              if self.focused < len(self.rows) - 1:
 770                  self.focused += len(self.rows) // 5
 771  
 772                  self.focused = min(self.focused, len(self.rows) - 1)
 773                  logger.debug(f"Move focus down (step): {self.focused}")
 774  
 775                  if self.focused - self.row_offset >= (self.height - 1):
 776                      self.row_offset = self.focused + 1 - (self.height - 1)
 777                  self.follow = None
 778                  self.refresh = True
 779  
 780          elif event.key_code in Keys.TOGGLE_RESUME_PAUSE_ALL:
 781              stats = self.api.get_stats()
 782              if stats.num_active:
 783                  self.api.pause_all()
 784              else:
 785                  self.api.resume_all()
 786  
 787          elif event.key_code in Keys.RETRY:
 788              download = self.data[self.focused]
 789              self.api.retry_downloads([download])
 790  
 791          elif event.key_code in Keys.RETRY_ALL:
 792              downloads = self.data[:]
 793              self.api.retry_downloads(downloads)
 794  
 795          elif event.key_code in Keys.ADD_DOWNLOADS:
 796              self.state = self.State.ADD_DOWNLOADS
 797              self.refresh = True
 798              self.side_focused = 0
 799              self.x_offset = self.width
 800  
 801              # build set of copied lines
 802              copied_lines = set()
 803              for line in pyperclip.paste().split("\n") + pyperclip.paste(primary=True).split("\n"):
 804                  copied_lines.add(line.strip())
 805              with contextlib.suppress(KeyError):
 806                  copied_lines.remove("")
 807  
 808              # add lines to download uris
 809              if copied_lines:
 810                  self.downloads_uris = sorted(copied_lines)
 811  
 812          elif event.key_code in Keys.QUIT:
 813              raise Exit
 814  
 815      def process_keyboard_event_help(self, event: KeyboardEvent) -> None:  # noqa: ARG002,D102
 816          self.state = self.State.MAIN
 817          self.refresh = True
 818  
 819      def process_keyboard_event_setup(self, event: KeyboardEvent) -> None:  # noqa: D102
 820          pass
 821  
 822      def process_keyboard_event_remove_ask(self, event: KeyboardEvent) -> None:  # noqa: D102
 823          if event.key_code in Keys.CANCEL:
 824              logger.debug("Canceling removal")
 825              self.state = self.State.MAIN
 826              self.x_offset = 0
 827              self.refresh = True
 828  
 829          elif event.key_code in Keys.ENTER:
 830              logger.debug("Validate removal")
 831              if self.follow:
 832                  self.remove_ask_rows[self.side_focused][1](self.follow)
 833                  self.follow = None
 834              else:
 835                  logger.debug("No download was targeted, not removing")
 836              self.last_remove_choice = self.side_focused
 837              self.state = self.State.MAIN
 838              self.x_offset = 0
 839  
 840              # force complete refresh
 841              self.frame = 0
 842  
 843          elif event.key_code in Keys.MOVE_UP:
 844              if self.side_focused > 0:
 845                  self.side_focused -= 1
 846                  logger.debug(f"Moving side focus up: {self.side_focused}")
 847                  self.refresh = True
 848  
 849          elif event.key_code in Keys.MOVE_DOWN:
 850              if self.side_focused < len(self.remove_ask_rows) - 1:
 851                  self.side_focused += 1
 852                  logger.debug(f"Moving side focus down: {self.side_focused}")
 853                  self.refresh = True
 854  
 855      def process_keyboard_event_select_sort(self, event: KeyboardEvent) -> None:  # noqa: D102
 856          if event.key_code in Keys.CANCEL:
 857              self.state = self.State.MAIN
 858              self.x_offset = 0
 859              self.refresh = True
 860  
 861          elif event.key_code in Keys.ENTER:
 862              self.sort = self.side_focused
 863              self.state = self.State.MAIN
 864              self.x_offset = 0
 865              self.refresh = True
 866  
 867          elif event.key_code in Keys.MOVE_UP:
 868              if self.side_focused > 0:
 869                  self.side_focused -= 1
 870                  self.refresh = True
 871  
 872          elif event.key_code in Keys.MOVE_DOWN:
 873              if self.side_focused < len(self.select_sort_rows) - 1:
 874                  self.side_focused += 1
 875                  self.refresh = True
 876  
 877      def process_keyboard_event_add_downloads(self, event: KeyboardEvent) -> None:  # noqa: D102
 878          if event.key_code in Keys.CANCEL:
 879              self.state = self.State.MAIN
 880              self.x_offset = 0
 881              self.refresh = True
 882  
 883          elif event.key_code in Keys.MOVE_UP:
 884              if self.side_focused > 0:
 885                  self.side_focused -= 1
 886  
 887                  if self.side_focused < self.row_offset:
 888                      self.row_offset = self.side_focused
 889                  elif self.side_focused >= self.row_offset + (self.height - 1):
 890                      # happens when shrinking height
 891                      self.row_offset = self.side_focused + 1 - (self.height - 1)
 892                  self.follow = None
 893                  self.refresh = True
 894  
 895          elif event.key_code in Keys.MOVE_DOWN:
 896              if self.side_focused < len(self.downloads_uris) - 1:
 897                  self.side_focused += 1
 898                  if self.side_focused - self.row_offset >= (self.height - 1):
 899                      self.row_offset = self.side_focused + 1 - (self.height - 1)
 900                  self.follow = None
 901                  self.refresh = True
 902  
 903          elif event.key_code in Keys.ENTER:
 904              if self.api.add(self.downloads_uris[self.side_focused]):
 905                  self.downloads_uris.pop(self.side_focused)
 906                  if 0 < self.side_focused > len(self.downloads_uris) - 1:
 907                      self.side_focused -= 1
 908                  self.refresh = True
 909  
 910          elif event.key_code in Keys.ADD_DOWNLOADS:
 911              for uri in self.downloads_uris:
 912                  self.api.add(uri)
 913  
 914              self.downloads_uris.clear()
 915              self.refresh = True
 916  
 917      def process_mouse_event(self, event: MouseEvent) -> None:  # noqa: D102
 918          self.state_mapping[self.state]["process_mouse_event"](event)
 919  
 920      def process_mouse_event_main(self, event: MouseEvent) -> None:  # noqa: D102
 921          if event.buttons & MouseEvent.LEFT_CLICK:
 922              if event.y == 0:
 923                  new_sort = self.get_column_at_x(event.x)
 924                  if new_sort == self.sort:
 925                      self.reverse = not self.reverse
 926                  else:
 927                      self.sort = new_sort
 928              else:
 929                  self.focused = min(event.y - 1 + self.row_offset, len(self.rows) - 1)
 930              self.refresh = True
 931  
 932          # elif event.buttons & MouseEvent.RIGHT_CLICK:
 933          #     pass  # TODO: expand/collapse
 934  
 935      def process_mouse_event_help(self, event: MouseEvent) -> None:  # noqa: D102
 936          pass
 937  
 938      def process_mouse_event_setup(self, event: MouseEvent) -> None:  # noqa: D102
 939          pass
 940  
 941      def process_mouse_event_remove_ask(self, event: MouseEvent) -> None:  # noqa: D102
 942          pass
 943  
 944      def process_mouse_event_select_sort(self, event: MouseEvent) -> None:  # noqa: D102
 945          pass
 946  
 947      def process_mouse_event_add_downloads(self, event: MouseEvent) -> None:  # noqa: D102
 948          pass
 949  
 950      def width_remove_ask(self) -> int:  # noqa: D102
 951          return max(len(self.remove_ask_header), max(len(row[0]) for row in self.remove_ask_rows))  # noqa: PLW3301
 952  
 953      def width_select_sort(self) -> int:  # noqa: D102
 954          return max(len(column_name) for column_name in [*self.columns_order, self.select_sort_header])
 955  
 956      def follow_focused(self) -> bool:  # noqa: D102
 957          if self.focused < len(self.data):
 958              self.follow = self.data[self.focused]
 959              return True
 960          return False
 961  
 962      def print_add_downloads(self) -> None:  # noqa: D102
 963          y = self.y_offset
 964          padding = self.width
 965          header_string = f"{self.downloads_uris_header:<{padding}}"
 966          len_header = len(header_string)
 967          self.screen.print_at(header_string, 0, y, *self.palettes["side_column_header"])
 968          self.screen.print_at(" ", len_header, y, *self.palettes["default"])
 969          y += 1
 970          self.screen.print_at(" " * self.width, 0, y, *self.palettes["ui"])
 971          separator = "..."
 972  
 973          for i, uri in enumerate(self.downloads_uris):
 974              y += 1
 975              palette = (
 976                  self.palettes["side_column_focused_row"] if i == self.side_focused else self.palettes["side_column_row"]
 977              )
 978              if len(uri) > self.width:
 979                  # print part of uri string
 980                  uri = f"{uri[:(self.width//2)-len(separator)]} {separator} {uri[-(self.width//2)+len(separator):]}"  # noqa: PLW2901
 981  
 982              self.screen.print_at(uri, 0, y, *palette)
 983              self.screen.print_at(" ", len(uri), y, *self.palettes["default"])
 984  
 985          for i in range(1, self.height - y):
 986              self.screen.print_at(" " * (padding + 1), 0, y + i, *self.palettes["ui"])
 987  
 988      def print_help(self) -> None:  # noqa: D102
 989          version = get_version()
 990          lines = [
 991              f"aria2p {version} — (C) 2018-2020 Timothée Mazzucotelli and contributors",
 992              "Released under the ISC license.",
 993              "",
 994          ]
 995  
 996          y = 0
 997          for line in lines:
 998              self.screen.print_at(f"{line:<{self.width}}", 0, y, *self.palettes["bright_help"])
 999              y += 1
1000  
1001          for keys, text in [
1002              (Keys.HELP, " show this help screen"),
1003              (Keys.MOVE_UP, " scroll downloads list"),
1004              (Keys.MOVE_UP_STEP, " scroll downloads list (steps)"),
1005              (Keys.MOVE_DOWN, " scroll downloads list"),
1006              (Keys.MOVE_DOWN_STEP, " scroll downloads list (steps)"),
1007              # not implemented: (Keys.SETUP, " setup"),
1008              (Keys.TOGGLE_RESUME_PAUSE, " toggle pause/resume"),
1009              (Keys.PRIORITY_UP, " priority up (-)"),
1010              (Keys.PRIORITY_DOWN, " priority down (+)"),
1011              (Keys.REVERSE_SORT, " invert sort order"),
1012              (Keys.NEXT_SORT, " sort next column"),
1013              (Keys.PREVIOUS_SORT, " sort previous column"),
1014              (Keys.SELECT_SORT, " select sort column"),
1015              (Keys.REMOVE_ASK, " remove download"),
1016              # not implemented: (Keys.TOGGLE_EXPAND_COLLAPSE, " toggle expand/collapse"),
1017              # not implemented: (Keys.TOGGLE_EXPAND_COLLAPSE_ALL, " toggle expand/collapse all"),
1018              (Keys.AUTOCLEAR, " autopurge downloads"),
1019              (Keys.FOLLOW_ROW, " cursor follows download"),
1020              # not implemented: (Keys.SEARCH, " name search"),
1021              # not implemented: (Keys.FILTER, " name filtering"),
1022              # not implemented: (Keys.TOGGLE_SELECT, " toggle select download"),
1023              # not implemented: (Keys.UN_SELECT_ALL, " unselect all downloads"),
1024              (Keys.MOVE_HOME, " move focus to first download"),
1025              (Keys.MOVE_END, " move focus to last download"),
1026              (Keys.RETRY, " retry failed download"),
1027              (Keys.RETRY_ALL, " retry all failed download"),
1028              (Keys.ADD_DOWNLOADS, " add downloads from clipboard"),
1029              (Keys.QUIT, " quit"),
1030          ]:
1031              self.print_keys(keys, text, y)
1032              y += 1
1033  
1034          self.screen.print_at(" " * self.width, 0, y, *self.palettes["ui"])
1035          y += 1
1036          self.screen.print_at(f"{'Press any key to return.':<{self.width}}", 0, y, *self.palettes["bright_help"])
1037          y += 1
1038  
1039          for i in range(self.height - y):
1040              self.screen.print_at(" " * self.width, 0, y + i, *self.palettes["ui"])
1041  
1042      def print_keys(self, keys: list[Key], text: str, y: int) -> None:  # noqa: D102
1043          self.print_keys_text(" ".join(Keys.names(keys)) + ":", text, y)
1044  
1045      def print_keys_text(self, keys_text: str, text: str, y: int) -> None:  # noqa: D102
1046          length = 8
1047          padding = self.width - length
1048          self.screen.print_at(f"{keys_text:>{length}}", 0, y, *self.palettes["bright_help"])
1049          self.screen.print_at(f"{text:<{padding}}", length, y, *self.palettes["default"])
1050  
1051      def print_remove_ask_column(self) -> None:  # noqa: D102
1052          y = self.y_offset
1053          padding = self.width_remove_ask()
1054          header_string = f"{self.remove_ask_header:<{padding}}"
1055          len_header = len(header_string)
1056          self.screen.print_at(header_string, 0, y, *self.palettes["side_column_header"])
1057          self.screen.print_at(" ", len_header, y, *self.palettes["default"])
1058          for i, row in enumerate(self.remove_ask_rows):
1059              y += 1
1060              palette = (
1061                  self.palettes["side_column_focused_row"] if i == self.side_focused else self.palettes["side_column_row"]
1062              )
1063              row_string = f"{row[0]:<{padding}}"
1064              len_row = len(row_string)
1065              self.screen.print_at(row_string, 0, y, *palette)
1066              self.screen.print_at(" ", len_row, y, *self.palettes["default"])
1067  
1068          for i in range(1, self.height - y):
1069              self.screen.print_at(" " * (padding + 1), 0, y + i, *self.palettes["ui"])
1070  
1071      def print_select_sort_column(self) -> None:  # noqa: D102
1072          y = self.y_offset
1073          padding = self.width_select_sort()
1074          header_string = f"{self.select_sort_header:<{padding}}"
1075          len_header = len(header_string)
1076          self.screen.print_at(header_string, 0, y, *self.palettes["side_column_header"])
1077          self.screen.print_at(" ", len_header, y, *self.palettes["default"])
1078          for i, row in enumerate(self.select_sort_rows):
1079              y += 1
1080              palette = (
1081                  self.palettes["side_column_focused_row"] if i == self.side_focused else self.palettes["side_column_row"]
1082              )
1083              row_string = f"{row:<{padding}}"
1084              len_row = len(row_string)
1085              self.screen.print_at(row_string, 0, y, *palette)
1086              self.screen.print_at(" ", len_row, y, *self.palettes["default"])
1087  
1088          for i in range(1, self.height - y):
1089              self.screen.print_at(" " * (padding + 1), 0, y + i, *self.palettes["ui"])
1090  
1091      def print_table(self) -> None:  # noqa: D102
1092          self.print_headers()
1093          self.print_rows()
1094  
1095      def print_headers(self) -> None:
1096          """Print the headers (columns names)."""
1097          self.scroller.set_scroll(self.x_scroll)
1098          x, y, c = self.x_offset, self.y_offset, 0
1099  
1100          for column_name in self.columns_order:
1101              column = self.columns[column_name]
1102              palette = self.palettes["focused_header"] if c == self.sort else self.palettes["header"]
1103  
1104              if column.padding == "100%":
1105                  header_string = f"{column.header}"
1106                  fill_up = " " * max(0, self.width - x - len(header_string))
1107                  written = self.scroller.print_at(header_string, x, y, palette)
1108                  self.scroller.print_at(fill_up, x + written, y, self.palettes["header"])
1109  
1110              else:
1111                  header_string = f"{column.header:{column.padding}} "
1112                  written = self.scroller.print_at(header_string, x, y, palette)
1113  
1114              x += written
1115              c += 1  # noqa: SIM113
1116  
1117      def print_rows(self) -> None:
1118          """Print the rows."""
1119          y = self.y_offset + 1
1120          for row in self.rows[self.row_offset : self.row_offset + self.height]:
1121              self.scroller.set_scroll(self.x_scroll)
1122              x = self.x_offset
1123  
1124              for i, column_name in enumerate(self.columns_order):
1125                  column = self.columns[column_name]
1126                  padding = f"<{max(0, self.width - x)}" if column.padding == "100%" else column.padding
1127  
1128                  if self.focused == y - self.y_offset - 1 + self.row_offset:
1129                      palette = self.palettes["focused_row"]
1130                  else:
1131                      palette = column.get_palette(row[i])
1132                      if isinstance(palette, str):
1133                          palette = self.palettes[palette]
1134  
1135                  field_string = f"{row[i]:{padding}} "
1136                  written = self.scroller.print_at(field_string, x, y, palette)
1137                  x += written
1138  
1139              y += 1
1140  
1141          for i in range(self.height - y):
1142              self.screen.print_at(" " * self.width, self.x_offset, y + i, *self.palettes["ui"])
1143  
1144      def get_column_at_x(self, x: int) -> int:
1145          """For an horizontal position X, return the column index."""
1146          for i, bound in enumerate(self.bounds):
1147              if bound[0] <= x <= bound[1]:
1148                  return i
1149          raise ValueError("clicked outside of boundaries")
1150  
1151      def set_screen(self, screen: Screen) -> None:
1152          """Set the screen object, its scroller wrapper, width, height, and columns bounds."""
1153          self.screen = screen
1154          self.height, self.width = screen.dimensions
1155          self.scroller = HorizontalScroll(screen)
1156          self.bounds = []
1157          for column_name in self.columns_order:
1158              column = self.columns[column_name]
1159              if column.padding == "100%":  # last column
1160                  self.bounds.append((self.bounds[-1][1] + 1, self.width))
1161              else:
1162                  padding = int(column.padding.lstrip("<>=^"))
1163                  if not self.bounds:
1164                      self.bounds = [(0, padding)]
1165                  else:
1166                      self.bounds.append((self.bounds[-1][1] + 1, self.bounds[-1][1] + 1 + padding))
1167  
1168      def get_data(self) -> list[Download]:
1169          """Return a list of objects."""
1170          return self.api.get_downloads()
1171  
1172      def update_data(self) -> None:
1173          """Set the interface data and rows contents."""
1174          try:
1175              self.data = self.get_data()
1176              self.sort_data()
1177          except requests.exceptions.Timeout:
1178              logger.debug("Request timeout")
1179  
1180      def sort_data(self) -> None:
1181          """Sort data according to interface state."""
1182          sort_function = self.columns[self.columns_order[self.sort]].get_sort
1183          self.data = sorted(self.data, key=sort_function, reverse=self.reverse)
1184  
1185      def update_rows(self) -> None:
1186          """Update rows contents according to data and interface state."""
1187          text_getters = [self.columns[c].get_text for c in self.columns_order]
1188          n_columns = len(self.columns_order)
1189          self.rows = [tuple(text_getters[i](item) for i in range(n_columns)) for item in self.data]
1190          if self.follow:
1191              self.focused = self.data.index(self.follow)