/ test / functional / interface_zmq.py
interface_zmq.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2015-present The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Test the ZMQ notification interface."""
  6  import os
  7  import struct
  8  import tempfile
  9  from io import BytesIO
 10  
 11  from test_framework.address import (
 12      ADDRESS_BCRT1_P2WSH_OP_TRUE,
 13      ADDRESS_BCRT1_UNSPENDABLE,
 14  )
 15  from test_framework.blocktools import (
 16      add_witness_commitment,
 17      create_block,
 18      create_coinbase,
 19  )
 20  from test_framework.test_framework import BitcoinTestFramework
 21  from test_framework.messages import (
 22      CBlock,
 23      hash256,
 24      tx_from_hex,
 25  )
 26  from test_framework.util import (
 27      assert_equal,
 28      assert_raises_rpc_error,
 29      ensure_for,
 30      p2p_port,
 31  )
 32  from test_framework.wallet import (
 33      MiniWallet,
 34  )
 35  from test_framework.netutil import test_ipv6_local, test_unix_socket
 36  
 37  
 38  # Test may be skipped and not have zmq installed
 39  try:
 40      import zmq
 41  except ImportError:
 42      pass
 43  
 44  def hash256_reversed(byte_str):
 45      return hash256(byte_str)[::-1]
 46  
 47  class ZMQSubscriber:
 48      def __init__(self, socket, topic):
 49          self.sequence = None  # no sequence number received yet
 50          self.socket = socket
 51          self.topic = topic
 52  
 53          self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
 54  
 55      # Receive message from publisher and verify that topic and sequence match
 56      def _receive_from_publisher_and_check(self):
 57          topic, body, seq = self.socket.recv_multipart()
 58          # Topic should match the subscriber topic.
 59          assert_equal(topic, self.topic)
 60          # Sequence should be incremental.
 61          received_seq = struct.unpack('<I', seq)[-1]
 62          if self.sequence is None:
 63              self.sequence = received_seq
 64          else:
 65              assert_equal(received_seq, self.sequence)
 66          self.sequence += 1
 67          return body
 68  
 69      def receive(self):
 70          return self._receive_from_publisher_and_check()
 71  
 72      def receive_sequence(self):
 73          body = self._receive_from_publisher_and_check()
 74          hash = body[:32].hex()
 75          label = chr(body[32])
 76          mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
 77          if mempool_sequence is not None:
 78              assert label == "A" or label == "R"
 79          else:
 80              assert label == "D" or label == "C"
 81          return (hash, label, mempool_sequence)
 82  
 83  
 84  class ZMQTestSetupBlock:
 85      """Helper class for setting up a ZMQ test via the "sync up" procedure.
 86      Generates a block on the specified node on instantiation and provides a
 87      method to check whether a ZMQ notification matches, i.e. the event was
 88      caused by this generated block.  Assumes that a notification either contains
 89      the generated block's hash, it's (coinbase) transaction id, the raw block or
 90      raw transaction data.
 91      """
 92      def __init__(self, test_framework, node):
 93          self.block_hash = test_framework.generate(node, 1, sync_fun=test_framework.no_op)[0]
 94          coinbase = node.getblock(self.block_hash, 2)['tx'][0]
 95          self.tx_hash = coinbase['txid']
 96          self.raw_tx = coinbase['hex']
 97          self.raw_block = node.getblock(self.block_hash, 0)
 98  
 99      def caused_notification(self, notification):
100          return (
101              self.block_hash in notification
102              or self.tx_hash in notification
103              or self.raw_block in notification
104              or self.raw_tx in notification
105          )
106  
107  
108  class ZMQTest (BitcoinTestFramework):
109      def set_test_params(self):
110          self.num_nodes = 2
111          # whitelist peers to speed up tx relay / mempool sync
112          self.noban_tx_relay = True
113          self.zmq_port_base = p2p_port(self.num_nodes + 1)
114  
115      def skip_test_if_missing_module(self):
116          self.skip_if_no_py3_zmq()
117          self.skip_if_no_bitcoind_zmq()
118  
119      def run_test(self):
120          self.wallet = MiniWallet(self.nodes[0])
121          self.ctx = zmq.Context()
122          try:
123              self.test_basic()
124              if test_unix_socket():
125                  self.test_basic(unix=True)
126              else:
127                  self.log.info("Skipping ipc test, because UNIX sockets are not supported.")
128              self.test_sequence()
129              self.test_mempool_sync()
130              self.test_reorg()
131              self.test_multiple_interfaces()
132              self.test_ipv6()
133          finally:
134              # Destroy the ZMQ context.
135              self.log.debug("Destroying ZMQ context")
136              self.ctx.destroy(linger=None)
137  
138      # Restart node with the specified zmq notifications enabled, subscribe to
139      # all of them and return the corresponding ZMQSubscriber objects.
140      def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True, ipv6=False):
141          subscribers = []
142          for topic, address in services:
143              socket = self.ctx.socket(zmq.SUB)
144              if ipv6:
145                  socket.setsockopt(zmq.IPV6, 1)
146              subscribers.append(ZMQSubscriber(socket, topic.encode()))
147  
148          self.restart_node(0, [f"-zmqpub{topic}={address.replace('ipc://', 'unix:')}" for topic, address in services])
149  
150          for i, sub in enumerate(subscribers):
151              sub.socket.connect(services[i][1])
152  
153          # Ensure that all zmq publisher notification interfaces are ready by
154          # running the following "sync up" procedure:
155          #   1. Generate a block on the node
156          #   2. Try to receive the corresponding notification on all subscribers
157          #   3. If all subscribers get the message within the timeout (1 second),
158          #      we are done, otherwise repeat starting from step 1
159          for sub in subscribers:
160              sub.socket.set(zmq.RCVTIMEO, 1000)
161          while True:
162              test_block = ZMQTestSetupBlock(self, self.nodes[0])
163              recv_failed = False
164              for sub in subscribers:
165                  try:
166                      while not test_block.caused_notification(sub.receive().hex()):
167                          self.log.debug("Ignoring sync-up notification for previously generated block.")
168                  except zmq.error.Again:
169                      self.log.debug("Didn't receive sync-up notification, trying again.")
170                      recv_failed = True
171              if not recv_failed:
172                  self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
173                  break
174  
175          # set subscriber's desired timeout for the test
176          for sub in subscribers:
177              sub.socket.set(zmq.RCVTIMEO, int(recv_timeout * self.options.timeout_factor * 1000))
178  
179          self.connect_nodes(0, 1)
180          if sync_blocks:
181              self.sync_blocks()
182  
183          return subscribers
184  
185      def test_basic(self, unix = False):
186          self.log.info(f"Running basic test with {'ipc' if unix else 'tcp'} protocol")
187  
188          # Invalid zmq arguments don't take down the node, see #17185.
189          self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
190  
191          address = f"tcp://127.0.0.1:{self.zmq_port_base}"
192  
193          if unix:
194              # Use the shortest temp path possible since paths may have as little as 92-char limit
195              socket_path = tempfile.NamedTemporaryFile().name
196              address = f"ipc://{socket_path}"
197  
198          subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
199  
200          hashblock = subs[0]
201          hashtx = subs[1]
202          rawblock = subs[2]
203          rawtx = subs[3]
204  
205          num_blocks = 5
206          self.log.info(f"Generate {num_blocks} blocks (and {num_blocks} coinbase txes)")
207          genhashes = self.generatetoaddress(self.nodes[0], num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
208  
209          for x in range(num_blocks):
210              # Should receive the coinbase txid.
211              txid = hashtx.receive()
212  
213              # Should receive the coinbase raw transaction.
214              tx = tx_from_hex(rawtx.receive().hex())
215              assert_equal(tx.txid_hex, txid.hex())
216  
217              # Should receive the generated raw block.
218              hex = rawblock.receive()
219              block = CBlock()
220              block.deserialize(BytesIO(hex))
221              assert block.is_valid()
222              assert_equal(block.vtx[0].txid_hex, tx.txid_hex)
223              assert_equal(len(block.vtx), 1)
224              assert_equal(genhashes[x], hash256_reversed(hex[:80]).hex())
225  
226              # Should receive the generated block hash.
227              hash = hashblock.receive().hex()
228              assert_equal(genhashes[x], hash)
229              # The block should only have the coinbase txid.
230              assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"])
231  
232  
233          self.log.info("Wait for tx from second node")
234          payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
235          payment_txid = payment_tx['txid']
236          self.sync_all()
237          # Should receive the broadcasted txid.
238          txid = hashtx.receive()
239          assert_equal(payment_txid, txid.hex())
240  
241          # Should receive the broadcasted raw transaction.
242          hex = rawtx.receive()
243          assert_equal(payment_tx['wtxid'], hash256_reversed(hex).hex())
244  
245          # Mining the block with this tx should result in second notification
246          # after coinbase tx notification
247          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
248          hashtx.receive()
249          txid = hashtx.receive()
250          assert_equal(payment_txid, txid.hex())
251  
252  
253          self.log.info("Test the getzmqnotifications RPC")
254          assert_equal(self.nodes[0].getzmqnotifications(), [
255              {"type": "pubhashblock", "address": address, "hwm": 1000},
256              {"type": "pubhashtx", "address": address, "hwm": 1000},
257              {"type": "pubrawblock", "address": address, "hwm": 1000},
258              {"type": "pubrawtx", "address": address, "hwm": 1000},
259          ])
260  
261          assert_equal(self.nodes[1].getzmqnotifications(), [])
262          if unix:
263              os.unlink(socket_path)
264  
265      def test_reorg(self):
266  
267          address = f"tcp://127.0.0.1:{self.zmq_port_base}"
268  
269          # Should only notify the tip if a reorg occurs
270          hashblock, hashtx = self.setup_zmq_test(
271              [(topic, address) for topic in ["hashblock", "hashtx"]],
272              recv_timeout=2)  # 2 second timeout to check end of notifications
273          self.disconnect_nodes(0, 1)
274  
275          # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
276          payment_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
277          disconnect_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0]
278          disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0]
279          assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())
280          assert_equal(hashtx.receive().hex(), payment_txid)
281          assert_equal(hashtx.receive().hex(), disconnect_cb)
282  
283          # Generate 2 blocks in nodes[1] to a different address to ensure split
284          connect_blocks = self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op)
285  
286          # nodes[0] will reorg chain after connecting back nodes[1]
287          self.connect_nodes(0, 1)
288          self.sync_blocks() # tx in mempool valid but not advertised
289  
290          # Should receive nodes[1] tip
291          assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())
292  
293          # During reorg:
294          # Get old payment transaction notification from disconnect and disconnected cb
295          assert_equal(hashtx.receive().hex(), payment_txid)
296          assert_equal(hashtx.receive().hex(), disconnect_cb)
297          # And the payment transaction again due to mempool entry
298          assert_equal(hashtx.receive().hex(), payment_txid)
299          assert_equal(hashtx.receive().hex(), payment_txid)
300          # And the new connected coinbases
301          for i in [0, 1]:
302              assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0])
303  
304          # If we do a simple invalidate we announce the disconnected coinbase
305          self.nodes[0].invalidateblock(connect_blocks[1])
306          assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0])
307          # And the current tip
308          assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
309  
310      def test_sequence(self):
311          """
312          Sequence zmq notifications give every blockhash and txhash in order
313          of processing, regardless of IBD, re-orgs, etc.
314          Format of messages:
315          <32-byte hash>C :                 Blockhash connected
316          <32-byte hash>D :                 Blockhash disconnected
317          <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
318          <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
319          """
320          self.log.info("Testing 'sequence' publisher")
321          [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")])
322          self.disconnect_nodes(0, 1)
323  
324          # Mempool sequence number starts at 1
325          seq_num = 1
326  
327          # Generate 1 block in nodes[0] and receive all notifications
328          dc_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0]
329  
330          # Note: We are not notified of any block transactions, coinbase or mined
331          assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())
332  
333          # Generate 2 blocks in nodes[1] to a different address to ensure a chain split
334          self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op)
335  
336          # nodes[0] will reorg chain after connecting back nodes[1]
337          self.connect_nodes(0, 1)
338  
339          # Then we receive all block (dis)connect notifications for the 2 block reorg
340          assert_equal((dc_block, "D", None), seq.receive_sequence())
341          block_count = self.nodes[1].getblockcount()
342          assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
343          assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())
344  
345          self.log.info("Wait for tx from second node")
346          payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
347          payment_txid = payment_tx['txid']
348          self.sync_all()
349          self.log.info("Testing sequence notifications with mempool sequence values")
350  
351          # Should receive the broadcasted txid.
352          assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
353          seq_num += 1
354  
355          self.log.info("Testing RBF notification")
356          # Replace it to test eviction/addition notification
357          payment_tx['tx'].vout[0].nValue -= 1000
358          rbf_txid = self.nodes[1].sendrawtransaction(payment_tx['tx'].serialize().hex())
359          self.sync_all()
360          assert_equal((payment_txid, "R", seq_num), seq.receive_sequence())
361          seq_num += 1
362          assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence())
363          seq_num += 1
364  
365          # Doesn't get published when mined, make a block and tx to "flush" the possibility
366          # though the mempool sequence number does go up by the number of transactions
367          # removed from the mempool by the block mining it.
368          mempool_size = len(self.nodes[0].getrawmempool())
369          c_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)[0]
370          # Make sure the number of mined transactions matches the number of txs out of mempool
371          mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
372          assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
373          seq_num += mempool_size_delta
374          payment_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[1])['txid']
375          self.sync_all()
376          assert_equal((c_block, "C", None), seq.receive_sequence())
377          assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
378          seq_num += 1
379  
380          # Spot check getrawmempool results that they only show up when asked for
381          assert type(self.nodes[0].getrawmempool()) is list
382          assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
383          assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
384          assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
385          assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)
386  
387          self.log.info("Testing reorg notifications")
388          # Manually invalidate the last block to test mempool re-entry
389          # N.B. This part could be made more lenient in exact ordering
390          # since it greatly depends on inner-workings of blocks/mempool
391          # during "deep" re-orgs. Probably should "re-construct"
392          # blockchain/mempool state from notifications instead.
393          block_count = self.nodes[0].getblockcount()
394          best_hash = self.nodes[0].getbestblockhash()
395          self.nodes[0].invalidateblock(best_hash)
396  
397          # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
398          # of the time they were gathered.
399          ensure_for(duration=2, f=lambda: self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num)
400  
401          assert_equal((best_hash, "D", None), seq.receive_sequence())
402          assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence())
403          seq_num += 1
404  
405          # Other things may happen but aren't wallet-deterministic so we don't test for them currently
406          self.nodes[0].reconsiderblock(best_hash)
407          self.generatetoaddress(self.nodes[1], 1, ADDRESS_BCRT1_UNSPENDABLE)
408  
409          self.log.info("Evict mempool transaction by block conflict")
410          orig_tx = self.wallet.send_self_transfer(from_node=self.nodes[0])
411          orig_txid = orig_tx['txid']
412  
413          # More to be simply mined
414          more_tx = []
415          for _ in range(5):
416              more_tx.append(self.wallet.send_self_transfer(from_node=self.nodes[0]))
417  
418          orig_tx['tx'].vout[0].nValue -= 1000
419          bump_txid = self.nodes[0].sendrawtransaction(orig_tx['tx'].serialize().hex())
420          # Mine the pre-bump tx
421          txs_to_add = [orig_tx['hex']] + [tx['hex'] for tx in more_tx]
422          block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1), txlist=txs_to_add)
423          add_witness_commitment(block)
424          block.solve()
425          assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
426          tip = self.nodes[0].getbestblockhash()
427          assert_equal(int(tip, 16), block.hash_int)
428          orig_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
429  
430          # Flush old notifications until evicted tx original entry
431          (hash_str, label, mempool_seq) = seq.receive_sequence()
432          while hash_str != orig_txid:
433              (hash_str, label, mempool_seq) = seq.receive_sequence()
434          mempool_seq += 1
435  
436          # Added original tx
437          assert_equal(label, "A")
438          # More transactions to be simply mined
439          for i in range(len(more_tx)):
440              assert_equal((more_tx[i]['txid'], "A", mempool_seq), seq.receive_sequence())
441              mempool_seq += 1
442          # Bumped by rbf
443          assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence())
444          mempool_seq += 1
445          assert_equal((bump_txid, "A", mempool_seq), seq.receive_sequence())
446          mempool_seq += 1
447          # Conflict announced first, then block
448          assert_equal((bump_txid, "R", mempool_seq), seq.receive_sequence())
449          mempool_seq += 1
450          assert_equal((tip, "C", None), seq.receive_sequence())
451          mempool_seq += len(more_tx)
452          # Last tx
453          assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
454          mempool_seq += 1
455          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
456          self.sync_all()  # want to make sure we didn't break "consensus" for other tests
457  
458      def test_mempool_sync(self):
459          """
460          Use sequence notification plus getrawmempool sequence results to "sync mempool"
461          """
462  
463          self.log.info("Testing 'mempool sync' usage of sequence notifier")
464          [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")])
465  
466          # In-memory counter, should always start at 1
467          next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
468          assert_equal(next_mempool_seq, 1)
469  
470          # Some transactions have been happening but we aren't consuming zmq notifications yet
471          # or we lost a ZMQ message somehow and want to start over
472          txs = []
473          num_txs = 5
474          for _ in range(num_txs):
475              txs.append(self.wallet.send_self_transfer(from_node=self.nodes[1]))
476          self.sync_all()
477  
478          # 1) Consume backlog until we get a mempool sequence number
479          (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
480          while zmq_mem_seq is None:
481              (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
482  
483          assert label == "A" or label == "R"
484          assert hash_str is not None
485  
486          # 2) We need to "seed" our view of the mempool
487          mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
488          mempool_view = set(mempool_snapshot["txids"])
489          get_raw_seq = mempool_snapshot["mempool_sequence"]
490          assert_equal(get_raw_seq, num_txs + 1)
491          assert zmq_mem_seq < get_raw_seq
492  
493          # Things continue to happen in the "interim" while waiting for snapshot results
494          # We have node 0 do all these to avoid p2p races with RBF announcements
495          for _ in range(num_txs):
496              txs.append(self.wallet.send_self_transfer(from_node=self.nodes[0]))
497          txs[-1]['tx'].vout[0].nValue -= 1000
498          self.nodes[0].sendrawtransaction(txs[-1]['tx'].serialize().hex())
499          self.sync_all()
500          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
501          final_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
502  
503          # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
504          while True:
505              if zmq_mem_seq == get_raw_seq - 1:
506                  break
507              (hash_str, label, mempool_sequence) = seq.receive_sequence()
508              if mempool_sequence is not None:
509                  zmq_mem_seq = mempool_sequence
510                  if zmq_mem_seq > get_raw_seq:
511                      raise Exception(f"We somehow jumped mempool sequence numbers! zmq_mem_seq: {zmq_mem_seq} > get_raw_seq: {get_raw_seq}")
512  
513          # 4) Moving forward, we apply the delta to our local view
514          #    remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx
515          expected_sequence = get_raw_seq
516          r_gap = 0
517          for _ in range(num_txs + 2 + 1 + 1):
518              (hash_str, label, mempool_sequence) = seq.receive_sequence()
519              if mempool_sequence is not None:
520                  if mempool_sequence != expected_sequence:
521                      # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
522                      # position in the incoming block message "C"
523                      if label == "R":
524                          assert mempool_sequence > expected_sequence
525                          r_gap += mempool_sequence - expected_sequence
526                      else:
527                          raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}")
528              if label == "A":
529                  assert hash_str not in mempool_view
530                  mempool_view.add(hash_str)
531                  expected_sequence = mempool_sequence + 1
532              elif label == "R":
533                  assert hash_str in mempool_view
534                  mempool_view.remove(hash_str)
535                  expected_sequence = mempool_sequence + 1
536              elif label == "C":
537                  # (Attempt to) remove all txids from known block connects
538                  block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
539                  for txid in block_txids:
540                      if txid in mempool_view:
541                          expected_sequence += 1
542                          mempool_view.remove(txid)
543                  expected_sequence -= r_gap
544                  r_gap = 0
545              elif label == "D":
546                  # Not useful for mempool tracking per se
547                  continue
548              else:
549                  raise Exception("Unexpected ZMQ sequence label!")
550  
551          assert_equal(self.nodes[0].getrawmempool(), [final_txid])
552          assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)
553  
554          # 5) If you miss a zmq/mempool sequence number, go back to step (2)
555  
556          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
557  
558      def test_multiple_interfaces(self):
559          # Set up two subscribers with different addresses
560          # (note that after the reorg test, syncing would fail due to different
561          # chain lengths on node0 and node1; for this test we only need node0, so
562          # we can disable syncing blocks on the setup)
563          subscribers = self.setup_zmq_test([
564              ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 1}"),
565              ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 2}"),
566          ], sync_blocks=False)
567  
568          # Generate 1 block in nodes[0] and receive all notifications
569          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)
570  
571          # Should receive the same block hash on both subscribers
572          assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
573          assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex())
574  
575      def test_ipv6(self):
576          if not test_ipv6_local():
577              self.log.info("Skipping IPv6 test, because IPv6 is not supported.")
578              return
579          self.log.info("Testing IPv6")
580          # Set up subscriber using IPv6 loopback address
581          subscribers = self.setup_zmq_test([
582              ("hashblock", f"tcp://[::1]:{self.zmq_port_base}")
583          ], ipv6=True)
584  
585          # Generate 1 block in nodes[0]
586          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
587  
588          # Should receive the same block hash
589          assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
590  
591  
592  if __name__ == '__main__':
593      ZMQTest(__file__).main()