zmqnotificationinterface.cpp
1 // Copyright (c) 2015-2022 The Bitcoin Core developers 2 // Distributed under the MIT software license, see the accompanying 3 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 5 #include <zmq/zmqnotificationinterface.h> 6 7 #include <common/args.h> 8 #include <kernel/chain.h> 9 #include <kernel/mempool_entry.h> 10 #include <logging.h> 11 #include <netbase.h> 12 #include <primitives/block.h> 13 #include <primitives/transaction.h> 14 #include <validationinterface.h> 15 #include <zmq/zmqabstractnotifier.h> 16 #include <zmq/zmqpublishnotifier.h> 17 #include <zmq/zmqutil.h> 18 19 #include <zmq.h> 20 21 #include <cassert> 22 #include <map> 23 #include <string> 24 #include <utility> 25 #include <vector> 26 27 CZMQNotificationInterface::CZMQNotificationInterface() = default; 28 29 CZMQNotificationInterface::~CZMQNotificationInterface() 30 { 31 Shutdown(); 32 } 33 34 std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const 35 { 36 std::list<const CZMQAbstractNotifier*> result; 37 for (const auto& n : notifiers) { 38 result.push_back(n.get()); 39 } 40 return result; 41 } 42 43 std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<std::byte>&, const CBlockIndex&)> get_block_by_index) 44 { 45 std::map<std::string, CZMQNotifierFactory> factories; 46 factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>; 47 factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; 48 factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> { 49 return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index); 50 }; 51 factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; 52 factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>; 53 54 std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers; 55 for (const auto& entry : factories) 56 { 57 std::string arg("-zmq" + entry.first); 58 const auto& factory = entry.second; 59 for (std::string& address : gArgs.GetArgs(arg)) { 60 // libzmq uses prefix "ipc://" for UNIX domain sockets 61 if (address.starts_with(ADDR_PREFIX_UNIX)) { 62 address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC); 63 } 64 65 std::unique_ptr<CZMQAbstractNotifier> notifier = factory(); 66 notifier->SetType(entry.first); 67 notifier->SetAddress(address); 68 notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); 69 notifiers.push_back(std::move(notifier)); 70 } 71 } 72 73 if (!notifiers.empty()) 74 { 75 std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface()); 76 notificationInterface->notifiers = std::move(notifiers); 77 78 if (notificationInterface->Initialize()) { 79 return notificationInterface; 80 } 81 } 82 83 return nullptr; 84 } 85 86 // Called at startup to conditionally set up ZMQ socket(s) 87 bool CZMQNotificationInterface::Initialize() 88 { 89 int major = 0, minor = 0, patch = 0; 90 zmq_version(&major, &minor, &patch); 91 LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch); 92 93 LogDebug(BCLog::ZMQ, "Initialize notification interface\n"); 94 assert(!pcontext); 95 96 pcontext = zmq_ctx_new(); 97 98 if (!pcontext) 99 { 100 zmqError("Unable to initialize context"); 101 return false; 102 } 103 104 for (auto& notifier : notifiers) { 105 if (notifier->Initialize(pcontext)) { 106 LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); 107 } else { 108 LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); 109 return false; 110 } 111 } 112 113 return true; 114 } 115 116 // Called during shutdown sequence 117 void CZMQNotificationInterface::Shutdown() 118 { 119 LogDebug(BCLog::ZMQ, "Shutdown notification interface\n"); 120 if (pcontext) 121 { 122 for (auto& notifier : notifiers) { 123 LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); 124 notifier->Shutdown(); 125 } 126 zmq_ctx_term(pcontext); 127 128 pcontext = nullptr; 129 } 130 } 131 132 namespace { 133 134 template <typename Function> 135 void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func) 136 { 137 for (auto i = notifiers.begin(); i != notifiers.end(); ) { 138 CZMQAbstractNotifier* notifier = i->get(); 139 if (func(notifier)) { 140 ++i; 141 } else { 142 notifier->Shutdown(); 143 i = notifiers.erase(i); 144 } 145 } 146 } 147 148 } // anonymous namespace 149 150 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) 151 { 152 if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones 153 return; 154 155 TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) { 156 return notifier->NotifyBlock(pindexNew); 157 }); 158 } 159 160 void CZMQNotificationInterface::TransactionAddedToMempool(const NewMempoolTransactionInfo& ptx, uint64_t mempool_sequence) 161 { 162 const CTransaction& tx = *(ptx.info.m_tx); 163 164 TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { 165 return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence); 166 }); 167 } 168 169 void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence) 170 { 171 // Called for all non-block inclusion reasons 172 const CTransaction& tx = *ptx; 173 174 TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { 175 return notifier->NotifyTransactionRemoval(tx, mempool_sequence); 176 }); 177 } 178 179 void CZMQNotificationInterface::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) 180 { 181 if (role == ChainstateRole::BACKGROUND) { 182 return; 183 } 184 for (const CTransactionRef& ptx : pblock->vtx) { 185 const CTransaction& tx = *ptx; 186 TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { 187 return notifier->NotifyTransaction(tx); 188 }); 189 } 190 191 // Next we notify BlockConnect listeners for *all* blocks 192 TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) { 193 return notifier->NotifyBlockConnect(pindexConnected); 194 }); 195 } 196 197 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) 198 { 199 for (const CTransactionRef& ptx : pblock->vtx) { 200 const CTransaction& tx = *ptx; 201 TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { 202 return notifier->NotifyTransaction(tx); 203 }); 204 } 205 206 // Next we notify BlockDisconnect listeners for *all* blocks 207 TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) { 208 return notifier->NotifyBlockDisconnect(pindexDisconnected); 209 }); 210 } 211 212 std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;