/ src / leveldb / db / write_batch.cc
write_batch.cc
  1  // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2  // Use of this source code is governed by a BSD-style license that can be
  3  // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4  //
  5  // WriteBatch::rep_ :=
  6  //    sequence: fixed64
  7  //    count: fixed32
  8  //    data: record[count]
  9  // record :=
 10  //    kTypeValue varstring varstring         |
 11  //    kTypeDeletion varstring
 12  // varstring :=
 13  //    len: varint32
 14  //    data: uint8[len]
 15  
 16  #include "leveldb/write_batch.h"
 17  
 18  #include "db/dbformat.h"
 19  #include "db/memtable.h"
 20  #include "db/write_batch_internal.h"
 21  #include "leveldb/db.h"
 22  #include "util/coding.h"
 23  
 24  namespace leveldb {
 25  
 26  // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
 27  static const size_t kHeader = 12;
 28  
 29  WriteBatch::WriteBatch() { Clear(); }
 30  
 31  WriteBatch::~WriteBatch() = default;
 32  
 33  WriteBatch::Handler::~Handler() = default;
 34  
 35  void WriteBatch::Clear() {
 36    rep_.clear();
 37    rep_.resize(kHeader);
 38  }
 39  
 40  size_t WriteBatch::ApproximateSize() const { return rep_.size(); }
 41  
 42  Status WriteBatch::Iterate(Handler* handler) const {
 43    Slice input(rep_);
 44    if (input.size() < kHeader) {
 45      return Status::Corruption("malformed WriteBatch (too small)");
 46    }
 47  
 48    input.remove_prefix(kHeader);
 49    Slice key, value;
 50    int found = 0;
 51    while (!input.empty()) {
 52      found++;
 53      char tag = input[0];
 54      input.remove_prefix(1);
 55      switch (tag) {
 56        case kTypeValue:
 57          if (GetLengthPrefixedSlice(&input, &key) &&
 58              GetLengthPrefixedSlice(&input, &value)) {
 59            handler->Put(key, value);
 60          } else {
 61            return Status::Corruption("bad WriteBatch Put");
 62          }
 63          break;
 64        case kTypeDeletion:
 65          if (GetLengthPrefixedSlice(&input, &key)) {
 66            handler->Delete(key);
 67          } else {
 68            return Status::Corruption("bad WriteBatch Delete");
 69          }
 70          break;
 71        default:
 72          return Status::Corruption("unknown WriteBatch tag");
 73      }
 74    }
 75    if (found != WriteBatchInternal::Count(this)) {
 76      return Status::Corruption("WriteBatch has wrong count");
 77    } else {
 78      return Status::OK();
 79    }
 80  }
 81  
 82  int WriteBatchInternal::Count(const WriteBatch* b) {
 83    return DecodeFixed32(b->rep_.data() + 8);
 84  }
 85  
 86  void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
 87    EncodeFixed32(&b->rep_[8], n);
 88  }
 89  
 90  SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
 91    return SequenceNumber(DecodeFixed64(b->rep_.data()));
 92  }
 93  
 94  void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
 95    EncodeFixed64(&b->rep_[0], seq);
 96  }
 97  
 98  void WriteBatch::Put(const Slice& key, const Slice& value) {
 99    WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
100    rep_.push_back(static_cast<char>(kTypeValue));
101    PutLengthPrefixedSlice(&rep_, key);
102    PutLengthPrefixedSlice(&rep_, value);
103  }
104  
105  void WriteBatch::Delete(const Slice& key) {
106    WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
107    rep_.push_back(static_cast<char>(kTypeDeletion));
108    PutLengthPrefixedSlice(&rep_, key);
109  }
110  
111  void WriteBatch::Append(const WriteBatch& source) {
112    WriteBatchInternal::Append(this, &source);
113  }
114  
115  namespace {
116  class MemTableInserter : public WriteBatch::Handler {
117   public:
118    SequenceNumber sequence_;
119    MemTable* mem_;
120  
121    void Put(const Slice& key, const Slice& value) override {
122      mem_->Add(sequence_, kTypeValue, key, value);
123      sequence_++;
124    }
125    void Delete(const Slice& key) override {
126      mem_->Add(sequence_, kTypeDeletion, key, Slice());
127      sequence_++;
128    }
129  };
130  }  // namespace
131  
132  Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
133    MemTableInserter inserter;
134    inserter.sequence_ = WriteBatchInternal::Sequence(b);
135    inserter.mem_ = memtable;
136    return b->Iterate(&inserter);
137  }
138  
139  void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
140    assert(contents.size() >= kHeader);
141    b->rep_.assign(contents.data(), contents.size());
142  }
143  
144  void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
145    SetCount(dst, Count(dst) + Count(src));
146    assert(src->rep_.size() >= kHeader);
147    dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
148  }
149  
150  }  // namespace leveldb