/ test / func_blocks / lsu / test_dummylsu.py
test_dummylsu.py
  1  import random
  2  from collections import deque
  3  from amaranth import *
  4  
  5  from transactron.lib import Adapter, AdapterTrans
  6  from transactron.utils import int_to_signed, signed_to_int
  7  from transactron.utils.dependencies import DependencyContext
  8  from transactron.testing.method_mock import MethodMock
  9  from transactron.testing import CallTrigger, TestbenchIO, TestCaseWithSimulator, def_method_mock, TestbenchContext
 10  from coreblocks.params import GenParams
 11  from coreblocks.func_blocks.fu.lsu.dummyLsu import LSUDummy
 12  from coreblocks.params.configurations import test_core_config
 13  from coreblocks.arch import *
 14  from coreblocks.interface.keys import CoreStateKey, CSRInstancesKey, ExceptionReportKey, InstructionPrecommitKey
 15  from coreblocks.priv.csr.csr_instances import CSRInstances
 16  from coreblocks.interface.layouts import ExceptionRegisterLayouts, RetirementLayouts
 17  from ...peripherals.bus_mock import BusMockParameters, MockMasterAdapter
 18  
 19  
 20  def generate_aligned_addr(max_reg_val: int) -> int:
 21      return random.randint(0, max_reg_val // 4) * 4
 22  
 23  
 24  def generate_random_op(ops: dict[str, tuple[OpType, Funct3]]) -> tuple[tuple[OpType, Funct3], int, bool]:
 25      ops_k = list(ops.keys())
 26      op = ops[ops_k[random.randint(0, len(ops) - 1)]]
 27      signess = False
 28      mask = 0xF
 29      if op[1] in {Funct3.B, Funct3.BU}:
 30          mask = 0x1
 31      if op[1] in {Funct3.H, Funct3.HU}:
 32          mask = 0x3
 33      if op[1] in {Funct3.B, Funct3.H}:
 34          signess = True
 35      return (op, mask, signess)
 36  
 37  
 38  def generate_imm(max_imm_val: int) -> int:
 39      if random.randint(0, 1):
 40          return 0
 41      else:
 42          return random.randint(0, max_imm_val)
 43  
 44  
 45  def shift_mask_based_on_addr(mask: int, addr: int) -> int:
 46      rest = addr % 4
 47      if mask == 0x1:
 48          mask = mask << rest
 49      elif mask == 0x3:
 50          mask = mask << rest
 51      return mask
 52  
 53  
 54  def check_align(addr: int, op: tuple[OpType, Funct3]) -> bool:
 55      rest = addr % 4
 56      if op[1] in {Funct3.B, Funct3.BU}:
 57          return True
 58      if op[1] in {Funct3.H, Funct3.HU} and rest in {0, 2}:
 59          return True
 60      if op[1] == Funct3.W and rest == 0:
 61          return True
 62      return False
 63  
 64  
 65  class DummyLSUTestCircuit(Elaboratable):
 66      def __init__(self, gen: GenParams):
 67          self.gen = gen
 68  
 69      def elaborate(self, platform):
 70          m = Module()
 71  
 72          bus_mock_params = BusMockParameters(data_width=self.gen.isa.ilen, addr_width=32)
 73  
 74          self.bus_master_adapter = MockMasterAdapter(bus_mock_params)
 75  
 76          m.submodules.exception_report = self.exception_report = TestbenchIO(
 77              Adapter(i=self.gen.get(ExceptionRegisterLayouts).report)
 78          )
 79  
 80          DependencyContext.get().add_dependency(ExceptionReportKey(), lambda: self.exception_report.adapter.iface)
 81  
 82          layouts = self.gen.get(RetirementLayouts)
 83          m.submodules.precommit = self.precommit = TestbenchIO(
 84              Adapter(
 85                  i=layouts.precommit_in,
 86                  o=layouts.precommit_out,
 87                  nonexclusive=True,
 88                  combiner=lambda m, args, runs: args[0],
 89              ).set(with_validate_arguments=True)
 90          )
 91          DependencyContext.get().add_dependency(InstructionPrecommitKey(), self.precommit.adapter.iface)
 92  
 93          m.submodules.core_state = self.core_state = TestbenchIO(Adapter(o=layouts.core_state, nonexclusive=True))
 94          DependencyContext.get().add_dependency(CoreStateKey(), self.core_state.adapter.iface)
 95  
 96          m.submodules.csr_instances = self.csr_instances = CSRInstances(self.gen)
 97          DependencyContext.get().add_dependency(CSRInstancesKey(), self.csr_instances)
 98  
 99          m.submodules.func_unit = func_unit = LSUDummy(self.gen, self.bus_master_adapter)
100  
101          m.submodules.issue_mock = self.issue = TestbenchIO(AdapterTrans.create(func_unit.issue))
102          m.submodules.push_result_mock = self.push_result = TestbenchIO(Adapter.create(func_unit.push_result))
103          m.submodules.bus_master_adapter = self.bus_master_adapter
104          return m
105  
106  
107  class TestDummyLSULoads(TestCaseWithSimulator):
108      last_rob_id: int = 0
109  
110      def generate_instr(self, max_reg_val, max_imm_val):
111          ops = {
112              "LB": (OpType.LOAD, Funct3.B),
113              "LBU": (OpType.LOAD, Funct3.BU),
114              "LH": (OpType.LOAD, Funct3.H),
115              "LHU": (OpType.LOAD, Funct3.HU),
116              "LW": (OpType.LOAD, Funct3.W),
117          }
118          for i in range(self.tests_number):
119              misaligned = False
120              bus_err = random.random() < 0.1
121  
122              # generate new instructions till we generate correct one
123              while True:
124                  # generate opcode
125                  (op, mask, signess) = generate_random_op(ops)
126                  # generate rp1, val1 which create addr
127                  s1_val = generate_aligned_addr(max_reg_val)
128                  imm = generate_imm(max_imm_val)
129                  addr = s1_val + imm
130  
131                  if check_align(addr, op):
132                      break
133  
134                  if random.random() < 0.1:
135                      misaligned = True
136                      break
137  
138              exec_fn = {"op_type": op[0], "funct3": op[1], "funct7": 0}
139  
140              # calculate word address and mask
141              mask = shift_mask_based_on_addr(mask, addr)
142              word_addr = addr >> 2
143  
144              rp_dst = random.randint(0, 2**self.gen_params.phys_regs_bits - 1)
145              self.last_rob_id = (self.last_rob_id + 1) % 2**self.gen_params.rob_entries_bits
146              rob_id = self.last_rob_id
147              instr = {
148                  "rp_dst": rp_dst,
149                  "rob_id": rob_id,
150                  "exec_fn": exec_fn,
151                  "s1_val": s1_val,
152                  "s2_val": 0,
153                  "imm": imm,
154                  "pc": 0,
155              }
156              self.instr_queue.appendleft(instr)
157              self.mem_data_queue.appendleft(
158                  {
159                      "addr": word_addr,
160                      "mask": mask,
161                      "sign": signess,
162                      "rnd_bytes": bytes.fromhex(f"{random.randint(0, 2**32-1):08x}"),
163                      "misaligned": misaligned,
164                      "err": bus_err,
165                  }
166              )
167  
168              if misaligned or bus_err:
169                  self.exception_queue.appendleft(
170                      {
171                          "rob_id": rob_id,
172                          "cause": (
173                              ExceptionCause.LOAD_ADDRESS_MISALIGNED if misaligned else ExceptionCause.LOAD_ACCESS_FAULT
174                          ),
175                          "pc": 0,
176                          "mtval": addr,
177                      }
178                  )
179  
180              self.exception_result.append(
181                  {"rob_id": rob_id, "err": misaligned or bus_err},
182              )
183  
184      def setup_method(self) -> None:
185          random.seed(14)
186          self.tests_number = 100
187          self.gen_params = GenParams(test_core_config.replace(phys_regs_bits=3, rob_entries_bits=4))
188          self.test_module = DummyLSUTestCircuit(self.gen_params)
189          self.instr_queue = deque()
190          self.mem_data_queue = deque()
191          self.returned_data = deque()
192          self.exception_queue = deque()
193          self.exception_result = deque()
194          self.free_rob_id = set(range(2**self.gen_params.rob_entries_bits))
195          self.generate_instr(2**7, 2**7)
196          self.max_wait = 10
197  
198      async def bus_mock(self, sim: TestbenchContext):
199          while self.mem_data_queue:
200              generated_data = self.mem_data_queue.pop()
201  
202              if generated_data["misaligned"]:
203                  continue
204  
205              mask = generated_data["mask"]
206              sign = generated_data["sign"]
207              req = await self.test_module.bus_master_adapter.request_read_mock.call(sim)
208              assert req.addr == generated_data["addr"]
209              assert req.sel == mask
210              await self.random_wait(sim, self.max_wait)
211  
212              resp_data = int((generated_data["rnd_bytes"][:4]).hex(), 16)
213              data_shift = (mask & -mask).bit_length() - 1
214              assert mask.bit_length() == data_shift + mask.bit_count(), "Unexpected mask"
215  
216              size = mask.bit_count() * 8
217              data_mask = 2**size - 1
218              data = (resp_data >> (data_shift * 8)) & data_mask
219              if sign:
220                  data = int_to_signed(signed_to_int(data, size), 32)
221              if not generated_data["err"]:
222                  self.returned_data.appendleft(data)
223              await self.test_module.bus_master_adapter.get_read_response_mock.call(
224                  sim, data=resp_data, err=generated_data["err"]
225              )
226  
227      async def inserter(self, sim: TestbenchContext):
228          for i in range(self.tests_number):
229              req = self.instr_queue.pop()
230              while req["rob_id"] not in self.free_rob_id:
231                  await sim.tick()
232              self.free_rob_id.remove(req["rob_id"])
233              await self.test_module.issue.call(sim, req)
234              await self.random_wait(sim, self.max_wait)
235  
236      async def consumer(self, sim: TestbenchContext):
237          for i in range(self.tests_number):
238              v = await self.test_module.push_result.call(sim)
239              rob_id = v["rob_id"]
240              assert rob_id not in self.free_rob_id
241              self.free_rob_id.add(rob_id)
242  
243              exc = next(i for i in self.exception_result if i["rob_id"] == rob_id)
244              self.exception_result.remove(exc)
245              if not exc["err"]:
246                  assert v["result"] == self.returned_data.pop()
247              assert v["exception"] == exc["err"]
248  
249              await self.random_wait(sim, self.max_wait)
250  
251      def test(self):
252          @def_method_mock(lambda: self.test_module.exception_report)
253          def exception_consumer(arg):
254              @MethodMock.effect
255              def eff():
256                  assert arg == self.exception_queue.pop()
257  
258          @def_method_mock(lambda: self.test_module.precommit, validate_arguments=lambda rob_id: True)
259          def precommiter(rob_id):
260              return {"side_fx": 1}
261  
262          @def_method_mock(lambda: self.test_module.core_state)
263          def core_state_process():
264              return {"flushing": 0}
265  
266          with self.run_simulation(self.test_module) as sim:
267              sim.add_testbench(self.bus_mock, background=True)
268              sim.add_testbench(self.inserter)
269              sim.add_testbench(self.consumer)
270  
271  
272  class TestDummyLSULoadsCycles(TestCaseWithSimulator):
273      def generate_instr(self, max_reg_val, max_imm_val):
274          s1_val = random.randint(0, max_reg_val // 4) * 4
275          imm = random.randint(0, max_imm_val // 4) * 4
276          rp_dst = random.randint(0, 2**self.gen_params.phys_regs_bits - 1)
277          rob_id = random.randint(0, 2**self.gen_params.rob_entries_bits - 1)
278  
279          exec_fn = {"op_type": OpType.LOAD, "funct3": Funct3.W, "funct7": 0}
280          instr = {
281              "rp_dst": rp_dst,
282              "rob_id": rob_id,
283              "exec_fn": exec_fn,
284              "s1_val": s1_val,
285              "s2_val": 0,
286              "imm": imm,
287              "pc": 0,
288          }
289  
290          data = {
291              "addr": (s1_val + imm) >> 2,
292              "mask": 0xF,
293              "rnd_bytes": bytes.fromhex(f"{random.randint(0, 2**32-1):08x}"),
294          }
295          return instr, data
296  
297      def setup_method(self) -> None:
298          random.seed(14)
299          self.gen_params = GenParams(test_core_config.replace(phys_regs_bits=3, rob_entries_bits=3))
300          self.test_module = DummyLSUTestCircuit(self.gen_params)
301  
302      async def one_instr_test(self, sim: TestbenchContext):
303          instr, data = self.generate_instr(2**7, 2**7)
304  
305          await self.test_module.issue.call(sim, instr)
306  
307          mask = data["mask"]
308          req = await self.test_module.bus_master_adapter.request_read_mock.call(sim)
309          assert req.addr == data["addr"]
310          assert req.sel == mask
311          data = data["rnd_bytes"][:4]
312          data = int(data.hex(), 16)
313          r, v = (
314              await CallTrigger(sim)
315              .call(self.test_module.bus_master_adapter.get_read_response_mock, data=data, err=0)
316              .call(self.test_module.push_result)
317          )
318          assert r is not None and v is not None
319          assert v["result"] == data
320  
321      def test(self):
322          @def_method_mock(lambda: self.test_module.exception_report)
323          def exception_consumer(arg):
324              @MethodMock.effect
325              def eff():
326                  assert False
327  
328          @def_method_mock(lambda: self.test_module.precommit, validate_arguments=lambda rob_id: True)
329          def precommiter(rob_id):
330              return {"side_fx": 1}
331  
332          with self.run_simulation(self.test_module) as sim:
333              sim.add_testbench(self.one_instr_test)
334  
335  
336  class TestDummyLSUStores(TestCaseWithSimulator):
337      def generate_instr(self, max_reg_val, max_imm_val):
338          ops = {
339              "SB": (OpType.STORE, Funct3.B),
340              "SH": (OpType.STORE, Funct3.H),
341              "SW": (OpType.STORE, Funct3.W),
342          }
343          for i in range(self.tests_number):
344              while True:
345                  # generate opcode
346                  (op, mask, _) = generate_random_op(ops)
347                  # generate address
348                  s1_val = generate_aligned_addr(max_reg_val)
349                  imm = generate_imm(max_imm_val)
350                  addr = s1_val + imm
351                  if check_align(addr, op):
352                      break
353  
354              data = s2_val = generate_aligned_addr(0xFFFFFFFF)
355  
356              exec_fn = {"op_type": op[0], "funct3": op[1], "funct7": 0}
357  
358              # calculate word address and mask
359              mask = shift_mask_based_on_addr(mask, addr)
360              addr = addr >> 2
361  
362              rob_id = random.randint(0, 2**self.gen_params.rob_entries_bits - 1)
363              instr = {
364                  "rp_dst": 0,
365                  "rob_id": rob_id,
366                  "exec_fn": exec_fn,
367                  "s1_val": s1_val,
368                  "s2_val": s2_val,
369                  "imm": imm,
370                  "pc": 0,
371              }
372              self.instr_queue.appendleft(instr)
373              self.mem_data_queue.appendleft({"addr": addr, "mask": mask, "data": bytes.fromhex(f"{data:08x}")})
374  
375      def setup_method(self) -> None:
376          random.seed(14)
377          self.tests_number = 100
378          self.gen_params = GenParams(test_core_config.replace(phys_regs_bits=3, rob_entries_bits=3))
379          self.test_module = DummyLSUTestCircuit(self.gen_params)
380          self.instr_queue = deque()
381          self.mem_data_queue = deque()
382          self.get_result_data = deque()
383          self.precommit_data = deque()
384          self.generate_instr(2**7, 2**7)
385          self.max_wait = 8
386  
387      async def bus_mock(self, sim: TestbenchContext):
388          for i in range(self.tests_number):
389              generated_data = self.mem_data_queue.pop()
390  
391              mask = generated_data["mask"]
392              b_dict = {1: 0, 2: 8, 4: 16, 8: 24}
393              h_dict = {3: 0, 0xC: 16}
394              if mask in b_dict:
395                  data = (int(generated_data["data"][-1:].hex(), 16) & 0xFF) << b_dict[mask]
396              elif mask in h_dict:
397                  data = (int(generated_data["data"][-2:].hex(), 16) & 0xFFFF) << h_dict[mask]
398              else:
399                  data = int(generated_data["data"][-4:].hex(), 16)
400              req = await self.test_module.bus_master_adapter.request_write_mock.call(sim)
401              assert req.addr == generated_data["addr"]
402              assert req.data == data
403              assert req.sel == generated_data["mask"]
404              await self.random_wait(sim, self.max_wait)
405  
406              await self.test_module.bus_master_adapter.get_write_response_mock.call(sim, err=0)
407  
408      async def inserter(self, sim: TestbenchContext):
409          for i in range(self.tests_number):
410              req = self.instr_queue.pop()
411              self.get_result_data.appendleft(req["rob_id"])
412              await self.test_module.issue.call(sim, req)
413              self.precommit_data.appendleft(req["rob_id"])
414              await self.random_wait(sim, self.max_wait)
415  
416      async def get_resulter(self, sim: TestbenchContext):
417          for i in range(self.tests_number):
418              v = await self.test_module.push_result.call(sim)
419              rob_id = self.get_result_data.pop()
420              assert v["rob_id"] == rob_id
421              assert v["rp_dst"] == 0
422              await self.random_wait(sim, self.max_wait)
423              self.precommit_data.pop()  # retire
424  
425      def precommit_validate(self, rob_id):
426          return len(self.precommit_data) > 0 and rob_id == self.precommit_data[-1]
427  
428      @def_method_mock(lambda self: self.test_module.precommit, validate_arguments=precommit_validate)
429      def precommiter(self, rob_id):
430          return {"side_fx": 1}
431  
432      def test(self):
433          @def_method_mock(lambda: self.test_module.exception_report)
434          def exception_consumer(arg):
435              @MethodMock.effect
436              def eff():
437                  assert False
438  
439          with self.run_simulation(self.test_module) as sim:
440              sim.add_testbench(self.bus_mock)
441              sim.add_testbench(self.inserter)
442              sim.add_testbench(self.get_resulter)
443  
444  
445  class TestDummyLSUFence(TestCaseWithSimulator):
446      def get_instr(self, exec_fn):
447          return {"rp_dst": 1, "rob_id": 1, "exec_fn": exec_fn, "s1_val": 4, "s2_val": 1, "imm": 8, "pc": 0}
448  
449      async def push_one_instr(self, sim: TestbenchContext, instr):
450          await self.test_module.issue.call(sim, instr)
451  
452          v = await self.test_module.push_result.call(sim)
453          if instr["exec_fn"]["op_type"] == OpType.LOAD:
454              assert v.result == 1
455  
456      async def process(self, sim: TestbenchContext):
457          # just tests if FENCE doens't hang up the LSU
458          load_fn = {"op_type": OpType.LOAD, "funct3": Funct3.W, "funct7": 0}
459          fence_fn = {"op_type": OpType.FENCE, "funct3": 0, "funct7": 0}
460          await self.push_one_instr(sim, self.get_instr(load_fn))
461          await self.push_one_instr(sim, self.get_instr(fence_fn))
462          await self.push_one_instr(sim, self.get_instr(load_fn))
463  
464      def test_fence(self):
465          self.gen_params = GenParams(test_core_config.replace(phys_regs_bits=3, rob_entries_bits=3))
466          self.test_module = DummyLSUTestCircuit(self.gen_params)
467  
468          @def_method_mock(lambda: self.test_module.exception_report)
469          def exception_consumer(arg):
470              @MethodMock.effect
471              def eff():
472                  assert False
473  
474          @def_method_mock(lambda: self.test_module.precommit, validate_arguments=lambda rob_id: True)
475          def precommiter(rob_id):
476              return {"side_fx": 1}
477  
478          pending_req = False
479  
480          @def_method_mock(lambda: self.test_module.bus_master_adapter.request_read_mock, enable=lambda: not pending_req)
481          def request_read(addr, sel):
482              @MethodMock.effect
483              def eff():
484                  nonlocal pending_req
485                  pending_req = True
486  
487          @def_method_mock(lambda: self.test_module.bus_master_adapter.get_read_response_mock, enable=lambda: pending_req)
488          def read_response():
489              @MethodMock.effect
490              def eff():
491                  nonlocal pending_req
492                  pending_req = False
493  
494              return {"data": 1, "err": 0}
495  
496          with self.run_simulation(self.test_module) as sim:
497              sim.add_testbench(self.process)