/ src / leveldb / db / log_test.cc
log_test.cc
  1  // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2  // Use of this source code is governed by a BSD-style license that can be
  3  // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4  
  5  #include "db/log_reader.h"
  6  #include "db/log_writer.h"
  7  #include "leveldb/env.h"
  8  #include "util/coding.h"
  9  #include "util/crc32c.h"
 10  #include "util/random.h"
 11  #include "util/testharness.h"
 12  
 13  namespace leveldb {
 14  namespace log {
 15  
 16  // Construct a string of the specified length made out of the supplied
 17  // partial string.
 18  static std::string BigString(const std::string& partial_string, size_t n) {
 19    std::string result;
 20    while (result.size() < n) {
 21      result.append(partial_string);
 22    }
 23    result.resize(n);
 24    return result;
 25  }
 26  
 27  // Construct a string from a number
 28  static std::string NumberString(int n) {
 29    char buf[50];
 30    snprintf(buf, sizeof(buf), "%d.", n);
 31    return std::string(buf);
 32  }
 33  
 34  // Return a skewed potentially long string
 35  static std::string RandomSkewedString(int i, Random* rnd) {
 36    return BigString(NumberString(i), rnd->Skewed(17));
 37  }
 38  
 39  class LogTest {
 40   public:
 41    LogTest()
 42        : reading_(false),
 43          writer_(new Writer(&dest_)),
 44          reader_(new Reader(&source_, &report_, true /*checksum*/,
 45                             0 /*initial_offset*/)) {}
 46  
 47    ~LogTest() {
 48      delete writer_;
 49      delete reader_;
 50    }
 51  
 52    void ReopenForAppend() {
 53      delete writer_;
 54      writer_ = new Writer(&dest_, dest_.contents_.size());
 55    }
 56  
 57    void Write(const std::string& msg) {
 58      ASSERT_TRUE(!reading_) << "Write() after starting to read";
 59      writer_->AddRecord(Slice(msg));
 60    }
 61  
 62    size_t WrittenBytes() const { return dest_.contents_.size(); }
 63  
 64    std::string Read() {
 65      if (!reading_) {
 66        reading_ = true;
 67        source_.contents_ = Slice(dest_.contents_);
 68      }
 69      std::string scratch;
 70      Slice record;
 71      if (reader_->ReadRecord(&record, &scratch)) {
 72        return record.ToString();
 73      } else {
 74        return "EOF";
 75      }
 76    }
 77  
 78    void IncrementByte(int offset, int delta) {
 79      dest_.contents_[offset] += delta;
 80    }
 81  
 82    void SetByte(int offset, char new_byte) {
 83      dest_.contents_[offset] = new_byte;
 84    }
 85  
 86    void ShrinkSize(int bytes) {
 87      dest_.contents_.resize(dest_.contents_.size() - bytes);
 88    }
 89  
 90    void FixChecksum(int header_offset, int len) {
 91      // Compute crc of type/len/data
 92      uint32_t crc = crc32c::Value(&dest_.contents_[header_offset + 6], 1 + len);
 93      crc = crc32c::Mask(crc);
 94      EncodeFixed32(&dest_.contents_[header_offset], crc);
 95    }
 96  
 97    void ForceError() { source_.force_error_ = true; }
 98  
 99    size_t DroppedBytes() const { return report_.dropped_bytes_; }
100  
101    std::string ReportMessage() const { return report_.message_; }
102  
103    // Returns OK iff recorded error message contains "msg"
104    std::string MatchError(const std::string& msg) const {
105      if (report_.message_.find(msg) == std::string::npos) {
106        return report_.message_;
107      } else {
108        return "OK";
109      }
110    }
111  
112    void WriteInitialOffsetLog() {
113      for (int i = 0; i < num_initial_offset_records_; i++) {
114        std::string record(initial_offset_record_sizes_[i],
115                           static_cast<char>('a' + i));
116        Write(record);
117      }
118    }
119  
120    void StartReadingAt(uint64_t initial_offset) {
121      delete reader_;
122      reader_ = new Reader(&source_, &report_, true /*checksum*/, initial_offset);
123    }
124  
125    void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
126      WriteInitialOffsetLog();
127      reading_ = true;
128      source_.contents_ = Slice(dest_.contents_);
129      Reader* offset_reader = new Reader(&source_, &report_, true /*checksum*/,
130                                         WrittenBytes() + offset_past_end);
131      Slice record;
132      std::string scratch;
133      ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
134      delete offset_reader;
135    }
136  
137    void CheckInitialOffsetRecord(uint64_t initial_offset,
138                                  int expected_record_offset) {
139      WriteInitialOffsetLog();
140      reading_ = true;
141      source_.contents_ = Slice(dest_.contents_);
142      Reader* offset_reader =
143          new Reader(&source_, &report_, true /*checksum*/, initial_offset);
144  
145      // Read all records from expected_record_offset through the last one.
146      ASSERT_LT(expected_record_offset, num_initial_offset_records_);
147      for (; expected_record_offset < num_initial_offset_records_;
148           ++expected_record_offset) {
149        Slice record;
150        std::string scratch;
151        ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
152        ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
153                  record.size());
154        ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
155                  offset_reader->LastRecordOffset());
156        ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
157      }
158      delete offset_reader;
159    }
160  
161   private:
162    class StringDest : public WritableFile {
163     public:
164      Status Close() override { return Status::OK(); }
165      Status Flush() override { return Status::OK(); }
166      Status Sync() override { return Status::OK(); }
167      Status Append(const Slice& slice) override {
168        contents_.append(slice.data(), slice.size());
169        return Status::OK();
170      }
171      std::string GetName() const override { return ""; }
172  
173      std::string contents_;
174    };
175  
176    class StringSource : public SequentialFile {
177     public:
178      StringSource() : force_error_(false), returned_partial_(false) {}
179  
180      Status Read(size_t n, Slice* result, char* scratch) override {
181        ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
182  
183        if (force_error_) {
184          force_error_ = false;
185          returned_partial_ = true;
186          return Status::Corruption("read error");
187        }
188  
189        if (contents_.size() < n) {
190          n = contents_.size();
191          returned_partial_ = true;
192        }
193        *result = Slice(contents_.data(), n);
194        contents_.remove_prefix(n);
195        return Status::OK();
196      }
197  
198      Status Skip(uint64_t n) override {
199        if (n > contents_.size()) {
200          contents_.clear();
201          return Status::NotFound("in-memory file skipped past end");
202        }
203  
204        contents_.remove_prefix(n);
205  
206        return Status::OK();
207      }
208      std::string GetName() const { return ""; }
209  
210      Slice contents_;
211      bool force_error_;
212      bool returned_partial_;
213    };
214  
215    class ReportCollector : public Reader::Reporter {
216     public:
217      ReportCollector() : dropped_bytes_(0) {}
218      void Corruption(size_t bytes, const Status& status) override {
219        dropped_bytes_ += bytes;
220        message_.append(status.ToString());
221      }
222  
223      size_t dropped_bytes_;
224      std::string message_;
225    };
226  
227    // Record metadata for testing initial offset functionality
228    static size_t initial_offset_record_sizes_[];
229    static uint64_t initial_offset_last_record_offsets_[];
230    static int num_initial_offset_records_;
231  
232    StringDest dest_;
233    StringSource source_;
234    ReportCollector report_;
235    bool reading_;
236    Writer* writer_;
237    Reader* reader_;
238  };
239  
240  size_t LogTest::initial_offset_record_sizes_[] = {
241      10000,  // Two sizable records in first block
242      10000,
243      2 * log::kBlockSize - 1000,  // Span three blocks
244      1,
245      13716,                          // Consume all but two bytes of block 3.
246      log::kBlockSize - kHeaderSize,  // Consume the entirety of block 4.
247  };
248  
249  uint64_t LogTest::initial_offset_last_record_offsets_[] = {
250      0,
251      kHeaderSize + 10000,
252      2 * (kHeaderSize + 10000),
253      2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
254      2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize +
255          kHeaderSize + 1,
256      3 * log::kBlockSize,
257  };
258  
259  // LogTest::initial_offset_last_record_offsets_ must be defined before this.
260  int LogTest::num_initial_offset_records_ =
261      sizeof(LogTest::initial_offset_last_record_offsets_) / sizeof(uint64_t);
262  
263  TEST(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
264  
265  TEST(LogTest, ReadWrite) {
266    Write("foo");
267    Write("bar");
268    Write("");
269    Write("xxxx");
270    ASSERT_EQ("foo", Read());
271    ASSERT_EQ("bar", Read());
272    ASSERT_EQ("", Read());
273    ASSERT_EQ("xxxx", Read());
274    ASSERT_EQ("EOF", Read());
275    ASSERT_EQ("EOF", Read());  // Make sure reads at eof work
276  }
277  
278  TEST(LogTest, ManyBlocks) {
279    for (int i = 0; i < 100000; i++) {
280      Write(NumberString(i));
281    }
282    for (int i = 0; i < 100000; i++) {
283      ASSERT_EQ(NumberString(i), Read());
284    }
285    ASSERT_EQ("EOF", Read());
286  }
287  
288  TEST(LogTest, Fragmentation) {
289    Write("small");
290    Write(BigString("medium", 50000));
291    Write(BigString("large", 100000));
292    ASSERT_EQ("small", Read());
293    ASSERT_EQ(BigString("medium", 50000), Read());
294    ASSERT_EQ(BigString("large", 100000), Read());
295    ASSERT_EQ("EOF", Read());
296  }
297  
298  TEST(LogTest, MarginalTrailer) {
299    // Make a trailer that is exactly the same length as an empty record.
300    const int n = kBlockSize - 2 * kHeaderSize;
301    Write(BigString("foo", n));
302    ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
303    Write("");
304    Write("bar");
305    ASSERT_EQ(BigString("foo", n), Read());
306    ASSERT_EQ("", Read());
307    ASSERT_EQ("bar", Read());
308    ASSERT_EQ("EOF", Read());
309  }
310  
311  TEST(LogTest, MarginalTrailer2) {
312    // Make a trailer that is exactly the same length as an empty record.
313    const int n = kBlockSize - 2 * kHeaderSize;
314    Write(BigString("foo", n));
315    ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
316    Write("bar");
317    ASSERT_EQ(BigString("foo", n), Read());
318    ASSERT_EQ("bar", Read());
319    ASSERT_EQ("EOF", Read());
320    ASSERT_EQ(0, DroppedBytes());
321    ASSERT_EQ("", ReportMessage());
322  }
323  
324  TEST(LogTest, ShortTrailer) {
325    const int n = kBlockSize - 2 * kHeaderSize + 4;
326    Write(BigString("foo", n));
327    ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
328    Write("");
329    Write("bar");
330    ASSERT_EQ(BigString("foo", n), Read());
331    ASSERT_EQ("", Read());
332    ASSERT_EQ("bar", Read());
333    ASSERT_EQ("EOF", Read());
334  }
335  
336  TEST(LogTest, AlignedEof) {
337    const int n = kBlockSize - 2 * kHeaderSize + 4;
338    Write(BigString("foo", n));
339    ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
340    ASSERT_EQ(BigString("foo", n), Read());
341    ASSERT_EQ("EOF", Read());
342  }
343  
344  TEST(LogTest, OpenForAppend) {
345    Write("hello");
346    ReopenForAppend();
347    Write("world");
348    ASSERT_EQ("hello", Read());
349    ASSERT_EQ("world", Read());
350    ASSERT_EQ("EOF", Read());
351  }
352  
353  TEST(LogTest, RandomRead) {
354    const int N = 500;
355    Random write_rnd(301);
356    for (int i = 0; i < N; i++) {
357      Write(RandomSkewedString(i, &write_rnd));
358    }
359    Random read_rnd(301);
360    for (int i = 0; i < N; i++) {
361      ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
362    }
363    ASSERT_EQ("EOF", Read());
364  }
365  
366  // Tests of all the error paths in log_reader.cc follow:
367  
368  TEST(LogTest, ReadError) {
369    Write("foo");
370    ForceError();
371    ASSERT_EQ("EOF", Read());
372    ASSERT_EQ(kBlockSize, DroppedBytes());
373    ASSERT_EQ("OK", MatchError("read error"));
374  }
375  
376  TEST(LogTest, BadRecordType) {
377    Write("foo");
378    // Type is stored in header[6]
379    IncrementByte(6, 100);
380    FixChecksum(0, 3);
381    ASSERT_EQ("EOF", Read());
382    ASSERT_EQ(3, DroppedBytes());
383    ASSERT_EQ("OK", MatchError("unknown record type"));
384  }
385  
386  TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
387    Write("foo");
388    ShrinkSize(4);  // Drop all payload as well as a header byte
389    ASSERT_EQ("EOF", Read());
390    // Truncated last record is ignored, not treated as an error.
391    ASSERT_EQ(0, DroppedBytes());
392    ASSERT_EQ("", ReportMessage());
393  }
394  
395  TEST(LogTest, BadLength) {
396    const int kPayloadSize = kBlockSize - kHeaderSize;
397    Write(BigString("bar", kPayloadSize));
398    Write("foo");
399    // Least significant size byte is stored in header[4].
400    IncrementByte(4, 1);
401    ASSERT_EQ("foo", Read());
402    ASSERT_EQ(kBlockSize, DroppedBytes());
403    ASSERT_EQ("OK", MatchError("bad record length"));
404  }
405  
406  TEST(LogTest, BadLengthAtEndIsIgnored) {
407    Write("foo");
408    ShrinkSize(1);
409    ASSERT_EQ("EOF", Read());
410    ASSERT_EQ(0, DroppedBytes());
411    ASSERT_EQ("", ReportMessage());
412  }
413  
414  TEST(LogTest, ChecksumMismatch) {
415    Write("foo");
416    IncrementByte(0, 10);
417    ASSERT_EQ("EOF", Read());
418    ASSERT_EQ(10, DroppedBytes());
419    ASSERT_EQ("OK", MatchError("checksum mismatch"));
420  }
421  
422  TEST(LogTest, UnexpectedMiddleType) {
423    Write("foo");
424    SetByte(6, kMiddleType);
425    FixChecksum(0, 3);
426    ASSERT_EQ("EOF", Read());
427    ASSERT_EQ(3, DroppedBytes());
428    ASSERT_EQ("OK", MatchError("missing start"));
429  }
430  
431  TEST(LogTest, UnexpectedLastType) {
432    Write("foo");
433    SetByte(6, kLastType);
434    FixChecksum(0, 3);
435    ASSERT_EQ("EOF", Read());
436    ASSERT_EQ(3, DroppedBytes());
437    ASSERT_EQ("OK", MatchError("missing start"));
438  }
439  
440  TEST(LogTest, UnexpectedFullType) {
441    Write("foo");
442    Write("bar");
443    SetByte(6, kFirstType);
444    FixChecksum(0, 3);
445    ASSERT_EQ("bar", Read());
446    ASSERT_EQ("EOF", Read());
447    ASSERT_EQ(3, DroppedBytes());
448    ASSERT_EQ("OK", MatchError("partial record without end"));
449  }
450  
451  TEST(LogTest, UnexpectedFirstType) {
452    Write("foo");
453    Write(BigString("bar", 100000));
454    SetByte(6, kFirstType);
455    FixChecksum(0, 3);
456    ASSERT_EQ(BigString("bar", 100000), Read());
457    ASSERT_EQ("EOF", Read());
458    ASSERT_EQ(3, DroppedBytes());
459    ASSERT_EQ("OK", MatchError("partial record without end"));
460  }
461  
462  TEST(LogTest, MissingLastIsIgnored) {
463    Write(BigString("bar", kBlockSize));
464    // Remove the LAST block, including header.
465    ShrinkSize(14);
466    ASSERT_EQ("EOF", Read());
467    ASSERT_EQ("", ReportMessage());
468    ASSERT_EQ(0, DroppedBytes());
469  }
470  
471  TEST(LogTest, PartialLastIsIgnored) {
472    Write(BigString("bar", kBlockSize));
473    // Cause a bad record length in the LAST block.
474    ShrinkSize(1);
475    ASSERT_EQ("EOF", Read());
476    ASSERT_EQ("", ReportMessage());
477    ASSERT_EQ(0, DroppedBytes());
478  }
479  
480  TEST(LogTest, SkipIntoMultiRecord) {
481    // Consider a fragmented record:
482    //    first(R1), middle(R1), last(R1), first(R2)
483    // If initial_offset points to a record after first(R1) but before first(R2)
484    // incomplete fragment errors are not actual errors, and must be suppressed
485    // until a new first or full record is encountered.
486    Write(BigString("foo", 3 * kBlockSize));
487    Write("correct");
488    StartReadingAt(kBlockSize);
489  
490    ASSERT_EQ("correct", Read());
491    ASSERT_EQ("", ReportMessage());
492    ASSERT_EQ(0, DroppedBytes());
493    ASSERT_EQ("EOF", Read());
494  }
495  
496  TEST(LogTest, ErrorJoinsRecords) {
497    // Consider two fragmented records:
498    //    first(R1) last(R1) first(R2) last(R2)
499    // where the middle two fragments disappear.  We do not want
500    // first(R1),last(R2) to get joined and returned as a valid record.
501  
502    // Write records that span two blocks
503    Write(BigString("foo", kBlockSize));
504    Write(BigString("bar", kBlockSize));
505    Write("correct");
506  
507    // Wipe the middle block
508    for (int offset = kBlockSize; offset < 2 * kBlockSize; offset++) {
509      SetByte(offset, 'x');
510    }
511  
512    ASSERT_EQ("correct", Read());
513    ASSERT_EQ("EOF", Read());
514    const size_t dropped = DroppedBytes();
515    ASSERT_LE(dropped, 2 * kBlockSize + 100);
516    ASSERT_GE(dropped, 2 * kBlockSize);
517  }
518  
519  TEST(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
520  
521  TEST(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
522  
523  TEST(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
524  
525  TEST(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); }
526  
527  TEST(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); }
528  
529  TEST(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); }
530  
531  TEST(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); }
532  
533  TEST(LogTest, ReadFourthFirstBlockTrailer) {
534    CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
535  }
536  
537  TEST(LogTest, ReadFourthMiddleBlock) {
538    CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
539  }
540  
541  TEST(LogTest, ReadFourthLastBlock) {
542    CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
543  }
544  
545  TEST(LogTest, ReadFourthStart) {
546    CheckInitialOffsetRecord(
547        2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
548        3);
549  }
550  
551  TEST(LogTest, ReadInitialOffsetIntoBlockPadding) {
552    CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
553  }
554  
555  TEST(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
556  
557  TEST(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
558  
559  }  // namespace log
560  }  // namespace leveldb
561  
562  int main(int argc, char** argv) { return leveldb::test::RunAllTests(); }