test_rf.py
1 import random 2 from typing import Optional 3 import pytest 4 from collections import deque 5 from transactron.testing import TestCaseWithSimulator, SimpleTestCircuit, TestbenchContext 6 7 from coreblocks.core_structs.rf import RegisterFile 8 from coreblocks.params import GenParams 9 from coreblocks.params.configurations import test_core_config 10 11 12 class TestRegisterFile(TestCaseWithSimulator): 13 def tb_read_req(self, k: int): 14 async def tb(sim: TestbenchContext): 15 while True: 16 await self.random_wait_geom(sim, 0.95) 17 reg_id = random.randrange(0, self.gen_params.phys_regs) 18 await self.m.read_req[k].call(sim, reg_id=reg_id) 19 self.read_queues[k].append(reg_id) 20 21 return tb 22 23 def tb_read_resp(self, k: int): 24 async def tb(sim: TestbenchContext): 25 await sim.delay(1e-9) 26 while True: 27 await self.random_wait_geom(sim, 0.95) 28 while not self.read_queues[k]: 29 await sim.tick() 30 await sim.delay(1e-9) 31 reg_id = self.read_queues[k].popleft() 32 resp = await self.m.read_resp[k].call(sim, reg_id=reg_id) 33 await sim.delay(1e-9) # writes happen before asserts 34 assert bool(resp.valid) == (self.reg_values[reg_id] is not None) 35 assert self.reg_values[reg_id] is None or resp.reg_val == self.reg_values[reg_id] 36 37 return tb 38 39 def tb_write(self, k: int): 40 async def tb(sim: TestbenchContext): 41 for _ in range(self.num_writes): 42 await self.random_wait_geom(sim, 0.8) 43 await sim.delay(1e-9) 44 while not self.free_set: 45 await sim.tick() 46 await sim.delay(1e-9) 47 reg_id = random.choice(list(self.free_set)) 48 self.free_set.remove(reg_id) 49 reg_val = random.randrange(0, 2**self.gen_params.isa.xlen) 50 await self.m.write[k].call(sim, reg_id=reg_id, reg_val=reg_val) 51 self.used_set.add(reg_id) 52 self.reg_values[reg_id] = reg_val 53 54 return tb 55 56 def tb_free(self, k: int): 57 async def tb(sim: TestbenchContext): 58 await sim.delay(2e-9) 59 while True: 60 await self.random_wait_geom(sim, 0.5) 61 await sim.delay(2e-9) 62 while not self.used_set: 63 await sim.tick() 64 await sim.delay(2e-9) 65 reg_id = random.choice(list(self.used_set)) 66 self.used_set.remove(reg_id) 67 await self.m.free[k].call(sim, reg_id=reg_id) 68 await sim.delay(2e-9) # frees happen after asserts 69 self.free_set.add(reg_id) 70 self.reg_values[reg_id] = None 71 72 return tb 73 74 @pytest.mark.parametrize("read_ports, write_ports, free_ports", [(2, 1, 1), (4, 2, 2)]) 75 def test_randomized(self, read_ports: int, write_ports: int, free_ports: int): 76 self.gen_params = GenParams(test_core_config.replace(phys_regs_bits=4)) 77 self.m = m = SimpleTestCircuit( 78 RegisterFile( 79 gen_params=self.gen_params, read_ports=read_ports, write_ports=write_ports, free_ports=free_ports 80 ) 81 ) 82 self.num_writes = 1000 83 84 self.reg_values: list[Optional[int]] = [None for _ in range(self.gen_params.phys_regs)] 85 self.reg_values[0] = 0 86 87 self.read_queues: list[deque[int]] = [deque() for _ in range(read_ports)] 88 self.free_set: set[int] = set(range(1, self.gen_params.phys_regs)) 89 self.used_set: set[int] = set() 90 91 random.seed(42) 92 93 with self.run_simulation(m) as sim: 94 for k in range(read_ports): 95 sim.add_testbench(self.tb_read_req(k), background=True) 96 sim.add_testbench(self.tb_read_resp(k), background=True) 97 for k in range(write_ports): 98 sim.add_testbench(self.tb_write(k)) 99 for k in range(free_ports): 100 sim.add_testbench(self.tb_free(k), background=True)