/ test / core_structs / test_rf.py
test_rf.py
  1  import random
  2  from typing import Optional
  3  import pytest
  4  from collections import deque
  5  from transactron.testing import TestCaseWithSimulator, SimpleTestCircuit, TestbenchContext
  6  
  7  from coreblocks.core_structs.rf import RegisterFile
  8  from coreblocks.params import GenParams
  9  from coreblocks.params.configurations import test_core_config
 10  
 11  
 12  class TestRegisterFile(TestCaseWithSimulator):
 13      def tb_read_req(self, k: int):
 14          async def tb(sim: TestbenchContext):
 15              while True:
 16                  await self.random_wait_geom(sim, 0.95)
 17                  reg_id = random.randrange(0, self.gen_params.phys_regs)
 18                  await self.m.read_req[k].call(sim, reg_id=reg_id)
 19                  self.read_queues[k].append(reg_id)
 20  
 21          return tb
 22  
 23      def tb_read_resp(self, k: int):
 24          async def tb(sim: TestbenchContext):
 25              await sim.delay(1e-9)
 26              while True:
 27                  await self.random_wait_geom(sim, 0.95)
 28                  while not self.read_queues[k]:
 29                      await sim.tick()
 30                      await sim.delay(1e-9)
 31                  reg_id = self.read_queues[k].popleft()
 32                  resp = await self.m.read_resp[k].call(sim, reg_id=reg_id)
 33                  await sim.delay(1e-9)  # writes happen before asserts
 34                  assert bool(resp.valid) == (self.reg_values[reg_id] is not None)
 35                  assert self.reg_values[reg_id] is None or resp.reg_val == self.reg_values[reg_id]
 36  
 37          return tb
 38  
 39      def tb_write(self, k: int):
 40          async def tb(sim: TestbenchContext):
 41              for _ in range(self.num_writes):
 42                  await self.random_wait_geom(sim, 0.8)
 43                  await sim.delay(1e-9)
 44                  while not self.free_set:
 45                      await sim.tick()
 46                      await sim.delay(1e-9)
 47                  reg_id = random.choice(list(self.free_set))
 48                  self.free_set.remove(reg_id)
 49                  reg_val = random.randrange(0, 2**self.gen_params.isa.xlen)
 50                  await self.m.write[k].call(sim, reg_id=reg_id, reg_val=reg_val)
 51                  self.used_set.add(reg_id)
 52                  self.reg_values[reg_id] = reg_val
 53  
 54          return tb
 55  
 56      def tb_free(self, k: int):
 57          async def tb(sim: TestbenchContext):
 58              await sim.delay(2e-9)
 59              while True:
 60                  await self.random_wait_geom(sim, 0.5)
 61                  await sim.delay(2e-9)
 62                  while not self.used_set:
 63                      await sim.tick()
 64                      await sim.delay(2e-9)
 65                  reg_id = random.choice(list(self.used_set))
 66                  self.used_set.remove(reg_id)
 67                  await self.m.free[k].call(sim, reg_id=reg_id)
 68                  await sim.delay(2e-9)  # frees happen after asserts
 69                  self.free_set.add(reg_id)
 70                  self.reg_values[reg_id] = None
 71  
 72          return tb
 73  
 74      @pytest.mark.parametrize("read_ports, write_ports, free_ports", [(2, 1, 1), (4, 2, 2)])
 75      def test_randomized(self, read_ports: int, write_ports: int, free_ports: int):
 76          self.gen_params = GenParams(test_core_config.replace(phys_regs_bits=4))
 77          self.m = m = SimpleTestCircuit(
 78              RegisterFile(
 79                  gen_params=self.gen_params, read_ports=read_ports, write_ports=write_ports, free_ports=free_ports
 80              )
 81          )
 82          self.num_writes = 1000
 83  
 84          self.reg_values: list[Optional[int]] = [None for _ in range(self.gen_params.phys_regs)]
 85          self.reg_values[0] = 0
 86  
 87          self.read_queues: list[deque[int]] = [deque() for _ in range(read_ports)]
 88          self.free_set: set[int] = set(range(1, self.gen_params.phys_regs))
 89          self.used_set: set[int] = set()
 90  
 91          random.seed(42)
 92  
 93          with self.run_simulation(m) as sim:
 94              for k in range(read_ports):
 95                  sim.add_testbench(self.tb_read_req(k), background=True)
 96                  sim.add_testbench(self.tb_read_resp(k), background=True)
 97              for k in range(write_ports):
 98                  sim.add_testbench(self.tb_write(k))
 99              for k in range(free_ports):
100                  sim.add_testbench(self.tb_free(k), background=True)