test_interface.py
1 """Tests for the `interface` module.""" 2 3 from __future__ import annotations 4 5 import os 6 import sys 7 import time 8 from pathlib import Path 9 from typing import TYPE_CHECKING, Any 10 11 import pyperclip 12 import pytest 13 from asciimatics.event import KeyboardEvent, MouseEvent 14 from asciimatics.screen import Screen 15 16 from aria2p import interface as tui 17 from tests import TESTS_DATA_DIR 18 from tests.conftest import Aria2Server 19 20 if TYPE_CHECKING: 21 from aria2p.api import API 22 23 tui.Interface.frames = 20 # reduce tests time 24 25 26 class SpecialEvent: 27 """Special events for special actions.""" 28 29 RESIZE = 1 30 PASS_N_FRAMES = 2 31 PASS_N_TICKS = 3 32 RAISE = 4 33 34 def __init__(self, event_type: int, value: Any = None) -> None: 35 """Initialize the special event. 36 37 Parameters: 38 event_type: The event type (enumeration value). 39 value: The event value (could be anything). 40 """ 41 self.type = event_type 42 self.value = value 43 44 45 class Event: 46 resize = SpecialEvent(SpecialEvent.RESIZE) 47 pass_frame = SpecialEvent(SpecialEvent.PASS_N_FRAMES, 1) 48 pass_tick = SpecialEvent(SpecialEvent.PASS_N_TICKS, 1) 49 pass_half_tick = SpecialEvent(SpecialEvent.PASS_N_FRAMES, tui.Interface.frames / 2) 50 pass_tick_and_a_half = SpecialEvent(SpecialEvent.PASS_N_FRAMES, tui.Interface.frames * 3 / 2) 51 up = KeyboardEvent(Screen.KEY_UP) 52 down = KeyboardEvent(Screen.KEY_DOWN) 53 left = KeyboardEvent(Screen.KEY_LEFT) 54 right = KeyboardEvent(Screen.KEY_RIGHT) 55 delete = KeyboardEvent(Screen.KEY_DELETE) 56 esc = KeyboardEvent(Screen.KEY_ESCAPE) 57 f1 = KeyboardEvent(Screen.KEY_F1) 58 f2 = KeyboardEvent(Screen.KEY_F2) 59 f3 = KeyboardEvent(Screen.KEY_F3) 60 f4 = KeyboardEvent(Screen.KEY_F4) 61 f5 = KeyboardEvent(Screen.KEY_F5) 62 f6 = KeyboardEvent(Screen.KEY_F6) 63 f7 = KeyboardEvent(Screen.KEY_F7) 64 f8 = KeyboardEvent(Screen.KEY_F8) 65 f9 = KeyboardEvent(Screen.KEY_F9) 66 f10 = KeyboardEvent(Screen.KEY_F10) 67 f11 = KeyboardEvent(Screen.KEY_F11) 68 f12 = KeyboardEvent(Screen.KEY_F12) 69 enter = KeyboardEvent(ord("\n")) 70 space = KeyboardEvent(ord(" ")) 71 72 @staticmethod 73 def hit(letter: str) -> KeyboardEvent: 74 return KeyboardEvent(ord(letter)) 75 76 @staticmethod 77 def exc(value: Exception | type[Exception]) -> SpecialEvent: 78 return SpecialEvent(SpecialEvent.RAISE, value) 79 80 @staticmethod 81 def pass_frames(value: int) -> SpecialEvent: 82 return SpecialEvent(SpecialEvent.PASS_N_FRAMES, value) 83 84 @staticmethod 85 def pass_ticks(value: int) -> SpecialEvent: 86 return SpecialEvent(SpecialEvent.PASS_N_TICKS, value) 87 88 89 def get_interface( 90 patcher: pytest.MonkeyPatch, 91 api: API | None = None, 92 events: list[KeyboardEvent | MouseEvent | SpecialEvent] | None = None, 93 *, 94 append_q: bool = True, 95 ) -> tui.Interface: 96 if not events: 97 events = [] 98 99 if append_q: 100 events.append(KeyboardEvent(ord("q"))) 101 102 class MockedManagedScreen: 103 def __enter__(self): 104 return MockedScreen(events) 105 106 def __exit__(self, exc_type, exc_val, exc_tb): # noqa: ANN001 107 pass 108 109 patcher.setattr(tui, "ManagedScreen", MockedManagedScreen) 110 return tui.Interface(api=api) 111 112 113 def run_interface( 114 patcher: pytest.MonkeyPatch, 115 api: API | None = None, 116 events: list[KeyboardEvent | MouseEvent | SpecialEvent] | None = None, 117 *, 118 append_q: bool = True, 119 **kwargs: Any, 120 ) -> tui.Interface: 121 interface = get_interface(patcher, api, events, append_q=append_q) 122 for key, value in kwargs.items(): 123 setattr(interface, key, value) 124 interface.run() 125 return interface 126 127 128 class MockedScreen: 129 def __init__(self, events: list[Event]) -> None: 130 """Initialize the mocked screen. 131 132 Parameters: 133 events: Pre-recorded events. 134 """ 135 self.events = events 136 self._has_resized = False 137 self._pass_n_frames = 0 138 self.print_at_calls: list[dict[str, Any]] = [] 139 self.paint_calls: list[dict[str, Any]] = [] 140 self.n_refresh = 0 141 142 @property 143 def dimensions(self) -> tuple[int, int]: 144 return 30, 80 145 146 def has_resized(self) -> bool: 147 return self._has_resized 148 149 def open(self) -> None: 150 pass 151 152 def close(self) -> None: 153 pass 154 155 def get_event(self) -> KeyboardEvent | MouseEvent | None: 156 if self._pass_n_frames > 0: 157 self._pass_n_frames -= 1 158 return None 159 event = self.events.pop(0) 160 if isinstance(event, (KeyboardEvent, MouseEvent)): 161 return event 162 if isinstance(event, SpecialEvent): 163 if event.type == SpecialEvent.RESIZE: 164 self._has_resized = True 165 elif event.type == SpecialEvent.PASS_N_FRAMES: 166 # we remove 1 because this event itself eats a frame 167 self._pass_n_frames = event.value - 1 168 elif event.type == SpecialEvent.PASS_N_TICKS: 169 # we remove 1 because this event itself eats a frame 170 self._pass_n_frames = (event.value * tui.Interface.frames) - 1 171 elif event.type == SpecialEvent.RAISE: 172 raise event.value 173 return None 174 175 def print_at(self, *args: Any, **kwargs: Any) -> None: 176 if args[0].strip(): 177 self.print_at_calls.append({"args": args, "kwargs": kwargs}) 178 179 def paint(self, *args: Any, **kwargs: Any) -> None: 180 if args[0].strip(): 181 self.paint_calls.append({"args": args, "kwargs": kwargs}) 182 183 def refresh(self) -> None: 184 self.n_refresh += 1 185 186 187 def test_run(monkeypatch: pytest.MonkeyPatch) -> None: 188 interface = run_interface(monkeypatch) 189 assert interface.screen 190 assert interface.height == 30 191 assert interface.width == 80 192 193 194 def test_resize(server: Aria2Server, monkeypatch: pytest.MonkeyPatch) -> None: 195 interface = run_interface(monkeypatch, server.api, events=[Event.resize]) 196 # assert screen was renewed 197 assert not interface.screen.has_resized() 198 199 200 def test_frames_plus_n(server: Aria2Server, monkeypatch: pytest.MonkeyPatch) -> None: 201 n = 10 202 interface = run_interface(monkeypatch, server.api, events=[Event.pass_frames(tui.Interface.frames + n)]) 203 assert interface.frame == n 204 205 206 def test_change_sort(server: Aria2Server, monkeypatch: pytest.MonkeyPatch) -> None: 207 interface = run_interface( 208 monkeypatch, 209 server.api, 210 events=[ 211 Event.hit(">"), 212 Event.pass_half_tick, 213 Event.hit("<"), 214 Event.pass_half_tick, 215 Event.hit("<"), 216 Event.pass_tick, 217 ], 218 ) 219 assert interface.sort == tui.Interface.sort - 1 220 221 222 def test_move_focus(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 223 with Aria2Server(tmp_path, port, session="2-dls-paused.txt") as server: 224 interface = run_interface( 225 monkeypatch, 226 server.api, 227 events=[Event.up, Event.pass_tick] + [Event.down] * 30 + [Event.pass_tick] + [Event.up] * 5, 228 ) 229 assert interface.focused == 0 230 231 232 def test_show_help(server: Aria2Server, monkeypatch: pytest.MonkeyPatch) -> None: 233 interface = run_interface(monkeypatch, server.api, events=[Event.f1, Event.pass_tick, Event.enter]) 234 assert interface.screen.print_at_calls[-1]["args"][0].startswith("Press any key to return.") 235 236 237 def test_horizontal_scrolling(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 238 with Aria2Server(tmp_path, port, session="3-magnets.txt") as server: 239 run_interface( 240 monkeypatch, 241 server.api, 242 events=[Event.left] 243 + [Event.pass_tick] 244 + [Event.right] * 2 245 + [Event.pass_tick_and_a_half] 246 + [Event.right] * 20 247 + [Event.pass_tick] 248 + [Event.left] * 5, 249 ) 250 251 252 def test_log_exception(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 253 with Aria2Server(tmp_path, port, session="2-dls-paused.txt") as server: 254 interface = get_interface(monkeypatch, server.api, events=[Event.exc(LookupError("some message"))]) 255 assert not interface.run() 256 with open(Path("tests") / "logs" / "test_interface" / "test_log_exception.log") as log_file: 257 lines = log_file.readlines() 258 first_line = "" 259 for line in lines: 260 if "ERROR" in line: 261 first_line = line 262 break 263 last_line = lines[-1] 264 assert "some message" in first_line 265 assert "LookupError" in last_line 266 267 268 def test_select_sort(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 269 with Aria2Server(tmp_path, port, session="3-magnets.txt") as server: 270 run_interface( 271 monkeypatch, 272 server.api, 273 events=[ 274 Event.f6, 275 Event.pass_tick_and_a_half, 276 Event.down, 277 Event.pass_tick, 278 Event.down, 279 Event.pass_tick, 280 Event.enter, 281 Event.pass_tick, 282 ], 283 ) 284 285 286 def test_mouse_event(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 287 reverse = tui.Interface.reverse 288 with Aria2Server(tmp_path, port, session="3-magnets.txt") as server: 289 interface = run_interface( 290 monkeypatch, 291 server.api, 292 events=[MouseEvent(x=tui.Interface.x_offset, y=tui.Interface.y_offset, buttons=MouseEvent.LEFT_CLICK)] * 2, 293 ) 294 assert interface.sort == 0 295 assert interface.reverse is not reverse 296 297 298 def test_vertical_scrolling(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 299 with Aria2Server(tmp_path, port, session="50-dls.txt") as server: 300 run_interface(monkeypatch, server.api, events=[Event.pass_frame] + [Event.down] * 40 + [Event.up] * 30) 301 302 303 def test_follow_focused(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 304 with Aria2Server(tmp_path, port, session="3-magnets.txt") as server: 305 interface = run_interface( 306 monkeypatch, 307 server.api, 308 events=[Event.hit("<"), Event.hit("<"), Event.pass_tick, Event.hit("F"), Event.hit("I"), Event.pass_tick], 309 ) 310 assert interface.sort == 0 311 assert interface.focused == 2 312 313 314 def test_remove_ask(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 315 with Aria2Server(tmp_path, port, session="very-small-download.txt") as server: 316 download = server.api.get_downloads()[0] 317 while not download.live.is_complete: 318 if download.has_failed: 319 pytest.xfail("Failed to establish connection (sporadic error)") 320 time.sleep(0.1) 321 assert download.root_files_paths[0].exists() 322 interface = run_interface( 323 monkeypatch, 324 server.api, 325 events=[Event.pass_frame, Event.delete, Event.pass_tick, Event.down, Event.enter, Event.pass_tick], 326 ) 327 assert not download.root_files_paths[0].exists() 328 assert interface.last_remove_choice == 1 329 330 331 def test_setup(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 332 with Aria2Server(tmp_path, port, session="2-dls-paused.txt") as server: 333 run_interface(monkeypatch, server.api, events=[Event.pass_frame, Event.f2, Event.pass_tick]) 334 335 336 def test_toggle_resume_pause(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 337 with Aria2Server(tmp_path, port, session="big-download.txt") as server: 338 interface = run_interface( 339 monkeypatch, 340 server.api, 341 events=[ 342 Event.pass_frame, 343 Event.space, 344 Event.pass_tick, 345 Event.pass_tick, 346 Event.space, 347 Event.pass_tick, 348 Event.pass_tick, 349 Event.space, 350 ], 351 ) 352 353 time.sleep(1) 354 interface.data[0].update() 355 if interface.data[0].has_failed: 356 pytest.xfail("Failed to establish connection (sporadic error)") 357 assert interface.data[0].is_paused 358 359 360 def test_priority(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 361 with Aria2Server(tmp_path, port, session="2-dls-paused.txt") as server: 362 run_interface( 363 monkeypatch, 364 server.api, 365 events=[ 366 Event.pass_frame, 367 Event.hit("["), 368 Event.pass_tick, 369 Event.hit("]"), 370 Event.pass_tick, 371 Event.hit("]"), 372 Event.pass_tick, 373 ], 374 sort=7, 375 ) 376 377 378 def test_sort_edges(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 379 with Aria2Server(tmp_path, port, session="2-dls-paused.txt") as server: 380 run_interface(monkeypatch, server.api, events=[Event.hit("<")], sort=0) 381 run_interface(monkeypatch, server.api, events=[Event.hit(">")], sort=7) 382 383 384 def test_remember_last_remove(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 385 with Aria2Server(tmp_path, port, session="2-dls-paused.txt") as server: 386 interface = run_interface( 387 monkeypatch, 388 server.api, 389 events=[Event.pass_frame, Event.delete, Event.down, Event.enter, Event.delete, Event.esc], 390 ) 391 assert interface.last_remove_choice == 1 392 assert interface.side_focused == 1 393 394 395 def test_autoclear(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 396 with Aria2Server(tmp_path, port, session="very-small-download.txt") as server: 397 download = server.api.get_downloads()[0] 398 while not download.live.is_complete: 399 if download.has_failed: 400 pytest.xfail("Failed to establish connection (sporadic error)") 401 time.sleep(0.1) 402 interface = run_interface(monkeypatch, server.api, events=[Event.pass_frame, Event.hit("c"), Event.pass_tick]) 403 assert not interface.data 404 405 406 def test_side_column_edges(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 407 with Aria2Server(tmp_path, port, session="2-dls-paused.txt") as server: 408 run_interface( 409 monkeypatch, 410 server.api, 411 events=[ 412 Event.pass_frame, 413 Event.delete, 414 Event.down, 415 Event.up, 416 Event.up, 417 Event.down, 418 Event.down, 419 Event.down, 420 Event.down, 421 Event.esc, 422 ], 423 ) 424 run_interface( 425 monkeypatch, 426 server.api, 427 events=[Event.pass_frame, Event.f6] 428 + [Event.up] * len(tui.Interface.columns_order) 429 + [Event.down] * len(tui.Interface.columns_order) 430 + [Event.esc], 431 ) 432 433 434 def test_click_row(tmp_path: Path, port: int, monkeypatch: pytest.MonkeyPatch) -> None: 435 with Aria2Server(tmp_path, port, session="2-dls-paused.txt") as server: 436 interface = run_interface( 437 monkeypatch, 438 server.api, 439 events=[Event.pass_frame, MouseEvent(x=10, y=2, buttons=MouseEvent.LEFT_CLICK)], 440 ) 441 assert interface.focused == 1 442 443 444 def test_click_out_bounds(server: Aria2Server, monkeypatch: pytest.MonkeyPatch) -> None: 445 run_interface( 446 monkeypatch, 447 server.api, 448 events=[Event.pass_frame, MouseEvent(x=1000, y=0, buttons=MouseEvent.LEFT_CLICK)], 449 ) 450 with open(Path("tests") / "logs" / "test_interface" / "test_click_out_bounds.log") as log_file: 451 lines = log_file.readlines() 452 error_line = None 453 for line in lines: 454 if "ERROR" in line: 455 error_line = line 456 break 457 assert error_line 458 assert "clicked outside of boundaries" in error_line 459 460 461 @pytest.mark.skipif( 462 sys.platform != "win32" and "CI" in os.environ, 463 reason="Can't get pyperclip to work on Linux or MacOS in CI", 464 ) 465 def test_add_downloads_uris(server: Aria2Server, monkeypatch: pytest.MonkeyPatch) -> None: 466 clipboard_selection_downloads = "" 467 primary_selection_downloads = "" 468 469 uri1 = "http://localhost:8779/1" 470 magnet1 = "magnet:?xt=urn:btih:RX46NCATYQRS3MCQNSEXVZGCCDNKTASQ" 471 clipboard_selection_downloads += "\n".join([uri1, magnet1]) 472 473 uri2 = "http://localhost:8779/2" 474 magnet2 = "magnet:?xt=urn:btih:VLYICEBJDQQ64SUGREZHD4IAD2FVCJCS" 475 primary_selection_downloads += "\n".join([uri2, magnet2]) 476 477 # clipboard selection 478 pyperclip.copy(clipboard_selection_downloads) 479 480 # primary selection 481 pyperclip.copy(primary_selection_downloads, primary=True) 482 483 interface = run_interface( 484 monkeypatch, 485 server.api, 486 events=[ 487 Event.pass_frame, 488 Event.hit("a"), 489 Event.esc, 490 Event.hit("a"), 491 Event.down, 492 Event.enter, 493 Event.enter, 494 Event.esc, 495 Event.pass_tick_and_a_half, 496 ], 497 ) 498 # clear clipboards 499 pyperclip.copy("") 500 pyperclip.copy("", primary=True) 501 assert len(interface.data) == 2 502 503 504 @pytest.mark.skipif( 505 sys.platform != "win32" and "CI" in os.environ, 506 reason="Can't get pyperclip to work on Linux or MacOS in CI", 507 ) 508 def test_add_downloads_torrents_and_metalinks(server: Aria2Server, monkeypatch: pytest.MonkeyPatch) -> None: 509 torrent_file = TESTS_DATA_DIR / "bunsenlabs-helium-4.iso.torrent" 510 metalink_file = TESTS_DATA_DIR / "debian.metalink" 511 512 clipboard_selection_download = f"{torrent_file.absolute()}" 513 primary_selection_download = f"{metalink_file.absolute()}" 514 515 # clipboard selection 516 pyperclip.copy(clipboard_selection_download) 517 518 # primary selection 519 pyperclip.copy(primary_selection_download, primary=True) 520 521 interface = run_interface( 522 monkeypatch, 523 server.api, 524 events=[ 525 Event.pass_frame, 526 Event.hit("a"), 527 Event.esc, 528 Event.pass_tick_and_a_half, 529 Event.hit("a"), 530 Event.down, 531 Event.up, 532 Event.enter, 533 Event.enter, 534 Event.pass_tick_and_a_half, 535 ], 536 ) 537 pyperclip.copy("") 538 pyperclip.copy("", primary=True) 539 if len(interface.data) != 2: 540 pytest.xfail("Empty data (sporadic error)") 541 assert len(interface.data) == 2