log_reader.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5 #include "db/log_reader.h" 6 7 #include <stdio.h> 8 9 #include "leveldb/env.h" 10 #include "util/coding.h" 11 #include "util/crc32c.h" 12 13 namespace leveldb { 14 namespace log { 15 16 Reader::Reporter::~Reporter() = default; 17 18 Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum, 19 uint64_t initial_offset) 20 : file_(file), 21 reporter_(reporter), 22 checksum_(checksum), 23 backing_store_(new char[kBlockSize]), 24 buffer_(), 25 eof_(false), 26 last_record_offset_(0), 27 end_of_buffer_offset_(0), 28 initial_offset_(initial_offset), 29 resyncing_(initial_offset > 0) {} 30 31 Reader::~Reader() { delete[] backing_store_; } 32 33 bool Reader::SkipToInitialBlock() { 34 const size_t offset_in_block = initial_offset_ % kBlockSize; 35 uint64_t block_start_location = initial_offset_ - offset_in_block; 36 37 // Don't search a block if we'd be in the trailer 38 if (offset_in_block > kBlockSize - 6) { 39 block_start_location += kBlockSize; 40 } 41 42 end_of_buffer_offset_ = block_start_location; 43 44 // Skip to start of first block that can contain the initial record 45 if (block_start_location > 0) { 46 Status skip_status = file_->Skip(block_start_location); 47 if (!skip_status.ok()) { 48 ReportDrop(block_start_location, skip_status); 49 return false; 50 } 51 } 52 53 return true; 54 } 55 56 bool Reader::ReadRecord(Slice* record, std::string* scratch) { 57 if (last_record_offset_ < initial_offset_) { 58 if (!SkipToInitialBlock()) { 59 return false; 60 } 61 } 62 63 scratch->clear(); 64 record->clear(); 65 bool in_fragmented_record = false; 66 // Record offset of the logical record that we're reading 67 // 0 is a dummy value to make compilers happy 68 uint64_t prospective_record_offset = 0; 69 70 Slice fragment; 71 while (true) { 72 const unsigned int record_type = ReadPhysicalRecord(&fragment); 73 74 // ReadPhysicalRecord may have only had an empty trailer remaining in its 75 // internal buffer. Calculate the offset of the next physical record now 76 // that it has returned, properly accounting for its header size. 77 uint64_t physical_record_offset = 78 end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size(); 79 80 if (resyncing_) { 81 if (record_type == kMiddleType) { 82 continue; 83 } else if (record_type == kLastType) { 84 resyncing_ = false; 85 continue; 86 } else { 87 resyncing_ = false; 88 } 89 } 90 91 switch (record_type) { 92 case kFullType: 93 if (in_fragmented_record) { 94 // Handle bug in earlier versions of log::Writer where 95 // it could emit an empty kFirstType record at the tail end 96 // of a block followed by a kFullType or kFirstType record 97 // at the beginning of the next block. 98 if (!scratch->empty()) { 99 ReportCorruption(scratch->size(), "partial record without end(1)"); 100 } 101 } 102 prospective_record_offset = physical_record_offset; 103 scratch->clear(); 104 *record = fragment; 105 last_record_offset_ = prospective_record_offset; 106 return true; 107 108 case kFirstType: 109 if (in_fragmented_record) { 110 // Handle bug in earlier versions of log::Writer where 111 // it could emit an empty kFirstType record at the tail end 112 // of a block followed by a kFullType or kFirstType record 113 // at the beginning of the next block. 114 if (!scratch->empty()) { 115 ReportCorruption(scratch->size(), "partial record without end(2)"); 116 } 117 } 118 prospective_record_offset = physical_record_offset; 119 scratch->assign(fragment.data(), fragment.size()); 120 in_fragmented_record = true; 121 break; 122 123 case kMiddleType: 124 if (!in_fragmented_record) { 125 ReportCorruption(fragment.size(), 126 "missing start of fragmented record(1)"); 127 } else { 128 scratch->append(fragment.data(), fragment.size()); 129 } 130 break; 131 132 case kLastType: 133 if (!in_fragmented_record) { 134 ReportCorruption(fragment.size(), 135 "missing start of fragmented record(2)"); 136 } else { 137 scratch->append(fragment.data(), fragment.size()); 138 *record = Slice(*scratch); 139 last_record_offset_ = prospective_record_offset; 140 return true; 141 } 142 break; 143 144 case kEof: 145 if (in_fragmented_record) { 146 // This can be caused by the writer dying immediately after 147 // writing a physical record but before completing the next; don't 148 // treat it as a corruption, just ignore the entire logical record. 149 scratch->clear(); 150 } 151 return false; 152 153 case kBadRecord: 154 if (in_fragmented_record) { 155 ReportCorruption(scratch->size(), "error in middle of record"); 156 in_fragmented_record = false; 157 scratch->clear(); 158 } 159 break; 160 161 default: { 162 char buf[40]; 163 snprintf(buf, sizeof(buf), "unknown record type %u", record_type); 164 ReportCorruption( 165 (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), 166 buf); 167 in_fragmented_record = false; 168 scratch->clear(); 169 break; 170 } 171 } 172 } 173 return false; 174 } 175 176 uint64_t Reader::LastRecordOffset() { return last_record_offset_; } 177 178 void Reader::ReportCorruption(uint64_t bytes, const char* reason) { 179 ReportDrop(bytes, Status::Corruption(reason, file_->GetName())); 180 } 181 182 void Reader::ReportDrop(uint64_t bytes, const Status& reason) { 183 if (reporter_ != nullptr && 184 end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) { 185 reporter_->Corruption(static_cast<size_t>(bytes), reason); 186 } 187 } 188 189 unsigned int Reader::ReadPhysicalRecord(Slice* result) { 190 while (true) { 191 if (buffer_.size() < kHeaderSize) { 192 if (!eof_) { 193 // Last read was a full read, so this is a trailer to skip 194 buffer_.clear(); 195 Status status = file_->Read(kBlockSize, &buffer_, backing_store_); 196 end_of_buffer_offset_ += buffer_.size(); 197 if (!status.ok()) { 198 buffer_.clear(); 199 ReportDrop(kBlockSize, status); 200 eof_ = true; 201 return kEof; 202 } else if (buffer_.size() < kBlockSize) { 203 eof_ = true; 204 } 205 continue; 206 } else { 207 // Note that if buffer_ is non-empty, we have a truncated header at the 208 // end of the file, which can be caused by the writer crashing in the 209 // middle of writing the header. Instead of considering this an error, 210 // just report EOF. 211 buffer_.clear(); 212 return kEof; 213 } 214 } 215 216 // Parse the header 217 const char* header = buffer_.data(); 218 const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; 219 const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; 220 const unsigned int type = header[6]; 221 const uint32_t length = a | (b << 8); 222 if (kHeaderSize + length > buffer_.size()) { 223 size_t drop_size = buffer_.size(); 224 buffer_.clear(); 225 if (!eof_) { 226 ReportCorruption(drop_size, "bad record length"); 227 return kBadRecord; 228 } 229 // If the end of the file has been reached without reading |length| bytes 230 // of payload, assume the writer died in the middle of writing the record. 231 // Don't report a corruption. 232 return kEof; 233 } 234 235 if (type == kZeroType && length == 0) { 236 // Skip zero length record without reporting any drops since 237 // such records are produced by the mmap based writing code in 238 // env_posix.cc that preallocates file regions. 239 buffer_.clear(); 240 return kBadRecord; 241 } 242 243 // Check crc 244 if (checksum_) { 245 uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); 246 uint32_t actual_crc = crc32c::Value(header + 6, 1 + length); 247 if (actual_crc != expected_crc) { 248 // Drop the rest of the buffer since "length" itself may have 249 // been corrupted and if we trust it, we could find some 250 // fragment of a real log record that just happens to look 251 // like a valid log record. 252 size_t drop_size = buffer_.size(); 253 buffer_.clear(); 254 ReportCorruption(drop_size, "checksum mismatch"); 255 return kBadRecord; 256 } 257 } 258 259 buffer_.remove_prefix(kHeaderSize + length); 260 261 // Skip physical record that started before initial_offset_ 262 if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < 263 initial_offset_) { 264 result->clear(); 265 return kBadRecord; 266 } 267 268 *result = Slice(header + kHeaderSize, length); 269 return type; 270 } 271 } 272 273 } // namespace log 274 } // namespace leveldb