zmqpublishnotifier.cpp
1 // Copyright (c) 2015-2022 The Bitcoin Core developers 2 // Distributed under the MIT software license, see the accompanying 3 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 5 #include <zmq/zmqpublishnotifier.h> 6 7 #include <chain.h> 8 #include <chainparams.h> 9 #include <crypto/common.h> 10 #include <kernel/cs_main.h> 11 #include <logging.h> 12 #include <netaddress.h> 13 #include <netbase.h> 14 #include <node/blockstorage.h> 15 #include <primitives/block.h> 16 #include <primitives/transaction.h> 17 #include <rpc/server.h> 18 #include <serialize.h> 19 #include <streams.h> 20 #include <sync.h> 21 #include <uint256.h> 22 #include <zmq/zmqutil.h> 23 24 #include <zmq.h> 25 26 #include <cassert> 27 #include <cstdarg> 28 #include <cstddef> 29 #include <cstdint> 30 #include <cstring> 31 #include <map> 32 #include <optional> 33 #include <string> 34 #include <utility> 35 #include <vector> 36 37 namespace Consensus { 38 struct Params; 39 } 40 41 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers; 42 43 static const char *MSG_HASHBLOCK = "hashblock"; 44 static const char *MSG_HASHTX = "hashtx"; 45 static const char *MSG_RAWBLOCK = "rawblock"; 46 static const char *MSG_RAWTX = "rawtx"; 47 static const char *MSG_SEQUENCE = "sequence"; 48 49 // Internal function to send multipart message 50 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) 51 { 52 va_list args; 53 va_start(args, size); 54 55 while (1) 56 { 57 zmq_msg_t msg; 58 59 int rc = zmq_msg_init_size(&msg, size); 60 if (rc != 0) 61 { 62 zmqError("Unable to initialize ZMQ msg"); 63 va_end(args); 64 return -1; 65 } 66 67 void *buf = zmq_msg_data(&msg); 68 memcpy(buf, data, size); 69 70 data = va_arg(args, const void*); 71 72 rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); 73 if (rc == -1) 74 { 75 zmqError("Unable to send ZMQ msg"); 76 zmq_msg_close(&msg); 77 va_end(args); 78 return -1; 79 } 80 81 zmq_msg_close(&msg); 82 83 if (!data) 84 break; 85 86 size = va_arg(args, size_t); 87 } 88 va_end(args); 89 return 0; 90 } 91 92 static bool IsZMQAddressIPV6(const std::string &zmq_address) 93 { 94 const std::string tcp_prefix = "tcp://"; 95 const size_t tcp_index = zmq_address.rfind(tcp_prefix); 96 const size_t colon_index = zmq_address.rfind(':'); 97 if (tcp_index == 0 && colon_index != std::string::npos) { 98 const std::string ip = zmq_address.substr(tcp_prefix.length(), colon_index - tcp_prefix.length()); 99 const std::optional<CNetAddr> addr{LookupHost(ip, false)}; 100 if (addr.has_value() && addr.value().IsIPv6()) return true; 101 } 102 return false; 103 } 104 105 bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) 106 { 107 assert(!psocket); 108 109 // check if address is being used by other publish notifier 110 std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address); 111 112 if (i==mapPublishNotifiers.end()) 113 { 114 psocket = zmq_socket(pcontext, ZMQ_PUB); 115 if (!psocket) 116 { 117 zmqError("Failed to create socket"); 118 return false; 119 } 120 121 LogPrint(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); 122 123 int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark)); 124 if (rc != 0) 125 { 126 zmqError("Failed to set outbound message high water mark"); 127 zmq_close(psocket); 128 return false; 129 } 130 131 const int so_keepalive_option {1}; 132 rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option)); 133 if (rc != 0) { 134 zmqError("Failed to set SO_KEEPALIVE"); 135 zmq_close(psocket); 136 return false; 137 } 138 139 // On some systems (e.g. OpenBSD) the ZMQ_IPV6 must not be enabled, if the address to bind isn't IPv6 140 const int enable_ipv6 { IsZMQAddressIPV6(address) ? 1 : 0}; 141 rc = zmq_setsockopt(psocket, ZMQ_IPV6, &enable_ipv6, sizeof(enable_ipv6)); 142 if (rc != 0) { 143 zmqError("Failed to set ZMQ_IPV6"); 144 zmq_close(psocket); 145 return false; 146 } 147 148 rc = zmq_bind(psocket, address.c_str()); 149 if (rc != 0) 150 { 151 zmqError("Failed to bind address"); 152 zmq_close(psocket); 153 return false; 154 } 155 156 // register this notifier for the address, so it can be reused for other publish notifier 157 mapPublishNotifiers.insert(std::make_pair(address, this)); 158 return true; 159 } 160 else 161 { 162 LogPrint(BCLog::ZMQ, "Reusing socket for address %s\n", address); 163 LogPrint(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); 164 165 psocket = i->second->psocket; 166 mapPublishNotifiers.insert(std::make_pair(address, this)); 167 168 return true; 169 } 170 } 171 172 void CZMQAbstractPublishNotifier::Shutdown() 173 { 174 // Early return if Initialize was not called 175 if (!psocket) return; 176 177 int count = mapPublishNotifiers.count(address); 178 179 // remove this notifier from the list of publishers using this address 180 typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator; 181 std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address); 182 183 for (iterator it = iterpair.first; it != iterpair.second; ++it) 184 { 185 if (it->second==this) 186 { 187 mapPublishNotifiers.erase(it); 188 break; 189 } 190 } 191 192 if (count == 1) 193 { 194 LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address); 195 int linger = 0; 196 zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); 197 zmq_close(psocket); 198 } 199 200 psocket = nullptr; 201 } 202 203 bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size) 204 { 205 assert(psocket); 206 207 /* send three parts, command & data & a LE 4byte sequence number */ 208 unsigned char msgseq[sizeof(uint32_t)]; 209 WriteLE32(msgseq, nSequence); 210 int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr); 211 if (rc == -1) 212 return false; 213 214 /* increment memory only sequence number after sending */ 215 nSequence++; 216 217 return true; 218 } 219 220 bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) 221 { 222 uint256 hash = pindex->GetBlockHash(); 223 LogPrint(BCLog::ZMQ, "Publish hashblock %s to %s\n", hash.GetHex(), this->address); 224 uint8_t data[32]; 225 for (unsigned int i = 0; i < 32; i++) { 226 data[31 - i] = hash.begin()[i]; 227 } 228 return SendZmqMessage(MSG_HASHBLOCK, data, 32); 229 } 230 231 bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) 232 { 233 uint256 hash = transaction.GetHash(); 234 LogPrint(BCLog::ZMQ, "Publish hashtx %s to %s\n", hash.GetHex(), this->address); 235 uint8_t data[32]; 236 for (unsigned int i = 0; i < 32; i++) { 237 data[31 - i] = hash.begin()[i]; 238 } 239 return SendZmqMessage(MSG_HASHTX, data, 32); 240 } 241 242 bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) 243 { 244 LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); 245 246 std::vector<uint8_t> block{}; 247 if (!m_get_block_by_index(block, *pindex)) { 248 zmqError("Can't read block from disk"); 249 return false; 250 } 251 252 return SendZmqMessage(MSG_RAWBLOCK, block.data(), block.size()); 253 } 254 255 bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) 256 { 257 uint256 hash = transaction.GetHash(); 258 LogPrint(BCLog::ZMQ, "Publish rawtx %s to %s\n", hash.GetHex(), this->address); 259 DataStream ss; 260 ss << TX_WITH_WITNESS(transaction); 261 return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); 262 } 263 264 // Helper function to send a 'sequence' topic message with the following structure: 265 // <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional) 266 static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional<uint64_t> sequence = {}) 267 { 268 unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)]; 269 for (unsigned int i = 0; i < sizeof(hash); ++i) { 270 data[sizeof(hash) - 1 - i] = hash.begin()[i]; 271 } 272 data[sizeof(hash)] = label; 273 if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence); 274 return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label)); 275 } 276 277 bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) 278 { 279 uint256 hash = pindex->GetBlockHash(); 280 LogPrint(BCLog::ZMQ, "Publish sequence block connect %s to %s\n", hash.GetHex(), this->address); 281 return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C'); 282 } 283 284 bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex) 285 { 286 uint256 hash = pindex->GetBlockHash(); 287 LogPrint(BCLog::ZMQ, "Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address); 288 return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D'); 289 } 290 291 bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) 292 { 293 uint256 hash = transaction.GetHash(); 294 LogPrint(BCLog::ZMQ, "Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address); 295 return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence); 296 } 297 298 bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) 299 { 300 uint256 hash = transaction.GetHash(); 301 LogPrint(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); 302 return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence); 303 }