test_wishbone.py
1 from collections.abc import Iterable 2 import random 3 from collections import deque 4 5 from amaranth.lib.wiring import connect 6 from amaranth_types import ValueLike 7 8 from coreblocks.peripherals.wishbone import * 9 10 from transactron.lib import AdapterTrans 11 12 from transactron.testing import * 13 14 15 class WishboneInterfaceWrapper: 16 def __init__(self, wishbone_interface: WishboneInterface): 17 self.wb = wishbone_interface 18 19 def master_set(self, sim: SimulatorContext, addr: int, data: int, we: int): 20 sim.set(self.wb.dat_w, data) 21 sim.set(self.wb.adr, addr) 22 sim.set(self.wb.we, we) 23 sim.set(self.wb.cyc, 1) 24 sim.set(self.wb.stb, 1) 25 26 def master_release(self, sim: SimulatorContext, release_cyc: bool = True): 27 sim.set(self.wb.stb, 0) 28 if release_cyc: 29 sim.set(self.wb.cyc, 0) 30 31 async def slave_wait(self, sim: SimulatorContext): 32 *_, adr, we, sel, dat_w = ( 33 await sim.tick() 34 .sample(self.wb.adr, self.wb.we, self.wb.sel, self.wb.dat_w) 35 .until(self.wb.stb & self.wb.cyc) 36 ) 37 return adr, we, sel, dat_w 38 39 async def slave_wait_and_verify( 40 self, sim: SimulatorContext, exp_addr: int, exp_data: int, exp_we: int, exp_sel: int = 0 41 ): 42 adr, we, sel, dat_w = await self.slave_wait(sim) 43 44 assert adr == exp_addr 45 assert we == exp_we 46 assert sel == exp_sel 47 if exp_we: 48 assert dat_w == exp_data 49 50 async def slave_tick_and_verify( 51 self, sim: SimulatorContext, exp_addr: int, exp_data: int, exp_we: int, exp_sel: int = 0 52 ): 53 *_, adr, we, sel, dat_w, stb, cyc = await sim.tick().sample( 54 self.wb.adr, self.wb.we, self.wb.sel, self.wb.dat_w, self.wb.stb, self.wb.cyc 55 ) 56 assert stb and cyc 57 58 assert adr == exp_addr 59 assert we == exp_we 60 assert sel == exp_sel 61 if exp_we: 62 assert dat_w == exp_data 63 64 async def slave_respond( 65 self, 66 sim: SimulatorContext, 67 data: int, 68 ack: int = 1, 69 err: int = 0, 70 rty: int = 0, 71 sample: Iterable[ValueLike] = (), 72 ): 73 assert sim.get(self.wb.stb) and sim.get(self.wb.cyc) 74 75 sim.set(self.wb.dat_r, data) 76 sim.set(self.wb.ack, ack) 77 sim.set(self.wb.err, err) 78 sim.set(self.wb.rty, rty) 79 ret = await sim.tick().sample(*sample) 80 sim.set(self.wb.ack, 0) 81 sim.set(self.wb.err, 0) 82 sim.set(self.wb.rty, 0) 83 return ret 84 85 async def slave_respond_master_verify( 86 self, sim: SimulatorContext, master: WishboneInterface, data: int, ack: int = 1, err: int = 0, rty: int = 0 87 ): 88 *_, ack, dat_r = await self.slave_respond(sim, data, ack, err, rty, sample=[master.ack, master.dat_r]) 89 assert ack and dat_r == data 90 91 async def wait_ack(self, sim: SimulatorContext): 92 await sim.tick().until(self.wb.stb & self.wb.cyc & self.wb.ack) 93 94 95 class TestWishboneMaster(TestCaseWithSimulator): 96 class WishboneMasterTestModule(Elaboratable): 97 def __init__(self): 98 pass 99 100 def elaborate(self, platform): 101 m = Module() 102 m.submodules.wbm = self.wbm = wbm = WishboneMaster(WishboneParameters()) 103 m.submodules.rqa = self.requestAdapter = TestbenchIO(AdapterTrans.create(wbm.request)) 104 m.submodules.rsa = self.resultAdapter = TestbenchIO(AdapterTrans.create(wbm.result)) 105 return m 106 107 def test_manual(self): 108 twbm = TestWishboneMaster.WishboneMasterTestModule() 109 110 async def process(sim: TestbenchContext): 111 # read request 112 await twbm.requestAdapter.call(sim, addr=2, data=0, we=0, sel=1) 113 114 # read request after delay 115 await sim.tick() 116 await sim.tick() 117 await twbm.requestAdapter.call(sim, addr=1, data=0, we=0, sel=1) 118 119 # write request 120 await twbm.requestAdapter.call(sim, addr=3, data=5, we=1, sel=0) 121 122 # RTY and ERR responese 123 await twbm.requestAdapter.call(sim, addr=2, data=0, we=0, sel=0) 124 resp = await twbm.requestAdapter.call_try(sim, addr=0, data=0, we=0, sel=0) 125 assert resp is None # verify cycle restart 126 127 async def result_process(sim: TestbenchContext): 128 resp = await twbm.resultAdapter.call(sim) 129 assert resp["data"] == 8 130 assert not resp["err"] 131 132 resp = await twbm.resultAdapter.call(sim) 133 assert resp["data"] == 3 134 assert not resp["err"] 135 136 resp = await twbm.resultAdapter.call(sim) 137 assert not resp["err"] 138 139 resp = await twbm.resultAdapter.call(sim) 140 assert resp["data"] == 1 141 assert resp["err"] 142 143 async def slave(sim: TestbenchContext): 144 wwb = WishboneInterfaceWrapper(twbm.wbm.wb_master) 145 146 await wwb.slave_wait_and_verify(sim, 2, 0, 0, 1) 147 await wwb.slave_respond(sim, 8) 148 149 await wwb.slave_wait_and_verify(sim, 1, 0, 0, 1) 150 await wwb.slave_respond(sim, 3) 151 152 await wwb.slave_tick_and_verify(sim, 3, 5, 1, 0) 153 await wwb.slave_respond(sim, 0) 154 await sim.tick() 155 156 await wwb.slave_tick_and_verify(sim, 2, 0, 0, 0) 157 await wwb.slave_respond(sim, 1, ack=0, err=0, rty=1) 158 assert not sim.get(wwb.wb.stb) 159 160 await wwb.slave_wait_and_verify(sim, 2, 0, 0, 0) 161 await wwb.slave_respond(sim, 1, ack=1, err=1, rty=0) 162 163 with self.run_simulation(twbm) as sim: 164 sim.add_testbench(process) 165 sim.add_testbench(result_process) 166 sim.add_testbench(slave) 167 168 169 class TestWishboneMuxer(TestCaseWithSimulator): 170 def test_manual(self): 171 num_slaves = 4 172 mux = WishboneMuxer(WishboneParameters(), num_slaves, Signal(num_slaves)) 173 slaves = [WishboneInterfaceWrapper(slave) for slave in mux.slaves] 174 wb_master = WishboneInterfaceWrapper(mux.master_wb) 175 176 async def process(sim: TestbenchContext): 177 # check full communiaction 178 wb_master.master_set(sim, 2, 0, 1) 179 sim.set(mux.sselTGA, 0b0001) 180 await slaves[0].slave_tick_and_verify(sim, 2, 0, 1) 181 assert not sim.get(slaves[1].wb.stb) 182 await slaves[0].slave_respond_master_verify(sim, wb_master.wb, 4) 183 wb_master.master_release(sim, release_cyc=False) 184 await sim.tick() 185 # select without releasing cyc (only on stb) 186 wb_master.master_set(sim, 3, 0, 0) 187 sim.set(mux.sselTGA, 0b0010) 188 await slaves[1].slave_tick_and_verify(sim, 3, 0, 0) 189 assert not sim.get(slaves[0].wb.stb) 190 await slaves[1].slave_respond_master_verify(sim, wb_master.wb, 5) 191 wb_master.master_release(sim) 192 await sim.tick() 193 194 # normal selection 195 wb_master.master_set(sim, 6, 0, 0) 196 sim.set(mux.sselTGA, 0b1000) 197 await slaves[3].slave_tick_and_verify(sim, 6, 0, 0) 198 await slaves[3].slave_respond_master_verify(sim, wb_master.wb, 1) 199 200 with self.run_simulation(mux) as sim: 201 sim.add_testbench(process) 202 203 204 class TestWishboneArbiter(TestCaseWithSimulator): 205 def test_manual(self): 206 arb = WishboneArbiter(WishboneParameters(), 2) 207 slave = WishboneInterfaceWrapper(arb.slave_wb) 208 masters = [WishboneInterfaceWrapper(master) for master in arb.masters] 209 210 async def process(sim: TestbenchContext): 211 masters[0].master_set(sim, 2, 3, 1) 212 await slave.slave_wait_and_verify(sim, 2, 3, 1) 213 masters[1].master_set(sim, 1, 4, 1) 214 await slave.slave_respond_master_verify(sim, masters[0].wb, 0) 215 assert not sim.get(masters[1].wb.ack) 216 masters[0].master_release(sim) 217 await sim.tick() 218 219 # check if bus is granted to next master if previous ends cycle 220 await slave.slave_wait_and_verify(sim, 1, 4, 1) 221 await slave.slave_respond_master_verify(sim, masters[1].wb, 0) 222 assert not sim.get(masters[0].wb.ack) 223 masters[1].master_release(sim) 224 await sim.tick() 225 226 # check round robin behaviour (2 masters requesting *2) 227 masters[0].master_set(sim, 1, 0, 0) 228 masters[1].master_set(sim, 2, 0, 0) 229 await slave.slave_wait_and_verify(sim, 1, 0, 0) 230 await slave.slave_respond_master_verify(sim, masters[0].wb, 3) 231 masters[0].master_release(sim) 232 masters[1].master_release(sim) 233 await sim.tick() 234 assert not sim.get(slave.wb.cyc) 235 236 masters[0].master_set(sim, 1, 0, 0) 237 masters[1].master_set(sim, 2, 0, 0) 238 await slave.slave_wait_and_verify(sim, 2, 0, 0) 239 await slave.slave_respond_master_verify(sim, masters[1].wb, 0) 240 241 # check if releasing stb keeps grant 242 masters[1].master_release(sim, release_cyc=False) 243 await sim.tick() 244 masters[1].master_set(sim, 3, 0, 0) 245 await slave.slave_wait_and_verify(sim, 3, 0, 0) 246 await slave.slave_respond_master_verify(sim, masters[1].wb, 0) 247 248 with self.run_simulation(arb) as sim: 249 sim.add_testbench(process) 250 251 252 class TestPipelinedWishboneMaster(TestCaseWithSimulator): 253 def test_randomized(self): 254 requests = 1000 255 256 req_queue = deque() 257 res_queue = deque() 258 slave_queue = deque() 259 260 random.seed(42) 261 wb_params = WishboneParameters() 262 pwbm = SimpleTestCircuit(PipelinedWishboneMaster((wb_params))) 263 264 async def request_process(sim: TestbenchContext): 265 for _ in range(requests): 266 request = { 267 "addr": random.randint(0, 2**wb_params.addr_width - 1), 268 "data": random.randint(0, 2**wb_params.data_width - 1), 269 "we": random.randint(0, 1), 270 "sel": random.randint(0, 2**wb_params.granularity - 1), 271 } 272 req_queue.appendleft(request) 273 await pwbm.request.call(sim, request) 274 275 async def verify_process(sim: TestbenchContext): 276 for _ in range(requests): 277 await self.random_wait_geom(sim, 0.8) 278 279 result = await pwbm.result.call(sim) 280 cres = res_queue.pop() 281 assert result["data"] == cres 282 assert not result["err"] 283 284 async def slave_process(sim: TestbenchContext): 285 wbw = pwbm._dut.wb 286 async for *_, cyc, stb, stall, adr, dat_w, we, sel in sim.tick().sample( 287 wbw.cyc, wbw.stb, wbw.stall, wbw.adr, wbw.dat_w, wbw.we, wbw.sel 288 ): 289 if cyc and stb: 290 assert not stall 291 assert req_queue 292 c_req = req_queue.pop() 293 assert adr == c_req["addr"] 294 assert dat_w == c_req["data"] 295 assert we == c_req["we"] 296 assert sel == c_req["sel"] 297 298 slave_queue.appendleft(dat_w) 299 res_queue.appendleft(dat_w) 300 301 if slave_queue and random.random() < 0.4: 302 sim.set(wbw.ack, 1) 303 sim.set(wbw.dat_r, slave_queue.pop()) 304 else: 305 sim.set(wbw.ack, 0) 306 307 sim.set(wbw.stall, random.random() < 0.3) 308 309 with self.run_simulation(pwbm) as sim: 310 sim.add_testbench(request_process) 311 sim.add_testbench(verify_process) 312 sim.add_testbench(slave_process, background=True) 313 314 315 class WishboneMemorySlaveCircuit(Elaboratable): 316 def __init__(self, wb_params: WishboneParameters, mem_args: dict): 317 self.wb_params = wb_params 318 self.mem_args = mem_args 319 320 def elaborate(self, platform): 321 m = Module() 322 323 m.submodules.mem_slave = self.mem_slave = WishboneMemorySlave(self.wb_params, **self.mem_args) 324 m.submodules.mem_master = self.mem_master = WishboneMaster(self.wb_params) 325 m.submodules.request = self.request = TestbenchIO(AdapterTrans.create(self.mem_master.request)) 326 m.submodules.result = self.result = TestbenchIO(AdapterTrans.create(self.mem_master.result)) 327 328 connect(m, self.mem_master.wb_master, self.mem_slave.bus) 329 330 return m 331 332 333 class TestWishboneMemorySlave(TestCaseWithSimulator): 334 def setup_method(self): 335 self.memsize = 43 # test some weird depth 336 self.iters = 300 337 338 self.addr_width = (self.memsize - 1).bit_length() # nearest log2 >= log2(memsize) 339 self.wb_params = WishboneParameters(data_width=32, addr_width=self.addr_width, granularity=16) 340 self.m = WishboneMemorySlaveCircuit(wb_params=self.wb_params, mem_args={"depth": self.memsize, "init": []}) 341 342 self.sel_width = self.wb_params.data_width // self.wb_params.granularity 343 344 random.seed(42) 345 346 def test_randomized(self): 347 req_queue = deque() 348 349 mem_state = [0] * self.memsize 350 351 async def request_process(sim: TestbenchContext): 352 for _ in range(self.iters): 353 req = { 354 "addr": random.randint(0, self.memsize - 1), 355 "data": random.randint(0, 2**self.wb_params.data_width - 1), 356 "we": random.randint(0, 1), 357 "sel": random.randint(0, 2**self.sel_width - 1), 358 } 359 req_queue.appendleft(req) 360 361 await self.random_wait_geom(sim, 0.2) 362 await self.m.request.call(sim, req) 363 364 async def result_process(sim: TestbenchContext): 365 for _ in range(self.iters): 366 await self.random_wait_geom(sim, 0.2) 367 res = await self.m.result.call(sim) 368 req = req_queue.pop() 369 370 if not req["we"]: 371 assert res["data"] == mem_state[req["addr"]] 372 else: 373 for i in range(self.sel_width): 374 if req["sel"] & (1 << i): 375 granularity_mask = (2**self.wb_params.granularity - 1) << (i * self.wb_params.granularity) 376 mem_state[req["addr"]] &= ~granularity_mask 377 mem_state[req["addr"]] |= req["data"] & granularity_mask 378 val = sim.get(Value.cast(self.m.mem_slave.mem.data[req["addr"]])) 379 assert val == mem_state[req["addr"]] 380 381 with self.run_simulation(self.m, max_cycles=3000) as sim: 382 sim.add_testbench(request_process) 383 sim.add_testbench(result_process)