future.go
1 package actor 2 3 import ( 4 "context" 5 "sync" 6 "sync/atomic" 7 8 "github.com/lightningnetwork/lnd/fn/v2" 9 ) 10 11 // promiseImpl is a structure that can be used to complete a Future. It provides 12 // methods to set the result of an asynchronous operation and to obtain the 13 // Future interface for consumers. 14 // The promiseImpl itself is not typically exposed directly to consumers of the 15 // future's result; they interact with the Future interface. 16 type promiseImpl[T any] struct { 17 fut *futureImpl[T] 18 } 19 20 // NewPromise creates a new Promise. The associated Future, which consumers can 21 // use to await the result, can be obtained via the Future() method. The Future 22 // is completed by calling the Complete() method on this Promise. 23 func NewPromise[T any]() Promise[T] { 24 return &promiseImpl[T]{ 25 fut: &futureImpl[T]{ 26 // done is a channel that will be closed when the future 27 // is completed. 28 done: make(chan struct{}), 29 }, 30 } 31 } 32 33 // Future returns the Future interface associated with this Promise. Consumers 34 // can use this to Await the result or register callbacks. 35 func (p *promiseImpl[T]) Future() Future[T] { 36 return p.fut 37 } 38 39 // Complete attempts to set the result of the future. It returns true if this 40 // call successfully set the result (i.e., it was the first to complete it), 41 // and false if the future had already been completed. This ensures that a 42 // future can only be completed once. The completion involves storing the result 43 // and signaling any goroutines waiting on the future's done channel. 44 func (p *promiseImpl[T]) Complete(result fn.Result[T]) bool { 45 var success bool 46 p.fut.completeOnce.Do(func() { 47 p.fut.resultCache.Store(&result) 48 close(p.fut.done) 49 50 success = true 51 }) 52 53 return success 54 } 55 56 // futureImpl is the concrete implementation of the Future interface. It manages 57 // the state of an asynchronous computation's result. 58 type futureImpl[T any] struct { 59 // resultCache stores the fn.Result[T] after the future is completed. 60 // It's of type atomic.Pointer to allow lock-free reads after completion 61 // with improved type safety over atomic.Value. 62 resultCache atomic.Pointer[fn.Result[T]] 63 64 // done is closed once the future is completed, signaling any waiting 65 // Await calls. 66 done chan struct{} 67 68 // completeOnce ensures that the logic to set the result and close the 69 // done channel is executed only once. 70 completeOnce sync.Once 71 } 72 73 // Await blocks until the result is available or the passed context is 74 // cancelled. If the future is already completed, it returns the result 75 // immediately. Otherwise, it waits for either the future's completion or the 76 // context's cancellation. 77 func (f *futureImpl[T]) Await(ctx context.Context) fn.Result[T] { 78 // First, try a non-blocking load from the cache. If the future is 79 // already completed, this will return the result directly. 80 if resPtr := f.resultCache.Load(); resPtr != nil { 81 return *resPtr 82 } 83 84 // Wait for either the future to be done or the context to be cancelled. 85 select { 86 case <-f.done: 87 // The future has been completed. Load the result from the 88 // cache. It must be present now. Load and dereference. 89 // This load is safe because the 'done' channel is closed only 90 // after the resultCache is written (ensured by completeOnce). 91 resPtr := f.resultCache.Load() 92 93 // resPtr should not be nil here as <-f.done was signaled. 94 return *resPtr 95 96 case <-ctx.Done(): 97 // The waiting context was cancelled before the future completed. 98 return fn.Err[T](ctx.Err()) 99 } 100 } 101 102 // ThenApply registers a function to transform the result of a future. The 103 // original future is not modified; a new Future instance representing the 104 // transformed result is returned. Once the original future completes 105 // successfully, the provided transformation function (fApply) is called with 106 // the result. The transformation is applied asynchronously in a new goroutine. 107 // If the passed context is cancelled while waiting for the 108 // original future to complete, the returned future will yield the context's 109 // error. 110 func (f *futureImpl[T]) ThenApply(ctx context.Context, 111 fApply func(T) T) Future[T] { 112 113 // Create a new promise for the transformed result. 114 transformedPromise := NewPromise[T]() 115 116 go func() { 117 // Await the original future's result, respecting the passed 118 // context for cancellation. 119 originalResult := f.Await(ctx) 120 121 // If the original future completed with an error (or Await was 122 // cancelled by its context), complete the transformed future 123 // with the same error. 124 // This also handles the case where originalResult.Await(ctx) 125 // itself returned ctx.Err(). 126 if originalResult.IsErr() { 127 transformedPromise.Complete(originalResult) 128 return 129 } 130 131 // Otherwise, the original future completed successfully. Apply the 132 // transformation function to its result. 133 originalResult.WhenOk(func(res T) { 134 newValue := fApply(res) 135 transformedPromise.Complete(fn.Ok(newValue)) 136 }) 137 }() 138 139 return transformedPromise.Future() 140 } 141 142 // OnComplete registers a function to be called when the result is ready. If the 143 // passed context is cancelled before the future completes, the callback 144 // function (cFunc) will be invoked with the context's error. The callback is 145 // executed in a new goroutine, so it does not block the completion path of the 146 // original future. 147 func (f *futureImpl[T]) OnComplete(ctx context.Context, 148 cFunc func(fn.Result[T])) { 149 150 go func() { 151 // Await the original future's result, respecting the passed 152 // context for cancellation. 153 result := f.Await(ctx) 154 155 // Call the callback function with the result. 156 cFunc(result) 157 }() 158 }