streams.h
1 // Copyright (c) 2009-2010 Satoshi Nakamoto 2 // Copyright (c) 2009-present The Bitcoin Core developers 3 // Distributed under the MIT software license, see the accompanying 4 // file COPYING or http://www.opensource.org/licenses/mit-license.php. 5 6 #ifndef BITCOIN_STREAMS_H 7 #define BITCOIN_STREAMS_H 8 9 #include <serialize.h> 10 #include <span.h> 11 #include <support/allocators/zeroafterfree.h> 12 #include <util/check.h> 13 #include <util/log.h> 14 #include <util/obfuscation.h> 15 #include <util/overflow.h> 16 #include <util/syserror.h> 17 18 #include <algorithm> 19 #include <cassert> 20 #include <cstddef> 21 #include <cstdint> 22 #include <cstdio> 23 #include <cstring> 24 #include <ios> 25 #include <limits> 26 #include <optional> 27 #include <string> 28 #include <vector> 29 30 /* Minimal stream for overwriting and/or appending to an existing byte vector 31 * 32 * The referenced vector will grow as necessary 33 */ 34 class VectorWriter 35 { 36 public: 37 /* 38 * @param[in] vchDataIn Referenced byte vector to overwrite/append 39 * @param[in] nPosIn Starting position. Vector index where writes should start. The vector will initially 40 * grow as necessary to max(nPosIn, vec.size()). So to append, use vec.size(). 41 */ 42 VectorWriter(std::vector<unsigned char>& vchDataIn, size_t nPosIn) : vchData{vchDataIn}, nPos{nPosIn} 43 { 44 if(nPos > vchData.size()) 45 vchData.resize(nPos); 46 } 47 /* 48 * (other params same as above) 49 * @param[in] args A list of items to serialize starting at nPosIn. 50 */ 51 template <typename... Args> 52 VectorWriter(std::vector<unsigned char>& vchDataIn, size_t nPosIn, Args&&... args) : VectorWriter{vchDataIn, nPosIn} 53 { 54 ::SerializeMany(*this, std::forward<Args>(args)...); 55 } 56 void write(std::span<const std::byte> src) 57 { 58 assert(nPos <= vchData.size()); 59 size_t nOverwrite = std::min(src.size(), vchData.size() - nPos); 60 if (nOverwrite) { 61 memcpy(vchData.data() + nPos, src.data(), nOverwrite); 62 } 63 if (nOverwrite < src.size()) { 64 vchData.insert(vchData.end(), UCharCast(src.data()) + nOverwrite, UCharCast(src.data() + src.size())); 65 } 66 nPos += src.size(); 67 } 68 template <typename T> 69 VectorWriter& operator<<(const T& obj) 70 { 71 ::Serialize(*this, obj); 72 return (*this); 73 } 74 75 private: 76 std::vector<unsigned char>& vchData; 77 size_t nPos; 78 }; 79 80 /** Minimal stream for reading from an existing byte array by std::span. 81 */ 82 class SpanReader 83 { 84 private: 85 std::span<const std::byte> m_data; 86 87 public: 88 /** 89 * @param[in] data Referenced byte vector to overwrite/append 90 */ 91 explicit SpanReader(std::span<const unsigned char> data) : m_data{std::as_bytes(data)} {} 92 explicit SpanReader(std::span<const std::byte> data) : m_data{data} {} 93 94 template<typename T> 95 SpanReader& operator>>(T&& obj) 96 { 97 ::Unserialize(*this, obj); 98 return (*this); 99 } 100 101 size_t size() const { return m_data.size(); } 102 bool empty() const { return m_data.empty(); } 103 104 void read(std::span<std::byte> dst) 105 { 106 if (dst.size() == 0) { 107 return; 108 } 109 110 // Read from the beginning of the buffer 111 if (dst.size() > m_data.size()) { 112 throw std::ios_base::failure("SpanReader::read(): end of data"); 113 } 114 memcpy(dst.data(), m_data.data(), dst.size()); 115 m_data = m_data.subspan(dst.size()); 116 } 117 118 void ignore(size_t n) 119 { 120 if (n > m_data.size()) { 121 throw std::ios_base::failure("SpanReader::ignore(): end of data"); 122 } 123 m_data = m_data.subspan(n); 124 } 125 }; 126 127 /** Minimal stream for writing to an existing span of bytes. 128 */ 129 class SpanWriter 130 { 131 private: 132 std::span<std::byte> m_dest; 133 134 public: 135 explicit SpanWriter(std::span<std::byte> dest) : m_dest{dest} {} 136 template <typename... Args> 137 SpanWriter(std::span<std::byte> dest, Args&&... args) : SpanWriter{dest} 138 { 139 ::SerializeMany(*this, std::forward<Args>(args)...); 140 } 141 142 void write(std::span<const std::byte> src) 143 { 144 if (src.size() > m_dest.size()) { 145 throw std::ios_base::failure("SpanWriter::write(): exceeded buffer size"); 146 } 147 memcpy(m_dest.data(), src.data(), src.size()); 148 m_dest = m_dest.subspan(src.size()); 149 } 150 151 template<typename T> 152 SpanWriter& operator<<(const T& obj) 153 { 154 ::Serialize(*this, obj); 155 return *this; 156 } 157 }; 158 159 /** Double ended buffer combining vector and stream-like interfaces. 160 * 161 * >> and << read and write unformatted data using the above serialization templates. 162 * Fills with data in linear time; some stringstream implementations take N^2 time. 163 */ 164 class DataStream 165 { 166 protected: 167 using vector_type = SerializeData; 168 vector_type vch; 169 vector_type::size_type m_read_pos{0}; 170 171 public: 172 typedef vector_type::allocator_type allocator_type; 173 typedef vector_type::size_type size_type; 174 typedef vector_type::difference_type difference_type; 175 typedef vector_type::reference reference; 176 typedef vector_type::const_reference const_reference; 177 typedef vector_type::value_type value_type; 178 typedef vector_type::iterator iterator; 179 typedef vector_type::const_iterator const_iterator; 180 typedef vector_type::reverse_iterator reverse_iterator; 181 182 explicit DataStream() = default; 183 explicit DataStream(std::span<const uint8_t> sp) : DataStream{std::as_bytes(sp)} {} 184 explicit DataStream(std::span<const value_type> sp) : vch(sp.data(), sp.data() + sp.size()) {} 185 186 std::string str() const 187 { 188 return std::string{UCharCast(data()), UCharCast(data() + size())}; 189 } 190 191 // 192 // Vector subset 193 // 194 const_iterator begin() const { return vch.begin() + m_read_pos; } 195 iterator begin() { return vch.begin() + m_read_pos; } 196 const_iterator end() const { return vch.end(); } 197 iterator end() { return vch.end(); } 198 size_type size() const { return vch.size() - m_read_pos; } 199 bool empty() const { return vch.size() == m_read_pos; } 200 void resize(size_type n, value_type c = value_type{}) { vch.resize(n + m_read_pos, c); } 201 void reserve(size_type n) { vch.reserve(n + m_read_pos); } 202 const_reference operator[](size_type pos) const { return vch[pos + m_read_pos]; } 203 reference operator[](size_type pos) { return vch[pos + m_read_pos]; } 204 void clear() { vch.clear(); m_read_pos = 0; } 205 value_type* data() { return vch.data() + m_read_pos; } 206 const value_type* data() const { return vch.data() + m_read_pos; } 207 208 // 209 // Stream subset 210 // 211 void read(std::span<value_type> dst) 212 { 213 if (dst.size() == 0) return; 214 215 // Read from the beginning of the buffer 216 auto next_read_pos{CheckedAdd(m_read_pos, dst.size())}; 217 if (!next_read_pos.has_value() || next_read_pos.value() > vch.size()) { 218 throw std::ios_base::failure("DataStream::read(): end of data"); 219 } 220 memcpy(dst.data(), &vch[m_read_pos], dst.size()); 221 if (next_read_pos.value() == vch.size()) { 222 // If fully consumed, reset to empty state. 223 clear(); 224 return; 225 } 226 m_read_pos = next_read_pos.value(); 227 } 228 229 void ignore(size_t num_ignore) 230 { 231 // Ignore from the beginning of the buffer 232 auto next_read_pos{CheckedAdd(m_read_pos, num_ignore)}; 233 if (!next_read_pos.has_value() || next_read_pos.value() > vch.size()) { 234 throw std::ios_base::failure("DataStream::ignore(): end of data"); 235 } 236 if (next_read_pos.value() == vch.size()) { 237 // If all bytes are ignored, reset to empty state. 238 clear(); 239 return; 240 } 241 m_read_pos = next_read_pos.value(); 242 } 243 244 void write(std::span<const value_type> src) 245 { 246 // Write to the end of the buffer 247 vch.insert(vch.end(), src.begin(), src.end()); 248 } 249 250 template<typename T> 251 DataStream& operator<<(const T& obj) 252 { 253 ::Serialize(*this, obj); 254 return (*this); 255 } 256 257 template <typename T> 258 DataStream& operator>>(T&& obj) 259 { 260 ::Unserialize(*this, obj); 261 return (*this); 262 } 263 264 /** Compute total memory usage of this object (own memory + any dynamic memory). */ 265 size_t GetMemoryUsage() const noexcept; 266 }; 267 268 template <typename IStream> 269 class BitStreamReader 270 { 271 private: 272 IStream& m_istream; 273 274 /// Buffered byte read in from the input stream. A new byte is read into the 275 /// buffer when m_offset reaches 8. 276 uint8_t m_buffer{0}; 277 278 /// Number of high order bits in m_buffer already returned by previous 279 /// Read() calls. The next bit to be returned is at this offset from the 280 /// most significant bit position. 281 int m_offset{8}; 282 283 public: 284 explicit BitStreamReader(IStream& istream) : m_istream(istream) {} 285 286 /** Read the specified number of bits from the stream. The data is returned 287 * in the nbits least significant bits of a 64-bit uint. 288 */ 289 uint64_t Read(int nbits) { 290 if (nbits < 0 || nbits > 64) { 291 throw std::out_of_range("nbits must be between 0 and 64"); 292 } 293 294 uint64_t data = 0; 295 while (nbits > 0) { 296 if (m_offset == 8) { 297 m_istream >> m_buffer; 298 m_offset = 0; 299 } 300 301 int bits = std::min(8 - m_offset, nbits); 302 data <<= bits; 303 data |= static_cast<uint8_t>(m_buffer << m_offset) >> (8 - bits); 304 m_offset += bits; 305 nbits -= bits; 306 } 307 return data; 308 } 309 }; 310 311 template <typename OStream> 312 class BitStreamWriter 313 { 314 private: 315 OStream& m_ostream; 316 317 /// Buffered byte waiting to be written to the output stream. The byte is 318 /// written buffer when m_offset reaches 8 or Flush() is called. 319 uint8_t m_buffer{0}; 320 321 /// Number of high order bits in m_buffer already written by previous 322 /// Write() calls and not yet flushed to the stream. The next bit to be 323 /// written to is at this offset from the most significant bit position. 324 int m_offset{0}; 325 326 public: 327 explicit BitStreamWriter(OStream& ostream) : m_ostream(ostream) {} 328 329 ~BitStreamWriter() 330 { 331 Flush(); 332 } 333 334 /** Write the nbits least significant bits of a 64-bit int to the output 335 * stream. Data is buffered until it completes an octet. 336 */ 337 void Write(uint64_t data, int nbits) { 338 if (nbits < 0 || nbits > 64) { 339 throw std::out_of_range("nbits must be between 0 and 64"); 340 } 341 342 while (nbits > 0) { 343 int bits = std::min(8 - m_offset, nbits); 344 m_buffer |= (data << (64 - nbits)) >> (64 - 8 + m_offset); 345 m_offset += bits; 346 nbits -= bits; 347 348 if (m_offset == 8) { 349 Flush(); 350 } 351 } 352 } 353 354 /** Flush any unwritten bits to the output stream, padding with 0's to the 355 * next byte boundary. 356 */ 357 void Flush() { 358 if (m_offset == 0) { 359 return; 360 } 361 362 m_ostream << m_buffer; 363 m_buffer = 0; 364 m_offset = 0; 365 } 366 }; 367 368 /** Non-refcounted RAII wrapper for FILE* 369 * 370 * Will automatically close the file when it goes out of scope if not null. 371 * If you're returning the file pointer, return file.release(). 372 * If you need to close the file early, use autofile.fclose() instead of fclose(underlying_FILE). 373 * 374 * @note If the file has been written to, then the caller must close it 375 * explicitly with the `fclose()` method, check if it returns an error and treat 376 * such an error as if the `write()` method failed. The OS's `fclose(3)` may 377 * fail to flush to disk data that has been previously written, rendering the 378 * file corrupt. 379 */ 380 class AutoFile 381 { 382 protected: 383 std::FILE* m_file; 384 Obfuscation m_obfuscation; 385 std::optional<int64_t> m_position; 386 bool m_was_written{false}; 387 388 public: 389 explicit AutoFile(std::FILE* file, const Obfuscation& obfuscation = {}); 390 391 ~AutoFile() 392 { 393 if (m_was_written) { 394 // Callers that wrote to the file must have closed it explicitly 395 // with the fclose() method and checked that the close succeeded. 396 // This is because here in the destructor we have no way to signal 397 // errors from fclose() which, after write, could mean the file is 398 // corrupted and must be handled properly at the call site. 399 // Destructors in C++ cannot signal an error to the callers because 400 // they do not return a value and are not allowed to throw exceptions. 401 Assume(IsNull()); 402 } 403 404 if (fclose() != 0) { 405 LogError("Failed to close file: %s", SysErrorString(errno)); 406 } 407 } 408 409 // Disallow copies 410 AutoFile(const AutoFile&) = delete; 411 AutoFile& operator=(const AutoFile&) = delete; 412 413 bool feof() const { return std::feof(m_file); } 414 415 [[nodiscard]] int fclose() 416 { 417 if (auto rel{release()}) return std::fclose(rel); 418 return 0; 419 } 420 421 /** Get wrapped FILE* with transfer of ownership. 422 * @note This will invalidate the AutoFile object, and makes it the responsibility of the caller 423 * of this function to clean up the returned FILE*. 424 */ 425 std::FILE* release() 426 { 427 std::FILE* ret{m_file}; 428 m_file = nullptr; 429 return ret; 430 } 431 432 /** Return true if the wrapped FILE* is nullptr, false otherwise. 433 */ 434 bool IsNull() const { return m_file == nullptr; } 435 436 /** Continue with a different XOR key */ 437 void SetObfuscation(const Obfuscation& obfuscation) { m_obfuscation = obfuscation; } 438 439 /** Implementation detail, only used internally. */ 440 std::size_t detail_fread(std::span<std::byte> dst); 441 442 /** Wrapper around fseek(). Will throw if seeking is not possible. */ 443 void seek(int64_t offset, int origin); 444 445 /** Find position within the file. Will throw if unknown. */ 446 int64_t tell(); 447 448 /** Return the size of the file. Will throw if unknown. */ 449 int64_t size(); 450 451 /** Wrapper around FileCommit(). */ 452 bool Commit(); 453 454 /** Wrapper around TruncateFile(). */ 455 bool Truncate(unsigned size); 456 457 //! Write a mutable buffer more efficiently than write(), obfuscating the buffer in-place. 458 void write_buffer(std::span<std::byte> src); 459 460 // 461 // Stream subset 462 // 463 void read(std::span<std::byte> dst); 464 void ignore(size_t nSize); 465 void write(std::span<const std::byte> src); 466 467 template <typename T> 468 AutoFile& operator<<(const T& obj) 469 { 470 ::Serialize(*this, obj); 471 return *this; 472 } 473 474 template <typename T> 475 AutoFile& operator>>(T&& obj) 476 { 477 ::Unserialize(*this, obj); 478 return *this; 479 } 480 }; 481 482 using DataBuffer = std::vector<std::byte>; 483 484 /** Wrapper around an AutoFile& that implements a ring buffer to 485 * deserialize from. It guarantees the ability to rewind a given number of bytes. 486 * 487 * Will automatically close the file when it goes out of scope if not null. 488 * If you need to close the file early, use file.fclose() instead of fclose(file). 489 */ 490 class BufferedFile 491 { 492 private: 493 AutoFile& m_src; 494 uint64_t nSrcPos{0}; //!< how many bytes have been read from source 495 uint64_t m_read_pos{0}; //!< how many bytes have been read from this 496 uint64_t nReadLimit; //!< up to which position we're allowed to read 497 uint64_t nRewind; //!< how many bytes we guarantee to rewind 498 DataBuffer vchBuf; 499 500 //! read data from the source to fill the buffer 501 bool Fill() { 502 unsigned int pos = nSrcPos % vchBuf.size(); 503 unsigned int readNow = vchBuf.size() - pos; 504 unsigned int nAvail = vchBuf.size() - (nSrcPos - m_read_pos) - nRewind; 505 if (nAvail < readNow) 506 readNow = nAvail; 507 if (readNow == 0) 508 return false; 509 size_t nBytes{m_src.detail_fread(std::span{vchBuf}.subspan(pos, readNow))}; 510 if (nBytes == 0) { 511 throw std::ios_base::failure{m_src.feof() ? "BufferedFile::Fill: end of file" : "BufferedFile::Fill: fread failed"}; 512 } 513 nSrcPos += nBytes; 514 return true; 515 } 516 517 //! Advance the stream's read pointer (m_read_pos) by up to 'length' bytes, 518 //! filling the buffer from the file so that at least one byte is available. 519 //! Return a pointer to the available buffer data and the number of bytes 520 //! (which may be less than the requested length) that may be accessed 521 //! beginning at that pointer. 522 std::pair<std::byte*, size_t> AdvanceStream(size_t length) 523 { 524 assert(m_read_pos <= nSrcPos); 525 if (m_read_pos + length > nReadLimit) { 526 throw std::ios_base::failure("Attempt to position past buffer limit"); 527 } 528 // If there are no bytes available, read from the file. 529 if (m_read_pos == nSrcPos && length > 0) Fill(); 530 531 size_t buffer_offset{static_cast<size_t>(m_read_pos % vchBuf.size())}; 532 size_t buffer_available{static_cast<size_t>(vchBuf.size() - buffer_offset)}; 533 size_t bytes_until_source_pos{static_cast<size_t>(nSrcPos - m_read_pos)}; 534 size_t advance{std::min({length, buffer_available, bytes_until_source_pos})}; 535 m_read_pos += advance; 536 return std::make_pair(&vchBuf[buffer_offset], advance); 537 } 538 539 public: 540 BufferedFile(AutoFile& file LIFETIMEBOUND, uint64_t nBufSize, uint64_t nRewindIn) 541 : m_src{file}, nReadLimit{std::numeric_limits<uint64_t>::max()}, nRewind{nRewindIn}, vchBuf(nBufSize, std::byte{0}) 542 { 543 if (nRewindIn >= nBufSize) 544 throw std::ios_base::failure("Rewind limit must be less than buffer size"); 545 } 546 547 //! check whether we're at the end of the source file 548 bool eof() const { 549 return m_read_pos == nSrcPos && m_src.feof(); 550 } 551 552 //! read a number of bytes 553 void read(std::span<std::byte> dst) 554 { 555 while (dst.size() > 0) { 556 auto [buffer_pointer, length]{AdvanceStream(dst.size())}; 557 memcpy(dst.data(), buffer_pointer, length); 558 dst = dst.subspan(length); 559 } 560 } 561 562 //! Move the read position ahead in the stream to the given position. 563 //! Use SetPos() to back up in the stream, not SkipTo(). 564 void SkipTo(const uint64_t file_pos) 565 { 566 assert(file_pos >= m_read_pos); 567 while (m_read_pos < file_pos) AdvanceStream(file_pos - m_read_pos); 568 } 569 570 //! return the current reading position 571 uint64_t GetPos() const { 572 return m_read_pos; 573 } 574 575 //! rewind to a given reading position 576 bool SetPos(uint64_t nPos) { 577 size_t bufsize = vchBuf.size(); 578 if (nPos + bufsize < nSrcPos) { 579 // rewinding too far, rewind as far as possible 580 m_read_pos = nSrcPos - bufsize; 581 return false; 582 } 583 if (nPos > nSrcPos) { 584 // can't go this far forward, go as far as possible 585 m_read_pos = nSrcPos; 586 return false; 587 } 588 m_read_pos = nPos; 589 return true; 590 } 591 592 //! prevent reading beyond a certain position 593 //! no argument removes the limit 594 bool SetLimit(uint64_t nPos = std::numeric_limits<uint64_t>::max()) { 595 if (nPos < m_read_pos) 596 return false; 597 nReadLimit = nPos; 598 return true; 599 } 600 601 template<typename T> 602 BufferedFile& operator>>(T&& obj) { 603 ::Unserialize(*this, obj); 604 return (*this); 605 } 606 607 //! search for a given byte in the stream, and remain positioned on it 608 void FindByte(std::byte byte) 609 { 610 // For best performance, avoid mod operation within the loop. 611 size_t buf_offset{size_t(m_read_pos % uint64_t(vchBuf.size()))}; 612 while (true) { 613 if (m_read_pos == nSrcPos) { 614 // No more bytes available; read from the file into the buffer, 615 // setting nSrcPos to one beyond the end of the new data. 616 // Throws exception if end-of-file reached. 617 Fill(); 618 } 619 const size_t len{std::min<size_t>(vchBuf.size() - buf_offset, nSrcPos - m_read_pos)}; 620 const auto it_start{vchBuf.begin() + buf_offset}; 621 const auto it_find{std::find(it_start, it_start + len, byte)}; 622 const size_t inc{size_t(std::distance(it_start, it_find))}; 623 m_read_pos += inc; 624 if (inc < len) break; 625 buf_offset += inc; 626 if (buf_offset >= vchBuf.size()) buf_offset = 0; 627 } 628 } 629 }; 630 631 /** 632 * Wrapper that buffers reads from an underlying stream. 633 * Requires underlying stream to support read() and detail_fread() calls 634 * to support fixed-size and variable-sized reads, respectively. 635 */ 636 template <typename S> 637 class BufferedReader 638 { 639 S& m_src; 640 DataBuffer m_buf; 641 size_t m_buf_pos; 642 643 public: 644 //! Requires stream ownership to prevent leaving the stream at an unexpected position after buffered reads. 645 explicit BufferedReader(S&& stream LIFETIMEBOUND, size_t size = 1 << 16) 646 requires std::is_rvalue_reference_v<S&&> 647 : m_src{stream}, m_buf(size), m_buf_pos{size} {} 648 649 void read(std::span<std::byte> dst) 650 { 651 if (const auto available{std::min(dst.size(), m_buf.size() - m_buf_pos)}) { 652 std::copy_n(m_buf.begin() + m_buf_pos, available, dst.begin()); 653 m_buf_pos += available; 654 dst = dst.subspan(available); 655 } 656 if (dst.size()) { 657 assert(m_buf_pos == m_buf.size()); 658 m_src.read(dst); 659 660 m_buf_pos = 0; 661 m_buf.resize(m_src.detail_fread(m_buf)); 662 } 663 } 664 665 template <typename T> 666 BufferedReader& operator>>(T&& obj) 667 { 668 Unserialize(*this, obj); 669 return *this; 670 } 671 }; 672 673 /** 674 * Wrapper that buffers writes to an underlying stream. 675 * Requires underlying stream to support write_buffer() method 676 * for efficient buffer flushing and obfuscation. 677 */ 678 template <typename S> 679 class BufferedWriter 680 { 681 S& m_dst; 682 DataBuffer m_buf; 683 size_t m_buf_pos{0}; 684 685 public: 686 explicit BufferedWriter(S& stream LIFETIMEBOUND, size_t size = 1 << 16) : m_dst{stream}, m_buf(size) {} 687 688 ~BufferedWriter() { flush(); } 689 690 void flush() 691 { 692 if (m_buf_pos) m_dst.write_buffer(std::span{m_buf}.first(m_buf_pos)); 693 m_buf_pos = 0; 694 } 695 696 void write(std::span<const std::byte> src) 697 { 698 while (const auto available{std::min(src.size(), m_buf.size() - m_buf_pos)}) { 699 std::copy_n(src.begin(), available, m_buf.begin() + m_buf_pos); 700 m_buf_pos += available; 701 if (m_buf_pos == m_buf.size()) flush(); 702 src = src.subspan(available); 703 } 704 } 705 706 template <typename T> 707 BufferedWriter& operator<<(const T& obj) 708 { 709 Serialize(*this, obj); 710 return *this; 711 } 712 }; 713 714 #endif // BITCOIN_STREAMS_H