/ test / functional / interface_zmq.py
interface_zmq.py
  1  #!/usr/bin/env python3
  2  # Copyright (c) 2015-2022 The Bitcoin Core developers
  3  # Distributed under the MIT software license, see the accompanying
  4  # file COPYING or http://www.opensource.org/licenses/mit-license.php.
  5  """Test the ZMQ notification interface."""
  6  import struct
  7  from time import sleep
  8  from io import BytesIO
  9  
 10  from test_framework.address import (
 11      ADDRESS_BCRT1_P2WSH_OP_TRUE,
 12      ADDRESS_BCRT1_UNSPENDABLE,
 13  )
 14  from test_framework.blocktools import (
 15      add_witness_commitment,
 16      create_block,
 17      create_coinbase,
 18  )
 19  from test_framework.test_framework import BitcoinTestFramework
 20  from test_framework.messages import (
 21      CBlock,
 22      hash256,
 23      tx_from_hex,
 24  )
 25  from test_framework.util import (
 26      assert_equal,
 27      assert_raises_rpc_error,
 28      p2p_port,
 29  )
 30  from test_framework.wallet import (
 31      MiniWallet,
 32  )
 33  from test_framework.netutil import test_ipv6_local
 34  
 35  
 36  # Test may be skipped and not have zmq installed
 37  try:
 38      import zmq
 39  except ImportError:
 40      pass
 41  
 42  def hash256_reversed(byte_str):
 43      return hash256(byte_str)[::-1]
 44  
 45  class ZMQSubscriber:
 46      def __init__(self, socket, topic):
 47          self.sequence = None  # no sequence number received yet
 48          self.socket = socket
 49          self.topic = topic
 50  
 51          self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
 52  
 53      # Receive message from publisher and verify that topic and sequence match
 54      def _receive_from_publisher_and_check(self):
 55          topic, body, seq = self.socket.recv_multipart()
 56          # Topic should match the subscriber topic.
 57          assert_equal(topic, self.topic)
 58          # Sequence should be incremental.
 59          received_seq = struct.unpack('<I', seq)[-1]
 60          if self.sequence is None:
 61              self.sequence = received_seq
 62          else:
 63              assert_equal(received_seq, self.sequence)
 64          self.sequence += 1
 65          return body
 66  
 67      def receive(self):
 68          return self._receive_from_publisher_and_check()
 69  
 70      def receive_sequence(self):
 71          body = self._receive_from_publisher_and_check()
 72          hash = body[:32].hex()
 73          label = chr(body[32])
 74          mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
 75          if mempool_sequence is not None:
 76              assert label == "A" or label == "R"
 77          else:
 78              assert label == "D" or label == "C"
 79          return (hash, label, mempool_sequence)
 80  
 81  
 82  class ZMQTestSetupBlock:
 83      """Helper class for setting up a ZMQ test via the "sync up" procedure.
 84      Generates a block on the specified node on instantiation and provides a
 85      method to check whether a ZMQ notification matches, i.e. the event was
 86      caused by this generated block.  Assumes that a notification either contains
 87      the generated block's hash, it's (coinbase) transaction id, the raw block or
 88      raw transaction data.
 89      """
 90      def __init__(self, test_framework, node):
 91          self.block_hash = test_framework.generate(node, 1, sync_fun=test_framework.no_op)[0]
 92          coinbase = node.getblock(self.block_hash, 2)['tx'][0]
 93          self.tx_hash = coinbase['txid']
 94          self.raw_tx = coinbase['hex']
 95          self.raw_block = node.getblock(self.block_hash, 0)
 96  
 97      def caused_notification(self, notification):
 98          return (
 99              self.block_hash in notification
100              or self.tx_hash in notification
101              or self.raw_block in notification
102              or self.raw_tx in notification
103          )
104  
105  
106  class ZMQTest (BitcoinTestFramework):
107      def set_test_params(self):
108          self.num_nodes = 2
109          # whitelist peers to speed up tx relay / mempool sync
110          self.noban_tx_relay = True
111          self.zmq_port_base = p2p_port(self.num_nodes + 1)
112  
113      def skip_test_if_missing_module(self):
114          self.skip_if_no_py3_zmq()
115          self.skip_if_no_bitcoind_zmq()
116  
117      def run_test(self):
118          self.wallet = MiniWallet(self.nodes[0])
119          self.ctx = zmq.Context()
120          try:
121              self.test_basic()
122              self.test_sequence()
123              self.test_mempool_sync()
124              self.test_reorg()
125              self.test_multiple_interfaces()
126              self.test_ipv6()
127          finally:
128              # Destroy the ZMQ context.
129              self.log.debug("Destroying ZMQ context")
130              self.ctx.destroy(linger=None)
131  
132      # Restart node with the specified zmq notifications enabled, subscribe to
133      # all of them and return the corresponding ZMQSubscriber objects.
134      def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True, ipv6=False):
135          subscribers = []
136          for topic, address in services:
137              socket = self.ctx.socket(zmq.SUB)
138              if ipv6:
139                  socket.setsockopt(zmq.IPV6, 1)
140              subscribers.append(ZMQSubscriber(socket, topic.encode()))
141  
142          self.restart_node(0, [f"-zmqpub{topic}={address}" for topic, address in services])
143  
144          for i, sub in enumerate(subscribers):
145              sub.socket.connect(services[i][1])
146  
147          # Ensure that all zmq publisher notification interfaces are ready by
148          # running the following "sync up" procedure:
149          #   1. Generate a block on the node
150          #   2. Try to receive the corresponding notification on all subscribers
151          #   3. If all subscribers get the message within the timeout (1 second),
152          #      we are done, otherwise repeat starting from step 1
153          for sub in subscribers:
154              sub.socket.set(zmq.RCVTIMEO, 1000)
155          while True:
156              test_block = ZMQTestSetupBlock(self, self.nodes[0])
157              recv_failed = False
158              for sub in subscribers:
159                  try:
160                      while not test_block.caused_notification(sub.receive().hex()):
161                          self.log.debug("Ignoring sync-up notification for previously generated block.")
162                  except zmq.error.Again:
163                      self.log.debug("Didn't receive sync-up notification, trying again.")
164                      recv_failed = True
165              if not recv_failed:
166                  self.log.debug("ZMQ sync-up completed, all subscribers are ready.")
167                  break
168  
169          # set subscriber's desired timeout for the test
170          for sub in subscribers:
171              sub.socket.set(zmq.RCVTIMEO, recv_timeout*1000)
172  
173          self.connect_nodes(0, 1)
174          if sync_blocks:
175              self.sync_blocks()
176  
177          return subscribers
178  
179      def test_basic(self):
180  
181          # Invalid zmq arguments don't take down the node, see #17185.
182          self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
183  
184          address = f"tcp://127.0.0.1:{self.zmq_port_base}"
185          subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
186  
187          hashblock = subs[0]
188          hashtx = subs[1]
189          rawblock = subs[2]
190          rawtx = subs[3]
191  
192          num_blocks = 5
193          self.log.info(f"Generate {num_blocks} blocks (and {num_blocks} coinbase txes)")
194          genhashes = self.generatetoaddress(self.nodes[0], num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
195  
196          for x in range(num_blocks):
197              # Should receive the coinbase txid.
198              txid = hashtx.receive()
199  
200              # Should receive the coinbase raw transaction.
201              tx = tx_from_hex(rawtx.receive().hex())
202              tx.calc_sha256()
203              assert_equal(tx.hash, txid.hex())
204  
205              # Should receive the generated raw block.
206              hex = rawblock.receive()
207              block = CBlock()
208              block.deserialize(BytesIO(hex))
209              assert block.is_valid()
210              assert_equal(block.vtx[0].hash, tx.hash)
211              assert_equal(len(block.vtx), 1)
212              assert_equal(genhashes[x], hash256_reversed(hex[:80]).hex())
213  
214              # Should receive the generated block hash.
215              hash = hashblock.receive().hex()
216              assert_equal(genhashes[x], hash)
217              # The block should only have the coinbase txid.
218              assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"])
219  
220  
221          self.log.info("Wait for tx from second node")
222          payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
223          payment_txid = payment_tx['txid']
224          self.sync_all()
225          # Should receive the broadcasted txid.
226          txid = hashtx.receive()
227          assert_equal(payment_txid, txid.hex())
228  
229          # Should receive the broadcasted raw transaction.
230          hex = rawtx.receive()
231          assert_equal(payment_tx['wtxid'], hash256_reversed(hex).hex())
232  
233          # Mining the block with this tx should result in second notification
234          # after coinbase tx notification
235          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
236          hashtx.receive()
237          txid = hashtx.receive()
238          assert_equal(payment_txid, txid.hex())
239  
240  
241          self.log.info("Test the getzmqnotifications RPC")
242          assert_equal(self.nodes[0].getzmqnotifications(), [
243              {"type": "pubhashblock", "address": address, "hwm": 1000},
244              {"type": "pubhashtx", "address": address, "hwm": 1000},
245              {"type": "pubrawblock", "address": address, "hwm": 1000},
246              {"type": "pubrawtx", "address": address, "hwm": 1000},
247          ])
248  
249          assert_equal(self.nodes[1].getzmqnotifications(), [])
250  
251      def test_reorg(self):
252  
253          address = f"tcp://127.0.0.1:{self.zmq_port_base}"
254  
255          # Should only notify the tip if a reorg occurs
256          hashblock, hashtx = self.setup_zmq_test(
257              [(topic, address) for topic in ["hashblock", "hashtx"]],
258              recv_timeout=2)  # 2 second timeout to check end of notifications
259          self.disconnect_nodes(0, 1)
260  
261          # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
262          payment_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
263          disconnect_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0]
264          disconnect_cb = self.nodes[0].getblock(disconnect_block)["tx"][0]
265          assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())
266          assert_equal(hashtx.receive().hex(), payment_txid)
267          assert_equal(hashtx.receive().hex(), disconnect_cb)
268  
269          # Generate 2 blocks in nodes[1] to a different address to ensure split
270          connect_blocks = self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op)
271  
272          # nodes[0] will reorg chain after connecting back nodes[1]
273          self.connect_nodes(0, 1)
274          self.sync_blocks() # tx in mempool valid but not advertised
275  
276          # Should receive nodes[1] tip
277          assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())
278  
279          # During reorg:
280          # Get old payment transaction notification from disconnect and disconnected cb
281          assert_equal(hashtx.receive().hex(), payment_txid)
282          assert_equal(hashtx.receive().hex(), disconnect_cb)
283          # And the payment transaction again due to mempool entry
284          assert_equal(hashtx.receive().hex(), payment_txid)
285          assert_equal(hashtx.receive().hex(), payment_txid)
286          # And the new connected coinbases
287          for i in [0, 1]:
288              assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0])
289  
290          # If we do a simple invalidate we announce the disconnected coinbase
291          self.nodes[0].invalidateblock(connect_blocks[1])
292          assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0])
293          # And the current tip
294          assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
295  
296      def test_sequence(self):
297          """
298          Sequence zmq notifications give every blockhash and txhash in order
299          of processing, regardless of IBD, re-orgs, etc.
300          Format of messages:
301          <32-byte hash>C :                 Blockhash connected
302          <32-byte hash>D :                 Blockhash disconnected
303          <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
304          <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
305          """
306          self.log.info("Testing 'sequence' publisher")
307          [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")])
308          self.disconnect_nodes(0, 1)
309  
310          # Mempool sequence number starts at 1
311          seq_num = 1
312  
313          # Generate 1 block in nodes[0] and receive all notifications
314          dc_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)[0]
315  
316          # Note: We are not notified of any block transactions, coinbase or mined
317          assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence())
318  
319          # Generate 2 blocks in nodes[1] to a different address to ensure a chain split
320          self.generatetoaddress(self.nodes[1], 2, ADDRESS_BCRT1_P2WSH_OP_TRUE, sync_fun=self.no_op)
321  
322          # nodes[0] will reorg chain after connecting back nodes[1]
323          self.connect_nodes(0, 1)
324  
325          # Then we receive all block (dis)connect notifications for the 2 block reorg
326          assert_equal((dc_block, "D", None), seq.receive_sequence())
327          block_count = self.nodes[1].getblockcount()
328          assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence())
329          assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence())
330  
331          self.log.info("Wait for tx from second node")
332          payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
333          payment_txid = payment_tx['txid']
334          self.sync_all()
335          self.log.info("Testing sequence notifications with mempool sequence values")
336  
337          # Should receive the broadcasted txid.
338          assert_equal((payment_txid, "A", seq_num), seq.receive_sequence())
339          seq_num += 1
340  
341          self.log.info("Testing RBF notification")
342          # Replace it to test eviction/addition notification
343          payment_tx['tx'].vout[0].nValue -= 1000
344          rbf_txid = self.nodes[1].sendrawtransaction(payment_tx['tx'].serialize().hex())
345          self.sync_all()
346          assert_equal((payment_txid, "R", seq_num), seq.receive_sequence())
347          seq_num += 1
348          assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence())
349          seq_num += 1
350  
351          # Doesn't get published when mined, make a block and tx to "flush" the possibility
352          # though the mempool sequence number does go up by the number of transactions
353          # removed from the mempool by the block mining it.
354          mempool_size = len(self.nodes[0].getrawmempool())
355          c_block = self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)[0]
356          # Make sure the number of mined transactions matches the number of txs out of mempool
357          mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool())
358          assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta)
359          seq_num += mempool_size_delta
360          payment_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[1])['txid']
361          self.sync_all()
362          assert_equal((c_block, "C", None), seq.receive_sequence())
363          assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence())
364          seq_num += 1
365  
366          # Spot check getrawmempool results that they only show up when asked for
367          assert type(self.nodes[0].getrawmempool()) is list
368          assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list
369          assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True)
370          assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True)
371          assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num)
372  
373          self.log.info("Testing reorg notifications")
374          # Manually invalidate the last block to test mempool re-entry
375          # N.B. This part could be made more lenient in exact ordering
376          # since it greatly depends on inner-workings of blocks/mempool
377          # during "deep" re-orgs. Probably should "re-construct"
378          # blockchain/mempool state from notifications instead.
379          block_count = self.nodes[0].getblockcount()
380          best_hash = self.nodes[0].getbestblockhash()
381          self.nodes[0].invalidateblock(best_hash)
382          sleep(2)  # Bit of room to make sure transaction things happened
383  
384          # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective
385          # of the time they were gathered.
386          assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num
387  
388          assert_equal((best_hash, "D", None), seq.receive_sequence())
389          assert_equal((rbf_txid, "A", seq_num), seq.receive_sequence())
390          seq_num += 1
391  
392          # Other things may happen but aren't wallet-deterministic so we don't test for them currently
393          self.nodes[0].reconsiderblock(best_hash)
394          self.generatetoaddress(self.nodes[1], 1, ADDRESS_BCRT1_UNSPENDABLE)
395  
396          self.log.info("Evict mempool transaction by block conflict")
397          orig_tx = self.wallet.send_self_transfer(from_node=self.nodes[0])
398          orig_txid = orig_tx['txid']
399  
400          # More to be simply mined
401          more_tx = []
402          for _ in range(5):
403              more_tx.append(self.wallet.send_self_transfer(from_node=self.nodes[0]))
404  
405          orig_tx['tx'].vout[0].nValue -= 1000
406          bump_txid = self.nodes[0].sendrawtransaction(orig_tx['tx'].serialize().hex())
407          # Mine the pre-bump tx
408          txs_to_add = [orig_tx['hex']] + [tx['hex'] for tx in more_tx]
409          block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1), txlist=txs_to_add)
410          add_witness_commitment(block)
411          block.solve()
412          assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None)
413          tip = self.nodes[0].getbestblockhash()
414          assert_equal(int(tip, 16), block.sha256)
415          orig_txid_2 = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
416  
417          # Flush old notifications until evicted tx original entry
418          (hash_str, label, mempool_seq) = seq.receive_sequence()
419          while hash_str != orig_txid:
420              (hash_str, label, mempool_seq) = seq.receive_sequence()
421          mempool_seq += 1
422  
423          # Added original tx
424          assert_equal(label, "A")
425          # More transactions to be simply mined
426          for i in range(len(more_tx)):
427              assert_equal((more_tx[i]['txid'], "A", mempool_seq), seq.receive_sequence())
428              mempool_seq += 1
429          # Bumped by rbf
430          assert_equal((orig_txid, "R", mempool_seq), seq.receive_sequence())
431          mempool_seq += 1
432          assert_equal((bump_txid, "A", mempool_seq), seq.receive_sequence())
433          mempool_seq += 1
434          # Conflict announced first, then block
435          assert_equal((bump_txid, "R", mempool_seq), seq.receive_sequence())
436          mempool_seq += 1
437          assert_equal((tip, "C", None), seq.receive_sequence())
438          mempool_seq += len(more_tx)
439          # Last tx
440          assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence())
441          mempool_seq += 1
442          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
443          self.sync_all()  # want to make sure we didn't break "consensus" for other tests
444  
445      def test_mempool_sync(self):
446          """
447          Use sequence notification plus getrawmempool sequence results to "sync mempool"
448          """
449  
450          self.log.info("Testing 'mempool sync' usage of sequence notifier")
451          [seq] = self.setup_zmq_test([("sequence", f"tcp://127.0.0.1:{self.zmq_port_base}")])
452  
453          # In-memory counter, should always start at 1
454          next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
455          assert_equal(next_mempool_seq, 1)
456  
457          # Some transactions have been happening but we aren't consuming zmq notifications yet
458          # or we lost a ZMQ message somehow and want to start over
459          txs = []
460          num_txs = 5
461          for _ in range(num_txs):
462              txs.append(self.wallet.send_self_transfer(from_node=self.nodes[1]))
463          self.sync_all()
464  
465          # 1) Consume backlog until we get a mempool sequence number
466          (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
467          while zmq_mem_seq is None:
468              (hash_str, label, zmq_mem_seq) = seq.receive_sequence()
469  
470          assert label == "A" or label == "R"
471          assert hash_str is not None
472  
473          # 2) We need to "seed" our view of the mempool
474          mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
475          mempool_view = set(mempool_snapshot["txids"])
476          get_raw_seq = mempool_snapshot["mempool_sequence"]
477          assert_equal(get_raw_seq, 6)
478          # Snapshot may be too old compared to zmq message we read off latest
479          while zmq_mem_seq >= get_raw_seq:
480              sleep(2)
481              mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True)
482              mempool_view = set(mempool_snapshot["txids"])
483              get_raw_seq = mempool_snapshot["mempool_sequence"]
484  
485          # Things continue to happen in the "interim" while waiting for snapshot results
486          # We have node 0 do all these to avoid p2p races with RBF announcements
487          for _ in range(num_txs):
488              txs.append(self.wallet.send_self_transfer(from_node=self.nodes[0]))
489          txs[-1]['tx'].vout[0].nValue -= 1000
490          self.nodes[0].sendrawtransaction(txs[-1]['tx'].serialize().hex())
491          self.sync_all()
492          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
493          final_txid = self.wallet.send_self_transfer(from_node=self.nodes[0])['txid']
494  
495          # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot
496          while True:
497              if zmq_mem_seq == get_raw_seq - 1:
498                  break
499              (hash_str, label, mempool_sequence) = seq.receive_sequence()
500              if mempool_sequence is not None:
501                  zmq_mem_seq = mempool_sequence
502                  if zmq_mem_seq > get_raw_seq:
503                      raise Exception(f"We somehow jumped mempool sequence numbers! zmq_mem_seq: {zmq_mem_seq} > get_raw_seq: {get_raw_seq}")
504  
505          # 4) Moving forward, we apply the delta to our local view
506          #    remaining txs(5) + 1 rbf(A+R) + 1 block connect + 1 final tx
507          expected_sequence = get_raw_seq
508          r_gap = 0
509          for _ in range(num_txs + 2 + 1 + 1):
510              (hash_str, label, mempool_sequence) = seq.receive_sequence()
511              if mempool_sequence is not None:
512                  if mempool_sequence != expected_sequence:
513                      # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its
514                      # position in the incoming block message "C"
515                      if label == "R":
516                          assert mempool_sequence > expected_sequence
517                          r_gap += mempool_sequence - expected_sequence
518                      else:
519                          raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}")
520              if label == "A":
521                  assert hash_str not in mempool_view
522                  mempool_view.add(hash_str)
523                  expected_sequence = mempool_sequence + 1
524              elif label == "R":
525                  assert hash_str in mempool_view
526                  mempool_view.remove(hash_str)
527                  expected_sequence = mempool_sequence + 1
528              elif label == "C":
529                  # (Attempt to) remove all txids from known block connects
530                  block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]
531                  for txid in block_txids:
532                      if txid in mempool_view:
533                          expected_sequence += 1
534                          mempool_view.remove(txid)
535                  expected_sequence -= r_gap
536                  r_gap = 0
537              elif label == "D":
538                  # Not useful for mempool tracking per se
539                  continue
540              else:
541                  raise Exception("Unexpected ZMQ sequence label!")
542  
543          assert_equal(self.nodes[0].getrawmempool(), [final_txid])
544          assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence)
545  
546          # 5) If you miss a zmq/mempool sequence number, go back to step (2)
547  
548          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
549  
550      def test_multiple_interfaces(self):
551          # Set up two subscribers with different addresses
552          # (note that after the reorg test, syncing would fail due to different
553          # chain lengths on node0 and node1; for this test we only need node0, so
554          # we can disable syncing blocks on the setup)
555          subscribers = self.setup_zmq_test([
556              ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 1}"),
557              ("hashblock", f"tcp://127.0.0.1:{self.zmq_port_base + 2}"),
558          ], sync_blocks=False)
559  
560          # Generate 1 block in nodes[0] and receive all notifications
561          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE, sync_fun=self.no_op)
562  
563          # Should receive the same block hash on both subscribers
564          assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
565          assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex())
566  
567      def test_ipv6(self):
568          if not test_ipv6_local():
569              self.log.info("Skipping IPv6 test, because IPv6 is not supported.")
570              return
571          self.log.info("Testing IPv6")
572          # Set up subscriber using IPv6 loopback address
573          subscribers = self.setup_zmq_test([
574              ("hashblock", f"tcp://[::1]:{self.zmq_port_base}")
575          ], ipv6=True)
576  
577          # Generate 1 block in nodes[0]
578          self.generatetoaddress(self.nodes[0], 1, ADDRESS_BCRT1_UNSPENDABLE)
579  
580          # Should receive the same block hash
581          assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
582  
583  
584  if __name__ == '__main__':
585      ZMQTest().main()