/ test / scheduler / test_rs_selection.py
test_rs_selection.py
  1  from collections import deque
  2  import random
  3  import pytest
  4  
  5  from coreblocks.params import GenParams
  6  from coreblocks.arch import Funct3, Funct7
  7  from coreblocks.arch import OpType
  8  from coreblocks.params.configurations import test_core_config
  9  from coreblocks.scheduler.scheduler import RSSelection
 10  from transactron.testing import SimpleTestCircuit, TestCaseWithSimulator, TestbenchIO, TestbenchContext
 11  from transactron.testing.functions import data_const_to_dict
 12  from transactron.testing.method_mock import MethodMock, def_method_mock
 13  from .test_scheduler import MockedBlockComponent
 14  
 15  _rs1_optypes = {OpType.ARITHMETIC, OpType.COMPARE}
 16  _rs2_optypes = {OpType.LOGIC, OpType.COMPARE}
 17  
 18  
 19  class TestRSSelect(TestCaseWithSimulator):
 20      @pytest.fixture(autouse=True)
 21      def setup_method(self):
 22          self.gen_params = GenParams(
 23              test_core_config.replace(
 24                  func_units_config=(MockedBlockComponent(_rs1_optypes, 4), MockedBlockComponent(_rs2_optypes, 4)),
 25                  allow_partial_extensions=True,
 26              )
 27          )
 28          self.m = SimpleTestCircuit(RSSelection(gen_params=self.gen_params))
 29          self.expected_out: deque[dict] = deque()
 30          self.input_instrs: deque[dict] = deque()
 31          self.instr_in: deque[dict] = deque()
 32          random.seed(1789)
 33  
 34      def gen_instrs(self, instr_count: int, optypes: set[OpType]):
 35          for i in range(instr_count):
 36              rp_dst = random.randrange(self.gen_params.phys_regs_bits)
 37              rp_s1 = random.randrange(self.gen_params.phys_regs_bits)
 38              rp_s2 = random.randrange(self.gen_params.phys_regs_bits)
 39  
 40              op_type = random.choice(list(optypes))
 41              funct3 = random.choice(list(Funct3))
 42              funct7 = random.choice(list(Funct7))
 43  
 44              immediate = random.randrange(2**32)
 45  
 46              rob_id = random.randrange(self.gen_params.rob_entries_bits)
 47              pc = random.randrange(2**32)
 48              csr = random.randrange(2**self.gen_params.isa.csr_alen)
 49  
 50              instr = {
 51                  "exec_fn": {
 52                      "op_type": op_type,
 53                      "funct3": funct3,
 54                      "funct7": funct7,
 55                  },
 56                  "regs_p": {
 57                      "rp_dst": rp_dst,
 58                      "rp_s1": rp_s1,
 59                      "rp_s2": rp_s2,
 60                  },
 61                  "rob_id": rob_id,
 62                  "imm": immediate,
 63                  "csr": csr,
 64                  "pc": pc,
 65                  "tag": 0,
 66              }
 67  
 68              self.input_instrs.append(instr)
 69  
 70      def create_instr_input_process(self, enable_prob: float = 1.0):
 71          @def_method_mock(lambda: self.m.get_instrs, enable=lambda: random.random() <= enable_prob)
 72          def process(count: int):
 73              @MethodMock.effect
 74              def eff():
 75                  for _ in range(count):
 76                      self.instr_in.append(self.input_instrs.popleft())
 77  
 78              return {"count": count, "data": [self.input_instrs[0]]}
 79  
 80          return process()
 81  
 82      def create_rs_alloc_process(self, io: TestbenchIO, rs_id: int, rs_optypes: set[OpType], enable_prob: float = 1):
 83          @def_method_mock(lambda: io, enable=lambda: random.random() <= enable_prob, delay=1e-9)
 84          def process():
 85              random_entry = random.randrange(self.gen_params.max_rs_entries)
 86  
 87              @MethodMock.effect
 88              def eff():
 89                  expected = self.instr_in.popleft()
 90                  assert expected["exec_fn"]["op_type"] in rs_optypes
 91                  expected["rs_entry_id"] = random_entry
 92                  expected["rs_selected"] = rs_id
 93                  self.expected_out.append(expected)
 94  
 95              return {"rs_entry_id": random_entry}
 96  
 97          return process()
 98  
 99      @def_method_mock(lambda self: self.m.peek_instrs, enable=lambda self: bool(self.input_instrs))
100      def peek_instrs_mock(self):
101          return {"count": 1, "data": [self.input_instrs[0]]}
102  
103      @def_method_mock(lambda self: self.m.rf_read_req[0])
104      def rf_read_req1_mock(self, reg_id):
105          pass
106  
107      @def_method_mock(lambda self: self.m.rf_read_req[1])
108      def rf_read_req2_mock(self, reg_id):
109          pass
110  
111      def create_output_process(self, instr_count: int, random_wait: int = 0):
112          async def process(sim: TestbenchContext):
113              for _ in range(instr_count):
114                  result = await self.m.push_instrs.call(sim)
115                  if result.count == 0:
116                      continue
117  
118                  outputs = self.expected_out.popleft()
119  
120                  await self.random_wait(sim, random_wait)
121                  assert result.count == 1
122                  assert data_const_to_dict(result.data[0]) == outputs
123  
124          return process
125  
126      def test_base_functionality(self):
127          """
128          Test checking basic functionality when both RS select methods are available.
129          """
130  
131          self.gen_instrs(100, _rs1_optypes.union(_rs2_optypes))
132          with self.run_simulation(self.m, max_cycles=1500) as sim:
133              self.add_mock(sim, self.create_instr_input_process())
134              self.add_mock(sim, self.create_rs_alloc_process(self.m.rs_select[0], rs_id=0, rs_optypes=_rs1_optypes))
135              self.add_mock(sim, self.create_rs_alloc_process(self.m.rs_select[1], rs_id=1, rs_optypes=_rs2_optypes))
136              sim.add_testbench(self.create_output_process(100))
137  
138      def test_only_rs1(self):
139          """
140          Test checking if instruction will get allocated if first the RS is full and
141          the RS select method is not available.
142          """
143  
144          self.gen_instrs(100, _rs1_optypes.intersection(_rs2_optypes))
145          with self.run_simulation(self.m, max_cycles=1500) as sim:
146              self.add_mock(sim, self.create_instr_input_process())
147              self.add_mock(sim, self.create_rs_alloc_process(self.m.rs_select[0], rs_id=0, rs_optypes=_rs1_optypes))
148              sim.add_testbench(self.create_output_process(100))
149  
150      def test_only_rs2(self):
151          """
152          Test checking if an instruction will get allocated if the second RS is full and
153          the RS select method is not available.
154          """
155  
156          self.gen_instrs(100, _rs1_optypes.intersection(_rs2_optypes))
157          with self.run_simulation(self.m, max_cycles=1500) as sim:
158              self.add_mock(sim, self.create_instr_input_process())
159              self.add_mock(sim, self.create_rs_alloc_process(self.m.rs_select[1], rs_id=1, rs_optypes=_rs2_optypes))
160              sim.add_testbench(self.create_output_process(100))
161  
162      def test_delays(self):
163          """
164          Test checking if instructions get allocated correctly if there are delays
165          in methods availability.
166          """
167  
168          self.gen_instrs(300, _rs1_optypes.union(_rs2_optypes))
169          with self.run_simulation(self.m, max_cycles=5000) as sim:
170              self.add_mock(sim, self.create_instr_input_process(enable_prob=0.3))
171              self.add_mock(
172                  sim,
173                  self.create_rs_alloc_process(self.m.rs_select[0], rs_id=0, rs_optypes=_rs1_optypes, enable_prob=0.1),
174              )
175              self.add_mock(
176                  sim,
177                  self.create_rs_alloc_process(self.m.rs_select[1], rs_id=1, rs_optypes=_rs2_optypes, enable_prob=0.1),
178              )
179              sim.add_testbench(self.create_output_process(300, random_wait=12))