log_test.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5 #include "db/log_reader.h" 6 #include "db/log_writer.h" 7 #include "leveldb/env.h" 8 #include "util/coding.h" 9 #include "util/crc32c.h" 10 #include "util/random.h" 11 #include "util/testharness.h" 12 13 namespace leveldb { 14 namespace log { 15 16 // Construct a string of the specified length made out of the supplied 17 // partial string. 18 static std::string BigString(const std::string& partial_string, size_t n) { 19 std::string result; 20 while (result.size() < n) { 21 result.append(partial_string); 22 } 23 result.resize(n); 24 return result; 25 } 26 27 // Construct a string from a number 28 static std::string NumberString(int n) { 29 char buf[50]; 30 snprintf(buf, sizeof(buf), "%d.", n); 31 return std::string(buf); 32 } 33 34 // Return a skewed potentially long string 35 static std::string RandomSkewedString(int i, Random* rnd) { 36 return BigString(NumberString(i), rnd->Skewed(17)); 37 } 38 39 class LogTest { 40 public: 41 LogTest() 42 : reading_(false), 43 writer_(new Writer(&dest_)), 44 reader_(new Reader(&source_, &report_, true /*checksum*/, 45 0 /*initial_offset*/)) {} 46 47 ~LogTest() { 48 delete writer_; 49 delete reader_; 50 } 51 52 void ReopenForAppend() { 53 delete writer_; 54 writer_ = new Writer(&dest_, dest_.contents_.size()); 55 } 56 57 void Write(const std::string& msg) { 58 ASSERT_TRUE(!reading_) << "Write() after starting to read"; 59 writer_->AddRecord(Slice(msg)); 60 } 61 62 size_t WrittenBytes() const { return dest_.contents_.size(); } 63 64 std::string Read() { 65 if (!reading_) { 66 reading_ = true; 67 source_.contents_ = Slice(dest_.contents_); 68 } 69 std::string scratch; 70 Slice record; 71 if (reader_->ReadRecord(&record, &scratch)) { 72 return record.ToString(); 73 } else { 74 return "EOF"; 75 } 76 } 77 78 void IncrementByte(int offset, int delta) { 79 dest_.contents_[offset] += delta; 80 } 81 82 void SetByte(int offset, char new_byte) { 83 dest_.contents_[offset] = new_byte; 84 } 85 86 void ShrinkSize(int bytes) { 87 dest_.contents_.resize(dest_.contents_.size() - bytes); 88 } 89 90 void FixChecksum(int header_offset, int len) { 91 // Compute crc of type/len/data 92 uint32_t crc = crc32c::Value(&dest_.contents_[header_offset + 6], 1 + len); 93 crc = crc32c::Mask(crc); 94 EncodeFixed32(&dest_.contents_[header_offset], crc); 95 } 96 97 void ForceError() { source_.force_error_ = true; } 98 99 size_t DroppedBytes() const { return report_.dropped_bytes_; } 100 101 std::string ReportMessage() const { return report_.message_; } 102 103 // Returns OK iff recorded error message contains "msg" 104 std::string MatchError(const std::string& msg) const { 105 if (report_.message_.find(msg) == std::string::npos) { 106 return report_.message_; 107 } else { 108 return "OK"; 109 } 110 } 111 112 void WriteInitialOffsetLog() { 113 for (int i = 0; i < num_initial_offset_records_; i++) { 114 std::string record(initial_offset_record_sizes_[i], 115 static_cast<char>('a' + i)); 116 Write(record); 117 } 118 } 119 120 void StartReadingAt(uint64_t initial_offset) { 121 delete reader_; 122 reader_ = new Reader(&source_, &report_, true /*checksum*/, initial_offset); 123 } 124 125 void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { 126 WriteInitialOffsetLog(); 127 reading_ = true; 128 source_.contents_ = Slice(dest_.contents_); 129 Reader* offset_reader = new Reader(&source_, &report_, true /*checksum*/, 130 WrittenBytes() + offset_past_end); 131 Slice record; 132 std::string scratch; 133 ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch)); 134 delete offset_reader; 135 } 136 137 void CheckInitialOffsetRecord(uint64_t initial_offset, 138 int expected_record_offset) { 139 WriteInitialOffsetLog(); 140 reading_ = true; 141 source_.contents_ = Slice(dest_.contents_); 142 Reader* offset_reader = 143 new Reader(&source_, &report_, true /*checksum*/, initial_offset); 144 145 // Read all records from expected_record_offset through the last one. 146 ASSERT_LT(expected_record_offset, num_initial_offset_records_); 147 for (; expected_record_offset < num_initial_offset_records_; 148 ++expected_record_offset) { 149 Slice record; 150 std::string scratch; 151 ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch)); 152 ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset], 153 record.size()); 154 ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset], 155 offset_reader->LastRecordOffset()); 156 ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]); 157 } 158 delete offset_reader; 159 } 160 161 private: 162 class StringDest : public WritableFile { 163 public: 164 Status Close() override { return Status::OK(); } 165 Status Flush() override { return Status::OK(); } 166 Status Sync() override { return Status::OK(); } 167 Status Append(const Slice& slice) override { 168 contents_.append(slice.data(), slice.size()); 169 return Status::OK(); 170 } 171 std::string GetName() const override { return ""; } 172 173 std::string contents_; 174 }; 175 176 class StringSource : public SequentialFile { 177 public: 178 StringSource() : force_error_(false), returned_partial_(false) {} 179 180 Status Read(size_t n, Slice* result, char* scratch) override { 181 ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error"; 182 183 if (force_error_) { 184 force_error_ = false; 185 returned_partial_ = true; 186 return Status::Corruption("read error"); 187 } 188 189 if (contents_.size() < n) { 190 n = contents_.size(); 191 returned_partial_ = true; 192 } 193 *result = Slice(contents_.data(), n); 194 contents_.remove_prefix(n); 195 return Status::OK(); 196 } 197 198 Status Skip(uint64_t n) override { 199 if (n > contents_.size()) { 200 contents_.clear(); 201 return Status::NotFound("in-memory file skipped past end"); 202 } 203 204 contents_.remove_prefix(n); 205 206 return Status::OK(); 207 } 208 std::string GetName() const { return ""; } 209 210 Slice contents_; 211 bool force_error_; 212 bool returned_partial_; 213 }; 214 215 class ReportCollector : public Reader::Reporter { 216 public: 217 ReportCollector() : dropped_bytes_(0) {} 218 void Corruption(size_t bytes, const Status& status) override { 219 dropped_bytes_ += bytes; 220 message_.append(status.ToString()); 221 } 222 223 size_t dropped_bytes_; 224 std::string message_; 225 }; 226 227 // Record metadata for testing initial offset functionality 228 static size_t initial_offset_record_sizes_[]; 229 static uint64_t initial_offset_last_record_offsets_[]; 230 static int num_initial_offset_records_; 231 232 StringDest dest_; 233 StringSource source_; 234 ReportCollector report_; 235 bool reading_; 236 Writer* writer_; 237 Reader* reader_; 238 }; 239 240 size_t LogTest::initial_offset_record_sizes_[] = { 241 10000, // Two sizable records in first block 242 10000, 243 2 * log::kBlockSize - 1000, // Span three blocks 244 1, 245 13716, // Consume all but two bytes of block 3. 246 log::kBlockSize - kHeaderSize, // Consume the entirety of block 4. 247 }; 248 249 uint64_t LogTest::initial_offset_last_record_offsets_[] = { 250 0, 251 kHeaderSize + 10000, 252 2 * (kHeaderSize + 10000), 253 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize, 254 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize + 255 kHeaderSize + 1, 256 3 * log::kBlockSize, 257 }; 258 259 // LogTest::initial_offset_last_record_offsets_ must be defined before this. 260 int LogTest::num_initial_offset_records_ = 261 sizeof(LogTest::initial_offset_last_record_offsets_) / sizeof(uint64_t); 262 263 TEST(LogTest, Empty) { ASSERT_EQ("EOF", Read()); } 264 265 TEST(LogTest, ReadWrite) { 266 Write("foo"); 267 Write("bar"); 268 Write(""); 269 Write("xxxx"); 270 ASSERT_EQ("foo", Read()); 271 ASSERT_EQ("bar", Read()); 272 ASSERT_EQ("", Read()); 273 ASSERT_EQ("xxxx", Read()); 274 ASSERT_EQ("EOF", Read()); 275 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work 276 } 277 278 TEST(LogTest, ManyBlocks) { 279 for (int i = 0; i < 100000; i++) { 280 Write(NumberString(i)); 281 } 282 for (int i = 0; i < 100000; i++) { 283 ASSERT_EQ(NumberString(i), Read()); 284 } 285 ASSERT_EQ("EOF", Read()); 286 } 287 288 TEST(LogTest, Fragmentation) { 289 Write("small"); 290 Write(BigString("medium", 50000)); 291 Write(BigString("large", 100000)); 292 ASSERT_EQ("small", Read()); 293 ASSERT_EQ(BigString("medium", 50000), Read()); 294 ASSERT_EQ(BigString("large", 100000), Read()); 295 ASSERT_EQ("EOF", Read()); 296 } 297 298 TEST(LogTest, MarginalTrailer) { 299 // Make a trailer that is exactly the same length as an empty record. 300 const int n = kBlockSize - 2 * kHeaderSize; 301 Write(BigString("foo", n)); 302 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes()); 303 Write(""); 304 Write("bar"); 305 ASSERT_EQ(BigString("foo", n), Read()); 306 ASSERT_EQ("", Read()); 307 ASSERT_EQ("bar", Read()); 308 ASSERT_EQ("EOF", Read()); 309 } 310 311 TEST(LogTest, MarginalTrailer2) { 312 // Make a trailer that is exactly the same length as an empty record. 313 const int n = kBlockSize - 2 * kHeaderSize; 314 Write(BigString("foo", n)); 315 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes()); 316 Write("bar"); 317 ASSERT_EQ(BigString("foo", n), Read()); 318 ASSERT_EQ("bar", Read()); 319 ASSERT_EQ("EOF", Read()); 320 ASSERT_EQ(0, DroppedBytes()); 321 ASSERT_EQ("", ReportMessage()); 322 } 323 324 TEST(LogTest, ShortTrailer) { 325 const int n = kBlockSize - 2 * kHeaderSize + 4; 326 Write(BigString("foo", n)); 327 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes()); 328 Write(""); 329 Write("bar"); 330 ASSERT_EQ(BigString("foo", n), Read()); 331 ASSERT_EQ("", Read()); 332 ASSERT_EQ("bar", Read()); 333 ASSERT_EQ("EOF", Read()); 334 } 335 336 TEST(LogTest, AlignedEof) { 337 const int n = kBlockSize - 2 * kHeaderSize + 4; 338 Write(BigString("foo", n)); 339 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes()); 340 ASSERT_EQ(BigString("foo", n), Read()); 341 ASSERT_EQ("EOF", Read()); 342 } 343 344 TEST(LogTest, OpenForAppend) { 345 Write("hello"); 346 ReopenForAppend(); 347 Write("world"); 348 ASSERT_EQ("hello", Read()); 349 ASSERT_EQ("world", Read()); 350 ASSERT_EQ("EOF", Read()); 351 } 352 353 TEST(LogTest, RandomRead) { 354 const int N = 500; 355 Random write_rnd(301); 356 for (int i = 0; i < N; i++) { 357 Write(RandomSkewedString(i, &write_rnd)); 358 } 359 Random read_rnd(301); 360 for (int i = 0; i < N; i++) { 361 ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read()); 362 } 363 ASSERT_EQ("EOF", Read()); 364 } 365 366 // Tests of all the error paths in log_reader.cc follow: 367 368 TEST(LogTest, ReadError) { 369 Write("foo"); 370 ForceError(); 371 ASSERT_EQ("EOF", Read()); 372 ASSERT_EQ(kBlockSize, DroppedBytes()); 373 ASSERT_EQ("OK", MatchError("read error")); 374 } 375 376 TEST(LogTest, BadRecordType) { 377 Write("foo"); 378 // Type is stored in header[6] 379 IncrementByte(6, 100); 380 FixChecksum(0, 3); 381 ASSERT_EQ("EOF", Read()); 382 ASSERT_EQ(3, DroppedBytes()); 383 ASSERT_EQ("OK", MatchError("unknown record type")); 384 } 385 386 TEST(LogTest, TruncatedTrailingRecordIsIgnored) { 387 Write("foo"); 388 ShrinkSize(4); // Drop all payload as well as a header byte 389 ASSERT_EQ("EOF", Read()); 390 // Truncated last record is ignored, not treated as an error. 391 ASSERT_EQ(0, DroppedBytes()); 392 ASSERT_EQ("", ReportMessage()); 393 } 394 395 TEST(LogTest, BadLength) { 396 const int kPayloadSize = kBlockSize - kHeaderSize; 397 Write(BigString("bar", kPayloadSize)); 398 Write("foo"); 399 // Least significant size byte is stored in header[4]. 400 IncrementByte(4, 1); 401 ASSERT_EQ("foo", Read()); 402 ASSERT_EQ(kBlockSize, DroppedBytes()); 403 ASSERT_EQ("OK", MatchError("bad record length")); 404 } 405 406 TEST(LogTest, BadLengthAtEndIsIgnored) { 407 Write("foo"); 408 ShrinkSize(1); 409 ASSERT_EQ("EOF", Read()); 410 ASSERT_EQ(0, DroppedBytes()); 411 ASSERT_EQ("", ReportMessage()); 412 } 413 414 TEST(LogTest, ChecksumMismatch) { 415 Write("foo"); 416 IncrementByte(0, 10); 417 ASSERT_EQ("EOF", Read()); 418 ASSERT_EQ(10, DroppedBytes()); 419 ASSERT_EQ("OK", MatchError("checksum mismatch")); 420 } 421 422 TEST(LogTest, UnexpectedMiddleType) { 423 Write("foo"); 424 SetByte(6, kMiddleType); 425 FixChecksum(0, 3); 426 ASSERT_EQ("EOF", Read()); 427 ASSERT_EQ(3, DroppedBytes()); 428 ASSERT_EQ("OK", MatchError("missing start")); 429 } 430 431 TEST(LogTest, UnexpectedLastType) { 432 Write("foo"); 433 SetByte(6, kLastType); 434 FixChecksum(0, 3); 435 ASSERT_EQ("EOF", Read()); 436 ASSERT_EQ(3, DroppedBytes()); 437 ASSERT_EQ("OK", MatchError("missing start")); 438 } 439 440 TEST(LogTest, UnexpectedFullType) { 441 Write("foo"); 442 Write("bar"); 443 SetByte(6, kFirstType); 444 FixChecksum(0, 3); 445 ASSERT_EQ("bar", Read()); 446 ASSERT_EQ("EOF", Read()); 447 ASSERT_EQ(3, DroppedBytes()); 448 ASSERT_EQ("OK", MatchError("partial record without end")); 449 } 450 451 TEST(LogTest, UnexpectedFirstType) { 452 Write("foo"); 453 Write(BigString("bar", 100000)); 454 SetByte(6, kFirstType); 455 FixChecksum(0, 3); 456 ASSERT_EQ(BigString("bar", 100000), Read()); 457 ASSERT_EQ("EOF", Read()); 458 ASSERT_EQ(3, DroppedBytes()); 459 ASSERT_EQ("OK", MatchError("partial record without end")); 460 } 461 462 TEST(LogTest, MissingLastIsIgnored) { 463 Write(BigString("bar", kBlockSize)); 464 // Remove the LAST block, including header. 465 ShrinkSize(14); 466 ASSERT_EQ("EOF", Read()); 467 ASSERT_EQ("", ReportMessage()); 468 ASSERT_EQ(0, DroppedBytes()); 469 } 470 471 TEST(LogTest, PartialLastIsIgnored) { 472 Write(BigString("bar", kBlockSize)); 473 // Cause a bad record length in the LAST block. 474 ShrinkSize(1); 475 ASSERT_EQ("EOF", Read()); 476 ASSERT_EQ("", ReportMessage()); 477 ASSERT_EQ(0, DroppedBytes()); 478 } 479 480 TEST(LogTest, SkipIntoMultiRecord) { 481 // Consider a fragmented record: 482 // first(R1), middle(R1), last(R1), first(R2) 483 // If initial_offset points to a record after first(R1) but before first(R2) 484 // incomplete fragment errors are not actual errors, and must be suppressed 485 // until a new first or full record is encountered. 486 Write(BigString("foo", 3 * kBlockSize)); 487 Write("correct"); 488 StartReadingAt(kBlockSize); 489 490 ASSERT_EQ("correct", Read()); 491 ASSERT_EQ("", ReportMessage()); 492 ASSERT_EQ(0, DroppedBytes()); 493 ASSERT_EQ("EOF", Read()); 494 } 495 496 TEST(LogTest, ErrorJoinsRecords) { 497 // Consider two fragmented records: 498 // first(R1) last(R1) first(R2) last(R2) 499 // where the middle two fragments disappear. We do not want 500 // first(R1),last(R2) to get joined and returned as a valid record. 501 502 // Write records that span two blocks 503 Write(BigString("foo", kBlockSize)); 504 Write(BigString("bar", kBlockSize)); 505 Write("correct"); 506 507 // Wipe the middle block 508 for (int offset = kBlockSize; offset < 2 * kBlockSize; offset++) { 509 SetByte(offset, 'x'); 510 } 511 512 ASSERT_EQ("correct", Read()); 513 ASSERT_EQ("EOF", Read()); 514 const size_t dropped = DroppedBytes(); 515 ASSERT_LE(dropped, 2 * kBlockSize + 100); 516 ASSERT_GE(dropped, 2 * kBlockSize); 517 } 518 519 TEST(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); } 520 521 TEST(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); } 522 523 TEST(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); } 524 525 TEST(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); } 526 527 TEST(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); } 528 529 TEST(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); } 530 531 TEST(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); } 532 533 TEST(LogTest, ReadFourthFirstBlockTrailer) { 534 CheckInitialOffsetRecord(log::kBlockSize - 4, 3); 535 } 536 537 TEST(LogTest, ReadFourthMiddleBlock) { 538 CheckInitialOffsetRecord(log::kBlockSize + 1, 3); 539 } 540 541 TEST(LogTest, ReadFourthLastBlock) { 542 CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3); 543 } 544 545 TEST(LogTest, ReadFourthStart) { 546 CheckInitialOffsetRecord( 547 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize, 548 3); 549 } 550 551 TEST(LogTest, ReadInitialOffsetIntoBlockPadding) { 552 CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5); 553 } 554 555 TEST(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); } 556 557 TEST(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); } 558 559 } // namespace log 560 } // namespace leveldb 561 562 int main(int argc, char** argv) { return leveldb::test::RunAllTests(); }