interface_zmq.py
1 #!/usr/bin/env python3 2 # Copyright (c) 2015-2022 The Bitcoin Core developers 3 # Distributed under the MIT software license, see the accompanying 4 # file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 """Test the ZMQ notification interface.""" 6 import struct 7 from time import sleep 8 from io import BytesIO 9 10 from test_framework.address import ( 11 ADDRESS_BCRT1_P2WSH_OP_TRUE, 12 ADDRESS_BCRT1_UNSPENDABLE, 13 ) 14 from test_framework.blocktools import ( 15 add_witness_commitment, 16 create_block, 17 create_coinbase, 18 ) 19 from test_framework.test_framework import BitcoinTestFramework 20 from test_framework.messages import ( 21 CBlock, 22 hash256, 23 tx_from_hex, 24 ) 25 from test_framework.util import ( 26 assert_equal, 27 assert_raises_rpc_error, 28 p2p_port, 29 ) 30 from test_framework.wallet import ( 31 MiniWallet, 32 ) 33 from test_framework.netutil import test_ipv6_local 34 35 36 # Test may be skipped and not have zmq installed 37 try: 38 import zmq 39 except ImportError: 40 pass 41 42 def hash256_reversed(byte_str): 43 return hash256(byte_str)[::-1] 44 45 class ZMQSubscriber: 46 def __init__(self, socket, topic): 47 self.sequence = None # no sequence number received yet 48 self.socket = socket 49 self.topic = topic 50 51 self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) 52 53 # Receive message from publisher and verify that topic and sequence match 54 def _receive_from_publisher_and_check(self): 55 topic, body, seq = self.socket.recv_multipart() 56 # Topic should match the subscriber topic. 57 assert_equal(topic, self.topic) 58 # Sequence should be incremental. 59 received_seq = struct.unpack('<I', seq)[-1] 60 if self.sequence is None: 61 self.sequence = received_seq 62 else: 63 assert_equal(received_seq, self.sequence) 64 self.sequence += 1 65 return body 66 67 def receive(self): 68 return self._receive_from_publisher_and_check() 69 70 def receive_sequence(self): 71 body = self._receive_from_publisher_and_check() 72 hash = body[:32].hex() 73 label = chr(body[32]) 74 mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0] 75 if mempool_sequence is not None: 76 assert label == "A" or label == "R" 77 else: 78 assert label == "D" or label == "C" 79 return (hash, label, mempool_sequence) 80 81 82 class ZMQTestSetupBlock: 83 """Helper class for setting up a ZMQ test via the "sync up" procedure. 84 Generates a block on the specified node on instantiation and provides a 85 method to check whether a ZMQ notification matches, i.e. the event was 86 caused by this generated block. Assumes that a notification either contains 87 the generated block's hash, it's (coinbase) transaction id, the raw block or 88 raw transaction data. 89 """ 90 def __init__(self, test_framework, node): 91 self.block_hash = test_framework.generate(node, 1, sync_fun=test_framework.no_op)[0] 92 coinbase = node.getblock(self.block_hash, 2)['tx'][0] 93 self.tx_hash = coinbase['txid'] 94 self.raw_tx = coinbase['hex'] 95 self.raw_block = node.getblock(self.block_hash, 0) 96 97 def caused_notification(self, notification): 98 return ( 99 self.block_hash in notification 100 or self.tx_hash in notification 101 or self.raw_block in notification 102 or self.raw_tx in notification 103 ) 104 105 106 class ZMQTest (BitcoinTestFramework): 107 def set_test_params(self): 108 self.num_nodes = 2 109 # whitelist peers to speed up tx relay / mempool sync 110 self.noban_tx_relay = True 111 self.zmq_port_base = p2p_port(self.num_nodes + 1) 112 113 def skip_test_if_missing_module(self): 114 self.skip_if_no_py3_zmq() 115 self.skip_if_no_bitcoind_zmq() 116 117 def run_test(self): 118 self.wallet = MiniWallet(self.nodes[0]) 119 self.ctx = zmq.Context() 120 try: 121 self.test_basic() 122 self.test_sequence() 123 self.test_mempool_sync() 124 self.test_reorg() 125 self.test_multiple_interfaces() 126 self.test_ipv6() 127 finally: 128 # Destroy the ZMQ context. 129 self.log.debug("Destroying ZMQ context") 130 self.ctx.destroy(linger=None) 131 132 # Restart node with the specified zmq notifications enabled, subscribe to 133 # all of them and return the corresponding ZMQSubscriber objects. 134 def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True, ipv6=False): 135 subscribers = [] 136 for topic, address in services: 137 socket = self.ctx.socket(zmq.SUB) 138 if ipv6: 139 socket.setsockopt(zmq.IPV6, 1) 140 subscribers.append(ZMQSubscriber(socket, topic.encode())) 141 142 self.restart_node(0, [f"-zmqpub{topic}={address}" for topic, address in services]) 143 144 for i, sub in enumerate(subscribers): 145 sub.socket.connect(services[i][1]) 146 147 # Ensure that all zmq publisher notification interfaces are ready by 148 # running the following "sync up" procedure: 149 # 1. Generate a block on the node 150 # 2. Try to receive the corresponding notification on all subscribers 151 # 3. If all subscribers get the message within the timeout (1 second), 152 # we are done, otherwise repeat starting from step 1 153 for sub in subscribers: 154 sub.socket.set(zmq.RCVTIMEO, 1000) 155 while True: 156 test_block = ZMQTestSetupBlock(self, self.nodes[0]) 157 recv_failed = False 158 for sub in subscribers: 159 try: 160 while not test_block.caused_notification(sub.receive().hex()): 161 self.log.debug("Ignoring sync-up notification for previously generated block.") 162 except zmq.error.Again: 163 self.log.debug("Didn't receive sync-up notification, trying again.") 164 recv_failed = True 165 if not recv_failed: 166 self.log.debug("ZMQ sync-up completed, all subscribers are ready.") 167 break 168 169 # set subscriber's desired timeout for the test 170 for sub in subscribers: 171 sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000) 172 173 self.connect_nodes(0, 1) 174 if sync_blocks: 175 self.sync_blocks() 176 177 return subscribers 178 179 def test_basic(self): 180 181 # Invalid zmq arguments don't take down the node, see #17185. 182 self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"]) 183 184 address = f"tcp://127.0.0.1:{self.zmq_port_base}" 185 subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]]) 186 187 hashblock = subs[0] 188 hashtx = subs[1] 189 rawblock = subs[2] 190 rawtx = subs[3] 191 192 num_blocks = 5 193 self.log.info(f"Generate {num_blocks} blocks (and {num_blocks} coinbase txes)") 194 genhashes = self.generatetoaddress(self.nodes[0], num_blocks, ADDRESS_BCRT1_UNSPENDABLE) 195 196 for x in range(num_blocks): 197 # Should receive the coinbase txid. 198 txid = hashtx.receive() 199 200 # Should receive the coinbase raw transaction. 201 tx = tx_from_hex(rawtx.receive().hex()) 202 tx.calc_sha256() 203 assert_equal(tx.hash, txid.hex()) 204 205 # Should receive the generated raw block. 206 hex = rawblock.receive() 207 block = CBlock() 208 block.deserialize(BytesIO(hex)) 209 assert block.is_valid() 210 assert_equal(block.vtx[0].hash, tx.hash) 211 assert_equal(len(block.vtx), 1) 212 assert_equal(genhashes[x], hash256_reversed(hex[:80]).hex()) 213 214 # Should receive the generated block hash. 215 hash = hashblock.receive().hex() 216 assert_equal(genhashes[x], hash) 217 # The block should only have the coinbase txid. 218 assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"]) 219 220 221 self.log.info("Wait for tx from second node") 222 payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1]) 223 payment_txid = payment_tx['txid'] 224 self.sync_all() 225 # Should receive the broadcasted txid. 226 txid = hashtx.receive() 227 assert_equal(payment_txid, txid.hex()) 228 229 # Should receive the broadcasted raw transaction. 230 hex = rawtx.receive() 231 assert_equal(payment_tx['wtxid'], hash256_reversed(hex).hex()) 232 233 # Mining the block with this tx should result in second notification 234 # after coinbase tx notification 235 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) 236 hashtx.receive() 237 txid = hashtx.receive() 238 assert_equal(payment_txid, txid.hex()) 239 240 241 self.log.info("Test the getzmqnotifications RPC") 242 assert_equal(self.nodes[0].getzmqnotifications(), [ 243 {"type": "pubhashblock", "address": address, "hwm": 1000}, 244 {"type": "pubhashtx", "address": address, "hwm": 1000}, 245 {"type": "pubrawblock", "address": address, "hwm": 1000}, 246 {"type": "pubrawtx", "address": address, "hwm": 1000}, 247 ]) 248 249 assert_equal(self.nodes[1].getzmqnotifications(), []) 250 251 def test_reorg(self): 252 253 address = f"tcp://127.0.0.1:{self.zmq_port_base}" 254 255 # Should only notify the tip if a reorg occurs 256 hashblock, hashtx = self.setup_zmq_test( 257 [(topic, address) for topic in ["hashblock", "hashtx"]], 258 recv_timeout=2) # 2 second timeout to check end of notifications 259 self.disconnect_nodes(0, 1) 260 261 # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications 262 payment_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid'] 263 disconnect_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0] 264 disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0] 265 assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex()) 266 assert_equal(hashtx.receive().hex(), payment_txid) 267 assert_equal(hashtx.receive().hex(), disconnect_cb) 268 269 # Generate 2 blocks in nodes[1] to a different address to ensure split 270 connect_blocks = self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op) 271 272 # nodes[0] will reorg chain after connecting back nodes[1] 273 self.connect_nodes(0, 1) 274 self.sync_blocks() # tx in mempool valid but not advertised 275 276 # Should receive nodes[1] tip 277 assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex()) 278 279 # During reorg: 280 # Get old payment transaction notification from disconnect and disconnected cb 281 assert_equal(hashtx.receive().hex(), payment_txid) 282 assert_equal(hashtx.receive().hex(), disconnect_cb) 283 # And the payment transaction again due to mempool entry 284 assert_equal(hashtx.receive().hex(), payment_txid) 285 assert_equal(hashtx.receive().hex(), payment_txid) 286 # And the new connected coinbases 287 for i in [0, 1]: 288 assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0]) 289 290 # If we do a simple invalidate we announce the disconnected coinbase 291 self.nodes[0].invalidateblock(connect_blocks[1]) 292 assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0]) 293 # And the current tip 294 assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0]) 295 296 def test_sequence(self): 297 """ 298 Sequence zmq notifications give every blockhash and txhash in order 299 of processing, regardless of IBD, re-orgs, etc. 300 Format of messages: 301 <32-byte hash>C : Blockhash connected 302 <32-byte hash>D : Blockhash disconnected 303 <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason 304 <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool 305 """ 306 self.log.info("Testing 'sequence' publisher") 307 [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")]) 308 self.disconnect_nodes(0, 1) 309 310 # Mempool sequence number starts at 1 311 seq_num = 1 312 313 # Generate 1 block in nodes[0] and receive all notifications 314 dc_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0] 315 316 # Note: We are not notified of any block transactions, coinbase or mined 317 assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence()) 318 319 # Generate 2 blocks in nodes[1] to a different address to ensure a chain split 320 self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op) 321 322 # nodes[0] will reorg chain after connecting back nodes[1] 323 self.connect_nodes(0, 1) 324 325 # Then we receive all block (dis)connect notifications for the 2 block reorg 326 assert_equal((dc_block, "D", None), seq.receive_sequence()) 327 block_count = self.nodes[1].getblockcount() 328 assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence()) 329 assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence()) 330 331 self.log.info("Wait for tx from second node") 332 payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1]) 333 payment_txid = payment_tx['txid'] 334 self.sync_all() 335 self.log.info("Testing sequence notifications with mempool sequence values") 336 337 # Should receive the broadcasted txid. 338 assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) 339 seq_num += 1 340 341 self.log.info("Testing RBF notification") 342 # Replace it to test eviction/addition notification 343 payment_tx['tx'].vout[0].nValue -= 1000 344 rbf_txid = self.nodes[1].sendrawtransaction(payment_tx['tx'].serialize().hex()) 345 self.sync_all() 346 assert_equal((payment_txid, "R", seq_num), seq.receive_sequence()) 347 seq_num += 1 348 assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence()) 349 seq_num += 1 350 351 # Doesn't get published when mined, make a block and tx to "flush" the possibility 352 # though the mempool sequence number does go up by the number of transactions 353 # removed from the mempool by the block mining it. 354 mempool_size = len(self.nodes[0].getrawmempool()) 355 c_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)[0] 356 # Make sure the number of mined transactions matches the number of txs out of mempool 357 mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool()) 358 assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta) 359 seq_num += mempool_size_delta 360 payment_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[1])['txid'] 361 self.sync_all() 362 assert_equal((c_block, "C", None), seq.receive_sequence()) 363 assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence()) 364 seq_num += 1 365 366 # Spot check getrawmempool results that they only show up when asked for 367 assert type(self.nodes[0].getrawmempool()) is list 368 assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list 369 assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True) 370 assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True) 371 assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num) 372 373 self.log.info("Testing reorg notifications") 374 # Manually invalidate the last block to test mempool re-entry 375 # N.B. This part could be made more lenient in exact ordering 376 # since it greatly depends on inner-workings of blocks/mempool 377 # during "deep" re-orgs. Probably should "re-construct" 378 # blockchain/mempool state from notifications instead. 379 block_count = self.nodes[0].getblockcount() 380 best_hash = self.nodes[0].getbestblockhash() 381 self.nodes[0].invalidateblock(best_hash) 382 sleep(2) # Bit of room to make sure transaction things happened 383 384 # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective 385 # of the time they were gathered. 386 assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num 387 388 assert_equal((best_hash, "D", None), seq.receive_sequence()) 389 assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence()) 390 seq_num += 1 391 392 # Other things may happen but aren't wallet-deterministic so we don't test for them currently 393 self.nodes[0].reconsiderblock(best_hash) 394 self.generatetoaddress(self.nodes[1], 1, ADDRESS_BCRT1_UNSPENDABLE) 395 396 self.log.info("Evict mempool transaction by block conflict") 397 orig_tx = self.wallet.send_self_transfer(from_node=self.nodes[0]) 398 orig_txid = orig_tx['txid'] 399 400 # More to be simply mined 401 more_tx = [] 402 for _ in range(5): 403 more_tx.append(self.wallet.send_self_transfer(from_node=self.nodes[0])) 404 405 orig_tx['tx'].vout[0].nValue -= 1000 406 bump_txid = self.nodes[0].sendrawtransaction(orig_tx['tx'].serialize().hex()) 407 # Mine the pre-bump tx 408 txs_to_add = [orig_tx['hex']] + [tx['hex'] for tx in more_tx] 409 block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1), txlist=txs_to_add) 410 add_witness_commitment(block) 411 block.solve() 412 assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None) 413 tip = self.nodes[0].getbestblockhash() 414 assert_equal(int(tip, 16), block.sha256) 415 orig_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid'] 416 417 # Flush old notifications until evicted tx original entry 418 (hash_str, label, mempool_seq) = seq.receive_sequence() 419 while hash_str != orig_txid: 420 (hash_str, label, mempool_seq) = seq.receive_sequence() 421 mempool_seq += 1 422 423 # Added original tx 424 assert_equal(label, "A") 425 # More transactions to be simply mined 426 for i in range(len(more_tx)): 427 assert_equal((more_tx[i]['txid'], "A", mempool_seq), seq.receive_sequence()) 428 mempool_seq += 1 429 # Bumped by rbf 430 assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence()) 431 mempool_seq += 1 432 assert_equal((bump_txid, "A", mempool_seq), seq.receive_sequence()) 433 mempool_seq += 1 434 # Conflict announced first, then block 435 assert_equal((bump_txid, "R", mempool_seq), seq.receive_sequence()) 436 mempool_seq += 1 437 assert_equal((tip, "C", None), seq.receive_sequence()) 438 mempool_seq += len(more_tx) 439 # Last tx 440 assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence()) 441 mempool_seq += 1 442 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) 443 self.sync_all() # want to make sure we didn't break "consensus" for other tests 444 445 def test_mempool_sync(self): 446 """ 447 Use sequence notification plus getrawmempool sequence results to "sync mempool" 448 """ 449 450 self.log.info("Testing 'mempool sync' usage of sequence notifier") 451 [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")]) 452 453 # In-memory counter, should always start at 1 454 next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] 455 assert_equal(next_mempool_seq, 1) 456 457 # Some transactions have been happening but we aren't consuming zmq notifications yet 458 # or we lost a ZMQ message somehow and want to start over 459 txs = [] 460 num_txs = 5 461 for _ in range(num_txs): 462 txs.append(self.wallet.send_self_transfer(from_node=self.nodes[1])) 463 self.sync_all() 464 465 # 1) Consume backlog until we get a mempool sequence number 466 (hash_str, label, zmq_mem_seq) = seq.receive_sequence() 467 while zmq_mem_seq is None: 468 (hash_str, label, zmq_mem_seq) = seq.receive_sequence() 469 470 assert label == "A" or label == "R" 471 assert hash_str is not None 472 473 # 2) We need to "seed" our view of the mempool 474 mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) 475 mempool_view = set(mempool_snapshot["txids"]) 476 get_raw_seq = mempool_snapshot["mempool_sequence"] 477 assert_equal(get_raw_seq, 6) 478 # Snapshot may be too old compared to zmq message we read off latest 479 while zmq_mem_seq >= get_raw_seq: 480 sleep(2) 481 mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) 482 mempool_view = set(mempool_snapshot["txids"]) 483 get_raw_seq = mempool_snapshot["mempool_sequence"] 484 485 # Things continue to happen in the "interim" while waiting for snapshot results 486 # We have node 0 do all these to avoid p2p races with RBF announcements 487 for _ in range(num_txs): 488 txs.append(self.wallet.send_self_transfer(from_node=self.nodes[0])) 489 txs[-1]['tx'].vout[0].nValue -= 1000 490 self.nodes[0].sendrawtransaction(txs[-1]['tx'].serialize().hex()) 491 self.sync_all() 492 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) 493 final_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid'] 494 495 # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot 496 while True: 497 if zmq_mem_seq == get_raw_seq - 1: 498 break 499 (hash_str, label, mempool_sequence) = seq.receive_sequence() 500 if mempool_sequence is not None: 501 zmq_mem_seq = mempool_sequence 502 if zmq_mem_seq > get_raw_seq: 503 raise Exception(f"We somehow jumped mempool sequence numbers! zmq_mem_seq: {zmq_mem_seq} > get_raw_seq: {get_raw_seq}") 504 505 # 4) Moving forward, we apply the delta to our local view 506 # remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx 507 expected_sequence = get_raw_seq 508 r_gap = 0 509 for _ in range(num_txs + 2 + 1 + 1): 510 (hash_str, label, mempool_sequence) = seq.receive_sequence() 511 if mempool_sequence is not None: 512 if mempool_sequence != expected_sequence: 513 # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its 514 # position in the incoming block message "C" 515 if label == "R": 516 assert mempool_sequence > expected_sequence 517 r_gap += mempool_sequence - expected_sequence 518 else: 519 raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}") 520 if label == "A": 521 assert hash_str not in mempool_view 522 mempool_view.add(hash_str) 523 expected_sequence = mempool_sequence + 1 524 elif label == "R": 525 assert hash_str in mempool_view 526 mempool_view.remove(hash_str) 527 expected_sequence = mempool_sequence + 1 528 elif label == "C": 529 # (Attempt to) remove all txids from known block connects 530 block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] 531 for txid in block_txids: 532 if txid in mempool_view: 533 expected_sequence += 1 534 mempool_view.remove(txid) 535 expected_sequence -= r_gap 536 r_gap = 0 537 elif label == "D": 538 # Not useful for mempool tracking per se 539 continue 540 else: 541 raise Exception("Unexpected ZMQ sequence label!") 542 543 assert_equal(self.nodes[0].getrawmempool(), [final_txid]) 544 assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence) 545 546 # 5) If you miss a zmq/mempool sequence number, go back to step (2) 547 548 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) 549 550 def test_multiple_interfaces(self): 551 # Set up two subscribers with different addresses 552 # (note that after the reorg test, syncing would fail due to different 553 # chain lengths on node0 and node1; for this test we only need node0, so 554 # we can disable syncing blocks on the setup) 555 subscribers = self.setup_zmq_test([ 556 ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 1}"), 557 ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 2}"), 558 ], sync_blocks=False) 559 560 # Generate 1 block in nodes[0] and receive all notifications 561 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op) 562 563 # Should receive the same block hash on both subscribers 564 assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) 565 assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex()) 566 567 def test_ipv6(self): 568 if not test_ipv6_local(): 569 self.log.info("Skipping IPv6 test, because IPv6 is not supported.") 570 return 571 self.log.info("Testing IPv6") 572 # Set up subscriber using IPv6 loopback address 573 subscribers = self.setup_zmq_test([ 574 ("hashblock", f"tcp://[::1]:{self.zmq_port_base}") 575 ], ipv6=True) 576 577 # Generate 1 block in nodes[0] 578 self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE) 579 580 # Should receive the same block hash 581 assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) 582 583 584 if __name__ == '__main__': 585 ZMQTest().main()