/ src / zmq / zmqnotificationinterface.cpp
zmqnotificationinterface.cpp
  1  // Copyright (c) 2015-2022 The Bitcoin Core developers
  2  // Distributed under the MIT software license, see the accompanying
  3  // file COPYING or http://www.opensource.org/licenses/mit-license.php.
  4  
  5  #include <zmq/zmqnotificationinterface.h>
  6  
  7  #include <common/args.h>
  8  #include <kernel/chain.h>
  9  #include <kernel/mempool_entry.h>
 10  #include <logging.h>
 11  #include <netbase.h>
 12  #include <primitives/block.h>
 13  #include <primitives/transaction.h>
 14  #include <validationinterface.h>
 15  #include <zmq/zmqabstractnotifier.h>
 16  #include <zmq/zmqpublishnotifier.h>
 17  #include <zmq/zmqutil.h>
 18  
 19  #include <zmq.h>
 20  
 21  #include <cassert>
 22  #include <map>
 23  #include <string>
 24  #include <utility>
 25  #include <vector>
 26  
 27  CZMQNotificationInterface::CZMQNotificationInterface() = default;
 28  
 29  CZMQNotificationInterface::~CZMQNotificationInterface()
 30  {
 31      Shutdown();
 32  }
 33  
 34  std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
 35  {
 36      std::list<const CZMQAbstractNotifier*> result;
 37      for (const auto& n : notifiers) {
 38          result.push_back(n.get());
 39      }
 40      return result;
 41  }
 42  
 43  std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<std::byte>&, const CBlockIndex&)> get_block_by_index)
 44  {
 45      std::map<std::string, CZMQNotifierFactory> factories;
 46      factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
 47      factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
 48      factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
 49          return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
 50      };
 51      factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
 52      factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
 53  
 54      std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
 55      for (const auto& entry : factories)
 56      {
 57          std::string arg("-zmq" + entry.first);
 58          const auto& factory = entry.second;
 59          for (std::string& address : gArgs.GetArgs(arg)) {
 60              // libzmq uses prefix "ipc://" for UNIX domain sockets
 61              if (address.starts_with(ADDR_PREFIX_UNIX)) {
 62                  address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
 63              }
 64  
 65              std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
 66              notifier->SetType(entry.first);
 67              notifier->SetAddress(address);
 68              notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
 69              notifiers.push_back(std::move(notifier));
 70          }
 71      }
 72  
 73      if (!notifiers.empty())
 74      {
 75          std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
 76          notificationInterface->notifiers = std::move(notifiers);
 77  
 78          if (notificationInterface->Initialize()) {
 79              return notificationInterface;
 80          }
 81      }
 82  
 83      return nullptr;
 84  }
 85  
 86  // Called at startup to conditionally set up ZMQ socket(s)
 87  bool CZMQNotificationInterface::Initialize()
 88  {
 89      int major = 0, minor = 0, patch = 0;
 90      zmq_version(&major, &minor, &patch);
 91      LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
 92  
 93      LogDebug(BCLog::ZMQ, "Initialize notification interface\n");
 94      assert(!pcontext);
 95  
 96      pcontext = zmq_ctx_new();
 97  
 98      if (!pcontext)
 99      {
100          zmqError("Unable to initialize context");
101          return false;
102      }
103  
104      for (auto& notifier : notifiers) {
105          if (notifier->Initialize(pcontext)) {
106              LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
107          } else {
108              LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
109              return false;
110          }
111      }
112  
113      return true;
114  }
115  
116  // Called during shutdown sequence
117  void CZMQNotificationInterface::Shutdown()
118  {
119      LogDebug(BCLog::ZMQ, "Shutdown notification interface\n");
120      if (pcontext)
121      {
122          for (auto& notifier : notifiers) {
123              LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
124              notifier->Shutdown();
125          }
126          zmq_ctx_term(pcontext);
127  
128          pcontext = nullptr;
129      }
130  }
131  
132  namespace {
133  
134  template <typename Function>
135  void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
136  {
137      for (auto i = notifiers.begin(); i != notifiers.end(); ) {
138          CZMQAbstractNotifier* notifier = i->get();
139          if (func(notifier)) {
140              ++i;
141          } else {
142              notifier->Shutdown();
143              i = notifiers.erase(i);
144          }
145      }
146  }
147  
148  } // anonymous namespace
149  
150  void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
151  {
152      if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
153          return;
154  
155      TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
156          return notifier->NotifyBlock(pindexNew);
157      });
158  }
159  
160  void CZMQNotificationInterface::TransactionAddedToMempool(const NewMempoolTransactionInfo& ptx, uint64_t mempool_sequence)
161  {
162      const CTransaction& tx = *(ptx.info.m_tx);
163  
164      TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
165          return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
166      });
167  }
168  
169  void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence)
170  {
171      // Called for all non-block inclusion reasons
172      const CTransaction& tx = *ptx;
173  
174      TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
175          return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
176      });
177  }
178  
179  void CZMQNotificationInterface::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
180  {
181      if (role == ChainstateRole::BACKGROUND) {
182          return;
183      }
184      for (const CTransactionRef& ptx : pblock->vtx) {
185          const CTransaction& tx = *ptx;
186          TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
187              return notifier->NotifyTransaction(tx);
188          });
189      }
190  
191      // Next we notify BlockConnect listeners for *all* blocks
192      TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
193          return notifier->NotifyBlockConnect(pindexConnected);
194      });
195  }
196  
197  void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
198  {
199      for (const CTransactionRef& ptx : pblock->vtx) {
200          const CTransaction& tx = *ptx;
201          TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
202              return notifier->NotifyTransaction(tx);
203          });
204      }
205  
206      // Next we notify BlockDisconnect listeners for *all* blocks
207      TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
208          return notifier->NotifyBlockDisconnect(pindexDisconnected);
209      });
210  }
211  
212  std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;