test_dummylsu.py
1 import random 2 from collections import deque 3 from amaranth import * 4 5 from transactron.lib import Adapter, AdapterTrans 6 from transactron.utils import int_to_signed, signed_to_int 7 from transactron.utils.dependencies import DependencyContext 8 from transactron.testing.method_mock import MethodMock 9 from transactron.testing import CallTrigger, TestbenchIO, TestCaseWithSimulator, def_method_mock, TestbenchContext 10 from coreblocks.params import GenParams 11 from coreblocks.func_blocks.fu.lsu.dummyLsu import LSUDummy 12 from coreblocks.params.configurations import test_core_config 13 from coreblocks.arch import * 14 from coreblocks.interface.keys import CoreStateKey, CSRInstancesKey, ExceptionReportKey, InstructionPrecommitKey 15 from coreblocks.priv.csr.csr_instances import CSRInstances 16 from coreblocks.interface.layouts import ExceptionRegisterLayouts, RetirementLayouts 17 from ...peripherals.bus_mock import BusMockParameters, MockMasterAdapter 18 19 20 def generate_aligned_addr(max_reg_val: int) -> int: 21 return random.randint(0, max_reg_val // 4) * 4 22 23 24 def generate_random_op(ops: dict[str, tuple[OpType, Funct3]]) -> tuple[tuple[OpType, Funct3], int, bool]: 25 ops_k = list(ops.keys()) 26 op = ops[ops_k[random.randint(0, len(ops) - 1)]] 27 signess = False 28 mask = 0xF 29 if op[1] in {Funct3.B, Funct3.BU}: 30 mask = 0x1 31 if op[1] in {Funct3.H, Funct3.HU}: 32 mask = 0x3 33 if op[1] in {Funct3.B, Funct3.H}: 34 signess = True 35 return (op, mask, signess) 36 37 38 def generate_imm(max_imm_val: int) -> int: 39 if random.randint(0, 1): 40 return 0 41 else: 42 return random.randint(0, max_imm_val) 43 44 45 def shift_mask_based_on_addr(mask: int, addr: int) -> int: 46 rest = addr % 4 47 if mask == 0x1: 48 mask = mask << rest 49 elif mask == 0x3: 50 mask = mask << rest 51 return mask 52 53 54 def check_align(addr: int, op: tuple[OpType, Funct3]) -> bool: 55 rest = addr % 4 56 if op[1] in {Funct3.B, Funct3.BU}: 57 return True 58 if op[1] in {Funct3.H, Funct3.HU} and rest in {0, 2}: 59 return True 60 if op[1] == Funct3.W and rest == 0: 61 return True 62 return False 63 64 65 class DummyLSUTestCircuit(Elaboratable): 66 def __init__(self, gen: GenParams): 67 self.gen = gen 68 69 def elaborate(self, platform): 70 m = Module() 71 72 bus_mock_params = BusMockParameters(data_width=self.gen.isa.ilen, addr_width=32) 73 74 self.bus_master_adapter = MockMasterAdapter(bus_mock_params) 75 76 m.submodules.exception_report = self.exception_report = TestbenchIO( 77 Adapter(i=self.gen.get(ExceptionRegisterLayouts).report) 78 ) 79 80 DependencyContext.get().add_dependency(ExceptionReportKey(), lambda: self.exception_report.adapter.iface) 81 82 layouts = self.gen.get(RetirementLayouts) 83 m.submodules.precommit = self.precommit = TestbenchIO( 84 Adapter( 85 i=layouts.precommit_in, 86 o=layouts.precommit_out, 87 nonexclusive=True, 88 combiner=lambda m, args, runs: args[0], 89 ).set(with_validate_arguments=True) 90 ) 91 DependencyContext.get().add_dependency(InstructionPrecommitKey(), self.precommit.adapter.iface) 92 93 m.submodules.core_state = self.core_state = TestbenchIO(Adapter(o=layouts.core_state, nonexclusive=True)) 94 DependencyContext.get().add_dependency(CoreStateKey(), self.core_state.adapter.iface) 95 96 m.submodules.csr_instances = self.csr_instances = CSRInstances(self.gen) 97 DependencyContext.get().add_dependency(CSRInstancesKey(), self.csr_instances) 98 99 m.submodules.func_unit = func_unit = LSUDummy(self.gen, self.bus_master_adapter) 100 101 m.submodules.issue_mock = self.issue = TestbenchIO(AdapterTrans.create(func_unit.issue)) 102 m.submodules.push_result_mock = self.push_result = TestbenchIO(Adapter.create(func_unit.push_result)) 103 m.submodules.bus_master_adapter = self.bus_master_adapter 104 return m 105 106 107 class TestDummyLSULoads(TestCaseWithSimulator): 108 last_rob_id: int = 0 109 110 def generate_instr(self, max_reg_val, max_imm_val): 111 ops = { 112 "LB": (OpType.LOAD, Funct3.B), 113 "LBU": (OpType.LOAD, Funct3.BU), 114 "LH": (OpType.LOAD, Funct3.H), 115 "LHU": (OpType.LOAD, Funct3.HU), 116 "LW": (OpType.LOAD, Funct3.W), 117 } 118 for i in range(self.tests_number): 119 misaligned = False 120 bus_err = random.random() < 0.1 121 122 # generate new instructions till we generate correct one 123 while True: 124 # generate opcode 125 (op, mask, signess) = generate_random_op(ops) 126 # generate rp1, val1 which create addr 127 s1_val = generate_aligned_addr(max_reg_val) 128 imm = generate_imm(max_imm_val) 129 addr = s1_val + imm 130 131 if check_align(addr, op): 132 break 133 134 if random.random() < 0.1: 135 misaligned = True 136 break 137 138 exec_fn = {"op_type": op[0], "funct3": op[1], "funct7": 0} 139 140 # calculate word address and mask 141 mask = shift_mask_based_on_addr(mask, addr) 142 word_addr = addr >> 2 143 144 rp_dst = random.randint(0, 2**self.gen_params.phys_regs_bits - 1) 145 self.last_rob_id = (self.last_rob_id + 1) % 2**self.gen_params.rob_entries_bits 146 rob_id = self.last_rob_id 147 instr = { 148 "rp_dst": rp_dst, 149 "rob_id": rob_id, 150 "exec_fn": exec_fn, 151 "s1_val": s1_val, 152 "s2_val": 0, 153 "imm": imm, 154 "pc": 0, 155 } 156 self.instr_queue.appendleft(instr) 157 self.mem_data_queue.appendleft( 158 { 159 "addr": word_addr, 160 "mask": mask, 161 "sign": signess, 162 "rnd_bytes": bytes.fromhex(f"{random.randint(0, 2**32-1):08x}"), 163 "misaligned": misaligned, 164 "err": bus_err, 165 } 166 ) 167 168 if misaligned or bus_err: 169 self.exception_queue.appendleft( 170 { 171 "rob_id": rob_id, 172 "cause": ( 173 ExceptionCause.LOAD_ADDRESS_MISALIGNED if misaligned else ExceptionCause.LOAD_ACCESS_FAULT 174 ), 175 "pc": 0, 176 "mtval": addr, 177 } 178 ) 179 180 self.exception_result.append( 181 {"rob_id": rob_id, "err": misaligned or bus_err}, 182 ) 183 184 def setup_method(self) -> None: 185 random.seed(14) 186 self.tests_number = 100 187 self.gen_params = GenParams(test_core_config.replace(phys_regs_bits=3, rob_entries_bits=4)) 188 self.test_module = DummyLSUTestCircuit(self.gen_params) 189 self.instr_queue = deque() 190 self.mem_data_queue = deque() 191 self.returned_data = deque() 192 self.exception_queue = deque() 193 self.exception_result = deque() 194 self.free_rob_id = set(range(2**self.gen_params.rob_entries_bits)) 195 self.generate_instr(2**7, 2**7) 196 self.max_wait = 10 197 198 async def bus_mock(self, sim: TestbenchContext): 199 while self.mem_data_queue: 200 generated_data = self.mem_data_queue.pop() 201 202 if generated_data["misaligned"]: 203 continue 204 205 mask = generated_data["mask"] 206 sign = generated_data["sign"] 207 req = await self.test_module.bus_master_adapter.request_read_mock.call(sim) 208 assert req.addr == generated_data["addr"] 209 assert req.sel == mask 210 await self.random_wait(sim, self.max_wait) 211 212 resp_data = int((generated_data["rnd_bytes"][:4]).hex(), 16) 213 data_shift = (mask & -mask).bit_length() - 1 214 assert mask.bit_length() == data_shift + mask.bit_count(), "Unexpected mask" 215 216 size = mask.bit_count() * 8 217 data_mask = 2**size - 1 218 data = (resp_data >> (data_shift * 8)) & data_mask 219 if sign: 220 data = int_to_signed(signed_to_int(data, size), 32) 221 if not generated_data["err"]: 222 self.returned_data.appendleft(data) 223 await self.test_module.bus_master_adapter.get_read_response_mock.call( 224 sim, data=resp_data, err=generated_data["err"] 225 ) 226 227 async def inserter(self, sim: TestbenchContext): 228 for i in range(self.tests_number): 229 req = self.instr_queue.pop() 230 while req["rob_id"] not in self.free_rob_id: 231 await sim.tick() 232 self.free_rob_id.remove(req["rob_id"]) 233 await self.test_module.issue.call(sim, req) 234 await self.random_wait(sim, self.max_wait) 235 236 async def consumer(self, sim: TestbenchContext): 237 for i in range(self.tests_number): 238 v = await self.test_module.push_result.call(sim) 239 rob_id = v["rob_id"] 240 assert rob_id not in self.free_rob_id 241 self.free_rob_id.add(rob_id) 242 243 exc = next(i for i in self.exception_result if i["rob_id"] == rob_id) 244 self.exception_result.remove(exc) 245 if not exc["err"]: 246 assert v["result"] == self.returned_data.pop() 247 assert v["exception"] == exc["err"] 248 249 await self.random_wait(sim, self.max_wait) 250 251 def test(self): 252 @def_method_mock(lambda: self.test_module.exception_report) 253 def exception_consumer(arg): 254 @MethodMock.effect 255 def eff(): 256 assert arg == self.exception_queue.pop() 257 258 @def_method_mock(lambda: self.test_module.precommit, validate_arguments=lambda rob_id: True) 259 def precommiter(rob_id): 260 return {"side_fx": 1} 261 262 @def_method_mock(lambda: self.test_module.core_state) 263 def core_state_process(): 264 return {"flushing": 0} 265 266 with self.run_simulation(self.test_module) as sim: 267 sim.add_testbench(self.bus_mock, background=True) 268 sim.add_testbench(self.inserter) 269 sim.add_testbench(self.consumer) 270 271 272 class TestDummyLSULoadsCycles(TestCaseWithSimulator): 273 def generate_instr(self, max_reg_val, max_imm_val): 274 s1_val = random.randint(0, max_reg_val // 4) * 4 275 imm = random.randint(0, max_imm_val // 4) * 4 276 rp_dst = random.randint(0, 2**self.gen_params.phys_regs_bits - 1) 277 rob_id = random.randint(0, 2**self.gen_params.rob_entries_bits - 1) 278 279 exec_fn = {"op_type": OpType.LOAD, "funct3": Funct3.W, "funct7": 0} 280 instr = { 281 "rp_dst": rp_dst, 282 "rob_id": rob_id, 283 "exec_fn": exec_fn, 284 "s1_val": s1_val, 285 "s2_val": 0, 286 "imm": imm, 287 "pc": 0, 288 } 289 290 data = { 291 "addr": (s1_val + imm) >> 2, 292 "mask": 0xF, 293 "rnd_bytes": bytes.fromhex(f"{random.randint(0, 2**32-1):08x}"), 294 } 295 return instr, data 296 297 def setup_method(self) -> None: 298 random.seed(14) 299 self.gen_params = GenParams(test_core_config.replace(phys_regs_bits=3, rob_entries_bits=3)) 300 self.test_module = DummyLSUTestCircuit(self.gen_params) 301 302 async def one_instr_test(self, sim: TestbenchContext): 303 instr, data = self.generate_instr(2**7, 2**7) 304 305 await self.test_module.issue.call(sim, instr) 306 307 mask = data["mask"] 308 req = await self.test_module.bus_master_adapter.request_read_mock.call(sim) 309 assert req.addr == data["addr"] 310 assert req.sel == mask 311 data = data["rnd_bytes"][:4] 312 data = int(data.hex(), 16) 313 r, v = ( 314 await CallTrigger(sim) 315 .call(self.test_module.bus_master_adapter.get_read_response_mock, data=data, err=0) 316 .call(self.test_module.push_result) 317 ) 318 assert r is not None and v is not None 319 assert v["result"] == data 320 321 def test(self): 322 @def_method_mock(lambda: self.test_module.exception_report) 323 def exception_consumer(arg): 324 @MethodMock.effect 325 def eff(): 326 assert False 327 328 @def_method_mock(lambda: self.test_module.precommit, validate_arguments=lambda rob_id: True) 329 def precommiter(rob_id): 330 return {"side_fx": 1} 331 332 with self.run_simulation(self.test_module) as sim: 333 sim.add_testbench(self.one_instr_test) 334 335 336 class TestDummyLSUStores(TestCaseWithSimulator): 337 def generate_instr(self, max_reg_val, max_imm_val): 338 ops = { 339 "SB": (OpType.STORE, Funct3.B), 340 "SH": (OpType.STORE, Funct3.H), 341 "SW": (OpType.STORE, Funct3.W), 342 } 343 for i in range(self.tests_number): 344 while True: 345 # generate opcode 346 (op, mask, _) = generate_random_op(ops) 347 # generate address 348 s1_val = generate_aligned_addr(max_reg_val) 349 imm = generate_imm(max_imm_val) 350 addr = s1_val + imm 351 if check_align(addr, op): 352 break 353 354 data = s2_val = generate_aligned_addr(0xFFFFFFFF) 355 356 exec_fn = {"op_type": op[0], "funct3": op[1], "funct7": 0} 357 358 # calculate word address and mask 359 mask = shift_mask_based_on_addr(mask, addr) 360 addr = addr >> 2 361 362 rob_id = random.randint(0, 2**self.gen_params.rob_entries_bits - 1) 363 instr = { 364 "rp_dst": 0, 365 "rob_id": rob_id, 366 "exec_fn": exec_fn, 367 "s1_val": s1_val, 368 "s2_val": s2_val, 369 "imm": imm, 370 "pc": 0, 371 } 372 self.instr_queue.appendleft(instr) 373 self.mem_data_queue.appendleft({"addr": addr, "mask": mask, "data": bytes.fromhex(f"{data:08x}")}) 374 375 def setup_method(self) -> None: 376 random.seed(14) 377 self.tests_number = 100 378 self.gen_params = GenParams(test_core_config.replace(phys_regs_bits=3, rob_entries_bits=3)) 379 self.test_module = DummyLSUTestCircuit(self.gen_params) 380 self.instr_queue = deque() 381 self.mem_data_queue = deque() 382 self.get_result_data = deque() 383 self.precommit_data = deque() 384 self.generate_instr(2**7, 2**7) 385 self.max_wait = 8 386 387 async def bus_mock(self, sim: TestbenchContext): 388 for i in range(self.tests_number): 389 generated_data = self.mem_data_queue.pop() 390 391 mask = generated_data["mask"] 392 b_dict = {1: 0, 2: 8, 4: 16, 8: 24} 393 h_dict = {3: 0, 0xC: 16} 394 if mask in b_dict: 395 data = (int(generated_data["data"][-1:].hex(), 16) & 0xFF) << b_dict[mask] 396 elif mask in h_dict: 397 data = (int(generated_data["data"][-2:].hex(), 16) & 0xFFFF) << h_dict[mask] 398 else: 399 data = int(generated_data["data"][-4:].hex(), 16) 400 req = await self.test_module.bus_master_adapter.request_write_mock.call(sim) 401 assert req.addr == generated_data["addr"] 402 assert req.data == data 403 assert req.sel == generated_data["mask"] 404 await self.random_wait(sim, self.max_wait) 405 406 await self.test_module.bus_master_adapter.get_write_response_mock.call(sim, err=0) 407 408 async def inserter(self, sim: TestbenchContext): 409 for i in range(self.tests_number): 410 req = self.instr_queue.pop() 411 self.get_result_data.appendleft(req["rob_id"]) 412 await self.test_module.issue.call(sim, req) 413 self.precommit_data.appendleft(req["rob_id"]) 414 await self.random_wait(sim, self.max_wait) 415 416 async def get_resulter(self, sim: TestbenchContext): 417 for i in range(self.tests_number): 418 v = await self.test_module.push_result.call(sim) 419 rob_id = self.get_result_data.pop() 420 assert v["rob_id"] == rob_id 421 assert v["rp_dst"] == 0 422 await self.random_wait(sim, self.max_wait) 423 self.precommit_data.pop() # retire 424 425 def precommit_validate(self, rob_id): 426 return len(self.precommit_data) > 0 and rob_id == self.precommit_data[-1] 427 428 @def_method_mock(lambda self: self.test_module.precommit, validate_arguments=precommit_validate) 429 def precommiter(self, rob_id): 430 return {"side_fx": 1} 431 432 def test(self): 433 @def_method_mock(lambda: self.test_module.exception_report) 434 def exception_consumer(arg): 435 @MethodMock.effect 436 def eff(): 437 assert False 438 439 with self.run_simulation(self.test_module) as sim: 440 sim.add_testbench(self.bus_mock) 441 sim.add_testbench(self.inserter) 442 sim.add_testbench(self.get_resulter) 443 444 445 class TestDummyLSUFence(TestCaseWithSimulator): 446 def get_instr(self, exec_fn): 447 return {"rp_dst": 1, "rob_id": 1, "exec_fn": exec_fn, "s1_val": 4, "s2_val": 1, "imm": 8, "pc": 0} 448 449 async def push_one_instr(self, sim: TestbenchContext, instr): 450 await self.test_module.issue.call(sim, instr) 451 452 v = await self.test_module.push_result.call(sim) 453 if instr["exec_fn"]["op_type"] == OpType.LOAD: 454 assert v.result == 1 455 456 async def process(self, sim: TestbenchContext): 457 # just tests if FENCE doens't hang up the LSU 458 load_fn = {"op_type": OpType.LOAD, "funct3": Funct3.W, "funct7": 0} 459 fence_fn = {"op_type": OpType.FENCE, "funct3": 0, "funct7": 0} 460 await self.push_one_instr(sim, self.get_instr(load_fn)) 461 await self.push_one_instr(sim, self.get_instr(fence_fn)) 462 await self.push_one_instr(sim, self.get_instr(load_fn)) 463 464 def test_fence(self): 465 self.gen_params = GenParams(test_core_config.replace(phys_regs_bits=3, rob_entries_bits=3)) 466 self.test_module = DummyLSUTestCircuit(self.gen_params) 467 468 @def_method_mock(lambda: self.test_module.exception_report) 469 def exception_consumer(arg): 470 @MethodMock.effect 471 def eff(): 472 assert False 473 474 @def_method_mock(lambda: self.test_module.precommit, validate_arguments=lambda rob_id: True) 475 def precommiter(rob_id): 476 return {"side_fx": 1} 477 478 pending_req = False 479 480 @def_method_mock(lambda: self.test_module.bus_master_adapter.request_read_mock, enable=lambda: not pending_req) 481 def request_read(addr, sel): 482 @MethodMock.effect 483 def eff(): 484 nonlocal pending_req 485 pending_req = True 486 487 @def_method_mock(lambda: self.test_module.bus_master_adapter.get_read_response_mock, enable=lambda: pending_req) 488 def read_response(): 489 @MethodMock.effect 490 def eff(): 491 nonlocal pending_req 492 pending_req = False 493 494 return {"data": 1, "err": 0} 495 496 with self.run_simulation(self.test_module) as sim: 497 sim.add_testbench(self.process)