interface_zmq.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2015-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test the ZMQ notification interface.""" 6 import os 7 import struct 8 import tempfile 9 from io import BytesIO 10 11 from test_framework.address import ( 12 ADDRESS_BCRT1_P2WSH_OP_TRUE, 13 ADDRESS_BCRT1_UNSPENDABLE, 14 ) 15 from test_framework.blocktools import ( 16 add_witness_commitment, 17 create_block, 18 create_coinbase, 19 ) 20 from test_framework.test_framework import BitcoinTestFramework 21 from test_framework.messages import ( 22 CBlock, 23 hash256, 24 tx_from_hex, 25 ) 26 from test_framework.util import ( 27 assert_equal, 28 assert_raises_rpc_error, 29 ensure_for, 30 p2p_port, 31 ) 32 from test_framework.wallet import ( 33 MiniWallet, 34 ) 35 from test_framework.netutil import test_ipv6_local, test_unix_socket 36 37 38 # Test may be skipped and not have zmq installed 39 try: 40 import zmq 41 except ImportError: 42 pass 43 44 def hash256_reversed(byte_str): 45 return hash256(byte_str)[::-1] 46 47 class ZMQSubscriber: 48 def __init__(self, socket, topic): 49 self.sequence = None # no sequence number received yet 50 self.socket = socket 51 self.topic = topic 52 53 self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) 54 55 # Receive message from publisher and verify that topic and sequence match 56 def _receive_from_publisher_and_check(self): 57 topic, body, seq = self.socket.recv_multipart() 58 # Topic should match the subscriber topic. 59 assert_equal(topic, self.topic) 60 # Sequence should be incremental. 61 received_seq = struct.unpack('<I', seq)[-1] 62 if self.sequence is None: 63 self.sequence = received_seq 64 else: 65 assert_equal(received_seq, self.sequence) 66 self.sequence += 1 67 return body 68 69 def receive(self): 70 return self._receive_from_publisher_and_check() 71 72 def receive_sequence(self): 73 body = self._receive_from_publisher_and_check() 74 hash = body[:32].hex() 75 label = chr(body[32]) 76 mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0] 77 if mempool_sequence is not None: 78 assert label == "A" or label == "R" 79 else: 80 assert label == "D" or label == "C" 81 return (hash, label, mempool_sequence) 82 83 84 class ZMQTestSetupBlock: 85 """Helper class for setting up a ZMQ test via the "sync up" procedure. 86 Generates a block on the specified node on instantiation and provides a 87 method to check whether a ZMQ notification matches, i.e. the event was 88 caused by this generated block. Assumes that a notification either contains 89 the generated block's hash, it's (coinbase) transaction id, the raw block or 90 raw transaction data. 91 """ 92 def __init__(self, test_framework, node): 93 self.block_hash = test_framework.generate(node, 1, sync_fun=test_framework.no_op)[0] 94 coinbase = node.getblock(self.block_hash, 2)['tx'][0] 95 self.tx_hash = coinbase['txid'] 96 self.raw_tx = coinbase['hex'] 97 self.raw_block = node.getblock(self.block_hash, 0) 98 99 def caused_notification(self, notification): 100 return ( 101 self.block_hash in notification 102 or self.tx_hash in notification 103 or self.raw_block in notification 104 or self.raw_tx in notification 105 ) 106 107 108 class ZMQTest (BitcoinTestFramework): 109 def set_test_params(self): 110 self.num_nodes = 2 111 # whitelist peers to speed up tx relay / mempool sync 112 self.noban_tx_relay = True 113 self.zmq_port_base = p2p_port(self.num_nodes + 1) 114 115 def skip_test_if_missing_module(self): 116 self.skip_if_no_py3_zmq() 117 self.skip_if_no_bitcoind_zmq() 118 119 def run_test(self): 120 self.wallet = MiniWallet(self.nodes[0]) 121 self.ctx = zmq.Context() 122 try: 123 self.test_basic() 124 if test_unix_socket(): 125 self.test_basic(unix=True) 126 else: 127 self.log.info("Skipping ipc test, because UNIX sockets are not supported.") 128 self.test_sequence() 129 self.test_mempool_sync() 130 self.test_reorg() 131 self.test_multiple_interfaces() 132 self.test_ipv6() 133 finally: 134 # Destroy the ZMQ context. 135 self.log.debug("Destroying ZMQ context") 136 self.ctx.destroy(linger=None) 137 138 # Restart node with the specified zmq notifications enabled, subscribe to 139 # all of them and return the corresponding ZMQSubscriber objects. 140 def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True, ipv6=False): 141 subscribers = [] 142 for topic, address in services: 143 socket = self.ctx.socket(zmq.SUB) 144 if ipv6: 145 socket.setsockopt(zmq.IPV6, 1) 146 subscribers.append(ZMQSubscriber(socket, topic.encode())) 147 148 self.restart_node(0, [f"-zmqpub{topic}={address.replace('ipc://', 'unix:')}" for topic, address in services]) 149 150 for i, sub in enumerate(subscribers): 151 sub.socket.connect(services[i][1]) 152 153 # Ensure that all zmq publisher notification interfaces are ready by 154 # running the following "sync up" procedure: 155 # 1. Generate a block on the node 156 # 2. Try to receive the corresponding notification on all subscribers 157 # 3. If all subscribers get the message within the timeout (1 second), 158 # we are done, otherwise repeat starting from step 1 159 for sub in subscribers: 160 sub.socket.set(zmq.RCVTIMEO, 1000) 161 while True: 162 test_block = ZMQTestSetupBlock(self, self.nodes[0]) 163 recv_failed = False 164 for sub in subscribers: 165 try: 166 while not test_block.caused_notification(sub.receive().hex()): 167 self.log.debug("Ignoring sync-up notification for previously generated block.") 168 except zmq.error.Again: 169 self.log.debug("Didn't receive sync-up notification, trying again.") 170 recv_failed = True 171 if not recv_failed: 172 self.log.debug("ZMQ sync-up completed, all subscribers are ready.") 173 break 174 175 # set subscriber's desired timeout for the test 176 for sub in subscribers: 177 sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000) 178 179 self.connect_nodes(0, 1) 180 if sync_blocks: 181 self.sync_blocks() 182 183 return subscribers 184 185 def test_basic(self, unix = False): 186 self.log.info(f"Running basic test with {'ipc' if unix else 'tcp'} protocol") 187 188 # Invalid zmq arguments don't take down the node, see #17185. 189 self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"]) 190 191 address = f"tcp://127.0.0.1:{self.zmq_port_base}" 192 193 if unix: 194 # Use the shortest temp path possible since paths may have as little as 92-char limit 195 socket_path = tempfile.NamedTemporaryFile().name 196 address = f"ipc://{socket_path}" 197 198 subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]]) 199 200 hashblock = subs[0] 201 hashtx = subs[1] 202 rawblock = subs[2] 203 rawtx = subs[3] 204 205 num_blocks = 5 206 self.log.info(f"Generate {num_blocks} blocks (and {num_blocks} coinbase txes)") 207 genhashes = self.generatetoaddress(self.nodes[0], num_blocks, ADDRESS_BCRT1_UNSPENDABLE) 208 209 for x in range(num_blocks): 210 # Should receive the coinbase txid. 211 txid = hashtx.receive() 212 213 # Should receive the coinbase raw transaction. 214 tx = tx_from_hex(rawtx.receive().hex()) 215 tx.calc_sha256() 216 assert_equal(tx.hash, txid.hex()) 217 218 # Should receive the generated raw block. 219 hex = rawblock.receive() 220 block = CBlock() 221 block.deserialize(BytesIO(hex)) 222 assert block.is_valid() 223 assert_equal(block.vtx[0].hash, tx.hash) 224 assert_equal(len(block.vtx), 1) 225 assert_equal(genhashes[x], hash256_reversed(hex[:80]).hex()) 226 227 # Should receive the generated block hash. 228 hash = hashblock.receive().hex() 229 assert_equal(genhashes[x], hash) 230 # The block should only have the coinbase txid. 231 assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"]) 232 233 234 self.log.info("Wait for tx from second node") 235 payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1]) 236 payment_txid = payment_tx['txid'] 237 self.sync_all() 238 # Should receive the broadcasted txid. 239 txid = hashtx.receive() 240 assert_equal(payment_txid, txid.hex()) 241 242 # Should receive the broadcasted raw transaction. 243 hex = rawtx.receive() 244 assert_equal(payment_tx['wtxid'], hash256_reversed(hex).hex()) 245 246 # Mining the block with this tx should result in second notification 247 # after coinbase tx notification 248 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) 249 hashtx.receive() 250 txid = hashtx.receive() 251 assert_equal(payment_txid, txid.hex()) 252 253 254 self.log.info("Test the getzmqnotifications RPC") 255 assert_equal(self.nodes[0].getzmqnotifications(), [ 256 {"type": "pubhashblock", "address": address, "hwm": 1000}, 257 {"type": "pubhashtx", "address": address, "hwm": 1000}, 258 {"type": "pubrawblock", "address": address, "hwm": 1000}, 259 {"type": "pubrawtx", "address": address, "hwm": 1000}, 260 ]) 261 262 assert_equal(self.nodes[1].getzmqnotifications(), []) 263 if unix: 264 os.unlink(socket_path) 265 266 def test_reorg(self): 267 268 address = f"tcp://127.0.0.1:{self.zmq_port_base}" 269 270 # Should only notify the tip if a reorg occurs 271 hashblock, hashtx = self.setup_zmq_test( 272 [(topic, address) for topic in ["hashblock", "hashtx"]], 273 recv_timeout=2) # 2 second timeout to check end of notifications 274 self.disconnect_nodes(0, 1) 275 276 # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications 277 payment_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid'] 278 disconnect_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0] 279 disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0] 280 assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex()) 281 assert_equal(hashtx.receive().hex(), payment_txid) 282 assert_equal(hashtx.receive().hex(), disconnect_cb) 283 284 # Generate 2 blocks in nodes[1] to a different address to ensure split 285 connect_blocks = self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op) 286 287 # nodes[0] will reorg chain after connecting back nodes[1] 288 self.connect_nodes(0, 1) 289 self.sync_blocks() # tx in mempool valid but not advertised 290 291 # Should receive nodes[1] tip 292 assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex()) 293 294 # During reorg: 295 # Get old payment transaction notification from disconnect and disconnected cb 296 assert_equal(hashtx.receive().hex(), payment_txid) 297 assert_equal(hashtx.receive().hex(), disconnect_cb) 298 # And the payment transaction again due to mempool entry 299 assert_equal(hashtx.receive().hex(), payment_txid) 300 assert_equal(hashtx.receive().hex(), payment_txid) 301 # And the new connected coinbases 302 for i in [0, 1]: 303 assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0]) 304 305 # If we do a simple invalidate we announce the disconnected coinbase 306 self.nodes[0].invalidateblock(connect_blocks[1]) 307 assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0]) 308 # And the current tip 309 assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0]) 310 311 def test_sequence(self): 312 """ 313 Sequence zmq notifications give every blockhash and txhash in order 314 of processing, regardless of IBD, re-orgs, etc. 315 Format of messages: 316 <32-byte hash>C : Blockhash connected 317 <32-byte hash>D : Blockhash disconnected 318 <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason 319 <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool 320 """ 321 self.log.info("Testing 'sequence' publisher") 322 [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")]) 323 self.disconnect_nodes(0, 1) 324 325 # Mempool sequence number starts at 1 326 seq_num = 1 327 328 # Generate 1 block in nodes[0] and receive all notifications 329 dc_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0] 330 331 # Note: We are not notified of any block transactions, coinbase or mined 332 assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence()) 333 334 # Generate 2 blocks in nodes[1] to a different address to ensure a chain split 335 self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op) 336 337 # nodes[0] will reorg chain after connecting back nodes[1] 338 self.connect_nodes(0, 1) 339 340 # Then we receive all block (dis)connect notifications for the 2 block reorg 341 assert_equal((dc_block, "D", None), seq.receive_sequence()) 342 block_count = self.nodes[1].getblockcount() 343 assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence()) 344 assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence()) 345 346 self.log.info("Wait for tx from second node") 347 payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1]) 348 payment_txid = payment_tx['txid'] 349 self.sync_all() 350 self.log.info("Testing sequence notifications with mempool sequence values") 351 352 # Should receive the broadcasted txid. 353 assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) 354 seq_num += 1 355 356 self.log.info("Testing RBF notification") 357 # Replace it to test eviction/addition notification 358 payment_tx['tx'].vout[0].nValue -= 1000 359 rbf_txid = self.nodes[1].sendrawtransaction(payment_tx['tx'].serialize().hex()) 360 self.sync_all() 361 assert_equal((payment_txid, "R", seq_num), seq.receive_sequence()) 362 seq_num += 1 363 assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence()) 364 seq_num += 1 365 366 # Doesn't get published when mined, make a block and tx to "flush" the possibility 367 # though the mempool sequence number does go up by the number of transactions 368 # removed from the mempool by the block mining it. 369 mempool_size = len(self.nodes[0].getrawmempool()) 370 c_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)[0] 371 # Make sure the number of mined transactions matches the number of txs out of mempool 372 mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool()) 373 assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta) 374 seq_num += mempool_size_delta 375 payment_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[1])['txid'] 376 self.sync_all() 377 assert_equal((c_block, "C", None), seq.receive_sequence()) 378 assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence()) 379 seq_num += 1 380 381 # Spot check getrawmempool results that they only show up when asked for 382 assert type(self.nodes[0].getrawmempool()) is list 383 assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list 384 assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True) 385 assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True) 386 assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num) 387 388 self.log.info("Testing reorg notifications") 389 # Manually invalidate the last block to test mempool re-entry 390 # N.B. This part could be made more lenient in exact ordering 391 # since it greatly depends on inner-workings of blocks/mempool 392 # during "deep" re-orgs. Probably should "re-construct" 393 # blockchain/mempool state from notifications instead. 394 block_count = self.nodes[0].getblockcount() 395 best_hash = self.nodes[0].getbestblockhash() 396 self.nodes[0].invalidateblock(best_hash) 397 398 # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective 399 # of the time they were gathered. 400 ensure_for(duration=2, f=lambda: self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num) 401 402 assert_equal((best_hash, "D", None), seq.receive_sequence()) 403 assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence()) 404 seq_num += 1 405 406 # Other things may happen but aren't wallet-deterministic so we don't test for them currently 407 self.nodes[0].reconsiderblock(best_hash) 408 self.generatetoaddress(self.nodes[1], 1, ADDRESS_BCRT1_UNSPENDABLE) 409 410 self.log.info("Evict mempool transaction by block conflict") 411 orig_tx = self.wallet.send_self_transfer(from_node=self.nodes[0]) 412 orig_txid = orig_tx['txid'] 413 414 # More to be simply mined 415 more_tx = [] 416 for _ in range(5): 417 more_tx.append(self.wallet.send_self_transfer(from_node=self.nodes[0])) 418 419 orig_tx['tx'].vout[0].nValue -= 1000 420 bump_txid = self.nodes[0].sendrawtransaction(orig_tx['tx'].serialize().hex()) 421 # Mine the pre-bump tx 422 txs_to_add = [orig_tx['hex']] + [tx['hex'] for tx in more_tx] 423 block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1), txlist=txs_to_add) 424 add_witness_commitment(block) 425 block.solve() 426 assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None) 427 tip = self.nodes[0].getbestblockhash() 428 assert_equal(int(tip, 16), block.sha256) 429 orig_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid'] 430 431 # Flush old notifications until evicted tx original entry 432 (hash_str, label, mempool_seq) = seq.receive_sequence() 433 while hash_str != orig_txid: 434 (hash_str, label, mempool_seq) = seq.receive_sequence() 435 mempool_seq += 1 436 437 # Added original tx 438 assert_equal(label, "A") 439 # More transactions to be simply mined 440 for i in range(len(more_tx)): 441 assert_equal((more_tx[i]['txid'], "A", mempool_seq), seq.receive_sequence()) 442 mempool_seq += 1 443 # Bumped by rbf 444 assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence()) 445 mempool_seq += 1 446 assert_equal((bump_txid, "A", mempool_seq), seq.receive_sequence()) 447 mempool_seq += 1 448 # Conflict announced first, then block 449 assert_equal((bump_txid, "R", mempool_seq), seq.receive_sequence()) 450 mempool_seq += 1 451 assert_equal((tip, "C", None), seq.receive_sequence()) 452 mempool_seq += len(more_tx) 453 # Last tx 454 assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence()) 455 mempool_seq += 1 456 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) 457 self.sync_all() # want to make sure we didn't break "consensus" for other tests 458 459 def test_mempool_sync(self): 460 """ 461 Use sequence notification plus getrawmempool sequence results to "sync mempool" 462 """ 463 464 self.log.info("Testing 'mempool sync' usage of sequence notifier") 465 [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")]) 466 467 # In-memory counter, should always start at 1 468 next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] 469 assert_equal(next_mempool_seq, 1) 470 471 # Some transactions have been happening but we aren't consuming zmq notifications yet 472 # or we lost a ZMQ message somehow and want to start over 473 txs = [] 474 num_txs = 5 475 for _ in range(num_txs): 476 txs.append(self.wallet.send_self_transfer(from_node=self.nodes[1])) 477 self.sync_all() 478 479 # 1) Consume backlog until we get a mempool sequence number 480 (hash_str, label, zmq_mem_seq) = seq.receive_sequence() 481 while zmq_mem_seq is None: 482 (hash_str, label, zmq_mem_seq) = seq.receive_sequence() 483 484 assert label == "A" or label == "R" 485 assert hash_str is not None 486 487 # 2) We need to "seed" our view of the mempool 488 mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) 489 mempool_view = set(mempool_snapshot["txids"]) 490 get_raw_seq = mempool_snapshot["mempool_sequence"] 491 assert_equal(get_raw_seq, num_txs + 1) 492 assert zmq_mem_seq < get_raw_seq 493 494 # Things continue to happen in the "interim" while waiting for snapshot results 495 # We have node 0 do all these to avoid p2p races with RBF announcements 496 for _ in range(num_txs): 497 txs.append(self.wallet.send_self_transfer(from_node=self.nodes[0])) 498 txs[-1]['tx'].vout[0].nValue -= 1000 499 self.nodes[0].sendrawtransaction(txs[-1]['tx'].serialize().hex()) 500 self.sync_all() 501 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) 502 final_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid'] 503 504 # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot 505 while True: 506 if zmq_mem_seq == get_raw_seq - 1: 507 break 508 (hash_str, label, mempool_sequence) = seq.receive_sequence() 509 if mempool_sequence is not None: 510 zmq_mem_seq = mempool_sequence 511 if zmq_mem_seq > get_raw_seq: 512 raise Exception(f"We somehow jumped mempool sequence numbers! zmq_mem_seq: {zmq_mem_seq} > get_raw_seq: {get_raw_seq}") 513 514 # 4) Moving forward, we apply the delta to our local view 515 # remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx 516 expected_sequence = get_raw_seq 517 r_gap = 0 518 for _ in range(num_txs + 2 + 1 + 1): 519 (hash_str, label, mempool_sequence) = seq.receive_sequence() 520 if mempool_sequence is not None: 521 if mempool_sequence != expected_sequence: 522 # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its 523 # position in the incoming block message "C" 524 if label == "R": 525 assert mempool_sequence > expected_sequence 526 r_gap += mempool_sequence - expected_sequence 527 else: 528 raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}") 529 if label == "A": 530 assert hash_str not in mempool_view 531 mempool_view.add(hash_str) 532 expected_sequence = mempool_sequence + 1 533 elif label == "R": 534 assert hash_str in mempool_view 535 mempool_view.remove(hash_str) 536 expected_sequence = mempool_sequence + 1 537 elif label == "C": 538 # (Attempt to) remove all txids from known block connects 539 block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] 540 for txid in block_txids: 541 if txid in mempool_view: 542 expected_sequence += 1 543 mempool_view.remove(txid) 544 expected_sequence -= r_gap 545 r_gap = 0 546 elif label == "D": 547 # Not useful for mempool tracking per se 548 continue 549 else: 550 raise Exception("Unexpected ZMQ sequence label!") 551 552 assert_equal(self.nodes[0].getrawmempool(), [final_txid]) 553 assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence) 554 555 # 5) If you miss a zmq/mempool sequence number, go back to step (2) 556 557 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) 558 559 def test_multiple_interfaces(self): 560 # Set up two subscribers with different addresses 561 # (note that after the reorg test, syncing would fail due to different 562 # chain lengths on node0 and node1; for this test we only need node0, so 563 # we can disable syncing blocks on the setup) 564 subscribers = self.setup_zmq_test([ 565 ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 1}"), 566 ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 2}"), 567 ], sync_blocks=False) 568 569 # Generate 1 block in nodes[0] and receive all notifications 570 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op) 571 572 # Should receive the same block hash on both subscribers 573 assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) 574 assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex()) 575 576 def test_ipv6(self): 577 if not test_ipv6_local(): 578 self.log.info("Skipping IPv6 test, because IPv6 is not supported.") 579 return 580 self.log.info("Testing IPv6") 581 # Set up subscriber using IPv6 loopback address 582 subscribers = self.setup_zmq_test([ 583 ("hashblock", f"tcp://[::1]:{self.zmq_port_base}") 584 ], ipv6=True) 585 586 # Generate 1 block in nodes[0] 587 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) 588 589 # Should receive the same block hash 590 assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) 591 592 593 if __name__ == '__main__': 594 ZMQTest(__file__).main()