test_radar_dashboard.py
1 #!/usr/bin/env python3 2 """ 3 Tests for AERIS-10 Radar Dashboard protocol parsing, command building, 4 data recording, and acquisition logic. 5 6 Run: python -m pytest test_radar_dashboard.py -v 7 or: python test_radar_dashboard.py 8 """ 9 10 import struct 11 import time 12 import queue 13 import os 14 import tempfile 15 import unittest 16 import numpy as np 17 18 from radar_protocol import ( 19 RadarProtocol, FT2232HConnection, DataRecorder, RadarAcquisition, 20 RadarFrame, StatusResponse, Opcode, 21 HEADER_BYTE, FOOTER_BYTE, STATUS_HEADER_BYTE, 22 NUM_RANGE_BINS, NUM_DOPPLER_BINS, NUM_CELLS, 23 DATA_PACKET_SIZE, 24 _HARDWARE_ONLY_OPCODES, _REPLAY_ADJUSTABLE_OPCODES, 25 ) 26 27 28 class TestRadarProtocol(unittest.TestCase): 29 """Test packet parsing and command building against usb_data_interface.v.""" 30 31 # ---------------------------------------------------------------- 32 # Command building 33 # ---------------------------------------------------------------- 34 def test_build_command_trigger(self): 35 """Opcode 0x01, value 1 → {0x01, 0x00, 0x0001}.""" 36 cmd = RadarProtocol.build_command(0x01, 1) 37 self.assertEqual(len(cmd), 4) 38 word = struct.unpack(">I", cmd)[0] 39 self.assertEqual((word >> 24) & 0xFF, 0x01) # opcode 40 self.assertEqual((word >> 16) & 0xFF, 0x00) # addr 41 self.assertEqual(word & 0xFFFF, 1) # value 42 43 def test_build_command_cfar_alpha(self): 44 """Opcode 0x23, value 0x30 (alpha=3.0 Q4.4).""" 45 cmd = RadarProtocol.build_command(0x23, 0x30) 46 word = struct.unpack(">I", cmd)[0] 47 self.assertEqual((word >> 24) & 0xFF, 0x23) 48 self.assertEqual(word & 0xFFFF, 0x30) 49 50 def test_build_command_status_request(self): 51 """Opcode 0xFF, value 0.""" 52 cmd = RadarProtocol.build_command(0xFF, 0) 53 word = struct.unpack(">I", cmd)[0] 54 self.assertEqual((word >> 24) & 0xFF, 0xFF) 55 self.assertEqual(word & 0xFFFF, 0) 56 57 def test_build_command_with_addr(self): 58 """Command with non-zero addr field.""" 59 cmd = RadarProtocol.build_command(0x10, 500, addr=0x42) 60 word = struct.unpack(">I", cmd)[0] 61 self.assertEqual((word >> 24) & 0xFF, 0x10) 62 self.assertEqual((word >> 16) & 0xFF, 0x42) 63 self.assertEqual(word & 0xFFFF, 500) 64 65 def test_build_command_value_clamp(self): 66 """Value > 0xFFFF should be masked to 16 bits.""" 67 cmd = RadarProtocol.build_command(0x01, 0x1FFFF) 68 word = struct.unpack(">I", cmd)[0] 69 self.assertEqual(word & 0xFFFF, 0xFFFF) 70 71 # ---------------------------------------------------------------- 72 # Data packet parsing 73 # ---------------------------------------------------------------- 74 def _make_data_packet(self, range_i=100, range_q=200, 75 dop_i=300, dop_q=400, detection=0): 76 """Build a synthetic 11-byte data packet matching FT2232H format.""" 77 pkt = bytearray() 78 pkt.append(HEADER_BYTE) 79 pkt += struct.pack(">h", range_q & 0xFFFF if range_q >= 0 else range_q) 80 pkt += struct.pack(">h", range_i & 0xFFFF if range_i >= 0 else range_i) 81 pkt += struct.pack(">h", dop_i & 0xFFFF if dop_i >= 0 else dop_i) 82 pkt += struct.pack(">h", dop_q & 0xFFFF if dop_q >= 0 else dop_q) 83 pkt.append(detection & 0x01) 84 pkt.append(FOOTER_BYTE) 85 return bytes(pkt) 86 87 def test_parse_data_packet_basic(self): 88 raw = self._make_data_packet(100, 200, 300, 400, 0) 89 result = RadarProtocol.parse_data_packet(raw) 90 self.assertIsNotNone(result) 91 self.assertEqual(result["range_i"], 100) 92 self.assertEqual(result["range_q"], 200) 93 self.assertEqual(result["doppler_i"], 300) 94 self.assertEqual(result["doppler_q"], 400) 95 self.assertEqual(result["detection"], 0) 96 97 def test_parse_data_packet_with_detection(self): 98 raw = self._make_data_packet(0, 0, 0, 0, 1) 99 result = RadarProtocol.parse_data_packet(raw) 100 self.assertIsNotNone(result) 101 self.assertEqual(result["detection"], 1) 102 103 def test_parse_data_packet_negative_values(self): 104 """Signed 16-bit values should round-trip correctly.""" 105 raw = self._make_data_packet(-1000, -2000, -500, 32000, 0) 106 result = RadarProtocol.parse_data_packet(raw) 107 self.assertIsNotNone(result) 108 self.assertEqual(result["range_i"], -1000) 109 self.assertEqual(result["range_q"], -2000) 110 self.assertEqual(result["doppler_i"], -500) 111 self.assertEqual(result["doppler_q"], 32000) 112 113 def test_parse_data_packet_too_short(self): 114 self.assertIsNone(RadarProtocol.parse_data_packet(b"\xAA\x00")) 115 116 def test_parse_data_packet_wrong_header(self): 117 raw = self._make_data_packet() 118 bad = b"\x00" + raw[1:] 119 self.assertIsNone(RadarProtocol.parse_data_packet(bad)) 120 121 # ---------------------------------------------------------------- 122 # Status packet parsing 123 # ---------------------------------------------------------------- 124 def _make_status_packet(self, mode=1, stream=7, threshold=10000, 125 long_chirp=3000, long_listen=13700, 126 guard=17540, short_chirp=50, 127 short_listen=17450, chirps=32, range_mode=0, 128 st_flags=0, st_detail=0, st_busy=0): 129 """Build a 26-byte status response matching FPGA format (Build 26).""" 130 pkt = bytearray() 131 pkt.append(STATUS_HEADER_BYTE) 132 133 # Word 0: {0xFF, 3'b0, mode[1:0], 5'b0, stream[2:0], threshold[15:0]} 134 w0 = (0xFF << 24) | ((mode & 0x03) << 21) | ((stream & 0x07) << 16) | (threshold & 0xFFFF) 135 pkt += struct.pack(">I", w0) 136 137 # Word 1: {long_chirp, long_listen} 138 w1 = ((long_chirp & 0xFFFF) << 16) | (long_listen & 0xFFFF) 139 pkt += struct.pack(">I", w1) 140 141 # Word 2: {guard, short_chirp} 142 w2 = ((guard & 0xFFFF) << 16) | (short_chirp & 0xFFFF) 143 pkt += struct.pack(">I", w2) 144 145 # Word 3: {short_listen, 10'd0, chirps[5:0]} 146 w3 = ((short_listen & 0xFFFF) << 16) | (chirps & 0x3F) 147 pkt += struct.pack(">I", w3) 148 149 # Word 4: {30'd0, range_mode[1:0]} 150 w4 = range_mode & 0x03 151 pkt += struct.pack(">I", w4) 152 153 # Word 5: {7'd0, self_test_busy, 8'd0, self_test_detail[7:0], 154 # 3'd0, self_test_flags[4:0]} 155 w5 = ((st_busy & 0x01) << 24) | ((st_detail & 0xFF) << 8) | (st_flags & 0x1F) 156 pkt += struct.pack(">I", w5) 157 158 pkt.append(FOOTER_BYTE) 159 return bytes(pkt) 160 161 def test_parse_status_defaults(self): 162 raw = self._make_status_packet() 163 sr = RadarProtocol.parse_status_packet(raw) 164 self.assertIsNotNone(sr) 165 self.assertEqual(sr.radar_mode, 1) 166 self.assertEqual(sr.stream_ctrl, 7) 167 self.assertEqual(sr.cfar_threshold, 10000) 168 self.assertEqual(sr.long_chirp, 3000) 169 self.assertEqual(sr.long_listen, 13700) 170 self.assertEqual(sr.guard, 17540) 171 self.assertEqual(sr.short_chirp, 50) 172 self.assertEqual(sr.short_listen, 17450) 173 self.assertEqual(sr.chirps_per_elev, 32) 174 self.assertEqual(sr.range_mode, 0) 175 176 def test_parse_status_range_mode(self): 177 raw = self._make_status_packet(range_mode=2) 178 sr = RadarProtocol.parse_status_packet(raw) 179 self.assertEqual(sr.range_mode, 2) 180 181 def test_parse_status_too_short(self): 182 self.assertIsNone(RadarProtocol.parse_status_packet(b"\xBB" + b"\x00" * 20)) 183 184 def test_parse_status_wrong_header(self): 185 raw = self._make_status_packet() 186 bad = b"\xAA" + raw[1:] 187 self.assertIsNone(RadarProtocol.parse_status_packet(bad)) 188 189 def test_parse_status_wrong_footer(self): 190 raw = bytearray(self._make_status_packet()) 191 raw[25] = 0x00 # corrupt footer (was at index 21 in old 5-word format) 192 self.assertIsNone(RadarProtocol.parse_status_packet(bytes(raw))) 193 194 def test_parse_status_self_test_all_pass(self): 195 """Status with all self-test flags set (all tests pass).""" 196 raw = self._make_status_packet(st_flags=0x1F, st_detail=0xA5, st_busy=0) 197 sr = RadarProtocol.parse_status_packet(raw) 198 self.assertIsNotNone(sr) 199 self.assertEqual(sr.self_test_flags, 0x1F) 200 self.assertEqual(sr.self_test_detail, 0xA5) 201 self.assertEqual(sr.self_test_busy, 0) 202 203 def test_parse_status_self_test_busy(self): 204 """Status with self-test busy flag set.""" 205 raw = self._make_status_packet(st_flags=0x00, st_detail=0x00, st_busy=1) 206 sr = RadarProtocol.parse_status_packet(raw) 207 self.assertIsNotNone(sr) 208 self.assertEqual(sr.self_test_busy, 1) 209 self.assertEqual(sr.self_test_flags, 0) 210 self.assertEqual(sr.self_test_detail, 0) 211 212 def test_parse_status_self_test_partial_fail(self): 213 """Status with partial self-test failures (flags=0b10110).""" 214 raw = self._make_status_packet(st_flags=0b10110, st_detail=0x42, st_busy=0) 215 sr = RadarProtocol.parse_status_packet(raw) 216 self.assertIsNotNone(sr) 217 self.assertEqual(sr.self_test_flags, 0b10110) 218 self.assertEqual(sr.self_test_detail, 0x42) 219 self.assertEqual(sr.self_test_busy, 0) 220 # T0 (BRAM) failed, T1 (CIC) passed, T2 (FFT) passed, T3 (arith) failed, T4 (ADC) passed 221 self.assertFalse(sr.self_test_flags & 0x01) # T0 fail 222 self.assertTrue(sr.self_test_flags & 0x02) # T1 pass 223 self.assertTrue(sr.self_test_flags & 0x04) # T2 pass 224 self.assertFalse(sr.self_test_flags & 0x08) # T3 fail 225 self.assertTrue(sr.self_test_flags & 0x10) # T4 pass 226 227 def test_parse_status_self_test_zero_word5(self): 228 """Status with zero word 5 (self-test never run).""" 229 raw = self._make_status_packet() 230 sr = RadarProtocol.parse_status_packet(raw) 231 self.assertEqual(sr.self_test_flags, 0) 232 self.assertEqual(sr.self_test_detail, 0) 233 self.assertEqual(sr.self_test_busy, 0) 234 235 def test_status_packet_is_26_bytes(self): 236 """Verify status packet is exactly 26 bytes.""" 237 raw = self._make_status_packet() 238 self.assertEqual(len(raw), 26) 239 240 # ---------------------------------------------------------------- 241 # Boundary detection 242 # ---------------------------------------------------------------- 243 def test_find_boundaries_mixed(self): 244 data_pkt = self._make_data_packet() 245 status_pkt = self._make_status_packet() 246 buf = b"\x00\x00" + data_pkt + b"\x00" + status_pkt + data_pkt 247 boundaries = RadarProtocol.find_packet_boundaries(buf) 248 self.assertEqual(len(boundaries), 3) 249 self.assertEqual(boundaries[0][2], "data") 250 self.assertEqual(boundaries[1][2], "status") 251 self.assertEqual(boundaries[2][2], "data") 252 253 def test_find_boundaries_empty(self): 254 self.assertEqual(RadarProtocol.find_packet_boundaries(b""), []) 255 256 def test_find_boundaries_truncated(self): 257 """Truncated packet should not be returned.""" 258 data_pkt = self._make_data_packet() 259 buf = data_pkt[:6] # truncated (less than 11-byte packet size) 260 boundaries = RadarProtocol.find_packet_boundaries(buf) 261 self.assertEqual(len(boundaries), 0) 262 263 264 class TestFT2232HConnection(unittest.TestCase): 265 """Test mock FT2232H connection.""" 266 267 def test_mock_open_close(self): 268 conn = FT2232HConnection(mock=True) 269 self.assertTrue(conn.open()) 270 self.assertTrue(conn.is_open) 271 conn.close() 272 self.assertFalse(conn.is_open) 273 274 def test_mock_read_returns_data(self): 275 conn = FT2232HConnection(mock=True) 276 conn.open() 277 data = conn.read(4096) 278 self.assertIsNotNone(data) 279 self.assertGreater(len(data), 0) 280 conn.close() 281 282 def test_mock_read_contains_valid_packets(self): 283 """Mock data should contain parseable data packets.""" 284 conn = FT2232HConnection(mock=True) 285 conn.open() 286 raw = conn.read(4096) 287 packets = RadarProtocol.find_packet_boundaries(raw) 288 self.assertGreater(len(packets), 0) 289 for start, end, ptype in packets: 290 if ptype == "data": 291 result = RadarProtocol.parse_data_packet(raw[start:end]) 292 self.assertIsNotNone(result) 293 conn.close() 294 295 def test_mock_write(self): 296 conn = FT2232HConnection(mock=True) 297 conn.open() 298 cmd = RadarProtocol.build_command(0x01, 1) 299 self.assertTrue(conn.write(cmd)) 300 conn.close() 301 302 def test_read_when_closed(self): 303 conn = FT2232HConnection(mock=True) 304 self.assertIsNone(conn.read()) 305 306 def test_write_when_closed(self): 307 conn = FT2232HConnection(mock=True) 308 self.assertFalse(conn.write(b"\x00\x00\x00\x00")) 309 310 311 class TestDataRecorder(unittest.TestCase): 312 """Test HDF5 recording (skipped if h5py not available).""" 313 314 def setUp(self): 315 self.tmpdir = tempfile.mkdtemp() 316 self.filepath = os.path.join(self.tmpdir, "test_recording.h5") 317 318 def tearDown(self): 319 if os.path.exists(self.filepath): 320 os.remove(self.filepath) 321 os.rmdir(self.tmpdir) 322 323 @unittest.skipUnless( 324 (lambda: (__import__("importlib.util") and __import__("importlib").util.find_spec("h5py") is not None))(), 325 "h5py not installed" 326 ) 327 def test_record_and_stop(self): 328 import h5py 329 rec = DataRecorder() 330 rec.start(self.filepath) 331 self.assertTrue(rec.recording) 332 333 # Record 3 frames 334 for i in range(3): 335 frame = RadarFrame() 336 frame.frame_number = i 337 frame.timestamp = time.time() 338 frame.magnitude = np.random.rand(NUM_RANGE_BINS, NUM_DOPPLER_BINS) 339 frame.range_profile = np.random.rand(NUM_RANGE_BINS) 340 rec.record_frame(frame) 341 342 rec.stop() 343 self.assertFalse(rec.recording) 344 345 # Verify HDF5 contents 346 with h5py.File(self.filepath, "r") as f: 347 self.assertEqual(f.attrs["total_frames"], 3) 348 self.assertIn("frames", f) 349 self.assertIn("frame_000000", f["frames"]) 350 self.assertIn("frame_000002", f["frames"]) 351 mag = f["frames/frame_000001/magnitude"][:] 352 self.assertEqual(mag.shape, (NUM_RANGE_BINS, NUM_DOPPLER_BINS)) 353 354 355 class TestRadarAcquisition(unittest.TestCase): 356 """Test acquisition thread with mock connection.""" 357 358 def test_acquisition_produces_frames(self): 359 conn = FT2232HConnection(mock=True) 360 conn.open() 361 fq = queue.Queue(maxsize=16) 362 acq = RadarAcquisition(conn, fq) 363 acq.start() 364 365 # Wait for at least one frame (mock produces ~32 samples per read, 366 # need 2048 for a full frame, so may take a few seconds) 367 frame = None 368 try: 369 frame = fq.get(timeout=10) 370 except queue.Empty: 371 pass 372 373 acq.stop() 374 acq.join(timeout=3) 375 conn.close() 376 377 # With mock data producing 32 packets per read at 50ms interval, 378 # a full frame (2048 samples) takes ~3.2s. Allow up to 10s. 379 if frame is not None: 380 self.assertIsInstance(frame, RadarFrame) 381 self.assertEqual(frame.magnitude.shape, 382 (NUM_RANGE_BINS, NUM_DOPPLER_BINS)) 383 # If no frame arrived in timeout, that's still OK for a fast CI run 384 385 def test_acquisition_stop(self): 386 conn = FT2232HConnection(mock=True) 387 conn.open() 388 fq = queue.Queue(maxsize=4) 389 acq = RadarAcquisition(conn, fq) 390 acq.start() 391 time.sleep(0.2) 392 acq.stop() 393 acq.join(timeout=3) 394 self.assertFalse(acq.is_alive()) 395 conn.close() 396 397 398 class TestRadarFrameDefaults(unittest.TestCase): 399 """Test RadarFrame default initialization.""" 400 401 def test_default_shapes(self): 402 f = RadarFrame() 403 self.assertEqual(f.range_doppler_i.shape, (64, 32)) 404 self.assertEqual(f.range_doppler_q.shape, (64, 32)) 405 self.assertEqual(f.magnitude.shape, (64, 32)) 406 self.assertEqual(f.detections.shape, (64, 32)) 407 self.assertEqual(f.range_profile.shape, (64,)) 408 self.assertEqual(f.detection_count, 0) 409 410 def test_default_zeros(self): 411 f = RadarFrame() 412 self.assertTrue(np.all(f.magnitude == 0)) 413 self.assertTrue(np.all(f.detections == 0)) 414 415 416 class TestEndToEnd(unittest.TestCase): 417 """End-to-end: build command → parse response → verify round-trip.""" 418 419 def test_command_roundtrip_all_opcodes(self): 420 """Verify all opcodes produce valid 4-byte commands.""" 421 opcodes = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x10, 0x11, 0x12, 422 0x13, 0x14, 0x15, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 423 0x26, 0x27, 0x30, 0x31, 0xFF] 424 for op in opcodes: 425 cmd = RadarProtocol.build_command(op, 42) 426 self.assertEqual(len(cmd), 4, f"opcode 0x{op:02X}") 427 word = struct.unpack(">I", cmd)[0] 428 self.assertEqual((word >> 24) & 0xFF, op) 429 self.assertEqual(word & 0xFFFF, 42) 430 431 def test_data_packet_roundtrip(self): 432 """Build an 11-byte data packet, parse it, verify values match.""" 433 ri, rq, di, dq = 1234, -5678, 9012, -3456 434 435 pkt = bytearray() 436 pkt.append(HEADER_BYTE) 437 pkt += struct.pack(">h", rq) 438 pkt += struct.pack(">h", ri) 439 pkt += struct.pack(">h", di) 440 pkt += struct.pack(">h", dq) 441 pkt.append(1) 442 pkt.append(FOOTER_BYTE) 443 444 self.assertEqual(len(pkt), DATA_PACKET_SIZE) 445 446 result = RadarProtocol.parse_data_packet(bytes(pkt)) 447 self.assertIsNotNone(result) 448 self.assertEqual(result["range_i"], ri) 449 self.assertEqual(result["range_q"], rq) 450 self.assertEqual(result["doppler_i"], di) 451 self.assertEqual(result["doppler_q"], dq) 452 self.assertEqual(result["detection"], 1) 453 454 455 class TestReplayConnection(unittest.TestCase): 456 """Test ReplayConnection with real .npy data files.""" 457 458 NPY_DIR = os.path.join( 459 os.path.dirname(__file__), "..", "9_2_FPGA", "tb", "cosim", 460 "real_data", "hex" 461 ) 462 463 def _npy_available(self): 464 """Check if the npy data files exist.""" 465 return os.path.isfile(os.path.join(self.NPY_DIR, 466 "fullchain_mti_doppler_i.npy")) 467 468 def test_replay_open_close(self): 469 """ReplayConnection opens and closes without error.""" 470 if not self._npy_available(): 471 self.skipTest("npy data files not found") 472 from radar_protocol import ReplayConnection 473 conn = ReplayConnection(self.NPY_DIR, use_mti=True) 474 self.assertTrue(conn.open()) 475 self.assertTrue(conn.is_open) 476 conn.close() 477 self.assertFalse(conn.is_open) 478 479 def test_replay_packet_count(self): 480 """Replay builds exactly NUM_CELLS (2048) packets.""" 481 if not self._npy_available(): 482 self.skipTest("npy data files not found") 483 from radar_protocol import ReplayConnection 484 conn = ReplayConnection(self.NPY_DIR, use_mti=True) 485 conn.open() 486 # Each packet is 11 bytes, total = 2048 * 11 487 expected_bytes = NUM_CELLS * DATA_PACKET_SIZE 488 self.assertEqual(conn._frame_len, expected_bytes) 489 conn.close() 490 491 def test_replay_packets_parseable(self): 492 """Every packet from replay can be parsed by RadarProtocol.""" 493 if not self._npy_available(): 494 self.skipTest("npy data files not found") 495 from radar_protocol import ReplayConnection 496 conn = ReplayConnection(self.NPY_DIR, use_mti=True) 497 conn.open() 498 raw = conn._packets 499 boundaries = RadarProtocol.find_packet_boundaries(raw) 500 self.assertEqual(len(boundaries), NUM_CELLS) 501 parsed_count = 0 502 det_count = 0 503 for start, end, ptype in boundaries: 504 self.assertEqual(ptype, "data") 505 result = RadarProtocol.parse_data_packet(raw[start:end]) 506 self.assertIsNotNone(result) 507 parsed_count += 1 508 if result["detection"]: 509 det_count += 1 510 self.assertEqual(parsed_count, NUM_CELLS) 511 # Default: MTI=ON, DC_notch=2, CFAR CA g=2 t=8 a=0x30 → 4 detections 512 self.assertEqual(det_count, 4) 513 conn.close() 514 515 def test_replay_read_loops(self): 516 """Read returns data and loops back around.""" 517 if not self._npy_available(): 518 self.skipTest("npy data files not found") 519 from radar_protocol import ReplayConnection 520 conn = ReplayConnection(self.NPY_DIR, use_mti=True, replay_fps=1000) 521 conn.open() 522 total_read = 0 523 for _ in range(100): 524 chunk = conn.read(1024) 525 self.assertIsNotNone(chunk) 526 total_read += len(chunk) 527 self.assertGreater(total_read, 0) 528 conn.close() 529 530 def test_replay_no_mti(self): 531 """ReplayConnection works with use_mti=False (CFAR still runs).""" 532 if not self._npy_available(): 533 self.skipTest("npy data files not found") 534 from radar_protocol import ReplayConnection 535 conn = ReplayConnection(self.NPY_DIR, use_mti=False) 536 conn.open() 537 self.assertEqual(conn._frame_len, NUM_CELLS * DATA_PACKET_SIZE) 538 # No-MTI with DC notch=2 and default CFAR → 0 detections 539 raw = conn._packets 540 boundaries = RadarProtocol.find_packet_boundaries(raw) 541 det_count = sum(1 for s, e, t in boundaries 542 if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0)) 543 self.assertEqual(det_count, 0) 544 conn.close() 545 546 def test_replay_write_returns_true(self): 547 """Write on replay connection returns True.""" 548 if not self._npy_available(): 549 self.skipTest("npy data files not found") 550 from radar_protocol import ReplayConnection 551 conn = ReplayConnection(self.NPY_DIR) 552 conn.open() 553 self.assertTrue(conn.write(b"\x01\x00\x00\x01")) 554 conn.close() 555 556 def test_replay_adjustable_param_cfar_guard(self): 557 """Changing CFAR guard via write() triggers re-processing.""" 558 if not self._npy_available(): 559 self.skipTest("npy data files not found") 560 from radar_protocol import ReplayConnection 561 conn = ReplayConnection(self.NPY_DIR, use_mti=True) 562 conn.open() 563 # Initial: guard=2 → 4 detections 564 self.assertFalse(conn._needs_rebuild) 565 # Send CFAR_GUARD=4 566 cmd = RadarProtocol.build_command(0x21, 4) 567 conn.write(cmd) 568 self.assertTrue(conn._needs_rebuild) 569 self.assertEqual(conn._cfar_guard, 4) 570 # Read triggers rebuild 571 conn.read(1024) 572 self.assertFalse(conn._needs_rebuild) 573 conn.close() 574 575 def test_replay_adjustable_param_mti_toggle(self): 576 """Toggling MTI via write() triggers re-processing.""" 577 if not self._npy_available(): 578 self.skipTest("npy data files not found") 579 from radar_protocol import ReplayConnection 580 conn = ReplayConnection(self.NPY_DIR, use_mti=True) 581 conn.open() 582 # Disable MTI 583 cmd = RadarProtocol.build_command(0x26, 0) 584 conn.write(cmd) 585 self.assertTrue(conn._needs_rebuild) 586 self.assertFalse(conn._mti_enable) 587 # Read to trigger rebuild, then count detections 588 # Drain all packets after rebuild 589 conn.read(1024) # triggers rebuild 590 raw = conn._packets 591 boundaries = RadarProtocol.find_packet_boundaries(raw) 592 det_count = sum(1 for s, e, t in boundaries 593 if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0)) 594 # No-MTI with default CFAR → 0 detections 595 self.assertEqual(det_count, 0) 596 conn.close() 597 598 def test_replay_adjustable_param_dc_notch(self): 599 """Changing DC notch width via write() triggers re-processing.""" 600 if not self._npy_available(): 601 self.skipTest("npy data files not found") 602 from radar_protocol import ReplayConnection 603 conn = ReplayConnection(self.NPY_DIR, use_mti=True) 604 conn.open() 605 # Change DC notch to 0 (no notch) 606 cmd = RadarProtocol.build_command(0x27, 0) 607 conn.write(cmd) 608 self.assertTrue(conn._needs_rebuild) 609 self.assertEqual(conn._dc_notch_width, 0) 610 conn.read(1024) # triggers rebuild 611 raw = conn._packets 612 boundaries = RadarProtocol.find_packet_boundaries(raw) 613 det_count = sum(1 for s, e, t in boundaries 614 if RadarProtocol.parse_data_packet(raw[s:e]).get("detection", 0)) 615 # DC notch=0 with MTI → 6 detections (more noise passes through) 616 self.assertEqual(det_count, 6) 617 conn.close() 618 619 def test_replay_hardware_opcode_ignored(self): 620 """Hardware-only opcodes don't trigger rebuild.""" 621 if not self._npy_available(): 622 self.skipTest("npy data files not found") 623 from radar_protocol import ReplayConnection 624 conn = ReplayConnection(self.NPY_DIR, use_mti=True) 625 conn.open() 626 # Send TRIGGER (hardware-only) 627 cmd = RadarProtocol.build_command(0x01, 1) 628 conn.write(cmd) 629 self.assertFalse(conn._needs_rebuild) 630 # Send STREAM_ENABLE (hardware-only) 631 cmd = RadarProtocol.build_command(0x05, 7) 632 conn.write(cmd) 633 self.assertFalse(conn._needs_rebuild) 634 conn.close() 635 636 def test_replay_same_value_no_rebuild(self): 637 """Setting same value as current doesn't trigger rebuild.""" 638 if not self._npy_available(): 639 self.skipTest("npy data files not found") 640 from radar_protocol import ReplayConnection 641 conn = ReplayConnection(self.NPY_DIR, use_mti=True) 642 conn.open() 643 # CFAR guard already 2 644 cmd = RadarProtocol.build_command(0x21, 2) 645 conn.write(cmd) 646 self.assertFalse(conn._needs_rebuild) 647 conn.close() 648 649 def test_replay_self_test_opcodes_are_hardware_only(self): 650 """Self-test opcodes 0x30/0x31 are hardware-only (ignored in replay).""" 651 if not self._npy_available(): 652 self.skipTest("npy data files not found") 653 from radar_protocol import ReplayConnection 654 conn = ReplayConnection(self.NPY_DIR, use_mti=True) 655 conn.open() 656 # Send self-test trigger 657 cmd = RadarProtocol.build_command(0x30, 1) 658 conn.write(cmd) 659 self.assertFalse(conn._needs_rebuild) 660 # Send self-test status request 661 cmd = RadarProtocol.build_command(0x31, 0) 662 conn.write(cmd) 663 self.assertFalse(conn._needs_rebuild) 664 conn.close() 665 666 667 class TestOpcodeEnum(unittest.TestCase): 668 """Verify Opcode enum matches RTL host register map.""" 669 670 def test_gain_shift_is_0x06(self): 671 """GAIN_SHIFT opcode must be 0x06 (not 0x16).""" 672 self.assertEqual(Opcode.GAIN_SHIFT, 0x06) 673 674 def test_no_digital_gain_alias(self): 675 """DIGITAL_GAIN should NOT exist (was bogus 0x16 alias).""" 676 self.assertFalse(hasattr(Opcode, 'DIGITAL_GAIN')) 677 678 def test_self_test_trigger(self): 679 """SELF_TEST_TRIGGER opcode must be 0x30.""" 680 self.assertEqual(Opcode.SELF_TEST_TRIGGER, 0x30) 681 682 def test_self_test_status(self): 683 """SELF_TEST_STATUS opcode must be 0x31.""" 684 self.assertEqual(Opcode.SELF_TEST_STATUS, 0x31) 685 686 def test_self_test_in_hardware_only(self): 687 """Self-test opcodes must be in _HARDWARE_ONLY_OPCODES.""" 688 self.assertIn(0x30, _HARDWARE_ONLY_OPCODES) 689 self.assertIn(0x31, _HARDWARE_ONLY_OPCODES) 690 691 def test_0x16_not_in_hardware_only(self): 692 """Bogus 0x16 must not be in _HARDWARE_ONLY_OPCODES.""" 693 self.assertNotIn(0x16, _HARDWARE_ONLY_OPCODES) 694 695 def test_stream_enable_is_0x05(self): 696 """STREAM_ENABLE must be 0x05 (not 0x04).""" 697 self.assertEqual(Opcode.STREAM_ENABLE, 0x05) 698 699 def test_all_rtl_opcodes_present(self): 700 """Every RTL opcode has a matching Opcode enum member.""" 701 expected = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 702 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 703 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 704 0x30, 0x31, 0xFF} 705 enum_values = set(int(m) for m in Opcode) 706 for op in expected: 707 self.assertIn(op, enum_values, f"0x{op:02X} missing from Opcode enum") 708 709 710 class TestStatusResponseDefaults(unittest.TestCase): 711 """Verify StatusResponse dataclass has self-test fields.""" 712 713 def test_default_self_test_fields(self): 714 sr = StatusResponse() 715 self.assertEqual(sr.self_test_flags, 0) 716 self.assertEqual(sr.self_test_detail, 0) 717 self.assertEqual(sr.self_test_busy, 0) 718 719 def test_self_test_fields_set(self): 720 sr = StatusResponse(self_test_flags=0x1F, 721 self_test_detail=0xAB, 722 self_test_busy=1) 723 self.assertEqual(sr.self_test_flags, 0x1F) 724 self.assertEqual(sr.self_test_detail, 0xAB) 725 self.assertEqual(sr.self_test_busy, 1) 726 727 728 if __name__ == "__main__": 729 unittest.main(verbosity=2)