/ test / peripherals / test_wishbone.py
test_wishbone.py
  1  from collections.abc import Iterable
  2  import random
  3  from collections import deque
  4  
  5  from amaranth.lib.wiring import connect
  6  from amaranth_types import ValueLike
  7  
  8  from coreblocks.peripherals.wishbone import *
  9  
 10  from transactron.lib import AdapterTrans
 11  
 12  from transactron.testing import *
 13  
 14  
 15  class WishboneInterfaceWrapper:
 16      def __init__(self, wishbone_interface: WishboneInterface):
 17          self.wb = wishbone_interface
 18  
 19      def master_set(self, sim: SimulatorContext, addr: int, data: int, we: int):
 20          sim.set(self.wb.dat_w, data)
 21          sim.set(self.wb.adr, addr)
 22          sim.set(self.wb.we, we)
 23          sim.set(self.wb.cyc, 1)
 24          sim.set(self.wb.stb, 1)
 25  
 26      def master_release(self, sim: SimulatorContext, release_cyc: bool = True):
 27          sim.set(self.wb.stb, 0)
 28          if release_cyc:
 29              sim.set(self.wb.cyc, 0)
 30  
 31      async def slave_wait(self, sim: SimulatorContext):
 32          *_, adr, we, sel, dat_w = (
 33              await sim.tick()
 34              .sample(self.wb.adr, self.wb.we, self.wb.sel, self.wb.dat_w)
 35              .until(self.wb.stb & self.wb.cyc)
 36          )
 37          return adr, we, sel, dat_w
 38  
 39      async def slave_wait_and_verify(
 40          self, sim: SimulatorContext, exp_addr: int, exp_data: int, exp_we: int, exp_sel: int = 0
 41      ):
 42          adr, we, sel, dat_w = await self.slave_wait(sim)
 43  
 44          assert adr == exp_addr
 45          assert we == exp_we
 46          assert sel == exp_sel
 47          if exp_we:
 48              assert dat_w == exp_data
 49  
 50      async def slave_tick_and_verify(
 51          self, sim: SimulatorContext, exp_addr: int, exp_data: int, exp_we: int, exp_sel: int = 0
 52      ):
 53          *_, adr, we, sel, dat_w, stb, cyc = await sim.tick().sample(
 54              self.wb.adr, self.wb.we, self.wb.sel, self.wb.dat_w, self.wb.stb, self.wb.cyc
 55          )
 56          assert stb and cyc
 57  
 58          assert adr == exp_addr
 59          assert we == exp_we
 60          assert sel == exp_sel
 61          if exp_we:
 62              assert dat_w == exp_data
 63  
 64      async def slave_respond(
 65          self,
 66          sim: SimulatorContext,
 67          data: int,
 68          ack: int = 1,
 69          err: int = 0,
 70          rty: int = 0,
 71          sample: Iterable[ValueLike] = (),
 72      ):
 73          assert sim.get(self.wb.stb) and sim.get(self.wb.cyc)
 74  
 75          sim.set(self.wb.dat_r, data)
 76          sim.set(self.wb.ack, ack)
 77          sim.set(self.wb.err, err)
 78          sim.set(self.wb.rty, rty)
 79          ret = await sim.tick().sample(*sample)
 80          sim.set(self.wb.ack, 0)
 81          sim.set(self.wb.err, 0)
 82          sim.set(self.wb.rty, 0)
 83          return ret
 84  
 85      async def slave_respond_master_verify(
 86          self, sim: SimulatorContext, master: WishboneInterface, data: int, ack: int = 1, err: int = 0, rty: int = 0
 87      ):
 88          *_, ack, dat_r = await self.slave_respond(sim, data, ack, err, rty, sample=[master.ack, master.dat_r])
 89          assert ack and dat_r == data
 90  
 91      async def wait_ack(self, sim: SimulatorContext):
 92          await sim.tick().until(self.wb.stb & self.wb.cyc & self.wb.ack)
 93  
 94  
 95  class TestWishboneMaster(TestCaseWithSimulator):
 96      class WishboneMasterTestModule(Elaboratable):
 97          def __init__(self):
 98              pass
 99  
100          def elaborate(self, platform):
101              m = Module()
102              m.submodules.wbm = self.wbm = wbm = WishboneMaster(WishboneParameters())
103              m.submodules.rqa = self.requestAdapter = TestbenchIO(AdapterTrans.create(wbm.request))
104              m.submodules.rsa = self.resultAdapter = TestbenchIO(AdapterTrans.create(wbm.result))
105              return m
106  
107      def test_manual(self):
108          twbm = TestWishboneMaster.WishboneMasterTestModule()
109  
110          async def process(sim: TestbenchContext):
111              # read request
112              await twbm.requestAdapter.call(sim, addr=2, data=0, we=0, sel=1)
113  
114              # read request after delay
115              await sim.tick()
116              await sim.tick()
117              await twbm.requestAdapter.call(sim, addr=1, data=0, we=0, sel=1)
118  
119              # write request
120              await twbm.requestAdapter.call(sim, addr=3, data=5, we=1, sel=0)
121  
122              # RTY and ERR responese
123              await twbm.requestAdapter.call(sim, addr=2, data=0, we=0, sel=0)
124              resp = await twbm.requestAdapter.call_try(sim, addr=0, data=0, we=0, sel=0)
125              assert resp is None  # verify cycle restart
126  
127          async def result_process(sim: TestbenchContext):
128              resp = await twbm.resultAdapter.call(sim)
129              assert resp["data"] == 8
130              assert not resp["err"]
131  
132              resp = await twbm.resultAdapter.call(sim)
133              assert resp["data"] == 3
134              assert not resp["err"]
135  
136              resp = await twbm.resultAdapter.call(sim)
137              assert not resp["err"]
138  
139              resp = await twbm.resultAdapter.call(sim)
140              assert resp["data"] == 1
141              assert resp["err"]
142  
143          async def slave(sim: TestbenchContext):
144              wwb = WishboneInterfaceWrapper(twbm.wbm.wb_master)
145  
146              await wwb.slave_wait_and_verify(sim, 2, 0, 0, 1)
147              await wwb.slave_respond(sim, 8)
148  
149              await wwb.slave_wait_and_verify(sim, 1, 0, 0, 1)
150              await wwb.slave_respond(sim, 3)
151  
152              await wwb.slave_tick_and_verify(sim, 3, 5, 1, 0)
153              await wwb.slave_respond(sim, 0)
154              await sim.tick()
155  
156              await wwb.slave_tick_and_verify(sim, 2, 0, 0, 0)
157              await wwb.slave_respond(sim, 1, ack=0, err=0, rty=1)
158              assert not sim.get(wwb.wb.stb)
159  
160              await wwb.slave_wait_and_verify(sim, 2, 0, 0, 0)
161              await wwb.slave_respond(sim, 1, ack=1, err=1, rty=0)
162  
163          with self.run_simulation(twbm) as sim:
164              sim.add_testbench(process)
165              sim.add_testbench(result_process)
166              sim.add_testbench(slave)
167  
168  
169  class TestWishboneMuxer(TestCaseWithSimulator):
170      def test_manual(self):
171          num_slaves = 4
172          mux = WishboneMuxer(WishboneParameters(), num_slaves, Signal(num_slaves))
173          slaves = [WishboneInterfaceWrapper(slave) for slave in mux.slaves]
174          wb_master = WishboneInterfaceWrapper(mux.master_wb)
175  
176          async def process(sim: TestbenchContext):
177              # check full communiaction
178              wb_master.master_set(sim, 2, 0, 1)
179              sim.set(mux.sselTGA, 0b0001)
180              await slaves[0].slave_tick_and_verify(sim, 2, 0, 1)
181              assert not sim.get(slaves[1].wb.stb)
182              await slaves[0].slave_respond_master_verify(sim, wb_master.wb, 4)
183              wb_master.master_release(sim, release_cyc=False)
184              await sim.tick()
185              # select without releasing cyc (only on stb)
186              wb_master.master_set(sim, 3, 0, 0)
187              sim.set(mux.sselTGA, 0b0010)
188              await slaves[1].slave_tick_and_verify(sim, 3, 0, 0)
189              assert not sim.get(slaves[0].wb.stb)
190              await slaves[1].slave_respond_master_verify(sim, wb_master.wb, 5)
191              wb_master.master_release(sim)
192              await sim.tick()
193  
194              # normal selection
195              wb_master.master_set(sim, 6, 0, 0)
196              sim.set(mux.sselTGA, 0b1000)
197              await slaves[3].slave_tick_and_verify(sim, 6, 0, 0)
198              await slaves[3].slave_respond_master_verify(sim, wb_master.wb, 1)
199  
200          with self.run_simulation(mux) as sim:
201              sim.add_testbench(process)
202  
203  
204  class TestWishboneArbiter(TestCaseWithSimulator):
205      def test_manual(self):
206          arb = WishboneArbiter(WishboneParameters(), 2)
207          slave = WishboneInterfaceWrapper(arb.slave_wb)
208          masters = [WishboneInterfaceWrapper(master) for master in arb.masters]
209  
210          async def process(sim: TestbenchContext):
211              masters[0].master_set(sim, 2, 3, 1)
212              await slave.slave_wait_and_verify(sim, 2, 3, 1)
213              masters[1].master_set(sim, 1, 4, 1)
214              await slave.slave_respond_master_verify(sim, masters[0].wb, 0)
215              assert not sim.get(masters[1].wb.ack)
216              masters[0].master_release(sim)
217              await sim.tick()
218  
219              # check if bus is granted to next master if previous ends cycle
220              await slave.slave_wait_and_verify(sim, 1, 4, 1)
221              await slave.slave_respond_master_verify(sim, masters[1].wb, 0)
222              assert not sim.get(masters[0].wb.ack)
223              masters[1].master_release(sim)
224              await sim.tick()
225  
226              # check round robin behaviour (2 masters requesting *2)
227              masters[0].master_set(sim, 1, 0, 0)
228              masters[1].master_set(sim, 2, 0, 0)
229              await slave.slave_wait_and_verify(sim, 1, 0, 0)
230              await slave.slave_respond_master_verify(sim, masters[0].wb, 3)
231              masters[0].master_release(sim)
232              masters[1].master_release(sim)
233              await sim.tick()
234              assert not sim.get(slave.wb.cyc)
235  
236              masters[0].master_set(sim, 1, 0, 0)
237              masters[1].master_set(sim, 2, 0, 0)
238              await slave.slave_wait_and_verify(sim, 2, 0, 0)
239              await slave.slave_respond_master_verify(sim, masters[1].wb, 0)
240  
241              # check if releasing stb keeps grant
242              masters[1].master_release(sim, release_cyc=False)
243              await sim.tick()
244              masters[1].master_set(sim, 3, 0, 0)
245              await slave.slave_wait_and_verify(sim, 3, 0, 0)
246              await slave.slave_respond_master_verify(sim, masters[1].wb, 0)
247  
248          with self.run_simulation(arb) as sim:
249              sim.add_testbench(process)
250  
251  
252  class TestPipelinedWishboneMaster(TestCaseWithSimulator):
253      def test_randomized(self):
254          requests = 1000
255  
256          req_queue = deque()
257          res_queue = deque()
258          slave_queue = deque()
259  
260          random.seed(42)
261          wb_params = WishboneParameters()
262          pwbm = SimpleTestCircuit(PipelinedWishboneMaster((wb_params)))
263  
264          async def request_process(sim: TestbenchContext):
265              for _ in range(requests):
266                  request = {
267                      "addr": random.randint(0, 2**wb_params.addr_width - 1),
268                      "data": random.randint(0, 2**wb_params.data_width - 1),
269                      "we": random.randint(0, 1),
270                      "sel": random.randint(0, 2**wb_params.granularity - 1),
271                  }
272                  req_queue.appendleft(request)
273                  await pwbm.request.call(sim, request)
274  
275          async def verify_process(sim: TestbenchContext):
276              for _ in range(requests):
277                  await self.random_wait_geom(sim, 0.8)
278  
279                  result = await pwbm.result.call(sim)
280                  cres = res_queue.pop()
281                  assert result["data"] == cres
282                  assert not result["err"]
283  
284          async def slave_process(sim: TestbenchContext):
285              wbw = pwbm._dut.wb
286              async for *_, cyc, stb, stall, adr, dat_w, we, sel in sim.tick().sample(
287                  wbw.cyc, wbw.stb, wbw.stall, wbw.adr, wbw.dat_w, wbw.we, wbw.sel
288              ):
289                  if cyc and stb:
290                      assert not stall
291                      assert req_queue
292                      c_req = req_queue.pop()
293                      assert adr == c_req["addr"]
294                      assert dat_w == c_req["data"]
295                      assert we == c_req["we"]
296                      assert sel == c_req["sel"]
297  
298                      slave_queue.appendleft(dat_w)
299                      res_queue.appendleft(dat_w)
300  
301                  if slave_queue and random.random() < 0.4:
302                      sim.set(wbw.ack, 1)
303                      sim.set(wbw.dat_r, slave_queue.pop())
304                  else:
305                      sim.set(wbw.ack, 0)
306  
307                  sim.set(wbw.stall, random.random() < 0.3)
308  
309          with self.run_simulation(pwbm) as sim:
310              sim.add_testbench(request_process)
311              sim.add_testbench(verify_process)
312              sim.add_testbench(slave_process, background=True)
313  
314  
315  class WishboneMemorySlaveCircuit(Elaboratable):
316      def __init__(self, wb_params: WishboneParameters, mem_args: dict):
317          self.wb_params = wb_params
318          self.mem_args = mem_args
319  
320      def elaborate(self, platform):
321          m = Module()
322  
323          m.submodules.mem_slave = self.mem_slave = WishboneMemorySlave(self.wb_params, **self.mem_args)
324          m.submodules.mem_master = self.mem_master = WishboneMaster(self.wb_params)
325          m.submodules.request = self.request = TestbenchIO(AdapterTrans.create(self.mem_master.request))
326          m.submodules.result = self.result = TestbenchIO(AdapterTrans.create(self.mem_master.result))
327  
328          connect(m, self.mem_master.wb_master, self.mem_slave.bus)
329  
330          return m
331  
332  
333  class TestWishboneMemorySlave(TestCaseWithSimulator):
334      def setup_method(self):
335          self.memsize = 43  # test some weird depth
336          self.iters = 300
337  
338          self.addr_width = (self.memsize - 1).bit_length()  # nearest log2 >= log2(memsize)
339          self.wb_params = WishboneParameters(data_width=32, addr_width=self.addr_width, granularity=16)
340          self.m = WishboneMemorySlaveCircuit(wb_params=self.wb_params, mem_args={"depth": self.memsize, "init": []})
341  
342          self.sel_width = self.wb_params.data_width // self.wb_params.granularity
343  
344          random.seed(42)
345  
346      def test_randomized(self):
347          req_queue = deque()
348  
349          mem_state = [0] * self.memsize
350  
351          async def request_process(sim: TestbenchContext):
352              for _ in range(self.iters):
353                  req = {
354                      "addr": random.randint(0, self.memsize - 1),
355                      "data": random.randint(0, 2**self.wb_params.data_width - 1),
356                      "we": random.randint(0, 1),
357                      "sel": random.randint(0, 2**self.sel_width - 1),
358                  }
359                  req_queue.appendleft(req)
360  
361                  await self.random_wait_geom(sim, 0.2)
362                  await self.m.request.call(sim, req)
363  
364          async def result_process(sim: TestbenchContext):
365              for _ in range(self.iters):
366                  await self.random_wait_geom(sim, 0.2)
367                  res = await self.m.result.call(sim)
368                  req = req_queue.pop()
369  
370                  if not req["we"]:
371                      assert res["data"] == mem_state[req["addr"]]
372                  else:
373                      for i in range(self.sel_width):
374                          if req["sel"] & (1 << i):
375                              granularity_mask = (2**self.wb_params.granularity - 1) << (i * self.wb_params.granularity)
376                              mem_state[req["addr"]] &= ~granularity_mask
377                              mem_state[req["addr"]] |= req["data"] & granularity_mask
378                      val = sim.get(Value.cast(self.m.mem_slave.mem.data[req["addr"]]))
379                      assert val == mem_state[req["addr"]]
380  
381          with self.run_simulation(self.m, max_cycles=3000) as sim:
382              sim.add_testbench(request_process)
383              sim.add_testbench(result_process)