/ scripts / live.py
live.py
  1  #!/usr/bin/env python
  2  
  3  
  4  import os
  5  import subprocess
  6  import sys
  7  import time
  8  from multiprocessing import Process
  9  from pathlib import Path
 10  from threading import Thread
 11  from typing import Any, cast
 12  
 13  import bs4
 14  import pyperclip
 15  import requests
 16  from watchdog.events import FileSystemEvent, FileSystemEventHandler
 17  from watchdog.observers import Observer
 18  
 19  root_dir = Path(os.getcwd())
 20  while not (root_dir / "flake.nix").is_file():
 21      root_dir = root_dir.parent
 22  os.chdir(root_dir / "live")
 23  
 24  if len(sys.argv) != 3:
 25      print(f"usage: {sys.argv[0]} YEAR DAY")
 26      exit(1)
 27  
 28  year, day = map(int, sys.argv[1:3])
 29  
 30  
 31  def run_solution(input: Path) -> str | None:
 32      if not input.exists():
 33          print(f"\033[1m\033[31mInput file {input} does not exist\033[0m")
 34          return None
 35      py = os.environ.get("AOC_PYTHON", "python")
 36      inp = input.read_bytes()
 37      print(f"cmd: {py} live.py | input: {input.resolve().relative_to(root_dir)}")
 38      start = time.time()
 39      out = subprocess.run([py, "live.py"], input=inp, capture_output=True)
 40      delta = time.time() - start
 41      print(f"\033[3{'12'[out.returncode == 0]}mexit code: {out.returncode} | delta: {delta:.2f}s\033[0m")
 42      print("--- stderr ---")
 43      sys.stdout.buffer.write(out.stderr)
 44      print("--- stdout ---")
 45      sys.stdout.buffer.write(out.stdout)
 46  
 47      if out.returncode != 0:
 48          return None
 49  
 50      try:
 51          return out.stdout.decode().strip().splitlines()[-1].strip()
 52      except:
 53          return None
 54  
 55  
 56  def trigger():
 57  
 58      print(end="\033[H\033[2J\033[0m")
 59      ex_dir = Path(f"../examples/{year}/{day}")
 60      for ex in sorted(
 61          (x for x in (ex_dir.iterdir() if ex_dir.is_dir() else []) if x.name.isnumeric()), key=lambda f: int(f.name)
 62      ):
 63          n = int(ex.name)
 64          print(f"\033[1m\033[34m----- Example {n} -----\033[0m")
 65          run_solution(ex)
 66          print()
 67  
 68      print("\033[1m\033[34m----- Puzzle Input -----\033[0m")
 69      ans = run_solution(Path(f"../.cache/{year}/{day}"))
 70      if ans is not None:
 71          print(f"\n\033[1m\033[32mAnswer: {ans}\033[0m")
 72          pyperclip.copy(ans)
 73          if (part := input(f"Submit? level=")) in ["1", "2"]:
 74              print(f"(submitting answer for part {part})")
 75              session = (root_dir / ".cache/session").read_text().strip()
 76              resp = requests.post(
 77                  f"https://adventofcode.com/{year}/day/{day}/answer",
 78                  cookies={"session": session},
 79                  data={"level": part, "answer": ans},
 80              ).text
 81              bs = bs4.BeautifulSoup(resp, "html.parser")
 82              resp = cast(Any, bs).main.article.p.text
 83              ok = resp.startswith("That's the right answer!")
 84              print(f"\033[1m\033[3{'12'[ok]}m{resp}\033[0m")
 85      else:
 86          print("\033[1m\033[31m(failed to find answer in program output)\033[0m")
 87      print("(waiting for changes to live.py)")
 88  
 89  
 90  proc: Process | None = None
 91  
 92  
 93  def spawn_trigger_process():
 94      global proc
 95  
 96      if proc is not None and proc.is_alive():
 97          print("(process killed)")
 98          proc.kill()
 99  
100      def trigger_wrapper():
101          sys.stdin = open(0)
102          while True:
103              trigger()
104              input()
105  
106      proc = Process(target=trigger_wrapper)
107      proc.start()
108  
109  
110  class Handler(FileSystemEventHandler):
111      def __init__(self):
112          super().__init__()
113          self.cnt = 0
114  
115      def on_modified(self, event: FileSystemEvent) -> None:
116          if event.src_path != "./live.py":
117              return
118  
119          self.cnt += 1
120          cnt = self.cnt
121  
122          def inner():
123              time.sleep(0.1)
124              if self.cnt == cnt:
125                  spawn_trigger_process()
126  
127          t = Thread(target=inner)
128          t.start()
129  
130  
131  spawn_trigger_process()
132  
133  handler = Handler()
134  observer = Observer()
135  observer.schedule(handler, ".", recursive=True)
136  observer.start()
137  
138  while True:
139      time.sleep(1)