zmqpublishnotifier.cpp
1 // Copyright (c) 2015-present The Bitcoin Core developers 2 // Distributed under the MIT software license, see the accompanying 3 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 5 #include <zmq/zmqpublishnotifier.h> 6 7 #include <chain.h> 8 #include <crypto/common.h> 9 #include <logging.h> 10 #include <netaddress.h> 11 #include <netbase.h> 12 #include <primitives/transaction.h> 13 #include <serialize.h> 14 #include <streams.h> 15 #include <uint256.h> 16 #include <util/check.h> 17 #include <zmq/zmqutil.h> 18 19 #include <zmq.h> 20 21 #include <cstdarg> 22 #include <cstddef> 23 #include <cstdint> 24 #include <cstring> 25 #include <map> 26 #include <optional> 27 #include <string> 28 #include <utility> 29 #include <vector> 30 31 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers; 32 33 static const char *MSG_HASHBLOCK = "hashblock"; 34 static const char *MSG_HASHTX = "hashtx"; 35 static const char *MSG_RAWBLOCK = "rawblock"; 36 static const char *MSG_RAWTX = "rawtx"; 37 static const char *MSG_SEQUENCE = "sequence"; 38 39 // Internal function to send multipart message 40 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) 41 { 42 va_list args; 43 va_start(args, size); 44 45 while (1) 46 { 47 zmq_msg_t msg; 48 49 int rc = zmq_msg_init_size(&msg, size); 50 if (rc != 0) 51 { 52 zmqError("Unable to initialize ZMQ msg"); 53 va_end(args); 54 return -1; 55 } 56 57 void *buf = zmq_msg_data(&msg); 58 memcpy(buf, data, size); 59 60 data = va_arg(args, const void*); 61 62 rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); 63 if (rc == -1) 64 { 65 zmqError("Unable to send ZMQ msg"); 66 zmq_msg_close(&msg); 67 va_end(args); 68 return -1; 69 } 70 71 zmq_msg_close(&msg); 72 73 if (!data) 74 break; 75 76 size = va_arg(args, size_t); 77 } 78 va_end(args); 79 return 0; 80 } 81 82 static bool IsZMQAddressIPV6(const std::string &zmq_address) 83 { 84 const std::string tcp_prefix = "tcp://"; 85 const size_t tcp_index = zmq_address.rfind(tcp_prefix); 86 const size_t colon_index = zmq_address.rfind(':'); 87 if (tcp_index == 0 && colon_index != std::string::npos) { 88 const std::string ip = zmq_address.substr(tcp_prefix.length(), colon_index - tcp_prefix.length()); 89 const std::optional<CNetAddr> addr{LookupHost(ip, false)}; 90 if (addr.has_value() && addr.value().IsIPv6()) return true; 91 } 92 return false; 93 } 94 95 bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) 96 { 97 assert(!psocket); 98 99 // check if address is being used by other publish notifier 100 std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address); 101 102 if (i==mapPublishNotifiers.end()) 103 { 104 psocket = zmq_socket(pcontext, ZMQ_PUB); 105 if (!psocket) 106 { 107 zmqError("Failed to create socket"); 108 return false; 109 } 110 111 LogDebug(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); 112 113 int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark)); 114 if (rc != 0) 115 { 116 zmqError("Failed to set outbound message high water mark"); 117 zmq_close(psocket); 118 return false; 119 } 120 121 const int so_keepalive_option {1}; 122 rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option)); 123 if (rc != 0) { 124 zmqError("Failed to set SO_KEEPALIVE"); 125 zmq_close(psocket); 126 return false; 127 } 128 129 // On some systems (e.g. OpenBSD) the ZMQ_IPV6 must not be enabled, if the address to bind isn't IPv6 130 const int enable_ipv6 { IsZMQAddressIPV6(address) ? 1 : 0}; 131 rc = zmq_setsockopt(psocket, ZMQ_IPV6, &enable_ipv6, sizeof(enable_ipv6)); 132 if (rc != 0) { 133 zmqError("Failed to set ZMQ_IPV6"); 134 zmq_close(psocket); 135 return false; 136 } 137 138 rc = zmq_bind(psocket, address.c_str()); 139 if (rc != 0) 140 { 141 zmqError("Failed to bind address"); 142 zmq_close(psocket); 143 return false; 144 } 145 146 // register this notifier for the address, so it can be reused for other publish notifier 147 mapPublishNotifiers.insert(std::make_pair(address, this)); 148 return true; 149 } 150 else 151 { 152 LogDebug(BCLog::ZMQ, "Reusing socket for address %s\n", address); 153 LogDebug(BCLog::ZMQ, "Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); 154 155 psocket = i->second->psocket; 156 mapPublishNotifiers.insert(std::make_pair(address, this)); 157 158 return true; 159 } 160 } 161 162 void CZMQAbstractPublishNotifier::Shutdown() 163 { 164 // Early return if Initialize was not called 165 if (!psocket) return; 166 167 int count = mapPublishNotifiers.count(address); 168 169 // remove this notifier from the list of publishers using this address 170 typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator; 171 std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address); 172 173 for (iterator it = iterpair.first; it != iterpair.second; ++it) 174 { 175 if (it->second==this) 176 { 177 mapPublishNotifiers.erase(it); 178 break; 179 } 180 } 181 182 if (count == 1) 183 { 184 LogDebug(BCLog::ZMQ, "Close socket at address %s\n", address); 185 int linger = 0; 186 zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); 187 zmq_close(psocket); 188 } 189 190 psocket = nullptr; 191 } 192 193 bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size) 194 { 195 assert(psocket); 196 197 /* send three parts, command & data & a LE 4byte sequence number */ 198 unsigned char msgseq[sizeof(uint32_t)]; 199 WriteLE32(msgseq, nSequence); 200 int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr); 201 if (rc == -1) 202 return false; 203 204 /* increment memory only sequence number after sending */ 205 nSequence++; 206 207 return true; 208 } 209 210 bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) 211 { 212 uint256 hash = pindex->GetBlockHash(); 213 LogDebug(BCLog::ZMQ, "Publish hashblock %s to %s\n", hash.GetHex(), this->address); 214 uint8_t data[32]; 215 for (unsigned int i = 0; i < 32; i++) { 216 data[31 - i] = hash.begin()[i]; 217 } 218 return SendZmqMessage(MSG_HASHBLOCK, data, 32); 219 } 220 221 bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) 222 { 223 uint256 hash = transaction.GetHash().ToUint256(); 224 LogDebug(BCLog::ZMQ, "Publish hashtx %s to %s\n", hash.GetHex(), this->address); 225 uint8_t data[32]; 226 for (unsigned int i = 0; i < 32; i++) { 227 data[31 - i] = hash.begin()[i]; 228 } 229 return SendZmqMessage(MSG_HASHTX, data, 32); 230 } 231 232 bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) 233 { 234 LogDebug(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); 235 236 std::vector<std::byte> block{}; 237 if (!m_get_block_by_index(block, *pindex)) { 238 zmqError("Can't read block from disk"); 239 return false; 240 } 241 242 return SendZmqMessage(MSG_RAWBLOCK, block.data(), block.size()); 243 } 244 245 bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) 246 { 247 uint256 hash = transaction.GetHash().ToUint256(); 248 LogDebug(BCLog::ZMQ, "Publish rawtx %s to %s\n", hash.GetHex(), this->address); 249 DataStream ss; 250 ss << TX_WITH_WITNESS(transaction); 251 return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); 252 } 253 254 // Helper function to send a 'sequence' topic message with the following structure: 255 // <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional) 256 static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional<uint64_t> sequence = {}) 257 { 258 unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)]; 259 for (unsigned int i = 0; i < sizeof(hash); ++i) { 260 data[sizeof(hash) - 1 - i] = hash.begin()[i]; 261 } 262 data[sizeof(hash)] = label; 263 if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence); 264 return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label)); 265 } 266 267 bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) 268 { 269 uint256 hash = pindex->GetBlockHash(); 270 LogDebug(BCLog::ZMQ, "Publish sequence block connect %s to %s\n", hash.GetHex(), this->address); 271 return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C'); 272 } 273 274 bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex) 275 { 276 uint256 hash = pindex->GetBlockHash(); 277 LogDebug(BCLog::ZMQ, "Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address); 278 return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D'); 279 } 280 281 bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) 282 { 283 uint256 hash = transaction.GetHash().ToUint256(); 284 LogDebug(BCLog::ZMQ, "Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address); 285 return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence); 286 } 287 288 bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) 289 { 290 uint256 hash = transaction.GetHash().ToUint256(); 291 LogDebug(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); 292 return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence); 293 }