wishbone.py
1 from amaranth import * 2 import amaranth.lib.memory as memory 3 from amaranth.lib.wiring import In, Out, Component 4 from functools import reduce 5 import operator 6 7 from transactron import Method, def_method, TModule 8 from transactron.core import Transaction 9 from transactron.lib import AdapterTrans, BasicFifo 10 from transactron.utils import OneHotSwitchDynamic, assign, RoundRobin 11 from transactron.utils.amaranth_ext.component_interface import ComponentInterface, CIn, COut 12 from transactron.lib.connectors import Forwarder 13 from transactron.utils.transactron_helpers import make_layout 14 from transactron.lib import logging 15 16 17 class WishboneParameters: 18 """Parameters of the Wishbone bus. 19 20 Parameters 21 ---------- 22 data_width: int 23 Width of dat_r and dat_w Wishbone signals. Defaults to 64 bits 24 addr_width: int 25 Width of adr Wishbone singal. Defaults to 64 bits 26 granularity: int 27 The smallest unit of data transfer that a port is capable of transferring. Defaults to 8 bits 28 """ 29 30 def __init__(self, *, data_width: int = 64, addr_width: int = 64, granularity: int = 8): 31 self.data_width = data_width 32 self.addr_width = addr_width 33 self.granularity = granularity 34 35 36 class WishboneInterface(ComponentInterface): 37 def __init__(self, wb_params: WishboneParameters): 38 self.dat_r = CIn(wb_params.data_width) 39 self.dat_w = COut(wb_params.data_width) 40 self.rst = COut() 41 self.ack = CIn() 42 self.adr = COut(wb_params.addr_width) 43 self.cyc = COut() 44 self.stall = CIn() 45 self.err = CIn() 46 self.lock = COut() 47 self.rty = CIn() 48 self.sel = COut(wb_params.data_width // wb_params.granularity) 49 self.stb = COut() 50 self.we = COut() 51 52 53 class WishboneMasterMethodLayout: 54 """Wishbone master layouts for methods 55 56 Parameters 57 ---------- 58 wb_params: WishboneParameters 59 Parameters used to generate Wishbone master layouts 60 61 Attributes 62 ---------- 63 request_layout: Layout 64 Layout for request method of WishboneMaster. 65 66 result_layout: Layout 67 Layout for result method of WishboneMaster. 68 """ 69 70 def __init__(self, wb_params: WishboneParameters): 71 self.request_layout = make_layout( 72 ("addr", wb_params.addr_width), 73 ("data", wb_params.data_width), 74 ("we", 1), 75 ("sel", wb_params.data_width // wb_params.granularity), 76 ) 77 78 self.result_layout = make_layout(("data", wb_params.data_width), ("err", 1)) 79 80 81 class WishboneMaster(Component): 82 """Wishbone bus master interface. 83 84 Parameters 85 ---------- 86 wb_params: WishboneParameters 87 Parameters for bus generation. 88 name: str, optional 89 Name of this bus. Used for logging. 90 91 Attributes 92 ---------- 93 wb_master: WishboneInterface 94 Wishbone bus output. 95 request: Method 96 Transactional method to start a new Wishbone request. 97 Ready when no request is being executed and previous result is read. 98 Takes `request_layout` as argument. 99 result: Method 100 Transactional method to read previous request result. 101 Becomes ready after Wishbone request is completed. 102 Returns state of request (error or success) and data (in case of read request) as `result_layout`. 103 """ 104 105 wb_master: WishboneInterface 106 107 def __init__(self, wb_params: WishboneParameters, name: str = ""): 108 super().__init__({"wb_master": Out(WishboneInterface(wb_params).signature)}) 109 self.name = name 110 self.wb_params = wb_params 111 112 self.method_layouts = WishboneMasterMethodLayout(wb_params) 113 114 self.request = Method(i=self.method_layouts.request_layout) 115 self.result = Method(o=self.method_layouts.result_layout) 116 117 # latched input signals 118 self.txn_req = Signal(self.method_layouts.request_layout) 119 120 logger_name = "bus.wishbone" 121 if name != "": 122 logger_name += f".{name}" 123 self.log = logging.HardwareLogger(logger_name) 124 125 def elaborate(self, platform): 126 m = TModule() 127 128 m.submodules.result = result = Forwarder(self.method_layouts.result_layout) 129 130 request_ready = Signal() 131 132 def FSMWBCycStart(request): # noqa: N802 133 # internal FSM function that starts Wishbone cycle 134 m.d.sync += self.wb_master.cyc.eq(1) 135 m.d.sync += self.wb_master.stb.eq(1) 136 m.d.sync += self.wb_master.adr.eq(request.addr) 137 m.d.sync += self.wb_master.dat_w.eq(Mux(request.we, request.data, 0)) 138 m.d.sync += self.wb_master.we.eq(request.we) 139 m.d.sync += self.wb_master.sel.eq(request.sel) 140 141 with m.FSM("Reset"): 142 with m.State("Reset"): 143 m.d.sync += self.wb_master.rst.eq(1) 144 m.next = "Idle" 145 with m.State("Idle"): 146 # default values for important signals 147 m.d.sync += self.wb_master.rst.eq(0) 148 m.d.sync += self.wb_master.stb.eq(0) 149 m.d.sync += self.wb_master.cyc.eq(0) 150 m.d.comb += request_ready.eq(1) 151 with m.If(self.request.run): 152 m.next = "WBWaitACK" 153 154 with m.State("WBCycStart"): 155 FSMWBCycStart(self.txn_req) 156 m.next = "WBWaitACK" 157 158 with m.State("WBWaitACK"): 159 with m.If(self.wb_master.ack | self.wb_master.err): 160 m.d.comb += request_ready.eq(result.read.run) 161 with Transaction().body(m): 162 # will be always ready, as we checked that in Idle 163 result.write(m, data=Mux(self.txn_req.we, 0, self.wb_master.dat_r), err=self.wb_master.err) 164 with m.If(self.request.run): 165 m.next = "WBWaitACK" 166 with m.Else(): 167 m.d.sync += self.wb_master.cyc.eq(0) 168 m.d.sync += self.wb_master.stb.eq(0) 169 m.next = "Idle" 170 with m.If(self.wb_master.rty): 171 m.d.sync += self.wb_master.cyc.eq(1) 172 m.d.sync += self.wb_master.stb.eq(0) 173 m.next = "WBCycStart" 174 175 @def_method(m, self.result) 176 def _(): 177 ret = result.read(m) 178 179 self.log.debug( 180 m, 181 True, 182 "response data=0x{:x} err={}", 183 ret.data, 184 ret.err, 185 ) 186 187 return ret 188 189 @def_method(m, self.request, ready=request_ready & result.write.ready) 190 def _(arg): 191 m.d.sync += assign(self.txn_req, arg) 192 # do WBCycStart state in the same clock cycle 193 FSMWBCycStart(arg) 194 195 self.log.debug( 196 m, 197 True, 198 "request addr=0x{:x} data=0x{:x} sel=0x{:x} write={}", 199 arg.addr, 200 arg.data, 201 arg.sel, 202 arg.we, 203 ) 204 205 result.write.schedule_before(self.request) 206 result.read.schedule_before(self.request) 207 208 return m 209 210 211 class PipelinedWishboneMaster(Component): 212 """Pipelined Wishbone bus master interface. 213 214 Parameters 215 ---------- 216 wb_params: WishboneParameters 217 Parameters for bus generation. 218 max_req: int 219 Size of the response buffer, limits the number of pending requests. Defaults to 8. 220 221 Attributes 222 ---------- 223 wb: WishboneInterface 224 Wishbone bus output. 225 request: Method 226 Transactional method to start a new Wishbone request. 227 Ready if new request can be immediately sent. 228 Takes `request_layout` as argument. 229 result: Method 230 Transactional method to read results from completed requests sequentially. 231 Ready if buffered results are available. 232 Returns state of request (error or success) and data (in case of read request) as `result_layout`. 233 requests_finished: Signal, out 234 True, if there are no requests waiting for response 235 """ 236 237 wb: WishboneInterface 238 239 def __init__(self, wb_params: WishboneParameters, *, max_req: int = 8): 240 super().__init__({"wb": Out(WishboneInterface(wb_params).signature)}) 241 self.wb_params = wb_params 242 self.max_req = max_req 243 244 self.generate_method_layouts(wb_params) 245 self.request = Method(i=self.request_in_layout) 246 self.result = Method(o=self.result_out_layout) 247 248 self.requests_finished = Signal() 249 250 def generate_method_layouts(self, wb_params: WishboneParameters): 251 # generate method layouts locally 252 self.request_in_layout = [ 253 ("addr", wb_params.addr_width), 254 ("data", wb_params.data_width), 255 ("we", 1), 256 ("sel", wb_params.data_width // wb_params.granularity), 257 ] 258 259 self.result_out_layout = [("data", wb_params.data_width), ("err", 1)] 260 261 def elaborate(self, platform): 262 m = TModule() 263 264 m.submodules.result_fifo = self.result_fifo = BasicFifo(self.result_out_layout, self.max_req) 265 m.submodules.result_write_adapter = self.result_write_adapter = AdapterTrans.create(self.result_fifo.write) 266 267 pending_req_cnt = Signal(range(self.max_req + 1)) 268 req_start = Signal() 269 req_finish = Signal() 270 271 request_ready = Signal() 272 # assure that responses to all instructions in flight can be buffered 273 m.d.comb += request_ready.eq(~self.wb.stall & (pending_req_cnt + self.result_fifo.level < self.max_req)) 274 m.d.comb += self.requests_finished.eq(pending_req_cnt == 0) 275 276 # assert cyc when starting new request or waiting for ack 277 m.d.comb += self.wb.cyc.eq(self.wb.stb | pending_req_cnt > 0) 278 279 with m.If(self.wb.ack | self.wb.err | self.wb.rty): 280 m.d.comb += self.result_write_adapter.en.eq(1) 281 m.d.comb += self.result_write_adapter.data_in.data.eq(self.wb.dat_r) 282 # retrying in not possible in PipelinedMaster, treat RTY as ERR. 283 m.d.comb += self.result_write_adapter.data_in.err.eq(self.wb.err | self.wb.rty) 284 285 m.d.comb += req_finish.eq(1) 286 287 self.result.provide(self.result_fifo.read) 288 289 @def_method(m, self.request, ready=request_ready) 290 def _(arg) -> None: 291 m.d.comb += self.wb.stb.eq(1) 292 293 m.d.top_comb += [ 294 self.wb.adr.eq(arg.addr), 295 self.wb.dat_w.eq(arg.data), 296 self.wb.we.eq(arg.we), 297 self.wb.sel.eq(arg.sel), 298 ] 299 300 m.d.comb += req_start.eq(1) 301 302 with m.If(req_start & ~req_finish): 303 m.d.sync += pending_req_cnt.eq(pending_req_cnt + 1) 304 with m.If(req_finish & ~req_start): 305 m.d.sync += pending_req_cnt.eq(pending_req_cnt - 1) 306 307 return m 308 309 310 class WishboneMuxer(Component): 311 """Wishbone Muxer. 312 313 Connects one master to multiple slaves. 314 315 Parameters 316 ---------- 317 wb_params: WishboneParameters 318 Parameters for bus generation. 319 num_slaves: int 320 Number of slave devices to multiplex. 321 ssel_tga: Signal 322 Signal that selects the slave to connect. Signal width is the number of slaves and each bit coresponds 323 to a slave. This signal is a Wishbone TGA (address tag), so it needs to be valid and held every time Wishbone 324 STB is asserted. 325 Note that if Pipelined Wishbone implementation is used, then before starting any new request with 326 different `ssel_tga` value, all pending request have to be finished (and `stall` cleared). Holding new requests 327 should be implemented in block that controlls `ssel_tga` signal, before the Wishbone Master. 328 329 Attributes 330 ---------- 331 master_wb: WishboneInterface 332 Master inteface. 333 slaves: list of WishboneInterface 334 List of connected slaves' Wishbone interfaces. 335 """ 336 337 master_wb: WishboneInterface 338 slaves: list[WishboneInterface] 339 340 def __init__(self, wb_params: WishboneParameters, num_slaves: int, ssel_tga: Signal): 341 super().__init__( 342 { 343 "master_wb": In(WishboneInterface(wb_params).signature), 344 "slaves": Out(WishboneInterface(wb_params).signature).array(num_slaves), 345 } 346 ) 347 self.sselTGA = ssel_tga 348 349 select_bits = ssel_tga.shape().width 350 assert select_bits == num_slaves 351 self.txn_sel = Signal(select_bits) 352 self.txn_sel_r = Signal(select_bits) 353 354 self.prev_stb = Signal() 355 356 def elaborate(self, platform): 357 m = TModule() 358 359 m.d.sync += self.prev_stb.eq(self.master_wb.stb) 360 361 for i in range(len(self.slaves)): 362 # connect all M->S signals except stb 363 # workaround for the lack of selective connecting in wiring 364 for n in ["dat_w", "cyc", "lock", "adr", "we", "sel", "stb"]: 365 m.d.comb += getattr(self.slaves[i], n).eq(getattr(self.master_wb, n)) 366 367 # note that sselTGA follows TGA (address tag) spec, it must be asserted with STB and keep its value 368 # until end of transfer (including termination cycle). It can change cycle after termination, without 369 # deasserting STB in a block request. 370 371 # use stb as a slave selector singal 372 m.d.comb += self.slaves[i].stb.eq(self.sselTGA[i] & self.master_wb.stb) 373 374 # bus termination signals S->M should be ORed 375 m.d.comb += self.master_wb.ack.eq(reduce(operator.or_, [self.slaves[i].ack for i in range(len(self.slaves))])) 376 m.d.comb += self.master_wb.err.eq(reduce(operator.or_, [self.slaves[i].err for i in range(len(self.slaves))])) 377 m.d.comb += self.master_wb.rty.eq(reduce(operator.or_, [self.slaves[i].rty for i in range(len(self.slaves))])) 378 for i in OneHotSwitchDynamic(m, self.sselTGA): 379 # mux S->M data 380 # workaround for the lack of selective connecting in wiring 381 for n in ["dat_r", "stall"]: 382 m.d.comb += getattr(self.master_wb, n).eq(getattr(self.slaves[i], n)) 383 return m 384 385 386 # connects multiple masters to one slave 387 class WishboneArbiter(Component): 388 """Wishbone Arbiter. 389 390 Connects multiple masters to one slave. 391 Bus is requested by asserting CYC signal and is granted to masters in a round robin manner. 392 393 Parameters 394 ---------- 395 wb_params: WishboneParameters 396 Parameters for bus generation. 397 num_slaves: int 398 Number of master devices. 399 400 Attributes 401 ---------- 402 slave_wb: WishboneInterface 403 Slave inteface. 404 masters: list of WishboneInterface 405 List of master interfaces. 406 """ 407 408 slave_wb: WishboneInterface 409 masters: list[WishboneInterface] 410 411 def __init__(self, wb_params: WishboneParameters, num_masters: int): 412 super().__init__( 413 { 414 "slave_wb": Out(WishboneInterface(wb_params).signature), 415 "masters": In(WishboneInterface(wb_params).signature).array(num_masters), 416 } 417 ) 418 419 self.prev_cyc = Signal() 420 # Amaranth round robin singals 421 self.arb_enable = Signal() 422 self.req_signal = Signal(num_masters) 423 424 def elaborate(self, platform): 425 m = TModule() 426 427 m.d.sync += self.prev_cyc.eq(self.slave_wb.cyc) 428 429 m.submodules.rr = rr = RoundRobin(count=len(self.masters)) 430 m.d.comb += [self.req_signal[i].eq(self.masters[i].cyc) for i in range(len(self.masters))] 431 m.d.comb += rr.requests.eq(Mux(self.arb_enable, self.req_signal, 0)) 432 433 master_array = Array([master for master in self.masters]) 434 # If master ends wb cycle, enable rr input to select new master on next cycle if avaliable (cyc off for 1 cycle) 435 # If selcted master is active, disable rr request input to preserve grant signal and correct rr state. 436 # prev_cyc is used to select next master in new bus cycle, if previously selected master asserts cyc at the 437 # same time as another one 438 m.d.comb += self.arb_enable.eq((~master_array[m.submodules.rr.grant].cyc) | (~self.prev_cyc)) 439 440 for i in range(len(self.masters)): 441 # mux S->M termination signals 442 m.d.comb += self.masters[i].ack.eq((m.submodules.rr.grant == i) & self.slave_wb.ack) 443 m.d.comb += self.masters[i].err.eq((m.submodules.rr.grant == i) & self.slave_wb.err) 444 m.d.comb += self.masters[i].rty.eq((m.submodules.rr.grant == i) & self.slave_wb.rty) 445 # remaining S->M signals are shared, master will only accept response if bus termination signal is present 446 # workaround for the lack of selective connecting in wiring 447 for n in ["dat_r", "stall"]: 448 m.d.comb += getattr(self.masters[i], n).eq(getattr(self.slave_wb, n)) 449 450 # combine reset singnal 451 m.d.comb += self.slave_wb.rst.eq(reduce(operator.or_, [self.masters[i].rst for i in range(len(self.masters))])) 452 453 # mux all M->S signals 454 with m.Switch(m.submodules.rr.grant): 455 for i in range(len(self.masters)): 456 with m.Case(i): 457 # workaround for the lack of selective connecting in wiring 458 for n in ["dat_w", "cyc", "lock", "adr", "we", "sel", "stb"]: 459 m.d.comb += getattr(self.slave_wb, n).eq(getattr(self.masters[i], n)) 460 461 # Disable slave when round robin is not valid at start of new request 462 # This prevents chaning grant and muxes during Wishbone cycle 463 with m.If((~m.submodules.rr.valid) & self.arb_enable): 464 m.d.comb += self.slave_wb.stb.eq(0) 465 466 return m 467 468 469 class WishboneMemorySlave(Component): 470 """Wishbone slave with memory 471 Wishbone slave interface with addressable memory underneath. 472 473 Parameters 474 ---------- 475 wb_params: WishboneParameters 476 Parameters for bus generation. 477 **kwargs: dict 478 Keyword arguments for the underlying Amaranth's `Memory`. If `width` and `depth` 479 are not specified, then they're inferred from `wb_params`: `data_width` becomes 480 `width` and `2 ** addr_width` becomes `depth`. 481 482 Attributes 483 ---------- 484 bus: WishboneInterface 485 Wishbone bus interface. 486 """ 487 488 bus: WishboneInterface 489 490 def __init__(self, wb_params: WishboneParameters, **kwargs): 491 super().__init__({"bus": In(WishboneInterface(wb_params).signature)}) 492 if "shape" not in kwargs: 493 kwargs["shape"] = wb_params.data_width 494 if kwargs["shape"] not in (8, 16, 32, 64): 495 raise RuntimeError("Memory shape has to be one of: 8, 16, 32, 64") 496 if "depth" not in kwargs: 497 kwargs["depth"] = 2**wb_params.addr_width 498 self.granularity = wb_params.granularity 499 if self.granularity not in (8, 16, 32, 64): 500 raise RuntimeError("Granularity has to be one of: 8, 16, 32, 64") 501 502 self.mem = memory.Memory(**kwargs) 503 504 def elaborate(self, platform): 505 m = TModule() 506 507 m.submodules.mem = self.mem 508 wrport = self.mem.write_port(granularity=self.granularity) 509 rdport = self.mem.read_port() 510 511 with m.FSM(): 512 with m.State("Start"): 513 with m.If(self.bus.stb & self.bus.cyc): 514 with m.If(~self.bus.we): 515 with m.If(self.bus.adr < self.mem.depth): 516 m.d.comb += rdport.addr.eq(self.bus.adr) 517 # asserting rdport.en not required in case of a transparent port 518 m.next = "Read" 519 with m.Else(): # access outside bounds 520 m.d.comb += self.bus.err.eq(1) 521 with m.Else(): 522 m.d.comb += wrport.addr.eq(self.bus.adr) 523 m.d.comb += wrport.en.eq(self.bus.sel) 524 m.d.comb += wrport.data.eq(self.bus.dat_w) 525 # writes can be ack'd earlier than reads because they don't return any data 526 m.d.comb += self.bus.ack.eq(1) 527 528 with m.State("Read"): 529 m.d.comb += self.bus.dat_r.eq(rdport.data) 530 # ack can only be asserted when stb is asserted 531 m.d.comb += self.bus.ack.eq(self.bus.stb) 532 m.next = "Start" 533 534 return m