ipc_util.py
1 #!/usr/bin/env python3 2 # Copyright (c) The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Shared utilities for IPC (multiprocess) interface tests.""" 6 import asyncio 7 import inspect 8 from contextlib import asynccontextmanager 9 from dataclasses import dataclass 10 from io import BytesIO 11 from pathlib import Path 12 import shutil 13 from typing import Optional 14 15 from test_framework.messages import CBlock 16 from test_framework.util import ( 17 assert_equal 18 ) 19 20 # Test may be skipped and not have capnp installed 21 try: 22 import capnp # type: ignore[import] # noqa: F401 23 except ModuleNotFoundError: 24 pass 25 26 27 # Stores the result of getCoinbaseTx() 28 @dataclass 29 class CoinbaseTxData: 30 version: int 31 sequence: int 32 scriptSigPrefix: bytes 33 witness: Optional[bytes] 34 blockRewardRemaining: int 35 requiredOutputs: list[bytes] 36 lockTime: int 37 38 39 @asynccontextmanager 40 async def destroying(obj, ctx): 41 """Call obj.destroy(ctx) at end of with: block. Similar to contextlib.closing.""" 42 try: 43 yield obj 44 finally: 45 await obj.destroy(ctx) 46 47 48 async def wait_and_do(wait_fn, do_fn): 49 """Call wait_fn, then sleep, then call do_fn in a parallel task. Wait for 50 both tasks to complete.""" 51 wait_started = asyncio.Event() 52 result = None 53 54 async def wait(): 55 nonlocal result 56 wait_started.set() 57 result = await wait_fn 58 59 async def do(): 60 await wait_started.wait() 61 await asyncio.sleep(0.1) 62 # Let do_fn be either a callable or an awaitable object 63 if inspect.isawaitable(do_fn): 64 await do_fn 65 else: 66 do_fn() 67 68 await asyncio.gather(wait(), do()) 69 return result 70 71 72 def load_capnp_modules(config): 73 if capnp_bin := shutil.which("capnp"): 74 # Add the system cap'nproto path so include/capnp/c++.capnp can be found. 75 capnp_dir = Path(capnp_bin).resolve().parent.parent / "include" 76 else: 77 # If there is no system cap'nproto, the pycapnp module should have its own "bundled" 78 # includes at this location. If pycapnp was installed with bundled capnp, 79 # capnp/c++.capnp can be found here. 80 capnp_dir = Path(capnp.__path__[0]).parent 81 src_dir = Path(config['environment']['SRCDIR']) / "src" 82 mp_dir = src_dir / "ipc" / "libmultiprocess" / "include" 83 # List of import directories. Note: it is important for mp_dir to be 84 # listed first, in case there are other libmultiprocess installations on 85 # the system, to ensure that `import "/mp/proxy.capnp"` lines load the 86 # same file as capnp.load() loads directly below, and there are not 87 # "failed: Duplicate ID @0xcc316e3f71a040fb" errors. 88 imports = [str(mp_dir), str(capnp_dir), str(src_dir)] 89 return { 90 "proxy": capnp.load(str(mp_dir / "mp" / "proxy.capnp"), imports=imports), 91 "init": capnp.load(str(src_dir / "ipc" / "capnp" / "init.capnp"), imports=imports), 92 "echo": capnp.load(str(src_dir / "ipc" / "capnp" / "echo.capnp"), imports=imports), 93 "mining": capnp.load(str(src_dir / "ipc" / "capnp" / "mining.capnp"), imports=imports), 94 } 95 96 97 async def make_capnp_init_ctx(self): 98 node = self.nodes[0] 99 # Establish a connection, and create Init proxy object. 100 connection = await capnp.AsyncIoStream.create_unix_connection(node.ipc_socket_path) 101 client = capnp.TwoPartyClient(connection) 102 init = client.bootstrap().cast_as(self.capnp_modules['init'].Init) 103 # Create a remote thread on the server for the IPC calls to be executed in. 104 threadmap = init.construct().threadMap 105 thread = threadmap.makeThread("pythread").result 106 ctx = self.capnp_modules['proxy'].Context() 107 ctx.thread = thread 108 # Return both. 109 return ctx, init 110 111 112 async def mining_create_block_template(mining, stack, ctx, opts): 113 """Call mining.createNewBlock() and return template, then call template.destroy() when stack exits.""" 114 response = await mining.createNewBlock(ctx, opts) 115 if not response._has("result"): 116 return None 117 return await stack.enter_async_context(destroying(response.result, ctx)) 118 119 120 async def mining_wait_next_template(template, stack, ctx, opts): 121 """Call template.waitNext() and return template, then call template.destroy() when stack exits.""" 122 response = await template.waitNext(ctx, opts) 123 if not response._has("result"): 124 return None 125 return await stack.enter_async_context(destroying(response.result, ctx)) 126 127 128 async def mining_get_block(block_template, ctx): 129 block_data = BytesIO((await block_template.getBlock(ctx)).result) 130 block = CBlock() 131 block.deserialize(block_data) 132 return block 133 134 135 async def mining_get_coinbase_tx(block_template, ctx) -> CoinbaseTxData: 136 assert block_template is not None 137 # Note: the template_capnp struct will be garbage-collected when this 138 # method returns, so it is important to copy any Data fields from it 139 # which need to be accessed later using the bytes() cast. Starting with 140 # pycapnp v2.2.0, Data fields have type `memoryview` and are ephemeral. 141 template_capnp = (await block_template.getCoinbaseTx(ctx)).result 142 witness: Optional[bytes] = None 143 if template_capnp._has("witness"): 144 witness = bytes(template_capnp.witness) 145 return CoinbaseTxData( 146 version=int(template_capnp.version), 147 sequence=int(template_capnp.sequence), 148 scriptSigPrefix=bytes(template_capnp.scriptSigPrefix), 149 witness=witness, 150 blockRewardRemaining=int(template_capnp.blockRewardRemaining), 151 requiredOutputs=[bytes(output) for output in template_capnp.requiredOutputs], 152 lockTime=int(template_capnp.lockTime), 153 ) 154 155 async def make_mining_ctx(self): 156 """Create IPC context and Mining proxy object.""" 157 ctx, init = await make_capnp_init_ctx(self) 158 self.log.debug("Create Mining proxy object") 159 mining = init.makeMining(ctx).result 160 return ctx, mining 161 162 def assert_capnp_failed(e, description_prefix): 163 assert e.description.startswith(description_prefix), f"Expected description starting with '{description_prefix}', got '{e.description}'" 164 assert_equal(e.type, "FAILED")