/ kvdb / sqlbase / readwrite_cursor.go
readwrite_cursor.go
  1  //go:build kvdb_postgres || (kvdb_sqlite && !(windows && (arm || 386)) && !(linux && (ppc64 || mips || mipsle || mips64)))
  2  
  3  package sqlbase
  4  
  5  import (
  6  	"database/sql"
  7  
  8  	"github.com/btcsuite/btcwallet/walletdb"
  9  )
 10  
 11  // readWriteCursor holds a reference to the cursors bucket, the value
 12  // prefix and the current key used while iterating.
 13  type readWriteCursor struct {
 14  	bucket *readWriteBucket
 15  
 16  	// currKey holds the current key of the cursor.
 17  	currKey []byte
 18  }
 19  
 20  func newReadWriteCursor(b *readWriteBucket) *readWriteCursor {
 21  	return &readWriteCursor{
 22  		bucket: b,
 23  	}
 24  }
 25  
 26  // First positions the cursor at the first key/value pair and returns
 27  // the pair.
 28  func (c *readWriteCursor) First() ([]byte, []byte) {
 29  	var (
 30  		key   []byte
 31  		value []byte
 32  	)
 33  	row, cancel := c.bucket.tx.QueryRow(
 34  		"SELECT key, value FROM " + c.bucket.table + " WHERE " +
 35  			parentSelector(c.bucket.id) +
 36  			" ORDER BY key LIMIT 1",
 37  	)
 38  	defer cancel()
 39  	err := row.Scan(&key, &value)
 40  
 41  	switch {
 42  	case err == sql.ErrNoRows:
 43  		return nil, nil
 44  
 45  	case err != nil:
 46  		panic(err)
 47  	}
 48  
 49  	// Copy current key to prevent modification by the caller.
 50  	c.currKey = make([]byte, len(key))
 51  	copy(c.currKey, key)
 52  
 53  	return key, value
 54  }
 55  
 56  // Last positions the cursor at the last key/value pair and returns the
 57  // pair.
 58  func (c *readWriteCursor) Last() ([]byte, []byte) {
 59  	var (
 60  		key   []byte
 61  		value []byte
 62  	)
 63  	row, cancel := c.bucket.tx.QueryRow(
 64  		"SELECT key, value FROM " + c.bucket.table + " WHERE " +
 65  			parentSelector(c.bucket.id) +
 66  			" ORDER BY key DESC LIMIT 1",
 67  	)
 68  	defer cancel()
 69  	err := row.Scan(&key, &value)
 70  
 71  	switch {
 72  	case err == sql.ErrNoRows:
 73  		return nil, nil
 74  
 75  	case err != nil:
 76  		panic(err)
 77  	}
 78  
 79  	// Copy current key to prevent modification by the caller.
 80  	c.currKey = make([]byte, len(key))
 81  	copy(c.currKey, key)
 82  
 83  	return key, value
 84  }
 85  
 86  // Next moves the cursor one key/value pair forward and returns the new
 87  // pair.
 88  func (c *readWriteCursor) Next() ([]byte, []byte) {
 89  	var (
 90  		key   []byte
 91  		value []byte
 92  	)
 93  	row, cancel := c.bucket.tx.QueryRow(
 94  		"SELECT key, value FROM "+c.bucket.table+" WHERE "+
 95  			parentSelector(c.bucket.id)+
 96  			" AND key>$1 ORDER BY key LIMIT 1",
 97  		c.currKey,
 98  	)
 99  	defer cancel()
100  	err := row.Scan(&key, &value)
101  
102  	switch {
103  	case err == sql.ErrNoRows:
104  		return nil, nil
105  
106  	case err != nil:
107  		panic(err)
108  	}
109  
110  	// Copy current key to prevent modification by the caller.
111  	c.currKey = make([]byte, len(key))
112  	copy(c.currKey, key)
113  
114  	return key, value
115  }
116  
117  // Prev moves the cursor one key/value pair backward and returns the new
118  // pair.
119  func (c *readWriteCursor) Prev() ([]byte, []byte) {
120  	var (
121  		key   []byte
122  		value []byte
123  	)
124  	row, cancel := c.bucket.tx.QueryRow(
125  		"SELECT key, value FROM "+c.bucket.table+" WHERE "+
126  			parentSelector(c.bucket.id)+
127  			" AND key<$1 ORDER BY key DESC LIMIT 1",
128  		c.currKey,
129  	)
130  	defer cancel()
131  	err := row.Scan(&key, &value)
132  
133  	switch {
134  	case err == sql.ErrNoRows:
135  		return nil, nil
136  
137  	case err != nil:
138  		panic(err)
139  	}
140  
141  	// Copy current key to prevent modification by the caller.
142  	c.currKey = make([]byte, len(key))
143  	copy(c.currKey, key)
144  
145  	return key, value
146  }
147  
148  // Seek positions the cursor at the passed seek key.  If the key does
149  // not exist, the cursor is moved to the next key after seek.  Returns
150  // the new pair.
151  func (c *readWriteCursor) Seek(seek []byte) ([]byte, []byte) {
152  	// Convert nil to empty slice, otherwise sql mapping won't be correct
153  	// and no keys are found.
154  	if seek == nil {
155  		seek = []byte{}
156  	}
157  
158  	var (
159  		key   []byte
160  		value []byte
161  	)
162  	row, cancel := c.bucket.tx.QueryRow(
163  		"SELECT key, value FROM "+c.bucket.table+" WHERE "+
164  			parentSelector(c.bucket.id)+
165  			" AND key>=$1 ORDER BY key LIMIT 1",
166  		seek,
167  	)
168  	defer cancel()
169  	err := row.Scan(&key, &value)
170  
171  	switch {
172  	case err == sql.ErrNoRows:
173  		return nil, nil
174  
175  	case err != nil:
176  		panic(err)
177  	}
178  
179  	// Copy current key to prevent modification by the caller.
180  	c.currKey = make([]byte, len(key))
181  	copy(c.currKey, key)
182  
183  	return key, value
184  }
185  
186  // Delete removes the current key/value pair the cursor is at without
187  // invalidating the cursor.  Returns ErrIncompatibleValue if attempted
188  // when the cursor points to a nested bucket.
189  func (c *readWriteCursor) Delete() error {
190  	// Get first record at or after cursor.
191  	var key []byte
192  	row, cancel := c.bucket.tx.QueryRow(
193  		"SELECT key FROM "+c.bucket.table+" WHERE "+
194  			parentSelector(c.bucket.id)+
195  			" AND key>=$1 ORDER BY key LIMIT 1",
196  		c.currKey,
197  	)
198  	defer cancel()
199  	err := row.Scan(&key)
200  
201  	switch {
202  	case err == sql.ErrNoRows:
203  		return nil
204  
205  	case err != nil:
206  		panic(err)
207  	}
208  
209  	// Delete record.
210  	result, err := c.bucket.tx.Exec(
211  		"DELETE FROM "+c.bucket.table+" WHERE "+
212  			parentSelector(c.bucket.id)+
213  			" AND key=$1 AND value IS NOT NULL",
214  		key,
215  	)
216  	if err != nil {
217  		panic(err)
218  	}
219  
220  	rows, err := result.RowsAffected()
221  	if err != nil {
222  		return err
223  	}
224  
225  	// The key exists but nothing has been deleted. This means that the key
226  	// must have been a bucket key.
227  	if rows != 1 {
228  		return walletdb.ErrIncompatibleValue
229  	}
230  
231  	return err
232  }