/ test / functional / interface_zmq.py
interface_zmq.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2015-2022 The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Test the ZMQ notification interface."""
  6  import os
  7  import struct
  8  import tempfile
  9  from io import BytesIO
 10  
 11  from test_framework.address import (
 12      ADDRESS_BCRT1_P2WSH_OP_TRUE,
 13      ADDRESS_BCRT1_UNSPENDABLE,
 14  )
 15  from test_framework.blocktools import (
 16      add_witness_commitment,
 17      create_block,
 18      create_coinbase,
 19  )
 20  from test_framework.test_framework import BitcoinTestFramework
 21  from test_framework.messages import (
 22      CBlock,
 23      hash256,
 24      tx_from_hex,
 25  )
 26  from test_framework.util import (
 27      assert_equal,
 28      assert_raises_rpc_error,
 29      ensure_for,
 30      p2p_port,
 31  )
 32  from test_framework.wallet import (
 33      MiniWallet,
 34  )
 35  from test_framework.netutil import test_ipv6_local, test_unix_socket
 36  
 37  
 38  # Test may be skipped and not have zmq installed
 39  try:
 40      import zmq
 41  except ImportError:
 42      pass
 43  
 44  def hash256_reversed(byte_str):
 45      return hash256(byte_str)[::-1]
 46  
 47  class ZMQSubscriber:
 48      def __init__(self, socket, topic):
 49          self.sequence = None  # no sequence number received yet
 50          self.socket = socket
 51          self.topic = topic
 52  
 53          self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
 54  
 55      # Receive message from publisher and verify that topic and sequence match
 56      def _receive_from_publisher_and_check(self):
 57          topic, body, seq = self.socket.recv_multipart()
 58          # Topic should match the subscriber topic.
 59          assert_equal(topic, self.topic)
 60          # Sequence should be incremental.
 61          received_seq = struct.unpack('<I', seq)[-1]
 62          if self.sequence is None:
 63              self.sequence = received_seq
 64          else:
 65              assert_equal(received_seq, self.sequence)
 66          self.sequence += 1
 67          return body
 68  
 69      def receive(self):
 70          return self._receive_from_publisher_and_check()
 71  
 72      def receive_sequence(self):
 73          body = self._receive_from_publisher_and_check()
 74          hash = body[:32].hex()
 75          label = chr(body[32])
 76          mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
 77          if mempool_sequence is not None:
 78              assert label == "A" or label == "R"
 79          else:
 80              assert label == "D" or label == "C"
 81          return (hash, label, mempool_sequence)
 82  
 83  
 84  class ZMQTestSetupBlock:
 85      """Helper class for setting up a ZMQ test via the "sync up" procedure.
 86      Generates a block on the specified node on instantiation and provides a
 87      method to check whether a ZMQ notification matches, i.e. the event was
 88      caused by this generated block.  Assumes that a notification either contains
 89      the generated block's hash, it's (coinbase) transaction id, the raw block or
 90      raw transaction data.
 91      """
 92      def __init__(self, test_framework, node):
 93          self.block_hash = test_framework.generate(node, 1, sync_fun=test_framework.no_op)[0]
 94          coinbase = node.getblock(self.block_hash, 2)['tx'][0]
 95          self.tx_hash = coinbase['txid']
 96          self.raw_tx = coinbase['hex']
 97          self.raw_block = node.getblock(self.block_hash, 0)
 98  
 99      def caused_notification(self, notification):
100          return (
101              self.block_hash in notification
102              or self.tx_hash in notification
103              or self.raw_block in notification
104              or self.raw_tx in notification
105          )
106  
107  
108  class ZMQTest (BitcoinTestFramework):
109      def set_test_params(self):
110          self.num_nodes = 2
111          # whitelist peers to speed up tx relay / mempool sync
112          self.noban_tx_relay = True
113          self.zmq_port_base = p2p_port(self.num_nodes + 1)
114  
115      def skip_test_if_missing_module(self):
116          self.skip_if_no_py3_zmq()
117          self.skip_if_no_bitcoind_zmq()
118  
119      def run_test(self):
120          self.wallet = MiniWallet(self.nodes[0])
121          self.ctx = zmq.Context()
122          try:
123              self.test_basic()
124              if test_unix_socket():
125                  self.test_basic(unix=True)
126              else:
127                  self.log.info("Skipping ipc test, because UNIX sockets are not supported.")
128              self.test_sequence()
129              self.test_mempool_sync()
130              self.test_reorg()
131              self.test_multiple_interfaces()
132              self.test_ipv6()
133          finally:
134              # Destroy the ZMQ context.
135              self.log.debug("Destroying ZMQ context")
136              self.ctx.destroy(linger=None)
137  
138      # Restart node with the specified zmq notifications enabled, subscribe to
139      # all of them and return the corresponding ZMQSubscriber objects.
140      def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True, ipv6=False):
141          subscribers = []
142          for topic, address in services:
143              socket = self.ctx.socket(zmq.SUB)
144              if ipv6:
145                  socket.setsockopt(zmq.IPV6, 1)
146              subscribers.append(ZMQSubscriber(socket, topic.encode()))
147  
148          self.restart_node(0, [f"-zmqpub{topic}={address.replace('ipc://', 'unix:')}" for topic, address in services])
149  
150          for i, sub in enumerate(subscribers):
151              sub.socket.connect(services[i][1])
152  
153          # Ensure that all zmq publisher notification interfaces are ready by
154          # running the following "sync up" procedure:
155          #   1. Generate a block on the node
156          #   2. Try to receive the corresponding notification on all subscribers
157          #   3. If all subscribers get the message within the timeout (1 second),
158          #      we are done, otherwise repeat starting from step 1
159          for sub in subscribers:
160              sub.socket.set(zmq.RCVTIMEO, 1000)
161          while True:
162              test_block = ZMQTestSetupBlock(self, self.nodes[0])
163              recv_failed = False
164              for sub in subscribers:
165                  try:
166                      while not test_block.caused_notification(sub.receive().hex()):
167                          self.log.debug("Ignoring sync-up notification for previously generated block.")
168                  except zmq.error.Again:
169                      self.log.debug("Didn't receive sync-up notification, trying again.")
170                      recv_failed = True
171              if not recv_failed:
172                  self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
173                  break
174  
175          # set subscriber's desired timeout for the test
176          for sub in subscribers:
177              sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)
178  
179          self.connect_nodes(0, 1)
180          if sync_blocks:
181              self.sync_blocks()
182  
183          return subscribers
184  
185      def test_basic(self, unix = False):
186          self.log.info(f"Running basic test with {'ipc' if unix else 'tcp'} protocol")
187  
188          # Invalid zmq arguments don't take down the node, see #17185.
189          self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
190  
191          address = f"tcp://127.0.0.1:{self.zmq_port_base}"
192  
193          if unix:
194              # Use the shortest temp path possible since paths may have as little as 92-char limit
195              socket_path = tempfile.NamedTemporaryFile().name
196              address = f"ipc://{socket_path}"
197  
198          subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
199  
200          hashblock = subs[0]
201          hashtx = subs[1]
202          rawblock = subs[2]
203          rawtx = subs[3]
204  
205          num_blocks = 5
206          self.log.info(f"Generate {num_blocks} blocks (and {num_blocks} coinbase txes)")
207          genhashes = self.generatetoaddress(self.nodes[0], num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
208  
209          for x in range(num_blocks):
210              # Should receive the coinbase txid.
211              txid = hashtx.receive()
212  
213              # Should receive the coinbase raw transaction.
214              tx = tx_from_hex(rawtx.receive().hex())
215              tx.calc_sha256()
216              assert_equal(tx.hash, txid.hex())
217  
218              # Should receive the generated raw block.
219              hex = rawblock.receive()
220              block = CBlock()
221              block.deserialize(BytesIO(hex))
222              assert block.is_valid()
223              assert_equal(block.vtx[0].hash, tx.hash)
224              assert_equal(len(block.vtx), 1)
225              assert_equal(genhashes[x], hash256_reversed(hex[:80]).hex())
226  
227              # Should receive the generated block hash.
228              hash = hashblock.receive().hex()
229              assert_equal(genhashes[x], hash)
230              # The block should only have the coinbase txid.
231              assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"])
232  
233  
234          self.log.info("Wait for tx from second node")
235          payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
236          payment_txid = payment_tx['txid']
237          self.sync_all()
238          # Should receive the broadcasted txid.
239          txid = hashtx.receive()
240          assert_equal(payment_txid, txid.hex())
241  
242          # Should receive the broadcasted raw transaction.
243          hex = rawtx.receive()
244          assert_equal(payment_tx['wtxid'], hash256_reversed(hex).hex())
245  
246          # Mining the block with this tx should result in second notification
247          # after coinbase tx notification
248          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
249          hashtx.receive()
250          txid = hashtx.receive()
251          assert_equal(payment_txid, txid.hex())
252  
253  
254          self.log.info("Test the getzmqnotifications RPC")
255          assert_equal(self.nodes[0].getzmqnotifications(), [
256              {"type": "pubhashblock", "address": address, "hwm": 1000},
257              {"type": "pubhashtx", "address": address, "hwm": 1000},
258              {"type": "pubrawblock", "address": address, "hwm": 1000},
259              {"type": "pubrawtx", "address": address, "hwm": 1000},
260          ])
261  
262          assert_equal(self.nodes[1].getzmqnotifications(), [])
263          if unix:
264              os.unlink(socket_path)
265  
266      def test_reorg(self):
267  
268          address = f"tcp://127.0.0.1:{self.zmq_port_base}"
269  
270          # Should only notify the tip if a reorg occurs
271          hashblock, hashtx = self.setup_zmq_test(
272              [(topic, address) for topic in ["hashblock", "hashtx"]],
273              recv_timeout=2)  # 2 second timeout to check end of notifications
274          self.disconnect_nodes(0, 1)
275  
276          # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
277          payment_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
278          disconnect_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0]
279          disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0]
280          assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())
281          assert_equal(hashtx.receive().hex(), payment_txid)
282          assert_equal(hashtx.receive().hex(), disconnect_cb)
283  
284          # Generate 2 blocks in nodes[1] to a different address to ensure split
285          connect_blocks = self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op)
286  
287          # nodes[0] will reorg chain after connecting back nodes[1]
288          self.connect_nodes(0, 1)
289          self.sync_blocks() # tx in mempool valid but not advertised
290  
291          # Should receive nodes[1] tip
292          assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())
293  
294          # During reorg:
295          # Get old payment transaction notification from disconnect and disconnected cb
296          assert_equal(hashtx.receive().hex(), payment_txid)
297          assert_equal(hashtx.receive().hex(), disconnect_cb)
298          # And the payment transaction again due to mempool entry
299          assert_equal(hashtx.receive().hex(), payment_txid)
300          assert_equal(hashtx.receive().hex(), payment_txid)
301          # And the new connected coinbases
302          for i in [0, 1]:
303              assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0])
304  
305          # If we do a simple invalidate we announce the disconnected coinbase
306          self.nodes[0].invalidateblock(connect_blocks[1])
307          assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0])
308          # And the current tip
309          assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
310  
311      def test_sequence(self):
312          """
313          Sequence zmq notifications give every blockhash and txhash in order
314          of processing, regardless of IBD, re-orgs, etc.
315          Format of messages:
316          <32-byte hash>C :                 Blockhash connected
317          <32-byte hash>D :                 Blockhash disconnected
318          <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
319          <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
320          """
321          self.log.info("Testing 'sequence' publisher")
322          [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")])
323          self.disconnect_nodes(0, 1)
324  
325          # Mempool sequence number starts at 1
326          seq_num = 1
327  
328          # Generate 1 block in nodes[0] and receive all notifications
329          dc_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0]
330  
331          # Note: We are not notified of any block transactions, coinbase or mined
332          assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())
333  
334          # Generate 2 blocks in nodes[1] to a different address to ensure a chain split
335          self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op)
336  
337          # nodes[0] will reorg chain after connecting back nodes[1]
338          self.connect_nodes(0, 1)
339  
340          # Then we receive all block (dis)connect notifications for the 2 block reorg
341          assert_equal((dc_block, "D", None), seq.receive_sequence())
342          block_count = self.nodes[1].getblockcount()
343          assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
344          assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())
345  
346          self.log.info("Wait for tx from second node")
347          payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
348          payment_txid = payment_tx['txid']
349          self.sync_all()
350          self.log.info("Testing sequence notifications with mempool sequence values")
351  
352          # Should receive the broadcasted txid.
353          assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
354          seq_num += 1
355  
356          self.log.info("Testing RBF notification")
357          # Replace it to test eviction/addition notification
358          payment_tx['tx'].vout[0].nValue -= 1000
359          rbf_txid = self.nodes[1].sendrawtransaction(payment_tx['tx'].serialize().hex())
360          self.sync_all()
361          assert_equal((payment_txid, "R", seq_num), seq.receive_sequence())
362          seq_num += 1
363          assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence())
364          seq_num += 1
365  
366          # Doesn't get published when mined, make a block and tx to "flush" the possibility
367          # though the mempool sequence number does go up by the number of transactions
368          # removed from the mempool by the block mining it.
369          mempool_size = len(self.nodes[0].getrawmempool())
370          c_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)[0]
371          # Make sure the number of mined transactions matches the number of txs out of mempool
372          mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
373          assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
374          seq_num += mempool_size_delta
375          payment_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[1])['txid']
376          self.sync_all()
377          assert_equal((c_block, "C", None), seq.receive_sequence())
378          assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
379          seq_num += 1
380  
381          # Spot check getrawmempool results that they only show up when asked for
382          assert type(self.nodes[0].getrawmempool()) is list
383          assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
384          assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
385          assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
386          assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)
387  
388          self.log.info("Testing reorg notifications")
389          # Manually invalidate the last block to test mempool re-entry
390          # N.B. This part could be made more lenient in exact ordering
391          # since it greatly depends on inner-workings of blocks/mempool
392          # during "deep" re-orgs. Probably should "re-construct"
393          # blockchain/mempool state from notifications instead.
394          block_count = self.nodes[0].getblockcount()
395          best_hash = self.nodes[0].getbestblockhash()
396          self.nodes[0].invalidateblock(best_hash)
397  
398          # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
399          # of the time they were gathered.
400          ensure_for(duration=2, f=lambda: self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num)
401  
402          assert_equal((best_hash, "D", None), seq.receive_sequence())
403          assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence())
404          seq_num += 1
405  
406          # Other things may happen but aren't wallet-deterministic so we don't test for them currently
407          self.nodes[0].reconsiderblock(best_hash)
408          self.generatetoaddress(self.nodes[1], 1, ADDRESS_BCRT1_UNSPENDABLE)
409  
410          self.log.info("Evict mempool transaction by block conflict")
411          orig_tx = self.wallet.send_self_transfer(from_node=self.nodes[0])
412          orig_txid = orig_tx['txid']
413  
414          # More to be simply mined
415          more_tx = []
416          for _ in range(5):
417              more_tx.append(self.wallet.send_self_transfer(from_node=self.nodes[0]))
418  
419          orig_tx['tx'].vout[0].nValue -= 1000
420          bump_txid = self.nodes[0].sendrawtransaction(orig_tx['tx'].serialize().hex())
421          # Mine the pre-bump tx
422          txs_to_add = [orig_tx['hex']] + [tx['hex'] for tx in more_tx]
423          block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1), txlist=txs_to_add)
424          add_witness_commitment(block)
425          block.solve()
426          assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
427          tip = self.nodes[0].getbestblockhash()
428          assert_equal(int(tip, 16), block.sha256)
429          orig_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
430  
431          # Flush old notifications until evicted tx original entry
432          (hash_str, label, mempool_seq) = seq.receive_sequence()
433          while hash_str != orig_txid:
434              (hash_str, label, mempool_seq) = seq.receive_sequence()
435          mempool_seq += 1
436  
437          # Added original tx
438          assert_equal(label, "A")
439          # More transactions to be simply mined
440          for i in range(len(more_tx)):
441              assert_equal((more_tx[i]['txid'], "A", mempool_seq), seq.receive_sequence())
442              mempool_seq += 1
443          # Bumped by rbf
444          assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence())
445          mempool_seq += 1
446          assert_equal((bump_txid, "A", mempool_seq), seq.receive_sequence())
447          mempool_seq += 1
448          # Conflict announced first, then block
449          assert_equal((bump_txid, "R", mempool_seq), seq.receive_sequence())
450          mempool_seq += 1
451          assert_equal((tip, "C", None), seq.receive_sequence())
452          mempool_seq += len(more_tx)
453          # Last tx
454          assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
455          mempool_seq += 1
456          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
457          self.sync_all()  # want to make sure we didn't break "consensus" for other tests
458  
459      def test_mempool_sync(self):
460          """
461          Use sequence notification plus getrawmempool sequence results to "sync mempool"
462          """
463  
464          self.log.info("Testing 'mempool sync' usage of sequence notifier")
465          [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")])
466  
467          # In-memory counter, should always start at 1
468          next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
469          assert_equal(next_mempool_seq, 1)
470  
471          # Some transactions have been happening but we aren't consuming zmq notifications yet
472          # or we lost a ZMQ message somehow and want to start over
473          txs = []
474          num_txs = 5
475          for _ in range(num_txs):
476              txs.append(self.wallet.send_self_transfer(from_node=self.nodes[1]))
477          self.sync_all()
478  
479          # 1) Consume backlog until we get a mempool sequence number
480          (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
481          while zmq_mem_seq is None:
482              (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
483  
484          assert label == "A" or label == "R"
485          assert hash_str is not None
486  
487          # 2) We need to "seed" our view of the mempool
488          mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
489          mempool_view = set(mempool_snapshot["txids"])
490          get_raw_seq = mempool_snapshot["mempool_sequence"]
491          assert_equal(get_raw_seq, num_txs + 1)
492          assert zmq_mem_seq < get_raw_seq
493  
494          # Things continue to happen in the "interim" while waiting for snapshot results
495          # We have node 0 do all these to avoid p2p races with RBF announcements
496          for _ in range(num_txs):
497              txs.append(self.wallet.send_self_transfer(from_node=self.nodes[0]))
498          txs[-1]['tx'].vout[0].nValue -= 1000
499          self.nodes[0].sendrawtransaction(txs[-1]['tx'].serialize().hex())
500          self.sync_all()
501          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
502          final_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
503  
504          # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
505          while True:
506              if zmq_mem_seq == get_raw_seq - 1:
507                  break
508              (hash_str, label, mempool_sequence) = seq.receive_sequence()
509              if mempool_sequence is not None:
510                  zmq_mem_seq = mempool_sequence
511                  if zmq_mem_seq > get_raw_seq:
512                      raise Exception(f"We somehow jumped mempool sequence numbers! zmq_mem_seq: {zmq_mem_seq} > get_raw_seq: {get_raw_seq}")
513  
514          # 4) Moving forward, we apply the delta to our local view
515          #    remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx
516          expected_sequence = get_raw_seq
517          r_gap = 0
518          for _ in range(num_txs + 2 + 1 + 1):
519              (hash_str, label, mempool_sequence) = seq.receive_sequence()
520              if mempool_sequence is not None:
521                  if mempool_sequence != expected_sequence:
522                      # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
523                      # position in the incoming block message "C"
524                      if label == "R":
525                          assert mempool_sequence > expected_sequence
526                          r_gap += mempool_sequence - expected_sequence
527                      else:
528                          raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}")
529              if label == "A":
530                  assert hash_str not in mempool_view
531                  mempool_view.add(hash_str)
532                  expected_sequence = mempool_sequence + 1
533              elif label == "R":
534                  assert hash_str in mempool_view
535                  mempool_view.remove(hash_str)
536                  expected_sequence = mempool_sequence + 1
537              elif label == "C":
538                  # (Attempt to) remove all txids from known block connects
539                  block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
540                  for txid in block_txids:
541                      if txid in mempool_view:
542                          expected_sequence += 1
543                          mempool_view.remove(txid)
544                  expected_sequence -= r_gap
545                  r_gap = 0
546              elif label == "D":
547                  # Not useful for mempool tracking per se
548                  continue
549              else:
550                  raise Exception("Unexpected ZMQ sequence label!")
551  
552          assert_equal(self.nodes[0].getrawmempool(), [final_txid])
553          assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)
554  
555          # 5) If you miss a zmq/mempool sequence number, go back to step (2)
556  
557          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
558  
559      def test_multiple_interfaces(self):
560          # Set up two subscribers with different addresses
561          # (note that after the reorg test, syncing would fail due to different
562          # chain lengths on node0 and node1; for this test we only need node0, so
563          # we can disable syncing blocks on the setup)
564          subscribers = self.setup_zmq_test([
565              ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 1}"),
566              ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 2}"),
567          ], sync_blocks=False)
568  
569          # Generate 1 block in nodes[0] and receive all notifications
570          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)
571  
572          # Should receive the same block hash on both subscribers
573          assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
574          assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex())
575  
576      def test_ipv6(self):
577          if not test_ipv6_local():
578              self.log.info("Skipping IPv6 test, because IPv6 is not supported.")
579              return
580          self.log.info("Testing IPv6")
581          # Set up subscriber using IPv6 loopback address
582          subscribers = self.setup_zmq_test([
583              ("hashblock", f"tcp://[::1]:{self.zmq_port_base}")
584          ], ipv6=True)
585  
586          # Generate 1 block in nodes[0]
587          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
588  
589          # Should receive the same block hash
590          assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
591  
592  
593  if __name__ == '__main__':
594      ZMQTest(__file__).main()