merger.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. See the AUTHORS file for names of contributors. 4 5 #include "table/merger.h" 6 7 #include "leveldb/comparator.h" 8 #include "leveldb/iterator.h" 9 #include "table/iterator_wrapper.h" 10 11 namespace leveldb { 12 13 namespace { 14 class MergingIterator : public Iterator { 15 public: 16 MergingIterator(const Comparator* comparator, Iterator** children, int n) 17 : comparator_(comparator), 18 children_(new IteratorWrapper[n]), 19 n_(n), 20 current_(nullptr), 21 direction_(kForward) { 22 for (int i = 0; i < n; i++) { 23 children_[i].Set(children[i]); 24 } 25 } 26 27 ~MergingIterator() override { delete[] children_; } 28 29 bool Valid() const override { return (current_ != nullptr); } 30 31 void SeekToFirst() override { 32 for (int i = 0; i < n_; i++) { 33 children_[i].SeekToFirst(); 34 } 35 FindSmallest(); 36 direction_ = kForward; 37 } 38 39 void SeekToLast() override { 40 for (int i = 0; i < n_; i++) { 41 children_[i].SeekToLast(); 42 } 43 FindLargest(); 44 direction_ = kReverse; 45 } 46 47 void Seek(const Slice& target) override { 48 for (int i = 0; i < n_; i++) { 49 children_[i].Seek(target); 50 } 51 FindSmallest(); 52 direction_ = kForward; 53 } 54 55 void Next() override { 56 assert(Valid()); 57 58 // Ensure that all children are positioned after key(). 59 // If we are moving in the forward direction, it is already 60 // true for all of the non-current_ children since current_ is 61 // the smallest child and key() == current_->key(). Otherwise, 62 // we explicitly position the non-current_ children. 63 if (direction_ != kForward) { 64 for (int i = 0; i < n_; i++) { 65 IteratorWrapper* child = &children_[i]; 66 if (child != current_) { 67 child->Seek(key()); 68 if (child->Valid() && 69 comparator_->Compare(key(), child->key()) == 0) { 70 child->Next(); 71 } 72 } 73 } 74 direction_ = kForward; 75 } 76 77 current_->Next(); 78 FindSmallest(); 79 } 80 81 void Prev() override { 82 assert(Valid()); 83 84 // Ensure that all children are positioned before key(). 85 // If we are moving in the reverse direction, it is already 86 // true for all of the non-current_ children since current_ is 87 // the largest child and key() == current_->key(). Otherwise, 88 // we explicitly position the non-current_ children. 89 if (direction_ != kReverse) { 90 for (int i = 0; i < n_; i++) { 91 IteratorWrapper* child = &children_[i]; 92 if (child != current_) { 93 child->Seek(key()); 94 if (child->Valid()) { 95 // Child is at first entry >= key(). Step back one to be < key() 96 child->Prev(); 97 } else { 98 // Child has no entries >= key(). Position at last entry. 99 child->SeekToLast(); 100 } 101 } 102 } 103 direction_ = kReverse; 104 } 105 106 current_->Prev(); 107 FindLargest(); 108 } 109 110 Slice key() const override { 111 assert(Valid()); 112 return current_->key(); 113 } 114 115 Slice value() const override { 116 assert(Valid()); 117 return current_->value(); 118 } 119 120 Status status() const override { 121 Status status; 122 for (int i = 0; i < n_; i++) { 123 status = children_[i].status(); 124 if (!status.ok()) { 125 break; 126 } 127 } 128 return status; 129 } 130 131 private: 132 // Which direction is the iterator moving? 133 enum Direction { kForward, kReverse }; 134 135 void FindSmallest(); 136 void FindLargest(); 137 138 // We might want to use a heap in case there are lots of children. 139 // For now we use a simple array since we expect a very small number 140 // of children in leveldb. 141 const Comparator* comparator_; 142 IteratorWrapper* children_; 143 int n_; 144 IteratorWrapper* current_; 145 Direction direction_; 146 }; 147 148 void MergingIterator::FindSmallest() { 149 IteratorWrapper* smallest = nullptr; 150 for (int i = 0; i < n_; i++) { 151 IteratorWrapper* child = &children_[i]; 152 if (child->Valid()) { 153 if (smallest == nullptr) { 154 smallest = child; 155 } else if (comparator_->Compare(child->key(), smallest->key()) < 0) { 156 smallest = child; 157 } 158 } 159 } 160 current_ = smallest; 161 } 162 163 void MergingIterator::FindLargest() { 164 IteratorWrapper* largest = nullptr; 165 for (int i = n_ - 1; i >= 0; i--) { 166 IteratorWrapper* child = &children_[i]; 167 if (child->Valid()) { 168 if (largest == nullptr) { 169 largest = child; 170 } else if (comparator_->Compare(child->key(), largest->key()) > 0) { 171 largest = child; 172 } 173 } 174 } 175 current_ = largest; 176 } 177 } // namespace 178 179 Iterator* NewMergingIterator(const Comparator* comparator, Iterator** children, 180 int n) { 181 assert(n >= 0); 182 if (n == 0) { 183 return NewEmptyIterator(); 184 } else if (n == 1) { 185 return children[0]; 186 } else { 187 return new MergingIterator(comparator, children, n); 188 } 189 } 190 191 } // namespace leveldb