/ test / functional / test_framework / ipc_util.py
ipc_util.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Shared utilities for IPC (multiprocess) interface tests."""
  6  import asyncio
  7  import inspect
  8  from contextlib import asynccontextmanager
  9  from dataclasses import dataclass
 10  from io import BytesIO
 11  from pathlib import Path
 12  import shutil
 13  from typing import Optional
 14  
 15  from test_framework.messages import CBlock
 16  from test_framework.util import (
 17      assert_equal
 18  )
 19  
 20  # Test may be skipped and not have capnp installed
 21  try:
 22      import capnp  # type: ignore[import] # noqa: F401
 23  except ModuleNotFoundError:
 24      pass
 25  
 26  
 27  # Stores the result of getCoinbaseTx()
 28  @dataclass
 29  class CoinbaseTxData:
 30      version: int
 31      sequence: int
 32      scriptSigPrefix: bytes
 33      witness: Optional[bytes]
 34      blockRewardRemaining: int
 35      requiredOutputs: list[bytes]
 36      lockTime: int
 37  
 38  
 39  @asynccontextmanager
 40  async def destroying(obj, ctx):
 41      """Call obj.destroy(ctx) at end of with: block. Similar to contextlib.closing."""
 42      try:
 43          yield obj
 44      finally:
 45          await obj.destroy(ctx)
 46  
 47  
 48  async def wait_and_do(wait_fn, do_fn):
 49      """Call wait_fn, then sleep, then call do_fn in a parallel task. Wait for
 50      both tasks to complete."""
 51      wait_started = asyncio.Event()
 52      result = None
 53  
 54      async def wait():
 55          nonlocal result
 56          wait_started.set()
 57          result = await wait_fn
 58  
 59      async def do():
 60          await wait_started.wait()
 61          await asyncio.sleep(0.1)
 62          # Let do_fn be either a callable or an awaitable object
 63          if inspect.isawaitable(do_fn):
 64              await do_fn
 65          else:
 66              do_fn()
 67  
 68      await asyncio.gather(wait(), do())
 69      return result
 70  
 71  
 72  def load_capnp_modules(config):
 73      if capnp_bin := shutil.which("capnp"):
 74          # Add the system cap'nproto path so include/capnp/c++.capnp can be found.
 75          capnp_dir = Path(capnp_bin).resolve().parent.parent / "include"
 76      else:
 77          # If there is no system cap'nproto, the pycapnp module should have its own "bundled"
 78          # includes at this location. If pycapnp was installed with bundled capnp,
 79          # capnp/c++.capnp can be found here.
 80          capnp_dir = Path(capnp.__path__[0]).parent
 81      src_dir = Path(config['environment']['SRCDIR']) / "src"
 82      mp_dir = src_dir / "ipc" / "libmultiprocess" / "include"
 83      # List of import directories. Note: it is important for mp_dir to be
 84      # listed first, in case there are other libmultiprocess installations on
 85      # the system, to ensure that `import "/mp/proxy.capnp"` lines load the
 86      # same file as capnp.load() loads directly below, and there are not
 87      # "failed: Duplicate ID @0xcc316e3f71a040fb" errors.
 88      imports = [str(mp_dir), str(capnp_dir), str(src_dir)]
 89      return {
 90          "proxy": capnp.load(str(mp_dir / "mp" / "proxy.capnp"), imports=imports),
 91          "init": capnp.load(str(src_dir / "ipc" / "capnp" / "init.capnp"), imports=imports),
 92          "echo": capnp.load(str(src_dir / "ipc" / "capnp" / "echo.capnp"), imports=imports),
 93          "mining": capnp.load(str(src_dir / "ipc" / "capnp" / "mining.capnp"), imports=imports),
 94      }
 95  
 96  
 97  async def make_capnp_init_ctx(self):
 98      node = self.nodes[0]
 99      # Establish a connection, and create Init proxy object.
100      connection = await capnp.AsyncIoStream.create_unix_connection(node.ipc_socket_path)
101      client = capnp.TwoPartyClient(connection)
102      init = client.bootstrap().cast_as(self.capnp_modules['init'].Init)
103      # Create a remote thread on the server for the IPC calls to be executed in.
104      threadmap = init.construct().threadMap
105      thread = threadmap.makeThread("pythread").result
106      ctx = self.capnp_modules['proxy'].Context()
107      ctx.thread = thread
108      # Return both.
109      return ctx, init
110  
111  
112  async def mining_create_block_template(mining, stack, ctx, opts):
113      """Call mining.createNewBlock() and return template, then call template.destroy() when stack exits."""
114      response = await mining.createNewBlock(ctx, opts)
115      if not response._has("result"):
116          return None
117      return await stack.enter_async_context(destroying(response.result, ctx))
118  
119  
120  async def mining_wait_next_template(template, stack, ctx, opts):
121      """Call template.waitNext() and return template, then call template.destroy() when stack exits."""
122      response = await template.waitNext(ctx, opts)
123      if not response._has("result"):
124          return None
125      return await stack.enter_async_context(destroying(response.result, ctx))
126  
127  
128  async def mining_get_block(block_template, ctx):
129      block_data = BytesIO((await block_template.getBlock(ctx)).result)
130      block = CBlock()
131      block.deserialize(block_data)
132      return block
133  
134  
135  async def mining_get_coinbase_tx(block_template, ctx) -> CoinbaseTxData:
136      assert block_template is not None
137      # Note: the template_capnp struct will be garbage-collected when this
138      # method returns, so it is important to copy any Data fields from it
139      # which need to be accessed later using the bytes() cast. Starting with
140      # pycapnp v2.2.0, Data fields have type `memoryview` and are ephemeral.
141      template_capnp = (await block_template.getCoinbaseTx(ctx)).result
142      witness: Optional[bytes] = None
143      if template_capnp._has("witness"):
144          witness = bytes(template_capnp.witness)
145      return CoinbaseTxData(
146          version=int(template_capnp.version),
147          sequence=int(template_capnp.sequence),
148          scriptSigPrefix=bytes(template_capnp.scriptSigPrefix),
149          witness=witness,
150          blockRewardRemaining=int(template_capnp.blockRewardRemaining),
151          requiredOutputs=[bytes(output) for output in template_capnp.requiredOutputs],
152          lockTime=int(template_capnp.lockTime),
153      )
154  
155  async def make_mining_ctx(self):
156      """Create IPC context and Mining proxy object."""
157      ctx, init = await make_capnp_init_ctx(self)
158      self.log.debug("Create Mining proxy object")
159      mining = init.makeMining(ctx).result
160      return ctx, mining
161  
162  def assert_capnp_failed(e, description_prefix):
163      assert e.description.startswith(description_prefix), f"Expected description starting with '{description_prefix}', got '{e.description}'"
164      assert_equal(e.type, "FAILED")