/ htlcswitch / payment_result.go
payment_result.go
1 package htlcswitch 2 3 import ( 4 "bytes" 5 "encoding/binary" 6 "errors" 7 "io" 8 "sync" 9 10 "github.com/lightningnetwork/lnd/channeldb" 11 "github.com/lightningnetwork/lnd/kvdb" 12 "github.com/lightningnetwork/lnd/lnwire" 13 "github.com/lightningnetwork/lnd/multimutex" 14 ) 15 16 var ( 17 18 // networkResultStoreBucketKey is used for the root level bucket that 19 // stores the network result for each payment ID. 20 networkResultStoreBucketKey = []byte("network-result-store-bucket") 21 22 // ErrPaymentIDNotFound is an error returned if the given paymentID is 23 // not found. 24 ErrPaymentIDNotFound = errors.New("paymentID not found") 25 26 // ErrPaymentIDAlreadyExists is returned if we try to write a pending 27 // payment whose paymentID already exists. 28 ErrPaymentIDAlreadyExists = errors.New("paymentID already exists") 29 ) 30 31 // PaymentResult wraps a decoded result received from the network after a 32 // payment attempt was made. This is what is eventually handed to the router 33 // for processing. 34 type PaymentResult struct { 35 // Preimage is set by the switch in case a sent HTLC was settled. 36 Preimage [32]byte 37 38 // Error is non-nil in case a HTLC send failed, and the HTLC is now 39 // irrevocably canceled. If the payment failed during forwarding, this 40 // error will be a *ForwardingError. 41 Error error 42 } 43 44 // networkResult is the raw result received from the network after a payment 45 // attempt has been made. Since the switch doesn't always have the necessary 46 // data to decode the raw message, we store it together with some meta data, 47 // and decode it when the router query for the final result. 48 type networkResult struct { 49 // msg is the received result. This should be of type UpdateFulfillHTLC 50 // or UpdateFailHTLC. 51 msg lnwire.Message 52 53 // unencrypted indicates whether the failure encoded in the message is 54 // unencrypted, and hence doesn't need to be decrypted. 55 unencrypted bool 56 57 // isResolution indicates whether this is a resolution message, in 58 // which the failure reason might not be included. 59 isResolution bool 60 } 61 62 // serializeNetworkResult serializes the networkResult. 63 func serializeNetworkResult(w io.Writer, n *networkResult) error { 64 return channeldb.WriteElements(w, n.msg, n.unencrypted, n.isResolution) 65 } 66 67 // deserializeNetworkResult deserializes the networkResult. 68 func deserializeNetworkResult(r io.Reader) (*networkResult, error) { 69 n := &networkResult{} 70 71 if err := channeldb.ReadElements(r, 72 &n.msg, &n.unencrypted, &n.isResolution, 73 ); err != nil { 74 return nil, err 75 } 76 77 return n, nil 78 } 79 80 // networkResultStore is a persistent store that stores any results of HTLCs in 81 // flight on the network. Since payment results are inherently asynchronous, it 82 // is used as a common access point for senders of HTLCs, to know when a result 83 // is back. The Switch will checkpoint any received result to the store, and 84 // the store will keep results and notify the callers about them. 85 type networkResultStore struct { 86 backend kvdb.Backend 87 88 // results is a map from paymentIDs to channels where subscribers to 89 // payment results will be notified. 90 results map[uint64][]chan *networkResult 91 resultsMtx sync.Mutex 92 93 // attemptIDMtx is a multimutex used to make sure the database and 94 // result subscribers map is consistent for each attempt ID in case of 95 // concurrent callers. 96 attemptIDMtx *multimutex.Mutex[uint64] 97 } 98 99 func newNetworkResultStore(db kvdb.Backend) *networkResultStore { 100 return &networkResultStore{ 101 backend: db, 102 results: make(map[uint64][]chan *networkResult), 103 attemptIDMtx: multimutex.NewMutex[uint64](), 104 } 105 } 106 107 // storeResult stores the networkResult for the given attemptID, and notifies 108 // any subscribers. 109 func (store *networkResultStore) storeResult(attemptID uint64, 110 result *networkResult) error { 111 112 // We get a mutex for this attempt ID. This is needed to ensure 113 // consistency between the database state and the subscribers in case 114 // of concurrent calls. 115 store.attemptIDMtx.Lock(attemptID) 116 defer store.attemptIDMtx.Unlock(attemptID) 117 118 log.Debugf("Storing result for attemptID=%v", attemptID) 119 120 // Serialize the payment result. 121 var b bytes.Buffer 122 if err := serializeNetworkResult(&b, result); err != nil { 123 return err 124 } 125 126 var attemptIDBytes [8]byte 127 binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID) 128 129 err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error { 130 networkResults, err := tx.CreateTopLevelBucket( 131 networkResultStoreBucketKey, 132 ) 133 if err != nil { 134 return err 135 } 136 137 return networkResults.Put(attemptIDBytes[:], b.Bytes()) 138 }) 139 if err != nil { 140 return err 141 } 142 143 // Now that the result is stored in the database, we can notify any 144 // active subscribers. 145 store.resultsMtx.Lock() 146 for _, res := range store.results[attemptID] { 147 res <- result 148 } 149 delete(store.results, attemptID) 150 store.resultsMtx.Unlock() 151 152 return nil 153 } 154 155 // subscribeResult is used to get the HTLC attempt result for the given attempt 156 // ID. It returns a channel on which the result will be delivered when ready. 157 func (store *networkResultStore) subscribeResult(attemptID uint64) ( 158 <-chan *networkResult, error) { 159 160 // We get a mutex for this payment ID. This is needed to ensure 161 // consistency between the database state and the subscribers in case 162 // of concurrent calls. 163 store.attemptIDMtx.Lock(attemptID) 164 defer store.attemptIDMtx.Unlock(attemptID) 165 166 log.Debugf("Subscribing to result for attemptID=%v", attemptID) 167 168 var ( 169 result *networkResult 170 resultChan = make(chan *networkResult, 1) 171 ) 172 173 err := kvdb.View(store.backend, func(tx kvdb.RTx) error { 174 var err error 175 result, err = fetchResult(tx, attemptID) 176 switch { 177 178 // Result not yet available, we will notify once a result is 179 // available. 180 case err == ErrPaymentIDNotFound: 181 return nil 182 183 case err != nil: 184 return err 185 186 // The result was found, and will be returned immediately. 187 default: 188 return nil 189 } 190 }, func() { 191 result = nil 192 }) 193 if err != nil { 194 return nil, err 195 } 196 197 // If the result was found, we can send it on the result channel 198 // imemdiately. 199 if result != nil { 200 resultChan <- result 201 return resultChan, nil 202 } 203 204 // Otherwise we store the result channel for when the result is 205 // available. 206 store.resultsMtx.Lock() 207 store.results[attemptID] = append( 208 store.results[attemptID], resultChan, 209 ) 210 store.resultsMtx.Unlock() 211 212 return resultChan, nil 213 } 214 215 // getResult attempts to immediately fetch the result for the given pid from 216 // the store. If no result is available, ErrPaymentIDNotFound is returned. 217 func (store *networkResultStore) getResult(pid uint64) ( 218 *networkResult, error) { 219 220 var result *networkResult 221 err := kvdb.View(store.backend, func(tx kvdb.RTx) error { 222 var err error 223 result, err = fetchResult(tx, pid) 224 return err 225 }, func() { 226 result = nil 227 }) 228 if err != nil { 229 return nil, err 230 } 231 232 return result, nil 233 } 234 235 func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) { 236 var attemptIDBytes [8]byte 237 binary.BigEndian.PutUint64(attemptIDBytes[:], pid) 238 239 networkResults := tx.ReadBucket(networkResultStoreBucketKey) 240 if networkResults == nil { 241 return nil, ErrPaymentIDNotFound 242 } 243 244 // Check whether a result is already available. 245 resultBytes := networkResults.Get(attemptIDBytes[:]) 246 if resultBytes == nil { 247 return nil, ErrPaymentIDNotFound 248 } 249 250 // Decode the result we found. 251 r := bytes.NewReader(resultBytes) 252 253 return deserializeNetworkResult(r) 254 } 255 256 // cleanStore removes all entries from the store, except the payment IDs given. 257 // NOTE: Since every result not listed in the keep map will be deleted, care 258 // should be taken to ensure no new payment attempts are being made 259 // concurrently while this process is ongoing, as its result might end up being 260 // deleted. 261 func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error { 262 return kvdb.Update(store.backend, func(tx kvdb.RwTx) error { 263 networkResults, err := tx.CreateTopLevelBucket( 264 networkResultStoreBucketKey, 265 ) 266 if err != nil { 267 return err 268 } 269 270 // Iterate through the bucket, deleting all items not in the 271 // keep map. 272 var toClean [][]byte 273 if err := networkResults.ForEach(func(k, _ []byte) error { 274 pid := binary.BigEndian.Uint64(k) 275 if _, ok := keep[pid]; ok { 276 return nil 277 } 278 279 toClean = append(toClean, k) 280 return nil 281 }); err != nil { 282 return err 283 } 284 285 for _, k := range toClean { 286 err := networkResults.Delete(k) 287 if err != nil { 288 return err 289 } 290 } 291 292 if len(toClean) > 0 { 293 log.Infof("Removed %d stale entries from network "+ 294 "result store", len(toClean)) 295 } 296 297 return nil 298 }, func() {}) 299 }