/ src / leveldb / db / log_reader.cc
log_reader.cc
  1  // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2  // Use of this source code is governed by a BSD-style license that can be
  3  // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4  
  5  #include "db/log_reader.h"
  6  
  7  #include <stdio.h>
  8  
  9  #include "leveldb/env.h"
 10  #include "util/coding.h"
 11  #include "util/crc32c.h"
 12  
 13  namespace leveldb {
 14  namespace log {
 15  
 16  Reader::Reporter::~Reporter() = default;
 17  
 18  Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
 19                 uint64_t initial_offset)
 20      : file_(file),
 21        reporter_(reporter),
 22        checksum_(checksum),
 23        backing_store_(new char[kBlockSize]),
 24        buffer_(),
 25        eof_(false),
 26        last_record_offset_(0),
 27        end_of_buffer_offset_(0),
 28        initial_offset_(initial_offset),
 29        resyncing_(initial_offset > 0) {}
 30  
 31  Reader::~Reader() { delete[] backing_store_; }
 32  
 33  bool Reader::SkipToInitialBlock() {
 34    const size_t offset_in_block = initial_offset_ % kBlockSize;
 35    uint64_t block_start_location = initial_offset_ - offset_in_block;
 36  
 37    // Don't search a block if we'd be in the trailer
 38    if (offset_in_block > kBlockSize - 6) {
 39      block_start_location += kBlockSize;
 40    }
 41  
 42    end_of_buffer_offset_ = block_start_location;
 43  
 44    // Skip to start of first block that can contain the initial record
 45    if (block_start_location > 0) {
 46      Status skip_status = file_->Skip(block_start_location);
 47      if (!skip_status.ok()) {
 48        ReportDrop(block_start_location, skip_status);
 49        return false;
 50      }
 51    }
 52  
 53    return true;
 54  }
 55  
 56  bool Reader::ReadRecord(Slice* record, std::string* scratch) {
 57    if (last_record_offset_ < initial_offset_) {
 58      if (!SkipToInitialBlock()) {
 59        return false;
 60      }
 61    }
 62  
 63    scratch->clear();
 64    record->clear();
 65    bool in_fragmented_record = false;
 66    // Record offset of the logical record that we're reading
 67    // 0 is a dummy value to make compilers happy
 68    uint64_t prospective_record_offset = 0;
 69  
 70    Slice fragment;
 71    while (true) {
 72      const unsigned int record_type = ReadPhysicalRecord(&fragment);
 73  
 74      // ReadPhysicalRecord may have only had an empty trailer remaining in its
 75      // internal buffer. Calculate the offset of the next physical record now
 76      // that it has returned, properly accounting for its header size.
 77      uint64_t physical_record_offset =
 78          end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();
 79  
 80      if (resyncing_) {
 81        if (record_type == kMiddleType) {
 82          continue;
 83        } else if (record_type == kLastType) {
 84          resyncing_ = false;
 85          continue;
 86        } else {
 87          resyncing_ = false;
 88        }
 89      }
 90  
 91      switch (record_type) {
 92        case kFullType:
 93          if (in_fragmented_record) {
 94            // Handle bug in earlier versions of log::Writer where
 95            // it could emit an empty kFirstType record at the tail end
 96            // of a block followed by a kFullType or kFirstType record
 97            // at the beginning of the next block.
 98            if (!scratch->empty()) {
 99              ReportCorruption(scratch->size(), "partial record without end(1)");
100            }
101          }
102          prospective_record_offset = physical_record_offset;
103          scratch->clear();
104          *record = fragment;
105          last_record_offset_ = prospective_record_offset;
106          return true;
107  
108        case kFirstType:
109          if (in_fragmented_record) {
110            // Handle bug in earlier versions of log::Writer where
111            // it could emit an empty kFirstType record at the tail end
112            // of a block followed by a kFullType or kFirstType record
113            // at the beginning of the next block.
114            if (!scratch->empty()) {
115              ReportCorruption(scratch->size(), "partial record without end(2)");
116            }
117          }
118          prospective_record_offset = physical_record_offset;
119          scratch->assign(fragment.data(), fragment.size());
120          in_fragmented_record = true;
121          break;
122  
123        case kMiddleType:
124          if (!in_fragmented_record) {
125            ReportCorruption(fragment.size(),
126                             "missing start of fragmented record(1)");
127          } else {
128            scratch->append(fragment.data(), fragment.size());
129          }
130          break;
131  
132        case kLastType:
133          if (!in_fragmented_record) {
134            ReportCorruption(fragment.size(),
135                             "missing start of fragmented record(2)");
136          } else {
137            scratch->append(fragment.data(), fragment.size());
138            *record = Slice(*scratch);
139            last_record_offset_ = prospective_record_offset;
140            return true;
141          }
142          break;
143  
144        case kEof:
145          if (in_fragmented_record) {
146            // This can be caused by the writer dying immediately after
147            // writing a physical record but before completing the next; don't
148            // treat it as a corruption, just ignore the entire logical record.
149            scratch->clear();
150          }
151          return false;
152  
153        case kBadRecord:
154          if (in_fragmented_record) {
155            ReportCorruption(scratch->size(), "error in middle of record");
156            in_fragmented_record = false;
157            scratch->clear();
158          }
159          break;
160  
161        default: {
162          char buf[40];
163          snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
164          ReportCorruption(
165              (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
166              buf);
167          in_fragmented_record = false;
168          scratch->clear();
169          break;
170        }
171      }
172    }
173    return false;
174  }
175  
176  uint64_t Reader::LastRecordOffset() { return last_record_offset_; }
177  
178  void Reader::ReportCorruption(uint64_t bytes, const char* reason) {
179    ReportDrop(bytes, Status::Corruption(reason, file_->GetName()));
180  }
181  
182  void Reader::ReportDrop(uint64_t bytes, const Status& reason) {
183    if (reporter_ != nullptr &&
184        end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {
185      reporter_->Corruption(static_cast<size_t>(bytes), reason);
186    }
187  }
188  
189  unsigned int Reader::ReadPhysicalRecord(Slice* result) {
190    while (true) {
191      if (buffer_.size() < kHeaderSize) {
192        if (!eof_) {
193          // Last read was a full read, so this is a trailer to skip
194          buffer_.clear();
195          Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
196          end_of_buffer_offset_ += buffer_.size();
197          if (!status.ok()) {
198            buffer_.clear();
199            ReportDrop(kBlockSize, status);
200            eof_ = true;
201            return kEof;
202          } else if (buffer_.size() < kBlockSize) {
203            eof_ = true;
204          }
205          continue;
206        } else {
207          // Note that if buffer_ is non-empty, we have a truncated header at the
208          // end of the file, which can be caused by the writer crashing in the
209          // middle of writing the header. Instead of considering this an error,
210          // just report EOF.
211          buffer_.clear();
212          return kEof;
213        }
214      }
215  
216      // Parse the header
217      const char* header = buffer_.data();
218      const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
219      const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
220      const unsigned int type = header[6];
221      const uint32_t length = a | (b << 8);
222      if (kHeaderSize + length > buffer_.size()) {
223        size_t drop_size = buffer_.size();
224        buffer_.clear();
225        if (!eof_) {
226          ReportCorruption(drop_size, "bad record length");
227          return kBadRecord;
228        }
229        // If the end of the file has been reached without reading |length| bytes
230        // of payload, assume the writer died in the middle of writing the record.
231        // Don't report a corruption.
232        return kEof;
233      }
234  
235      if (type == kZeroType && length == 0) {
236        // Skip zero length record without reporting any drops since
237        // such records are produced by the mmap based writing code in
238        // env_posix.cc that preallocates file regions.
239        buffer_.clear();
240        return kBadRecord;
241      }
242  
243      // Check crc
244      if (checksum_) {
245        uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
246        uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
247        if (actual_crc != expected_crc) {
248          // Drop the rest of the buffer since "length" itself may have
249          // been corrupted and if we trust it, we could find some
250          // fragment of a real log record that just happens to look
251          // like a valid log record.
252          size_t drop_size = buffer_.size();
253          buffer_.clear();
254          ReportCorruption(drop_size, "checksum mismatch");
255          return kBadRecord;
256        }
257      }
258  
259      buffer_.remove_prefix(kHeaderSize + length);
260  
261      // Skip physical record that started before initial_offset_
262      if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
263          initial_offset_) {
264        result->clear();
265        return kBadRecord;
266      }
267  
268      *result = Slice(header + kHeaderSize, length);
269      return type;
270    }
271  }
272  
273  }  // namespace log
274  }  // namespace leveldb