/ htlcswitch / payment_result.go
payment_result.go
  1  package htlcswitch
  2  
  3  import (
  4  	"bytes"
  5  	"encoding/binary"
  6  	"errors"
  7  	"io"
  8  	"sync"
  9  
 10  	"github.com/lightningnetwork/lnd/channeldb"
 11  	"github.com/lightningnetwork/lnd/kvdb"
 12  	"github.com/lightningnetwork/lnd/lnwire"
 13  	"github.com/lightningnetwork/lnd/multimutex"
 14  )
 15  
 16  var (
 17  
 18  	// networkResultStoreBucketKey is used for the root level bucket that
 19  	// stores the network result for each payment ID.
 20  	networkResultStoreBucketKey = []byte("network-result-store-bucket")
 21  
 22  	// ErrPaymentIDNotFound is an error returned if the given paymentID is
 23  	// not found.
 24  	ErrPaymentIDNotFound = errors.New("paymentID not found")
 25  
 26  	// ErrPaymentIDAlreadyExists is returned if we try to write a pending
 27  	// payment whose paymentID already exists.
 28  	ErrPaymentIDAlreadyExists = errors.New("paymentID already exists")
 29  )
 30  
 31  // PaymentResult wraps a decoded result received from the network after a
 32  // payment attempt was made. This is what is eventually handed to the router
 33  // for processing.
 34  type PaymentResult struct {
 35  	// Preimage is set by the switch in case a sent HTLC was settled.
 36  	Preimage [32]byte
 37  
 38  	// Error is non-nil in case a HTLC send failed, and the HTLC is now
 39  	// irrevocably canceled. If the payment failed during forwarding, this
 40  	// error will be a *ForwardingError.
 41  	Error error
 42  }
 43  
 44  // networkResult is the raw result received from the network after a payment
 45  // attempt has been made. Since the switch doesn't always have the necessary
 46  // data to decode the raw message, we store it together with some meta data,
 47  // and decode it when the router query for the final result.
 48  type networkResult struct {
 49  	// msg is the received result. This should be of type UpdateFulfillHTLC
 50  	// or UpdateFailHTLC.
 51  	msg lnwire.Message
 52  
 53  	// unencrypted indicates whether the failure encoded in the message is
 54  	// unencrypted, and hence doesn't need to be decrypted.
 55  	unencrypted bool
 56  
 57  	// isResolution indicates whether this is a resolution message, in
 58  	// which the failure reason might not be included.
 59  	isResolution bool
 60  }
 61  
 62  // serializeNetworkResult serializes the networkResult.
 63  func serializeNetworkResult(w io.Writer, n *networkResult) error {
 64  	return channeldb.WriteElements(w, n.msg, n.unencrypted, n.isResolution)
 65  }
 66  
 67  // deserializeNetworkResult deserializes the networkResult.
 68  func deserializeNetworkResult(r io.Reader) (*networkResult, error) {
 69  	n := &networkResult{}
 70  
 71  	if err := channeldb.ReadElements(r,
 72  		&n.msg, &n.unencrypted, &n.isResolution,
 73  	); err != nil {
 74  		return nil, err
 75  	}
 76  
 77  	return n, nil
 78  }
 79  
 80  // networkResultStore is a persistent store that stores any results of HTLCs in
 81  // flight on the network. Since payment results are inherently asynchronous, it
 82  // is used as a common access point for senders of HTLCs, to know when a result
 83  // is back. The Switch will checkpoint any received result to the store, and
 84  // the store will keep results and notify the callers about them.
 85  type networkResultStore struct {
 86  	backend kvdb.Backend
 87  
 88  	// results is a map from paymentIDs to channels where subscribers to
 89  	// payment results will be notified.
 90  	results    map[uint64][]chan *networkResult
 91  	resultsMtx sync.Mutex
 92  
 93  	// attemptIDMtx is a multimutex used to make sure the database and
 94  	// result subscribers map is consistent for each attempt ID in case of
 95  	// concurrent callers.
 96  	attemptIDMtx *multimutex.Mutex[uint64]
 97  }
 98  
 99  func newNetworkResultStore(db kvdb.Backend) *networkResultStore {
100  	return &networkResultStore{
101  		backend:      db,
102  		results:      make(map[uint64][]chan *networkResult),
103  		attemptIDMtx: multimutex.NewMutex[uint64](),
104  	}
105  }
106  
107  // storeResult stores the networkResult for the given attemptID, and notifies
108  // any subscribers.
109  func (store *networkResultStore) storeResult(attemptID uint64,
110  	result *networkResult) error {
111  
112  	// We get a mutex for this attempt ID. This is needed to ensure
113  	// consistency between the database state and the subscribers in case
114  	// of concurrent calls.
115  	store.attemptIDMtx.Lock(attemptID)
116  	defer store.attemptIDMtx.Unlock(attemptID)
117  
118  	log.Debugf("Storing result for attemptID=%v", attemptID)
119  
120  	// Serialize the payment result.
121  	var b bytes.Buffer
122  	if err := serializeNetworkResult(&b, result); err != nil {
123  		return err
124  	}
125  
126  	var attemptIDBytes [8]byte
127  	binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID)
128  
129  	err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error {
130  		networkResults, err := tx.CreateTopLevelBucket(
131  			networkResultStoreBucketKey,
132  		)
133  		if err != nil {
134  			return err
135  		}
136  
137  		return networkResults.Put(attemptIDBytes[:], b.Bytes())
138  	})
139  	if err != nil {
140  		return err
141  	}
142  
143  	// Now that the result is stored in the database, we can notify any
144  	// active subscribers.
145  	store.resultsMtx.Lock()
146  	for _, res := range store.results[attemptID] {
147  		res <- result
148  	}
149  	delete(store.results, attemptID)
150  	store.resultsMtx.Unlock()
151  
152  	return nil
153  }
154  
155  // subscribeResult is used to get the HTLC attempt result for the given attempt
156  // ID.  It returns a channel on which the result will be delivered when ready.
157  func (store *networkResultStore) subscribeResult(attemptID uint64) (
158  	<-chan *networkResult, error) {
159  
160  	// We get a mutex for this payment ID. This is needed to ensure
161  	// consistency between the database state and the subscribers in case
162  	// of concurrent calls.
163  	store.attemptIDMtx.Lock(attemptID)
164  	defer store.attemptIDMtx.Unlock(attemptID)
165  
166  	log.Debugf("Subscribing to result for attemptID=%v", attemptID)
167  
168  	var (
169  		result     *networkResult
170  		resultChan = make(chan *networkResult, 1)
171  	)
172  
173  	err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
174  		var err error
175  		result, err = fetchResult(tx, attemptID)
176  		switch {
177  
178  		// Result not yet available, we will notify once a result is
179  		// available.
180  		case err == ErrPaymentIDNotFound:
181  			return nil
182  
183  		case err != nil:
184  			return err
185  
186  		// The result was found, and will be returned immediately.
187  		default:
188  			return nil
189  		}
190  	}, func() {
191  		result = nil
192  	})
193  	if err != nil {
194  		return nil, err
195  	}
196  
197  	// If the result was found, we can send it on the result channel
198  	// imemdiately.
199  	if result != nil {
200  		resultChan <- result
201  		return resultChan, nil
202  	}
203  
204  	// Otherwise we store the result channel for when the result is
205  	// available.
206  	store.resultsMtx.Lock()
207  	store.results[attemptID] = append(
208  		store.results[attemptID], resultChan,
209  	)
210  	store.resultsMtx.Unlock()
211  
212  	return resultChan, nil
213  }
214  
215  // getResult attempts to immediately fetch the result for the given pid from
216  // the store. If no result is available, ErrPaymentIDNotFound is returned.
217  func (store *networkResultStore) getResult(pid uint64) (
218  	*networkResult, error) {
219  
220  	var result *networkResult
221  	err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
222  		var err error
223  		result, err = fetchResult(tx, pid)
224  		return err
225  	}, func() {
226  		result = nil
227  	})
228  	if err != nil {
229  		return nil, err
230  	}
231  
232  	return result, nil
233  }
234  
235  func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) {
236  	var attemptIDBytes [8]byte
237  	binary.BigEndian.PutUint64(attemptIDBytes[:], pid)
238  
239  	networkResults := tx.ReadBucket(networkResultStoreBucketKey)
240  	if networkResults == nil {
241  		return nil, ErrPaymentIDNotFound
242  	}
243  
244  	// Check whether a result is already available.
245  	resultBytes := networkResults.Get(attemptIDBytes[:])
246  	if resultBytes == nil {
247  		return nil, ErrPaymentIDNotFound
248  	}
249  
250  	// Decode the result we found.
251  	r := bytes.NewReader(resultBytes)
252  
253  	return deserializeNetworkResult(r)
254  }
255  
256  // cleanStore removes all entries from the store, except the payment IDs given.
257  // NOTE: Since every result not listed in the keep map will be deleted, care
258  // should be taken to ensure no new payment attempts are being made
259  // concurrently while this process is ongoing, as its result might end up being
260  // deleted.
261  func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error {
262  	return kvdb.Update(store.backend, func(tx kvdb.RwTx) error {
263  		networkResults, err := tx.CreateTopLevelBucket(
264  			networkResultStoreBucketKey,
265  		)
266  		if err != nil {
267  			return err
268  		}
269  
270  		// Iterate through the bucket, deleting all items not in the
271  		// keep map.
272  		var toClean [][]byte
273  		if err := networkResults.ForEach(func(k, _ []byte) error {
274  			pid := binary.BigEndian.Uint64(k)
275  			if _, ok := keep[pid]; ok {
276  				return nil
277  			}
278  
279  			toClean = append(toClean, k)
280  			return nil
281  		}); err != nil {
282  			return err
283  		}
284  
285  		for _, k := range toClean {
286  			err := networkResults.Delete(k)
287  			if err != nil {
288  				return err
289  			}
290  		}
291  
292  		if len(toClean) > 0 {
293  			log.Infof("Removed %d stale entries from network "+
294  				"result store", len(toClean))
295  		}
296  
297  		return nil
298  	}, func() {})
299  }