/ src / leveldb / table / merger.cc
merger.cc
  1  // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2  // Use of this source code is governed by a BSD-style license that can be
  3  // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4  
  5  #include "table/merger.h"
  6  
  7  #include "leveldb/comparator.h"
  8  #include "leveldb/iterator.h"
  9  #include "table/iterator_wrapper.h"
 10  
 11  namespace leveldb {
 12  
 13  namespace {
 14  class MergingIterator : public Iterator {
 15   public:
 16    MergingIterator(const Comparator* comparator, Iterator** children, int n)
 17        : comparator_(comparator),
 18          children_(new IteratorWrapper[n]),
 19          n_(n),
 20          current_(nullptr),
 21          direction_(kForward) {
 22      for (int i = 0; i < n; i++) {
 23        children_[i].Set(children[i]);
 24      }
 25    }
 26  
 27    ~MergingIterator() override { delete[] children_; }
 28  
 29    bool Valid() const override { return (current_ != nullptr); }
 30  
 31    void SeekToFirst() override {
 32      for (int i = 0; i < n_; i++) {
 33        children_[i].SeekToFirst();
 34      }
 35      FindSmallest();
 36      direction_ = kForward;
 37    }
 38  
 39    void SeekToLast() override {
 40      for (int i = 0; i < n_; i++) {
 41        children_[i].SeekToLast();
 42      }
 43      FindLargest();
 44      direction_ = kReverse;
 45    }
 46  
 47    void Seek(const Slice& target) override {
 48      for (int i = 0; i < n_; i++) {
 49        children_[i].Seek(target);
 50      }
 51      FindSmallest();
 52      direction_ = kForward;
 53    }
 54  
 55    void Next() override {
 56      assert(Valid());
 57  
 58      // Ensure that all children are positioned after key().
 59      // If we are moving in the forward direction, it is already
 60      // true for all of the non-current_ children since current_ is
 61      // the smallest child and key() == current_->key().  Otherwise,
 62      // we explicitly position the non-current_ children.
 63      if (direction_ != kForward) {
 64        for (int i = 0; i < n_; i++) {
 65          IteratorWrapper* child = &children_[i];
 66          if (child != current_) {
 67            child->Seek(key());
 68            if (child->Valid() &&
 69                comparator_->Compare(key(), child->key()) == 0) {
 70              child->Next();
 71            }
 72          }
 73        }
 74        direction_ = kForward;
 75      }
 76  
 77      current_->Next();
 78      FindSmallest();
 79    }
 80  
 81    void Prev() override {
 82      assert(Valid());
 83  
 84      // Ensure that all children are positioned before key().
 85      // If we are moving in the reverse direction, it is already
 86      // true for all of the non-current_ children since current_ is
 87      // the largest child and key() == current_->key().  Otherwise,
 88      // we explicitly position the non-current_ children.
 89      if (direction_ != kReverse) {
 90        for (int i = 0; i < n_; i++) {
 91          IteratorWrapper* child = &children_[i];
 92          if (child != current_) {
 93            child->Seek(key());
 94            if (child->Valid()) {
 95              // Child is at first entry >= key().  Step back one to be < key()
 96              child->Prev();
 97            } else {
 98              // Child has no entries >= key().  Position at last entry.
 99              child->SeekToLast();
100            }
101          }
102        }
103        direction_ = kReverse;
104      }
105  
106      current_->Prev();
107      FindLargest();
108    }
109  
110    Slice key() const override {
111      assert(Valid());
112      return current_->key();
113    }
114  
115    Slice value() const override {
116      assert(Valid());
117      return current_->value();
118    }
119  
120    Status status() const override {
121      Status status;
122      for (int i = 0; i < n_; i++) {
123        status = children_[i].status();
124        if (!status.ok()) {
125          break;
126        }
127      }
128      return status;
129    }
130  
131   private:
132    // Which direction is the iterator moving?
133    enum Direction { kForward, kReverse };
134  
135    void FindSmallest();
136    void FindLargest();
137  
138    // We might want to use a heap in case there are lots of children.
139    // For now we use a simple array since we expect a very small number
140    // of children in leveldb.
141    const Comparator* comparator_;
142    IteratorWrapper* children_;
143    int n_;
144    IteratorWrapper* current_;
145    Direction direction_;
146  };
147  
148  void MergingIterator::FindSmallest() {
149    IteratorWrapper* smallest = nullptr;
150    for (int i = 0; i < n_; i++) {
151      IteratorWrapper* child = &children_[i];
152      if (child->Valid()) {
153        if (smallest == nullptr) {
154          smallest = child;
155        } else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
156          smallest = child;
157        }
158      }
159    }
160    current_ = smallest;
161  }
162  
163  void MergingIterator::FindLargest() {
164    IteratorWrapper* largest = nullptr;
165    for (int i = n_ - 1; i >= 0; i--) {
166      IteratorWrapper* child = &children_[i];
167      if (child->Valid()) {
168        if (largest == nullptr) {
169          largest = child;
170        } else if (comparator_->Compare(child->key(), largest->key()) > 0) {
171          largest = child;
172        }
173      }
174    }
175    current_ = largest;
176  }
177  }  // namespace
178  
179  Iterator* NewMergingIterator(const Comparator* comparator, Iterator** children,
180                               int n) {
181    assert(n >= 0);
182    if (n == 0) {
183      return NewEmptyIterator();
184    } else if (n == 1) {
185      return children[0];
186    } else {
187      return new MergingIterator(comparator, children, n);
188    }
189  }
190  
191  }  // namespace leveldb