/ 9_Firmware / 9_3_GUI / test_radar_dashboard.py
test_radar_dashboard.py
  1  #!/usr/bin/env python3
  2  """
  3  Tests for AERIS-10 Radar Dashboard protocol parsing, command building,
  4  data recording, and acquisition logic.
  5  
  6  Run: python -m pytest test_radar_dashboard.py -v
  7    or: python test_radar_dashboard.py
  8  """
  9  
 10  import struct
 11  import time
 12  import queue
 13  import os
 14  import tempfile
 15  import unittest
 16  import numpy as np
 17  
 18  from radar_protocol import (
 19      RadarProtocol, FT2232HConnection, DataRecorder, RadarAcquisition,
 20      RadarFrame, StatusResponse, Opcode,
 21      HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE,
 22      NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS,
 23      DATA_PACKET_SIZE,
 24      _HARDWARE_ONLY_OPCODES, _REPLAY_ADJUSTABLE_OPCODES,
 25  )
 26  
 27  
 28  class TestRadarProtocol(unittest.TestCase):
 29      """Test packet parsing and command building against usb_data_interface.v."""
 30  
 31      # ----------------------------------------------------------------
 32      # Command building
 33      # ----------------------------------------------------------------
 34      def test_build_command_trigger(self):
 35          """Opcode 0x01, value 1 → {0x01, 0x00, 0x0001}."""
 36          cmd = RadarProtocol.build_command(0x01, 1)
 37          self.assertEqual(len(cmd), 4)
 38          word = struct.unpack(">I", cmd)[0]
 39          self.assertEqual((word >> 24) & 0xFF, 0x01)  # opcode
 40          self.assertEqual((word >> 16) & 0xFF, 0x00)  # addr
 41          self.assertEqual(word & 0xFFFF, 1)            # value
 42  
 43      def test_build_command_cfar_alpha(self):
 44          """Opcode 0x23, value 0x30 (alpha=3.0 Q4.4)."""
 45          cmd = RadarProtocol.build_command(0x23, 0x30)
 46          word = struct.unpack(">I", cmd)[0]
 47          self.assertEqual((word >> 24) & 0xFF, 0x23)
 48          self.assertEqual(word & 0xFFFF, 0x30)
 49  
 50      def test_build_command_status_request(self):
 51          """Opcode 0xFF, value 0."""
 52          cmd = RadarProtocol.build_command(0xFF, 0)
 53          word = struct.unpack(">I", cmd)[0]
 54          self.assertEqual((word >> 24) & 0xFF, 0xFF)
 55          self.assertEqual(word & 0xFFFF, 0)
 56  
 57      def test_build_command_with_addr(self):
 58          """Command with non-zero addr field."""
 59          cmd = RadarProtocol.build_command(0x10, 500, addr=0x42)
 60          word = struct.unpack(">I", cmd)[0]
 61          self.assertEqual((word >> 24) & 0xFF, 0x10)
 62          self.assertEqual((word >> 16) & 0xFF, 0x42)
 63          self.assertEqual(word & 0xFFFF, 500)
 64  
 65      def test_build_command_value_clamp(self):
 66          """Value > 0xFFFF should be masked to 16 bits."""
 67          cmd = RadarProtocol.build_command(0x01, 0x1FFFF)
 68          word = struct.unpack(">I", cmd)[0]
 69          self.assertEqual(word & 0xFFFF, 0xFFFF)
 70  
 71      # ----------------------------------------------------------------
 72      # Data packet parsing
 73      # ----------------------------------------------------------------
 74      def _make_data_packet(self, range_i=100, range_q=200,
 75                            dop_i=300, dop_q=400, detection=0):
 76          """Build a synthetic 11-byte data packet matching FT2232H format."""
 77          pkt = bytearray()
 78          pkt.append(HEADER_BYTE)
 79          pkt += struct.pack(">h", range_q & 0xFFFF if range_q >= 0 else range_q)
 80          pkt += struct.pack(">h", range_i & 0xFFFF if range_i >= 0 else range_i)
 81          pkt += struct.pack(">h", dop_i & 0xFFFF if dop_i >= 0 else dop_i)
 82          pkt += struct.pack(">h", dop_q & 0xFFFF if dop_q >= 0 else dop_q)
 83          pkt.append(detection & 0x01)
 84          pkt.append(FOOTER_BYTE)
 85          return bytes(pkt)
 86  
 87      def test_parse_data_packet_basic(self):
 88          raw = self._make_data_packet(100, 200, 300, 400, 0)
 89          result = RadarProtocol.parse_data_packet(raw)
 90          self.assertIsNotNone(result)
 91          self.assertEqual(result["range_i"], 100)
 92          self.assertEqual(result["range_q"], 200)
 93          self.assertEqual(result["doppler_i"], 300)
 94          self.assertEqual(result["doppler_q"], 400)
 95          self.assertEqual(result["detection"], 0)
 96  
 97      def test_parse_data_packet_with_detection(self):
 98          raw = self._make_data_packet(0, 0, 0, 0, 1)
 99          result = RadarProtocol.parse_data_packet(raw)
100          self.assertIsNotNone(result)
101          self.assertEqual(result["detection"], 1)
102  
103      def test_parse_data_packet_negative_values(self):
104          """Signed 16-bit values should round-trip correctly."""
105          raw = self._make_data_packet(-1000, -2000, -500, 32000, 0)
106          result = RadarProtocol.parse_data_packet(raw)
107          self.assertIsNotNone(result)
108          self.assertEqual(result["range_i"], -1000)
109          self.assertEqual(result["range_q"], -2000)
110          self.assertEqual(result["doppler_i"], -500)
111          self.assertEqual(result["doppler_q"], 32000)
112  
113      def test_parse_data_packet_too_short(self):
114          self.assertIsNone(RadarProtocol.parse_data_packet(b"\xAA\x00"))
115  
116      def test_parse_data_packet_wrong_header(self):
117          raw = self._make_data_packet()
118          bad = b"\x00" + raw[1:]
119          self.assertIsNone(RadarProtocol.parse_data_packet(bad))
120  
121      # ----------------------------------------------------------------
122      # Status packet parsing
123      # ----------------------------------------------------------------
124      def _make_status_packet(self, mode=1, stream=7, threshold=10000,
125                              long_chirp=3000, long_listen=13700,
126                              guard=17540, short_chirp=50,
127                              short_listen=17450, chirps=32, range_mode=0,
128                              st_flags=0, st_detail=0, st_busy=0):
129          """Build a 26-byte status response matching FPGA format (Build 26)."""
130          pkt = bytearray()
131          pkt.append(STATUS_HEADER_BYTE)
132  
133          # Word 0: {0xFF, 3'b0, mode[1:0], 5'b0, stream[2:0], threshold[15:0]}
134          w0 = (0xFF << 24) | ((mode & 0x03) << 21) | ((stream & 0x07) << 16) | (threshold & 0xFFFF)
135          pkt += struct.pack(">I", w0)
136  
137          # Word 1: {long_chirp, long_listen}
138          w1 = ((long_chirp & 0xFFFF) << 16) | (long_listen & 0xFFFF)
139          pkt += struct.pack(">I", w1)
140  
141          # Word 2: {guard, short_chirp}
142          w2 = ((guard & 0xFFFF) << 16) | (short_chirp & 0xFFFF)
143          pkt += struct.pack(">I", w2)
144  
145          # Word 3: {short_listen, 10'd0, chirps[5:0]}
146          w3 = ((short_listen & 0xFFFF) << 16) | (chirps & 0x3F)
147          pkt += struct.pack(">I", w3)
148  
149          # Word 4: {30'd0, range_mode[1:0]}
150          w4 = range_mode & 0x03
151          pkt += struct.pack(">I", w4)
152  
153          # Word 5: {7'd0, self_test_busy, 8'd0, self_test_detail[7:0],
154          #           3'd0, self_test_flags[4:0]}
155          w5 = ((st_busy & 0x01) << 24) | ((st_detail & 0xFF) << 8) | (st_flags & 0x1F)
156          pkt += struct.pack(">I", w5)
157  
158          pkt.append(FOOTER_BYTE)
159          return bytes(pkt)
160  
161      def test_parse_status_defaults(self):
162          raw = self._make_status_packet()
163          sr = RadarProtocol.parse_status_packet(raw)
164          self.assertIsNotNone(sr)
165          self.assertEqual(sr.radar_mode, 1)
166          self.assertEqual(sr.stream_ctrl, 7)
167          self.assertEqual(sr.cfar_threshold, 10000)
168          self.assertEqual(sr.long_chirp, 3000)
169          self.assertEqual(sr.long_listen, 13700)
170          self.assertEqual(sr.guard, 17540)
171          self.assertEqual(sr.short_chirp, 50)
172          self.assertEqual(sr.short_listen, 17450)
173          self.assertEqual(sr.chirps_per_elev, 32)
174          self.assertEqual(sr.range_mode, 0)
175  
176      def test_parse_status_range_mode(self):
177          raw = self._make_status_packet(range_mode=2)
178          sr = RadarProtocol.parse_status_packet(raw)
179          self.assertEqual(sr.range_mode, 2)
180  
181      def test_parse_status_too_short(self):
182          self.assertIsNone(RadarProtocol.parse_status_packet(b"\xBB" + b"\x00" * 20))
183  
184      def test_parse_status_wrong_header(self):
185          raw = self._make_status_packet()
186          bad = b"\xAA" + raw[1:]
187          self.assertIsNone(RadarProtocol.parse_status_packet(bad))
188  
189      def test_parse_status_wrong_footer(self):
190          raw = bytearray(self._make_status_packet())
191          raw[25] = 0x00  # corrupt footer (was at index 21 in old 5-word format)
192          self.assertIsNone(RadarProtocol.parse_status_packet(bytes(raw)))
193  
194      def test_parse_status_self_test_all_pass(self):
195          """Status with all self-test flags set (all tests pass)."""
196          raw = self._make_status_packet(st_flags=0x1F, st_detail=0xA5, st_busy=0)
197          sr = RadarProtocol.parse_status_packet(raw)
198          self.assertIsNotNone(sr)
199          self.assertEqual(sr.self_test_flags, 0x1F)
200          self.assertEqual(sr.self_test_detail, 0xA5)
201          self.assertEqual(sr.self_test_busy, 0)
202  
203      def test_parse_status_self_test_busy(self):
204          """Status with self-test busy flag set."""
205          raw = self._make_status_packet(st_flags=0x00, st_detail=0x00, st_busy=1)
206          sr = RadarProtocol.parse_status_packet(raw)
207          self.assertIsNotNone(sr)
208          self.assertEqual(sr.self_test_busy, 1)
209          self.assertEqual(sr.self_test_flags, 0)
210          self.assertEqual(sr.self_test_detail, 0)
211  
212      def test_parse_status_self_test_partial_fail(self):
213          """Status with partial self-test failures (flags=0b10110)."""
214          raw = self._make_status_packet(st_flags=0b10110, st_detail=0x42, st_busy=0)
215          sr = RadarProtocol.parse_status_packet(raw)
216          self.assertIsNotNone(sr)
217          self.assertEqual(sr.self_test_flags, 0b10110)
218          self.assertEqual(sr.self_test_detail, 0x42)
219          self.assertEqual(sr.self_test_busy, 0)
220          # T0 (BRAM) failed, T1 (CIC) passed, T2 (FFT) passed, T3 (arith) failed, T4 (ADC) passed
221          self.assertFalse(sr.self_test_flags & 0x01)  # T0 fail
222          self.assertTrue(sr.self_test_flags & 0x02)    # T1 pass
223          self.assertTrue(sr.self_test_flags & 0x04)    # T2 pass
224          self.assertFalse(sr.self_test_flags & 0x08)   # T3 fail
225          self.assertTrue(sr.self_test_flags & 0x10)     # T4 pass
226  
227      def test_parse_status_self_test_zero_word5(self):
228          """Status with zero word 5 (self-test never run)."""
229          raw = self._make_status_packet()
230          sr = RadarProtocol.parse_status_packet(raw)
231          self.assertEqual(sr.self_test_flags, 0)
232          self.assertEqual(sr.self_test_detail, 0)
233          self.assertEqual(sr.self_test_busy, 0)
234  
235      def test_status_packet_is_26_bytes(self):
236          """Verify status packet is exactly 26 bytes."""
237          raw = self._make_status_packet()
238          self.assertEqual(len(raw), 26)
239  
240      # ----------------------------------------------------------------
241      # Boundary detection
242      # ----------------------------------------------------------------
243      def test_find_boundaries_mixed(self):
244          data_pkt = self._make_data_packet()
245          status_pkt = self._make_status_packet()
246          buf = b"\x00\x00" + data_pkt + b"\x00" + status_pkt + data_pkt
247          boundaries = RadarProtocol.find_packet_boundaries(buf)
248          self.assertEqual(len(boundaries), 3)
249          self.assertEqual(boundaries[0][2], "data")
250          self.assertEqual(boundaries[1][2], "status")
251          self.assertEqual(boundaries[2][2], "data")
252  
253      def test_find_boundaries_empty(self):
254          self.assertEqual(RadarProtocol.find_packet_boundaries(b""), [])
255  
256      def test_find_boundaries_truncated(self):
257          """Truncated packet should not be returned."""
258          data_pkt = self._make_data_packet()
259          buf = data_pkt[:6]  # truncated (less than 11-byte packet size)
260          boundaries = RadarProtocol.find_packet_boundaries(buf)
261          self.assertEqual(len(boundaries), 0)
262  
263  
264  class TestFT2232HConnection(unittest.TestCase):
265      """Test mock FT2232H connection."""
266  
267      def test_mock_open_close(self):
268          conn = FT2232HConnection(mock=True)
269          self.assertTrue(conn.open())
270          self.assertTrue(conn.is_open)
271          conn.close()
272          self.assertFalse(conn.is_open)
273  
274      def test_mock_read_returns_data(self):
275          conn = FT2232HConnection(mock=True)
276          conn.open()
277          data = conn.read(4096)
278          self.assertIsNotNone(data)
279          self.assertGreater(len(data), 0)
280          conn.close()
281  
282      def test_mock_read_contains_valid_packets(self):
283          """Mock data should contain parseable data packets."""
284          conn = FT2232HConnection(mock=True)
285          conn.open()
286          raw = conn.read(4096)
287          packets = RadarProtocol.find_packet_boundaries(raw)
288          self.assertGreater(len(packets), 0)
289          for start, end, ptype in packets:
290              if ptype == "data":
291                  result = RadarProtocol.parse_data_packet(raw[start:end])
292                  self.assertIsNotNone(result)
293          conn.close()
294  
295      def test_mock_write(self):
296          conn = FT2232HConnection(mock=True)
297          conn.open()
298          cmd = RadarProtocol.build_command(0x01, 1)
299          self.assertTrue(conn.write(cmd))
300          conn.close()
301  
302      def test_read_when_closed(self):
303          conn = FT2232HConnection(mock=True)
304          self.assertIsNone(conn.read())
305  
306      def test_write_when_closed(self):
307          conn = FT2232HConnection(mock=True)
308          self.assertFalse(conn.write(b"\x00\x00\x00\x00"))
309  
310  
311  class TestDataRecorder(unittest.TestCase):
312      """Test HDF5 recording (skipped if h5py not available)."""
313  
314      def setUp(self):
315          self.tmpdir = tempfile.mkdtemp()
316          self.filepath = os.path.join(self.tmpdir, "test_recording.h5")
317  
318      def tearDown(self):
319          if os.path.exists(self.filepath):
320              os.remove(self.filepath)
321          os.rmdir(self.tmpdir)
322  
323      @unittest.skipUnless(
324          (lambda: (__import__("importlib.util") and __import__("importlib").util.find_spec("h5py") is not None))(),
325          "h5py not installed"
326      )
327      def test_record_and_stop(self):
328          import h5py
329          rec = DataRecorder()
330          rec.start(self.filepath)
331          self.assertTrue(rec.recording)
332  
333          # Record 3 frames
334          for i in range(3):
335              frame = RadarFrame()
336              frame.frame_number = i
337              frame.timestamp = time.time()
338              frame.magnitude = np.random.rand(NUM_RANGE_BINS, NUM_DOPPLER_BINS)
339              frame.range_profile = np.random.rand(NUM_RANGE_BINS)
340              rec.record_frame(frame)
341  
342          rec.stop()
343          self.assertFalse(rec.recording)
344  
345          # Verify HDF5 contents
346          with h5py.File(self.filepath, "r") as f:
347              self.assertEqual(f.attrs["total_frames"], 3)
348              self.assertIn("frames", f)
349              self.assertIn("frame_000000", f["frames"])
350              self.assertIn("frame_000002", f["frames"])
351              mag = f["frames/frame_000001/magnitude"][:]
352              self.assertEqual(mag.shape, (NUM_RANGE_BINS, NUM_DOPPLER_BINS))
353  
354  
355  class TestRadarAcquisition(unittest.TestCase):
356      """Test acquisition thread with mock connection."""
357  
358      def test_acquisition_produces_frames(self):
359          conn = FT2232HConnection(mock=True)
360          conn.open()
361          fq = queue.Queue(maxsize=16)
362          acq = RadarAcquisition(conn, fq)
363          acq.start()
364  
365          # Wait for at least one frame (mock produces ~32 samples per read,
366          # need 2048 for a full frame, so may take a few seconds)
367          frame = None
368          try:
369              frame = fq.get(timeout=10)
370          except queue.Empty:
371              pass
372  
373          acq.stop()
374          acq.join(timeout=3)
375          conn.close()
376  
377          # With mock data producing 32 packets per read at 50ms interval,
378          # a full frame (2048 samples) takes ~3.2s. Allow up to 10s.
379          if frame is not None:
380              self.assertIsInstance(frame, RadarFrame)
381              self.assertEqual(frame.magnitude.shape,
382                               (NUM_RANGE_BINS, NUM_DOPPLER_BINS))
383          # If no frame arrived in timeout, that's still OK for a fast CI run
384  
385      def test_acquisition_stop(self):
386          conn = FT2232HConnection(mock=True)
387          conn.open()
388          fq = queue.Queue(maxsize=4)
389          acq = RadarAcquisition(conn, fq)
390          acq.start()
391          time.sleep(0.2)
392          acq.stop()
393          acq.join(timeout=3)
394          self.assertFalse(acq.is_alive())
395          conn.close()
396  
397  
398  class TestRadarFrameDefaults(unittest.TestCase):
399      """Test RadarFrame default initialization."""
400  
401      def test_default_shapes(self):
402          f = RadarFrame()
403          self.assertEqual(f.range_doppler_i.shape, (64, 32))
404          self.assertEqual(f.range_doppler_q.shape, (64, 32))
405          self.assertEqual(f.magnitude.shape, (64, 32))
406          self.assertEqual(f.detections.shape, (64, 32))
407          self.assertEqual(f.range_profile.shape, (64,))
408          self.assertEqual(f.detection_count, 0)
409  
410      def test_default_zeros(self):
411          f = RadarFrame()
412          self.assertTrue(np.all(f.magnitude == 0))
413          self.assertTrue(np.all(f.detections == 0))
414  
415  
416  class TestEndToEnd(unittest.TestCase):
417      """End-to-end: build command → parse response → verify round-trip."""
418  
419      def test_command_roundtrip_all_opcodes(self):
420          """Verify all opcodes produce valid 4-byte commands."""
421          opcodes = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x10, 0x11, 0x12,
422                     0x13, 0x14, 0x15, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25,
423                     0x26, 0x27, 0x30, 0x31, 0xFF]
424          for op in opcodes:
425              cmd = RadarProtocol.build_command(op, 42)
426              self.assertEqual(len(cmd), 4, f"opcode 0x{op:02X}")
427              word = struct.unpack(">I", cmd)[0]
428              self.assertEqual((word >> 24) & 0xFF, op)
429              self.assertEqual(word & 0xFFFF, 42)
430  
431      def test_data_packet_roundtrip(self):
432          """Build an 11-byte data packet, parse it, verify values match."""
433          ri, rq, di, dq = 1234, -5678, 9012, -3456
434  
435          pkt = bytearray()
436          pkt.append(HEADER_BYTE)
437          pkt += struct.pack(">h", rq)
438          pkt += struct.pack(">h", ri)
439          pkt += struct.pack(">h", di)
440          pkt += struct.pack(">h", dq)
441          pkt.append(1)
442          pkt.append(FOOTER_BYTE)
443  
444          self.assertEqual(len(pkt), DATA_PACKET_SIZE)
445  
446          result = RadarProtocol.parse_data_packet(bytes(pkt))
447          self.assertIsNotNone(result)
448          self.assertEqual(result["range_i"], ri)
449          self.assertEqual(result["range_q"], rq)
450          self.assertEqual(result["doppler_i"], di)
451          self.assertEqual(result["doppler_q"], dq)
452          self.assertEqual(result["detection"], 1)
453  
454  
455  class TestReplayConnection(unittest.TestCase):
456      """Test ReplayConnection with real .npy data files."""
457  
458      NPY_DIR = os.path.join(
459          os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim",
460          "real_data", "hex"
461      )
462  
463      def _npy_available(self):
464          """Check if the npy data files exist."""
465          return os.path.isfile(os.path.join(self.NPY_DIR,
466                                              "fullchain_mti_doppler_i.npy"))
467  
468      def test_replay_open_close(self):
469          """ReplayConnection opens and closes without error."""
470          if not self._npy_available():
471              self.skipTest("npy data files not found")
472          from radar_protocol import ReplayConnection
473          conn = ReplayConnection(self.NPY_DIR, use_mti=True)
474          self.assertTrue(conn.open())
475          self.assertTrue(conn.is_open)
476          conn.close()
477          self.assertFalse(conn.is_open)
478  
479      def test_replay_packet_count(self):
480          """Replay builds exactly NUM_CELLS (2048) packets."""
481          if not self._npy_available():
482              self.skipTest("npy data files not found")
483          from radar_protocol import ReplayConnection
484          conn = ReplayConnection(self.NPY_DIR, use_mti=True)
485          conn.open()
486          # Each packet is 11 bytes, total = 2048 * 11
487          expected_bytes = NUM_CELLS * DATA_PACKET_SIZE
488          self.assertEqual(conn._frame_len, expected_bytes)
489          conn.close()
490  
491      def test_replay_packets_parseable(self):
492          """Every packet from replay can be parsed by RadarProtocol."""
493          if not self._npy_available():
494              self.skipTest("npy data files not found")
495          from radar_protocol import ReplayConnection
496          conn = ReplayConnection(self.NPY_DIR, use_mti=True)
497          conn.open()
498          raw = conn._packets
499          boundaries = RadarProtocol.find_packet_boundaries(raw)
500          self.assertEqual(len(boundaries), NUM_CELLS)
501          parsed_count = 0
502          det_count = 0
503          for start, end, ptype in boundaries:
504              self.assertEqual(ptype, "data")
505              result = RadarProtocol.parse_data_packet(raw[start:end])
506              self.assertIsNotNone(result)
507              parsed_count += 1
508              if result["detection"]:
509                  det_count += 1
510          self.assertEqual(parsed_count, NUM_CELLS)
511          # Default: MTI=ON, DC_notch=2, CFAR CA g=2 t=8 a=0x30 → 4 detections
512          self.assertEqual(det_count, 4)
513          conn.close()
514  
515      def test_replay_read_loops(self):
516          """Read returns data and loops back around."""
517          if not self._npy_available():
518              self.skipTest("npy data files not found")
519          from radar_protocol import ReplayConnection
520          conn = ReplayConnection(self.NPY_DIR, use_mti=True, replay_fps=1000)
521          conn.open()
522          total_read = 0
523          for _ in range(100):
524              chunk = conn.read(1024)
525              self.assertIsNotNone(chunk)
526              total_read += len(chunk)
527          self.assertGreater(total_read, 0)
528          conn.close()
529  
530      def test_replay_no_mti(self):
531          """ReplayConnection works with use_mti=False (CFAR still runs)."""
532          if not self._npy_available():
533              self.skipTest("npy data files not found")
534          from radar_protocol import ReplayConnection
535          conn = ReplayConnection(self.NPY_DIR, use_mti=False)
536          conn.open()
537          self.assertEqual(conn._frame_len, NUM_CELLS * DATA_PACKET_SIZE)
538          # No-MTI with DC notch=2 and default CFAR → 0 detections
539          raw = conn._packets
540          boundaries = RadarProtocol.find_packet_boundaries(raw)
541          det_count = sum(1 for s, e, t in boundaries
542                          if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0))
543          self.assertEqual(det_count, 0)
544          conn.close()
545  
546      def test_replay_write_returns_true(self):
547          """Write on replay connection returns True."""
548          if not self._npy_available():
549              self.skipTest("npy data files not found")
550          from radar_protocol import ReplayConnection
551          conn = ReplayConnection(self.NPY_DIR)
552          conn.open()
553          self.assertTrue(conn.write(b"\x01\x00\x00\x01"))
554          conn.close()
555  
556      def test_replay_adjustable_param_cfar_guard(self):
557          """Changing CFAR guard via write() triggers re-processing."""
558          if not self._npy_available():
559              self.skipTest("npy data files not found")
560          from radar_protocol import ReplayConnection
561          conn = ReplayConnection(self.NPY_DIR, use_mti=True)
562          conn.open()
563          # Initial: guard=2 → 4 detections
564          self.assertFalse(conn._needs_rebuild)
565          # Send CFAR_GUARD=4
566          cmd = RadarProtocol.build_command(0x21, 4)
567          conn.write(cmd)
568          self.assertTrue(conn._needs_rebuild)
569          self.assertEqual(conn._cfar_guard, 4)
570          # Read triggers rebuild
571          conn.read(1024)
572          self.assertFalse(conn._needs_rebuild)
573          conn.close()
574  
575      def test_replay_adjustable_param_mti_toggle(self):
576          """Toggling MTI via write() triggers re-processing."""
577          if not self._npy_available():
578              self.skipTest("npy data files not found")
579          from radar_protocol import ReplayConnection
580          conn = ReplayConnection(self.NPY_DIR, use_mti=True)
581          conn.open()
582          # Disable MTI
583          cmd = RadarProtocol.build_command(0x26, 0)
584          conn.write(cmd)
585          self.assertTrue(conn._needs_rebuild)
586          self.assertFalse(conn._mti_enable)
587          # Read to trigger rebuild, then count detections
588          # Drain all packets after rebuild
589          conn.read(1024)  # triggers rebuild
590          raw = conn._packets
591          boundaries = RadarProtocol.find_packet_boundaries(raw)
592          det_count = sum(1 for s, e, t in boundaries
593                          if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0))
594          # No-MTI with default CFAR → 0 detections
595          self.assertEqual(det_count, 0)
596          conn.close()
597  
598      def test_replay_adjustable_param_dc_notch(self):
599          """Changing DC notch width via write() triggers re-processing."""
600          if not self._npy_available():
601              self.skipTest("npy data files not found")
602          from radar_protocol import ReplayConnection
603          conn = ReplayConnection(self.NPY_DIR, use_mti=True)
604          conn.open()
605          # Change DC notch to 0 (no notch)
606          cmd = RadarProtocol.build_command(0x27, 0)
607          conn.write(cmd)
608          self.assertTrue(conn._needs_rebuild)
609          self.assertEqual(conn._dc_notch_width, 0)
610          conn.read(1024)  # triggers rebuild
611          raw = conn._packets
612          boundaries = RadarProtocol.find_packet_boundaries(raw)
613          det_count = sum(1 for s, e, t in boundaries
614                          if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0))
615          # DC notch=0 with MTI → 6 detections (more noise passes through)
616          self.assertEqual(det_count, 6)
617          conn.close()
618  
619      def test_replay_hardware_opcode_ignored(self):
620          """Hardware-only opcodes don't trigger rebuild."""
621          if not self._npy_available():
622              self.skipTest("npy data files not found")
623          from radar_protocol import ReplayConnection
624          conn = ReplayConnection(self.NPY_DIR, use_mti=True)
625          conn.open()
626          # Send TRIGGER (hardware-only)
627          cmd = RadarProtocol.build_command(0x01, 1)
628          conn.write(cmd)
629          self.assertFalse(conn._needs_rebuild)
630          # Send STREAM_ENABLE (hardware-only)
631          cmd = RadarProtocol.build_command(0x05, 7)
632          conn.write(cmd)
633          self.assertFalse(conn._needs_rebuild)
634          conn.close()
635  
636      def test_replay_same_value_no_rebuild(self):
637          """Setting same value as current doesn't trigger rebuild."""
638          if not self._npy_available():
639              self.skipTest("npy data files not found")
640          from radar_protocol import ReplayConnection
641          conn = ReplayConnection(self.NPY_DIR, use_mti=True)
642          conn.open()
643          # CFAR guard already 2
644          cmd = RadarProtocol.build_command(0x21, 2)
645          conn.write(cmd)
646          self.assertFalse(conn._needs_rebuild)
647          conn.close()
648  
649      def test_replay_self_test_opcodes_are_hardware_only(self):
650          """Self-test opcodes 0x30/0x31 are hardware-only (ignored in replay)."""
651          if not self._npy_available():
652              self.skipTest("npy data files not found")
653          from radar_protocol import ReplayConnection
654          conn = ReplayConnection(self.NPY_DIR, use_mti=True)
655          conn.open()
656          # Send self-test trigger
657          cmd = RadarProtocol.build_command(0x30, 1)
658          conn.write(cmd)
659          self.assertFalse(conn._needs_rebuild)
660          # Send self-test status request
661          cmd = RadarProtocol.build_command(0x31, 0)
662          conn.write(cmd)
663          self.assertFalse(conn._needs_rebuild)
664          conn.close()
665  
666  
667  class TestOpcodeEnum(unittest.TestCase):
668      """Verify Opcode enum matches RTL host register map."""
669  
670      def test_gain_shift_is_0x06(self):
671          """GAIN_SHIFT opcode must be 0x06 (not 0x16)."""
672          self.assertEqual(Opcode.GAIN_SHIFT, 0x06)
673  
674      def test_no_digital_gain_alias(self):
675          """DIGITAL_GAIN should NOT exist (was bogus 0x16 alias)."""
676          self.assertFalse(hasattr(Opcode, 'DIGITAL_GAIN'))
677  
678      def test_self_test_trigger(self):
679          """SELF_TEST_TRIGGER opcode must be 0x30."""
680          self.assertEqual(Opcode.SELF_TEST_TRIGGER, 0x30)
681  
682      def test_self_test_status(self):
683          """SELF_TEST_STATUS opcode must be 0x31."""
684          self.assertEqual(Opcode.SELF_TEST_STATUS, 0x31)
685  
686      def test_self_test_in_hardware_only(self):
687          """Self-test opcodes must be in _HARDWARE_ONLY_OPCODES."""
688          self.assertIn(0x30, _HARDWARE_ONLY_OPCODES)
689          self.assertIn(0x31, _HARDWARE_ONLY_OPCODES)
690  
691      def test_0x16_not_in_hardware_only(self):
692          """Bogus 0x16 must not be in _HARDWARE_ONLY_OPCODES."""
693          self.assertNotIn(0x16, _HARDWARE_ONLY_OPCODES)
694  
695      def test_stream_enable_is_0x05(self):
696          """STREAM_ENABLE must be 0x05 (not 0x04)."""
697          self.assertEqual(Opcode.STREAM_ENABLE, 0x05)
698  
699      def test_all_rtl_opcodes_present(self):
700          """Every RTL opcode has a matching Opcode enum member."""
701          expected = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
702                      0x10, 0x11, 0x12, 0x13, 0x14, 0x15,
703                      0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27,
704                      0x30, 0x31, 0xFF}
705          enum_values = set(int(m) for m in Opcode)
706          for op in expected:
707              self.assertIn(op, enum_values, f"0x{op:02X} missing from Opcode enum")
708  
709  
710  class TestStatusResponseDefaults(unittest.TestCase):
711      """Verify StatusResponse dataclass has self-test fields."""
712  
713      def test_default_self_test_fields(self):
714          sr = StatusResponse()
715          self.assertEqual(sr.self_test_flags, 0)
716          self.assertEqual(sr.self_test_detail, 0)
717          self.assertEqual(sr.self_test_busy, 0)
718  
719      def test_self_test_fields_set(self):
720          sr = StatusResponse(self_test_flags=0x1F,
721                              self_test_detail=0xAB,
722                              self_test_busy=1)
723          self.assertEqual(sr.self_test_flags, 0x1F)
724          self.assertEqual(sr.self_test_detail, 0xAB)
725          self.assertEqual(sr.self_test_busy, 1)
726  
727  
728  if __name__ == "__main__":
729      unittest.main(verbosity=2)