/ actor / future.go
future.go
  1  package actor
  2  
  3  import (
  4  	"context"
  5  	"sync"
  6  	"sync/atomic"
  7  
  8  	"github.com/lightningnetwork/lnd/fn/v2"
  9  )
 10  
 11  // promiseImpl is a structure that can be used to complete a Future. It provides
 12  // methods to set the result of an asynchronous operation and to obtain the
 13  // Future interface for consumers.
 14  // The promiseImpl itself is not typically exposed directly to consumers of the
 15  // future's result; they interact with the Future interface.
 16  type promiseImpl[T any] struct {
 17  	fut *futureImpl[T]
 18  }
 19  
 20  // NewPromise creates a new Promise. The associated Future, which consumers can
 21  // use to await the result, can be obtained via the Future() method. The Future
 22  // is completed by calling the Complete() method on this Promise.
 23  func NewPromise[T any]() Promise[T] {
 24  	return &promiseImpl[T]{
 25  		fut: &futureImpl[T]{
 26  			// done is a channel that will be closed when the future
 27  			// is completed.
 28  			done: make(chan struct{}),
 29  		},
 30  	}
 31  }
 32  
 33  // Future returns the Future interface associated with this Promise. Consumers
 34  // can use this to Await the result or register callbacks.
 35  func (p *promiseImpl[T]) Future() Future[T] {
 36  	return p.fut
 37  }
 38  
 39  // Complete attempts to set the result of the future. It returns true if this
 40  // call successfully set the result (i.e., it was the first to complete it),
 41  // and false if the future had already been completed. This ensures that a
 42  // future can only be completed once. The completion involves storing the result
 43  // and signaling any goroutines waiting on the future's done channel.
 44  func (p *promiseImpl[T]) Complete(result fn.Result[T]) bool {
 45  	var success bool
 46  	p.fut.completeOnce.Do(func() {
 47  		p.fut.resultCache.Store(&result)
 48  		close(p.fut.done)
 49  
 50  		success = true
 51  	})
 52  
 53  	return success
 54  }
 55  
 56  // futureImpl is the concrete implementation of the Future interface. It manages
 57  // the state of an asynchronous computation's result.
 58  type futureImpl[T any] struct {
 59  	// resultCache stores the fn.Result[T] after the future is completed.
 60  	// It's of type atomic.Pointer to allow lock-free reads after completion
 61  	// with improved type safety over atomic.Value.
 62  	resultCache atomic.Pointer[fn.Result[T]]
 63  
 64  	// done is closed once the future is completed, signaling any waiting
 65  	// Await calls.
 66  	done chan struct{}
 67  
 68  	// completeOnce ensures that the logic to set the result and close the
 69  	// done channel is executed only once.
 70  	completeOnce sync.Once
 71  }
 72  
 73  // Await blocks until the result is available or the passed context is
 74  // cancelled. If the future is already completed, it returns the result
 75  // immediately. Otherwise, it waits for either the future's completion or the
 76  // context's cancellation.
 77  func (f *futureImpl[T]) Await(ctx context.Context) fn.Result[T] {
 78  	// First, try a non-blocking load from the cache. If the future is
 79  	// already completed, this will return the result directly.
 80  	if resPtr := f.resultCache.Load(); resPtr != nil {
 81  		return *resPtr
 82  	}
 83  
 84  	// Wait for either the future to be done or the context to be cancelled.
 85  	select {
 86  	case <-f.done:
 87  		// The future has been completed. Load the result from the
 88  		// cache. It must be present now. Load and dereference.
 89  		// This load is safe because the 'done' channel is closed only
 90  		// after the resultCache is written (ensured by completeOnce).
 91  		resPtr := f.resultCache.Load()
 92  
 93  		// resPtr should not be nil here as <-f.done was signaled.
 94  		return *resPtr
 95  
 96  	case <-ctx.Done():
 97  		// The waiting context was cancelled before the future completed.
 98  		return fn.Err[T](ctx.Err())
 99  	}
100  }
101  
102  // ThenApply registers a function to transform the result of a future. The
103  // original future is not modified; a new Future instance representing the
104  // transformed result is returned. Once the original future completes
105  // successfully, the provided transformation function (fApply) is called with
106  // the result. The transformation is applied asynchronously in a new goroutine.
107  // If the passed context is cancelled while waiting for the
108  // original future to complete, the returned future will yield the context's
109  // error.
110  func (f *futureImpl[T]) ThenApply(ctx context.Context,
111  	fApply func(T) T) Future[T] {
112  
113  	// Create a new promise for the transformed result.
114  	transformedPromise := NewPromise[T]()
115  
116  	go func() {
117  		// Await the original future's result, respecting the passed
118  		// context for cancellation.
119  		originalResult := f.Await(ctx)
120  
121  		// If the original future completed with an error (or Await was
122  		// cancelled by its context), complete the transformed future
123  		// with the same error.
124  		// This also handles the case where originalResult.Await(ctx)
125  		// itself returned ctx.Err().
126  		if originalResult.IsErr() {
127  			transformedPromise.Complete(originalResult)
128  			return
129  		}
130  
131  		// Otherwise, the original future completed successfully. Apply the
132  		// transformation function to its result.
133  		originalResult.WhenOk(func(res T) {
134  			newValue := fApply(res)
135  			transformedPromise.Complete(fn.Ok(newValue))
136  		})
137  	}()
138  
139  	return transformedPromise.Future()
140  }
141  
142  // OnComplete registers a function to be called when the result is ready. If the
143  // passed context is cancelled before the future completes, the callback
144  // function (cFunc) will be invoked with the context's error. The callback is
145  // executed in a new goroutine, so it does not block the completion path of the
146  // original future.
147  func (f *futureImpl[T]) OnComplete(ctx context.Context,
148  	cFunc func(fn.Result[T])) {
149  
150  	go func() {
151  		// Await the original future's result, respecting the passed
152  		// context for cancellation.
153  		result := f.Await(ctx)
154  
155  		// Call the callback function with the result.
156  		cFunc(result)
157  	}()
158  }