/ src / leveldb / db / db_iter.cc
db_iter.cc
  1  // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2  // Use of this source code is governed by a BSD-style license that can be
  3  // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4  
  5  #include "db/db_iter.h"
  6  
  7  #include "db/db_impl.h"
  8  #include "db/dbformat.h"
  9  #include "db/filename.h"
 10  #include "leveldb/env.h"
 11  #include "leveldb/iterator.h"
 12  #include "port/port.h"
 13  #include "util/logging.h"
 14  #include "util/mutexlock.h"
 15  #include "util/random.h"
 16  
 17  namespace leveldb {
 18  
 19  #if 0
 20  static void DumpInternalIter(Iterator* iter) {
 21    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
 22      ParsedInternalKey k;
 23      if (!ParseInternalKey(iter->key(), &k)) {
 24        fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
 25      } else {
 26        fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
 27      }
 28    }
 29  }
 30  #endif
 31  
 32  namespace {
 33  
 34  // Memtables and sstables that make the DB representation contain
 35  // (userkey,seq,type) => uservalue entries.  DBIter
 36  // combines multiple entries for the same userkey found in the DB
 37  // representation into a single entry while accounting for sequence
 38  // numbers, deletion markers, overwrites, etc.
 39  class DBIter : public Iterator {
 40   public:
 41    // Which direction is the iterator currently moving?
 42    // (1) When moving forward, the internal iterator is positioned at
 43    //     the exact entry that yields this->key(), this->value()
 44    // (2) When moving backwards, the internal iterator is positioned
 45    //     just before all entries whose user key == this->key().
 46    enum Direction { kForward, kReverse };
 47  
 48    DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s,
 49           uint32_t seed)
 50        : db_(db),
 51          user_comparator_(cmp),
 52          iter_(iter),
 53          sequence_(s),
 54          direction_(kForward),
 55          valid_(false),
 56          rnd_(seed),
 57          bytes_until_read_sampling_(RandomCompactionPeriod()) {}
 58  
 59    DBIter(const DBIter&) = delete;
 60    DBIter& operator=(const DBIter&) = delete;
 61  
 62    ~DBIter() override { delete iter_; }
 63    bool Valid() const override { return valid_; }
 64    Slice key() const override {
 65      assert(valid_);
 66      return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_;
 67    }
 68    Slice value() const override {
 69      assert(valid_);
 70      return (direction_ == kForward) ? iter_->value() : saved_value_;
 71    }
 72    Status status() const override {
 73      if (status_.ok()) {
 74        return iter_->status();
 75      } else {
 76        return status_;
 77      }
 78    }
 79  
 80    void Next() override;
 81    void Prev() override;
 82    void Seek(const Slice& target) override;
 83    void SeekToFirst() override;
 84    void SeekToLast() override;
 85  
 86   private:
 87    void FindNextUserEntry(bool skipping, std::string* skip);
 88    void FindPrevUserEntry();
 89    bool ParseKey(ParsedInternalKey* key);
 90  
 91    inline void SaveKey(const Slice& k, std::string* dst) {
 92      dst->assign(k.data(), k.size());
 93    }
 94  
 95    inline void ClearSavedValue() {
 96      if (saved_value_.capacity() > 1048576) {
 97        std::string empty;
 98        swap(empty, saved_value_);
 99      } else {
100        saved_value_.clear();
101      }
102    }
103  
104    // Picks the number of bytes that can be read until a compaction is scheduled.
105    size_t RandomCompactionPeriod() {
106      return rnd_.Uniform(2 * config::kReadBytesPeriod);
107    }
108  
109    DBImpl* db_;
110    const Comparator* const user_comparator_;
111    Iterator* const iter_;
112    SequenceNumber const sequence_;
113    Status status_;
114    std::string saved_key_;    // == current key when direction_==kReverse
115    std::string saved_value_;  // == current raw value when direction_==kReverse
116    Direction direction_;
117    bool valid_;
118    Random rnd_;
119    size_t bytes_until_read_sampling_;
120  };
121  
122  inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
123    Slice k = iter_->key();
124  
125    size_t bytes_read = k.size() + iter_->value().size();
126    while (bytes_until_read_sampling_ < bytes_read) {
127      bytes_until_read_sampling_ += RandomCompactionPeriod();
128      db_->RecordReadSample(k);
129    }
130    assert(bytes_until_read_sampling_ >= bytes_read);
131    bytes_until_read_sampling_ -= bytes_read;
132  
133    if (!ParseInternalKey(k, ikey)) {
134      status_ = Status::Corruption("corrupted internal key in DBIter");
135      return false;
136    } else {
137      return true;
138    }
139  }
140  
141  void DBIter::Next() {
142    assert(valid_);
143  
144    if (direction_ == kReverse) {  // Switch directions?
145      direction_ = kForward;
146      // iter_ is pointing just before the entries for this->key(),
147      // so advance into the range of entries for this->key() and then
148      // use the normal skipping code below.
149      if (!iter_->Valid()) {
150        iter_->SeekToFirst();
151      } else {
152        iter_->Next();
153      }
154      if (!iter_->Valid()) {
155        valid_ = false;
156        saved_key_.clear();
157        return;
158      }
159      // saved_key_ already contains the key to skip past.
160    } else {
161      // Store in saved_key_ the current key so we skip it below.
162      SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
163  
164      // iter_ is pointing to current key. We can now safely move to the next to
165      // avoid checking current key.
166      iter_->Next();
167      if (!iter_->Valid()) {
168        valid_ = false;
169        saved_key_.clear();
170        return;
171      }
172    }
173  
174    FindNextUserEntry(true, &saved_key_);
175  }
176  
177  void DBIter::FindNextUserEntry(bool skipping, std::string* skip) {
178    // Loop until we hit an acceptable entry to yield
179    assert(iter_->Valid());
180    assert(direction_ == kForward);
181    do {
182      ParsedInternalKey ikey;
183      if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
184        switch (ikey.type) {
185          case kTypeDeletion:
186            // Arrange to skip all upcoming entries for this key since
187            // they are hidden by this deletion.
188            SaveKey(ikey.user_key, skip);
189            skipping = true;
190            break;
191          case kTypeValue:
192            if (skipping &&
193                user_comparator_->Compare(ikey.user_key, *skip) <= 0) {
194              // Entry hidden
195            } else {
196              valid_ = true;
197              saved_key_.clear();
198              return;
199            }
200            break;
201        }
202      }
203      iter_->Next();
204    } while (iter_->Valid());
205    saved_key_.clear();
206    valid_ = false;
207  }
208  
209  void DBIter::Prev() {
210    assert(valid_);
211  
212    if (direction_ == kForward) {  // Switch directions?
213      // iter_ is pointing at the current entry.  Scan backwards until
214      // the key changes so we can use the normal reverse scanning code.
215      assert(iter_->Valid());  // Otherwise valid_ would have been false
216      SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
217      while (true) {
218        iter_->Prev();
219        if (!iter_->Valid()) {
220          valid_ = false;
221          saved_key_.clear();
222          ClearSavedValue();
223          return;
224        }
225        if (user_comparator_->Compare(ExtractUserKey(iter_->key()), saved_key_) <
226            0) {
227          break;
228        }
229      }
230      direction_ = kReverse;
231    }
232  
233    FindPrevUserEntry();
234  }
235  
236  void DBIter::FindPrevUserEntry() {
237    assert(direction_ == kReverse);
238  
239    ValueType value_type = kTypeDeletion;
240    if (iter_->Valid()) {
241      do {
242        ParsedInternalKey ikey;
243        if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
244          if ((value_type != kTypeDeletion) &&
245              user_comparator_->Compare(ikey.user_key, saved_key_) < 0) {
246            // We encountered a non-deleted value in entries for previous keys,
247            break;
248          }
249          value_type = ikey.type;
250          if (value_type == kTypeDeletion) {
251            saved_key_.clear();
252            ClearSavedValue();
253          } else {
254            Slice raw_value = iter_->value();
255            if (saved_value_.capacity() > raw_value.size() + 1048576) {
256              std::string empty;
257              swap(empty, saved_value_);
258            }
259            SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
260            saved_value_.assign(raw_value.data(), raw_value.size());
261          }
262        }
263        iter_->Prev();
264      } while (iter_->Valid());
265    }
266  
267    if (value_type == kTypeDeletion) {
268      // End
269      valid_ = false;
270      saved_key_.clear();
271      ClearSavedValue();
272      direction_ = kForward;
273    } else {
274      valid_ = true;
275    }
276  }
277  
278  void DBIter::Seek(const Slice& target) {
279    direction_ = kForward;
280    ClearSavedValue();
281    saved_key_.clear();
282    AppendInternalKey(&saved_key_,
283                      ParsedInternalKey(target, sequence_, kValueTypeForSeek));
284    iter_->Seek(saved_key_);
285    if (iter_->Valid()) {
286      FindNextUserEntry(false, &saved_key_ /* temporary storage */);
287    } else {
288      valid_ = false;
289    }
290  }
291  
292  void DBIter::SeekToFirst() {
293    direction_ = kForward;
294    ClearSavedValue();
295    iter_->SeekToFirst();
296    if (iter_->Valid()) {
297      FindNextUserEntry(false, &saved_key_ /* temporary storage */);
298    } else {
299      valid_ = false;
300    }
301  }
302  
303  void DBIter::SeekToLast() {
304    direction_ = kReverse;
305    ClearSavedValue();
306    iter_->SeekToLast();
307    FindPrevUserEntry();
308  }
309  
310  }  // anonymous namespace
311  
312  Iterator* NewDBIterator(DBImpl* db, const Comparator* user_key_comparator,
313                          Iterator* internal_iter, SequenceNumber sequence,
314                          uint32_t seed) {
315    return new DBIter(db, user_key_comparator, internal_iter, sequence, seed);
316  }
317  
318  }  // namespace leveldb