streams.h
1 // Copyright (c) 2009-2010 Satoshi Nakamoto 2 // Copyright (c) 2009-present The Bitcoin Core developers 3 // Distributed under the MIT software license, see the accompanying 4 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 6 #ifndef BITCOIN_STREAMS_H 7 #define BITCOIN_STREAMS_H 8 9 #include <logging.h> 10 #include <serialize.h> 11 #include <span.h> 12 #include <support/allocators/zeroafterfree.h> 13 #include <util/check.h> 14 #include <util/obfuscation.h> 15 #include <util/overflow.h> 16 #include <util/syserror.h> 17 18 #include <algorithm> 19 #include <cassert> 20 #include <cstddef> 21 #include <cstdint> 22 #include <cstdio> 23 #include <cstring> 24 #include <ios> 25 #include <limits> 26 #include <optional> 27 #include <string> 28 #include <vector> 29 30 /* Minimal stream for overwriting and/or appending to an existing byte vector 31 * 32 * The referenced vector will grow as necessary 33 */ 34 class VectorWriter 35 { 36 public: 37 /* 38 * @param[in] vchDataIn Referenced byte vector to overwrite/append 39 * @param[in] nPosIn Starting position. Vector index where writes should start. The vector will initially 40 * grow as necessary to max(nPosIn, vec.size()). So to append, use vec.size(). 41 */ 42 VectorWriter(std::vector<unsigned char>& vchDataIn, size_t nPosIn) : vchData{vchDataIn}, nPos{nPosIn} 43 { 44 if(nPos > vchData.size()) 45 vchData.resize(nPos); 46 } 47 /* 48 * (other params same as above) 49 * @param[in] args A list of items to serialize starting at nPosIn. 50 */ 51 template <typename... Args> 52 VectorWriter(std::vector<unsigned char>& vchDataIn, size_t nPosIn, Args&&... args) : VectorWriter{vchDataIn, nPosIn} 53 { 54 ::SerializeMany(*this, std::forward<Args>(args)...); 55 } 56 void write(std::span<const std::byte> src) 57 { 58 assert(nPos <= vchData.size()); 59 size_t nOverwrite = std::min(src.size(), vchData.size() - nPos); 60 if (nOverwrite) { 61 memcpy(vchData.data() + nPos, src.data(), nOverwrite); 62 } 63 if (nOverwrite < src.size()) { 64 vchData.insert(vchData.end(), UCharCast(src.data()) + nOverwrite, UCharCast(src.data() + src.size())); 65 } 66 nPos += src.size(); 67 } 68 template <typename T> 69 VectorWriter& operator<<(const T& obj) 70 { 71 ::Serialize(*this, obj); 72 return (*this); 73 } 74 75 private: 76 std::vector<unsigned char>& vchData; 77 size_t nPos; 78 }; 79 80 /** Minimal stream for reading from an existing byte array by std::span. 81 */ 82 class SpanReader 83 { 84 private: 85 std::span<const std::byte> m_data; 86 87 public: 88 /** 89 * @param[in] data Referenced byte vector to overwrite/append 90 */ 91 explicit SpanReader(std::span<const unsigned char> data) : m_data{std::as_bytes(data)} {} 92 explicit SpanReader(std::span<const std::byte> data) : m_data{data} {} 93 94 template<typename T> 95 SpanReader& operator>>(T&& obj) 96 { 97 ::Unserialize(*this, obj); 98 return (*this); 99 } 100 101 size_t size() const { return m_data.size(); } 102 bool empty() const { return m_data.empty(); } 103 104 void read(std::span<std::byte> dst) 105 { 106 if (dst.size() == 0) { 107 return; 108 } 109 110 // Read from the beginning of the buffer 111 if (dst.size() > m_data.size()) { 112 throw std::ios_base::failure("SpanReader::read(): end of data"); 113 } 114 memcpy(dst.data(), m_data.data(), dst.size()); 115 m_data = m_data.subspan(dst.size()); 116 } 117 118 void ignore(size_t n) 119 { 120 m_data = m_data.subspan(n); 121 } 122 }; 123 124 /** Double ended buffer combining vector and stream-like interfaces. 125 * 126 * >> and << read and write unformatted data using the above serialization templates. 127 * Fills with data in linear time; some stringstream implementations take N^2 time. 128 */ 129 class DataStream 130 { 131 protected: 132 using vector_type = SerializeData; 133 vector_type vch; 134 vector_type::size_type m_read_pos{0}; 135 136 public: 137 typedef vector_type::allocator_type allocator_type; 138 typedef vector_type::size_type size_type; 139 typedef vector_type::difference_type difference_type; 140 typedef vector_type::reference reference; 141 typedef vector_type::const_reference const_reference; 142 typedef vector_type::value_type value_type; 143 typedef vector_type::iterator iterator; 144 typedef vector_type::const_iterator const_iterator; 145 typedef vector_type::reverse_iterator reverse_iterator; 146 147 explicit DataStream() = default; 148 explicit DataStream(std::span<const uint8_t> sp) : DataStream{std::as_bytes(sp)} {} 149 explicit DataStream(std::span<const value_type> sp) : vch(sp.data(), sp.data() + sp.size()) {} 150 151 std::string str() const 152 { 153 return std::string{UCharCast(data()), UCharCast(data() + size())}; 154 } 155 156 157 // 158 // Vector subset 159 // 160 const_iterator begin() const { return vch.begin() + m_read_pos; } 161 iterator begin() { return vch.begin() + m_read_pos; } 162 const_iterator end() const { return vch.end(); } 163 iterator end() { return vch.end(); } 164 size_type size() const { return vch.size() - m_read_pos; } 165 bool empty() const { return vch.size() == m_read_pos; } 166 void resize(size_type n, value_type c = value_type{}) { vch.resize(n + m_read_pos, c); } 167 void reserve(size_type n) { vch.reserve(n + m_read_pos); } 168 const_reference operator[](size_type pos) const { return vch[pos + m_read_pos]; } 169 reference operator[](size_type pos) { return vch[pos + m_read_pos]; } 170 void clear() { vch.clear(); m_read_pos = 0; } 171 value_type* data() { return vch.data() + m_read_pos; } 172 const value_type* data() const { return vch.data() + m_read_pos; } 173 174 inline void Compact() 175 { 176 vch.erase(vch.begin(), vch.begin() + m_read_pos); 177 m_read_pos = 0; 178 } 179 180 bool Rewind(std::optional<size_type> n = std::nullopt) 181 { 182 // Total rewind if no size is passed 183 if (!n) { 184 m_read_pos = 0; 185 return true; 186 } 187 // Rewind by n characters if the buffer hasn't been compacted yet 188 if (*n > m_read_pos) 189 return false; 190 m_read_pos -= *n; 191 return true; 192 } 193 194 195 // 196 // Stream subset 197 // 198 bool eof() const { return size() == 0; } 199 int in_avail() const { return size(); } 200 201 void read(std::span<value_type> dst) 202 { 203 if (dst.size() == 0) return; 204 205 // Read from the beginning of the buffer 206 auto next_read_pos{CheckedAdd(m_read_pos, dst.size())}; 207 if (!next_read_pos.has_value() || next_read_pos.value() > vch.size()) { 208 throw std::ios_base::failure("DataStream::read(): end of data"); 209 } 210 memcpy(dst.data(), &vch[m_read_pos], dst.size()); 211 if (next_read_pos.value() == vch.size()) { 212 m_read_pos = 0; 213 vch.clear(); 214 return; 215 } 216 m_read_pos = next_read_pos.value(); 217 } 218 219 void ignore(size_t num_ignore) 220 { 221 // Ignore from the beginning of the buffer 222 auto next_read_pos{CheckedAdd(m_read_pos, num_ignore)}; 223 if (!next_read_pos.has_value() || next_read_pos.value() > vch.size()) { 224 throw std::ios_base::failure("DataStream::ignore(): end of data"); 225 } 226 if (next_read_pos.value() == vch.size()) { 227 m_read_pos = 0; 228 vch.clear(); 229 return; 230 } 231 m_read_pos = next_read_pos.value(); 232 } 233 234 void write(std::span<const value_type> src) 235 { 236 // Write to the end of the buffer 237 vch.insert(vch.end(), src.begin(), src.end()); 238 } 239 240 template<typename T> 241 DataStream& operator<<(const T& obj) 242 { 243 ::Serialize(*this, obj); 244 return (*this); 245 } 246 247 template <typename T> 248 DataStream& operator>>(T&& obj) 249 { 250 ::Unserialize(*this, obj); 251 return (*this); 252 } 253 254 /** Compute total memory usage of this object (own memory + any dynamic memory). */ 255 size_t GetMemoryUsage() const noexcept; 256 }; 257 258 template <typename IStream> 259 class BitStreamReader 260 { 261 private: 262 IStream& m_istream; 263 264 /// Buffered byte read in from the input stream. A new byte is read into the 265 /// buffer when m_offset reaches 8. 266 uint8_t m_buffer{0}; 267 268 /// Number of high order bits in m_buffer already returned by previous 269 /// Read() calls. The next bit to be returned is at this offset from the 270 /// most significant bit position. 271 int m_offset{8}; 272 273 public: 274 explicit BitStreamReader(IStream& istream) : m_istream(istream) {} 275 276 /** Read the specified number of bits from the stream. The data is returned 277 * in the nbits least significant bits of a 64-bit uint. 278 */ 279 uint64_t Read(int nbits) { 280 if (nbits < 0 || nbits > 64) { 281 throw std::out_of_range("nbits must be between 0 and 64"); 282 } 283 284 uint64_t data = 0; 285 while (nbits > 0) { 286 if (m_offset == 8) { 287 m_istream >> m_buffer; 288 m_offset = 0; 289 } 290 291 int bits = std::min(8 - m_offset, nbits); 292 data <<= bits; 293 data |= static_cast<uint8_t>(m_buffer << m_offset) >> (8 - bits); 294 m_offset += bits; 295 nbits -= bits; 296 } 297 return data; 298 } 299 }; 300 301 template <typename OStream> 302 class BitStreamWriter 303 { 304 private: 305 OStream& m_ostream; 306 307 /// Buffered byte waiting to be written to the output stream. The byte is 308 /// written buffer when m_offset reaches 8 or Flush() is called. 309 uint8_t m_buffer{0}; 310 311 /// Number of high order bits in m_buffer already written by previous 312 /// Write() calls and not yet flushed to the stream. The next bit to be 313 /// written to is at this offset from the most significant bit position. 314 int m_offset{0}; 315 316 public: 317 explicit BitStreamWriter(OStream& ostream) : m_ostream(ostream) {} 318 319 ~BitStreamWriter() 320 { 321 Flush(); 322 } 323 324 /** Write the nbits least significant bits of a 64-bit int to the output 325 * stream. Data is buffered until it completes an octet. 326 */ 327 void Write(uint64_t data, int nbits) { 328 if (nbits < 0 || nbits > 64) { 329 throw std::out_of_range("nbits must be between 0 and 64"); 330 } 331 332 while (nbits > 0) { 333 int bits = std::min(8 - m_offset, nbits); 334 m_buffer |= (data << (64 - nbits)) >> (64 - 8 + m_offset); 335 m_offset += bits; 336 nbits -= bits; 337 338 if (m_offset == 8) { 339 Flush(); 340 } 341 } 342 } 343 344 /** Flush any unwritten bits to the output stream, padding with 0's to the 345 * next byte boundary. 346 */ 347 void Flush() { 348 if (m_offset == 0) { 349 return; 350 } 351 352 m_ostream << m_buffer; 353 m_buffer = 0; 354 m_offset = 0; 355 } 356 }; 357 358 /** Non-refcounted RAII wrapper for FILE* 359 * 360 * Will automatically close the file when it goes out of scope if not null. 361 * If you're returning the file pointer, return file.release(). 362 * If you need to close the file early, use autofile.fclose() instead of fclose(underlying_FILE). 363 * 364 * @note If the file has been written to, then the caller must close it 365 * explicitly with the `fclose()` method, check if it returns an error and treat 366 * such an error as if the `write()` method failed. The OS's `fclose(3)` may 367 * fail to flush to disk data that has been previously written, rendering the 368 * file corrupt. 369 */ 370 class AutoFile 371 { 372 protected: 373 std::FILE* m_file; 374 Obfuscation m_obfuscation; 375 std::optional<int64_t> m_position; 376 bool m_was_written{false}; 377 378 public: 379 explicit AutoFile(std::FILE* file, const Obfuscation& obfuscation = {}); 380 381 ~AutoFile() 382 { 383 if (m_was_written) { 384 // Callers that wrote to the file must have closed it explicitly 385 // with the fclose() method and checked that the close succeeded. 386 // This is because here in the destructor we have no way to signal 387 // errors from fclose() which, after write, could mean the file is 388 // corrupted and must be handled properly at the call site. 389 // Destructors in C++ cannot signal an error to the callers because 390 // they do not return a value and are not allowed to throw exceptions. 391 Assume(IsNull()); 392 } 393 394 if (fclose() != 0) { 395 LogError("Failed to close file: %s", SysErrorString(errno)); 396 } 397 } 398 399 // Disallow copies 400 AutoFile(const AutoFile&) = delete; 401 AutoFile& operator=(const AutoFile&) = delete; 402 403 bool feof() const { return std::feof(m_file); } 404 405 [[nodiscard]] int fclose() 406 { 407 if (auto rel{release()}) return std::fclose(rel); 408 return 0; 409 } 410 411 /** Get wrapped FILE* with transfer of ownership. 412 * @note This will invalidate the AutoFile object, and makes it the responsibility of the caller 413 * of this function to clean up the returned FILE*. 414 */ 415 std::FILE* release() 416 { 417 std::FILE* ret{m_file}; 418 m_file = nullptr; 419 return ret; 420 } 421 422 /** Return true if the wrapped FILE* is nullptr, false otherwise. 423 */ 424 bool IsNull() const { return m_file == nullptr; } 425 426 /** Continue with a different XOR key */ 427 void SetObfuscation(const Obfuscation& obfuscation) { m_obfuscation = obfuscation; } 428 429 /** Implementation detail, only used internally. */ 430 std::size_t detail_fread(std::span<std::byte> dst); 431 432 /** Wrapper around fseek(). Will throw if seeking is not possible. */ 433 void seek(int64_t offset, int origin); 434 435 /** Find position within the file. Will throw if unknown. */ 436 int64_t tell(); 437 438 /** Return the size of the file. Will throw if unknown. */ 439 int64_t size(); 440 441 /** Wrapper around FileCommit(). */ 442 bool Commit(); 443 444 /** Wrapper around TruncateFile(). */ 445 bool Truncate(unsigned size); 446 447 //! Write a mutable buffer more efficiently than write(), obfuscating the buffer in-place. 448 void write_buffer(std::span<std::byte> src); 449 450 // 451 // Stream subset 452 // 453 void read(std::span<std::byte> dst); 454 void ignore(size_t nSize); 455 void write(std::span<const std::byte> src); 456 457 template <typename T> 458 AutoFile& operator<<(const T& obj) 459 { 460 ::Serialize(*this, obj); 461 return *this; 462 } 463 464 template <typename T> 465 AutoFile& operator>>(T&& obj) 466 { 467 ::Unserialize(*this, obj); 468 return *this; 469 } 470 }; 471 472 using DataBuffer = std::vector<std::byte>; 473 474 /** Wrapper around an AutoFile& that implements a ring buffer to 475 * deserialize from. It guarantees the ability to rewind a given number of bytes. 476 * 477 * Will automatically close the file when it goes out of scope if not null. 478 * If you need to close the file early, use file.fclose() instead of fclose(file). 479 */ 480 class BufferedFile 481 { 482 private: 483 AutoFile& m_src; 484 uint64_t nSrcPos{0}; //!< how many bytes have been read from source 485 uint64_t m_read_pos{0}; //!< how many bytes have been read from this 486 uint64_t nReadLimit; //!< up to which position we're allowed to read 487 uint64_t nRewind; //!< how many bytes we guarantee to rewind 488 DataBuffer vchBuf; 489 490 //! read data from the source to fill the buffer 491 bool Fill() { 492 unsigned int pos = nSrcPos % vchBuf.size(); 493 unsigned int readNow = vchBuf.size() - pos; 494 unsigned int nAvail = vchBuf.size() - (nSrcPos - m_read_pos) - nRewind; 495 if (nAvail < readNow) 496 readNow = nAvail; 497 if (readNow == 0) 498 return false; 499 size_t nBytes{m_src.detail_fread(std::span{vchBuf}.subspan(pos, readNow))}; 500 if (nBytes == 0) { 501 throw std::ios_base::failure{m_src.feof() ? "BufferedFile::Fill: end of file" : "BufferedFile::Fill: fread failed"}; 502 } 503 nSrcPos += nBytes; 504 return true; 505 } 506 507 //! Advance the stream's read pointer (m_read_pos) by up to 'length' bytes, 508 //! filling the buffer from the file so that at least one byte is available. 509 //! Return a pointer to the available buffer data and the number of bytes 510 //! (which may be less than the requested length) that may be accessed 511 //! beginning at that pointer. 512 std::pair<std::byte*, size_t> AdvanceStream(size_t length) 513 { 514 assert(m_read_pos <= nSrcPos); 515 if (m_read_pos + length > nReadLimit) { 516 throw std::ios_base::failure("Attempt to position past buffer limit"); 517 } 518 // If there are no bytes available, read from the file. 519 if (m_read_pos == nSrcPos && length > 0) Fill(); 520 521 size_t buffer_offset{static_cast<size_t>(m_read_pos % vchBuf.size())}; 522 size_t buffer_available{static_cast<size_t>(vchBuf.size() - buffer_offset)}; 523 size_t bytes_until_source_pos{static_cast<size_t>(nSrcPos - m_read_pos)}; 524 size_t advance{std::min({length, buffer_available, bytes_until_source_pos})}; 525 m_read_pos += advance; 526 return std::make_pair(&vchBuf[buffer_offset], advance); 527 } 528 529 public: 530 BufferedFile(AutoFile& file LIFETIMEBOUND, uint64_t nBufSize, uint64_t nRewindIn) 531 : m_src{file}, nReadLimit{std::numeric_limits<uint64_t>::max()}, nRewind{nRewindIn}, vchBuf(nBufSize, std::byte{0}) 532 { 533 if (nRewindIn >= nBufSize) 534 throw std::ios_base::failure("Rewind limit must be less than buffer size"); 535 } 536 537 //! check whether we're at the end of the source file 538 bool eof() const { 539 return m_read_pos == nSrcPos && m_src.feof(); 540 } 541 542 //! read a number of bytes 543 void read(std::span<std::byte> dst) 544 { 545 while (dst.size() > 0) { 546 auto [buffer_pointer, length]{AdvanceStream(dst.size())}; 547 memcpy(dst.data(), buffer_pointer, length); 548 dst = dst.subspan(length); 549 } 550 } 551 552 //! Move the read position ahead in the stream to the given position. 553 //! Use SetPos() to back up in the stream, not SkipTo(). 554 void SkipTo(const uint64_t file_pos) 555 { 556 assert(file_pos >= m_read_pos); 557 while (m_read_pos < file_pos) AdvanceStream(file_pos - m_read_pos); 558 } 559 560 //! return the current reading position 561 uint64_t GetPos() const { 562 return m_read_pos; 563 } 564 565 //! rewind to a given reading position 566 bool SetPos(uint64_t nPos) { 567 size_t bufsize = vchBuf.size(); 568 if (nPos + bufsize < nSrcPos) { 569 // rewinding too far, rewind as far as possible 570 m_read_pos = nSrcPos - bufsize; 571 return false; 572 } 573 if (nPos > nSrcPos) { 574 // can't go this far forward, go as far as possible 575 m_read_pos = nSrcPos; 576 return false; 577 } 578 m_read_pos = nPos; 579 return true; 580 } 581 582 //! prevent reading beyond a certain position 583 //! no argument removes the limit 584 bool SetLimit(uint64_t nPos = std::numeric_limits<uint64_t>::max()) { 585 if (nPos < m_read_pos) 586 return false; 587 nReadLimit = nPos; 588 return true; 589 } 590 591 template<typename T> 592 BufferedFile& operator>>(T&& obj) { 593 ::Unserialize(*this, obj); 594 return (*this); 595 } 596 597 //! search for a given byte in the stream, and remain positioned on it 598 void FindByte(std::byte byte) 599 { 600 // For best performance, avoid mod operation within the loop. 601 size_t buf_offset{size_t(m_read_pos % uint64_t(vchBuf.size()))}; 602 while (true) { 603 if (m_read_pos == nSrcPos) { 604 // No more bytes available; read from the file into the buffer, 605 // setting nSrcPos to one beyond the end of the new data. 606 // Throws exception if end-of-file reached. 607 Fill(); 608 } 609 const size_t len{std::min<size_t>(vchBuf.size() - buf_offset, nSrcPos - m_read_pos)}; 610 const auto it_start{vchBuf.begin() + buf_offset}; 611 const auto it_find{std::find(it_start, it_start + len, byte)}; 612 const size_t inc{size_t(std::distance(it_start, it_find))}; 613 m_read_pos += inc; 614 if (inc < len) break; 615 buf_offset += inc; 616 if (buf_offset >= vchBuf.size()) buf_offset = 0; 617 } 618 } 619 }; 620 621 /** 622 * Wrapper that buffers reads from an underlying stream. 623 * Requires underlying stream to support read() and detail_fread() calls 624 * to support fixed-size and variable-sized reads, respectively. 625 */ 626 template <typename S> 627 class BufferedReader 628 { 629 S& m_src; 630 DataBuffer m_buf; 631 size_t m_buf_pos; 632 633 public: 634 //! Requires stream ownership to prevent leaving the stream at an unexpected position after buffered reads. 635 explicit BufferedReader(S&& stream LIFETIMEBOUND, size_t size = 1 << 16) 636 requires std::is_rvalue_reference_v<S&&> 637 : m_src{stream}, m_buf(size), m_buf_pos{size} {} 638 639 void read(std::span<std::byte> dst) 640 { 641 if (const auto available{std::min(dst.size(), m_buf.size() - m_buf_pos)}) { 642 std::copy_n(m_buf.begin() + m_buf_pos, available, dst.begin()); 643 m_buf_pos += available; 644 dst = dst.subspan(available); 645 } 646 if (dst.size()) { 647 assert(m_buf_pos == m_buf.size()); 648 m_src.read(dst); 649 650 m_buf_pos = 0; 651 m_buf.resize(m_src.detail_fread(m_buf)); 652 } 653 } 654 655 template <typename T> 656 BufferedReader& operator>>(T&& obj) 657 { 658 Unserialize(*this, obj); 659 return *this; 660 } 661 }; 662 663 /** 664 * Wrapper that buffers writes to an underlying stream. 665 * Requires underlying stream to support write_buffer() method 666 * for efficient buffer flushing and obfuscation. 667 */ 668 template <typename S> 669 class BufferedWriter 670 { 671 S& m_dst; 672 DataBuffer m_buf; 673 size_t m_buf_pos{0}; 674 675 public: 676 explicit BufferedWriter(S& stream LIFETIMEBOUND, size_t size = 1 << 16) : m_dst{stream}, m_buf(size) {} 677 678 ~BufferedWriter() { flush(); } 679 680 void flush() 681 { 682 if (m_buf_pos) m_dst.write_buffer(std::span{m_buf}.first(m_buf_pos)); 683 m_buf_pos = 0; 684 } 685 686 void write(std::span<const std::byte> src) 687 { 688 while (const auto available{std::min(src.size(), m_buf.size() - m_buf_pos)}) { 689 std::copy_n(src.begin(), available, m_buf.begin() + m_buf_pos); 690 m_buf_pos += available; 691 if (m_buf_pos == m_buf.size()) flush(); 692 src = src.subspan(available); 693 } 694 } 695 696 template <typename T> 697 BufferedWriter& operator<<(const T& obj) 698 { 699 Serialize(*this, obj); 700 return *this; 701 } 702 }; 703 704 #endif // BITCOIN_STREAMS_H