/ tests / tools / test_threaded_process_handle.py
test_threaded_process_handle.py
  1  """Tests for _ThreadedProcessHandle β€” the adapter for SDK backends."""
  2  
  3  import threading
  4  import time
  5  
  6  from tools.environments.base import _ThreadedProcessHandle
  7  
  8  
  9  class TestBasicExecution:
 10      def test_successful_execution(self):
 11          def exec_fn():
 12              return ("hello world", 0)
 13  
 14          handle = _ThreadedProcessHandle(exec_fn)
 15          handle.wait(timeout=5)
 16  
 17          assert handle.returncode == 0
 18          output = handle.stdout.read()
 19          assert "hello world" in output
 20  
 21      def test_nonzero_exit_code(self):
 22          def exec_fn():
 23              return ("error occurred", 42)
 24  
 25          handle = _ThreadedProcessHandle(exec_fn)
 26          handle.wait(timeout=5)
 27  
 28          assert handle.returncode == 42
 29          output = handle.stdout.read()
 30          assert "error occurred" in output
 31  
 32      def test_exception_in_exec_fn(self):
 33          def exec_fn():
 34              raise RuntimeError("boom")
 35  
 36          handle = _ThreadedProcessHandle(exec_fn)
 37          handle.wait(timeout=5)
 38  
 39          assert handle.returncode == 1
 40  
 41      def test_empty_output(self):
 42          def exec_fn():
 43              return ("", 0)
 44  
 45          handle = _ThreadedProcessHandle(exec_fn)
 46          handle.wait(timeout=5)
 47  
 48          assert handle.returncode == 0
 49          output = handle.stdout.read()
 50          assert output == ""
 51  
 52  
 53  class TestPolling:
 54      def test_poll_returns_none_while_running(self):
 55          event = threading.Event()
 56  
 57          def exec_fn():
 58              event.wait(timeout=5)
 59              return ("done", 0)
 60  
 61          handle = _ThreadedProcessHandle(exec_fn)
 62          assert handle.poll() is None
 63  
 64          event.set()
 65          handle.wait(timeout=5)
 66          assert handle.poll() == 0
 67  
 68      def test_poll_returns_returncode_when_done(self):
 69          def exec_fn():
 70              return ("ok", 0)
 71  
 72          handle = _ThreadedProcessHandle(exec_fn)
 73          handle.wait(timeout=5)
 74          assert handle.poll() == 0
 75  
 76  
 77  class TestCancelFn:
 78      def test_cancel_fn_called_on_kill(self):
 79          called = threading.Event()
 80  
 81          def cancel():
 82              called.set()
 83  
 84          def exec_fn():
 85              time.sleep(10)
 86              return ("", 0)
 87  
 88          handle = _ThreadedProcessHandle(exec_fn, cancel_fn=cancel)
 89          handle.kill()
 90          assert called.is_set()
 91  
 92      def test_cancel_fn_none_is_safe(self):
 93          def exec_fn():
 94              return ("ok", 0)
 95  
 96          handle = _ThreadedProcessHandle(exec_fn, cancel_fn=None)
 97          handle.kill()  # should not raise
 98          handle.wait(timeout=5)
 99          assert handle.returncode == 0
100  
101      def test_cancel_fn_exception_swallowed(self):
102          def cancel():
103              raise RuntimeError("cancel failed")
104  
105          def exec_fn():
106              return ("ok", 0)
107  
108          handle = _ThreadedProcessHandle(exec_fn, cancel_fn=cancel)
109          handle.kill()  # should not raise despite cancel raising
110          handle.wait(timeout=5)
111  
112  
113  class TestStdoutPipe:
114      def test_stdout_is_readable(self):
115          def exec_fn():
116              return ("line1\nline2\nline3\n", 0)
117  
118          handle = _ThreadedProcessHandle(exec_fn)
119          handle.wait(timeout=5)
120  
121          lines = handle.stdout.readlines()
122          assert len(lines) == 3
123          assert lines[0] == "line1\n"
124  
125      def test_stdout_iterable(self):
126          def exec_fn():
127              return ("a\nb\nc\n", 0)
128  
129          handle = _ThreadedProcessHandle(exec_fn)
130          handle.wait(timeout=5)
131  
132          collected = list(handle.stdout)
133          assert len(collected) == 3
134  
135      def test_unicode_output(self):
136          def exec_fn():
137              return ("hello δΈ–η•Œ 🌍\n", 0)
138  
139          handle = _ThreadedProcessHandle(exec_fn)
140          handle.wait(timeout=5)
141  
142          output = handle.stdout.read()
143          assert "δΈ–η•Œ" in output
144          assert "🌍" in output