/ peer / ping_manager.go
ping_manager.go
  1  package peer
  2  
  3  import (
  4  	"errors"
  5  	"fmt"
  6  	"sync"
  7  	"sync/atomic"
  8  	"time"
  9  
 10  	"github.com/lightningnetwork/lnd/fn/v2"
 11  	"github.com/lightningnetwork/lnd/lnwire"
 12  )
 13  
 14  // PingManagerConfig is a structure containing various parameters that govern
 15  // how the PingManager behaves.
 16  type PingManagerConfig struct {
 17  	// NewPingPayload is a closure that returns the payload to be packaged
 18  	// in the Ping message.
 19  	NewPingPayload func() []byte
 20  
 21  	// NewPongSize is a closure that returns a random value between
 22  	// [0, lnwire.MaxPongBytes]. This random value helps to more effectively
 23  	// pair Pong messages with Ping.
 24  	NewPongSize func() uint16
 25  
 26  	// IntervalDuration is the Duration between attempted pings.
 27  	IntervalDuration time.Duration
 28  
 29  	// TimeoutDuration is the Duration we wait before declaring a ping
 30  	// attempt failed.
 31  	TimeoutDuration time.Duration
 32  
 33  	// SendPing is a closure that is responsible for sending the Ping
 34  	// message out to our peer
 35  	SendPing func(ping *lnwire.Ping)
 36  
 37  	// OnPongFailure is a closure that is responsible for executing the
 38  	// logic when a Pong message is either late or does not match our
 39  	// expectations for that Pong
 40  	OnPongFailure func(failureReason error, timeWaitedForPong time.Duration,
 41  		lastKnownRTT time.Duration)
 42  }
 43  
 44  // PingManager is a structure that is designed to manage the internal state
 45  // of the ping pong lifecycle with the remote peer. We assume there is only one
 46  // ping outstanding at once.
 47  //
 48  // NOTE: This structure MUST be initialized with NewPingManager.
 49  type PingManager struct {
 50  	cfg *PingManagerConfig
 51  
 52  	// pingTime is a rough estimate of the RTT (round-trip-time) between us
 53  	// and the connected peer.
 54  	// To be used atomically.
 55  	// TODO(roasbeef): also use a WMA or EMA?
 56  	pingTime atomic.Pointer[time.Duration]
 57  
 58  	// pingLastSend is the time when we sent our last ping message.
 59  	// To be used atomically.
 60  	pingLastSend *time.Time
 61  
 62  	// outstandingPongSize is the current size of the requested pong
 63  	// payload.  This value can only validly range from [0,65531]. Any
 64  	// value < 0 is interpreted as if there is no outstanding ping message.
 65  	outstandingPongSize int32
 66  
 67  	// pingTicker is a pointer to a Ticker that fires on every ping
 68  	// interval.
 69  	pingTicker *time.Ticker
 70  
 71  	// pingTimeout is a Timer that will fire when we want to time out a
 72  	// ping
 73  	pingTimeout *time.Timer
 74  
 75  	// pongChan is the channel on which the pingManager will write Pong
 76  	// messages it is evaluating
 77  	pongChan chan *lnwire.Pong
 78  
 79  	started sync.Once
 80  	stopped sync.Once
 81  
 82  	quit chan struct{}
 83  	wg   sync.WaitGroup
 84  }
 85  
 86  // NewPingManager constructs a pingManager in a valid state. It must be started
 87  // before it does anything useful, though.
 88  func NewPingManager(cfg *PingManagerConfig) *PingManager {
 89  	m := PingManager{
 90  		cfg:                 cfg,
 91  		outstandingPongSize: -1,
 92  		pongChan:            make(chan *lnwire.Pong, 1),
 93  		quit:                make(chan struct{}),
 94  	}
 95  
 96  	return &m
 97  }
 98  
 99  // Start launches the primary goroutine that is owned by the pingManager.
100  func (m *PingManager) Start() error {
101  	var err error
102  	m.started.Do(func() {
103  		m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
104  		m.pingTimeout = time.NewTimer(0)
105  
106  		m.wg.Add(1)
107  		go m.pingHandler()
108  	})
109  
110  	return err
111  }
112  
113  // getLastRTT safely retrieves the last known RTT, returning 0 if none exists.
114  func (m *PingManager) getLastRTT() time.Duration {
115  	rttPtr := m.pingTime.Load()
116  	if rttPtr == nil {
117  		return 0
118  	}
119  
120  	return *rttPtr
121  }
122  
123  // pendingPingWait calculates the time waited since the last ping was sent. If
124  // no ping time is reported, None is returned. defaultDuration.
125  func (m *PingManager) pendingPingWait() fn.Option[time.Duration] {
126  	if m.pingLastSend != nil {
127  		return fn.Some(time.Since(*m.pingLastSend))
128  	}
129  
130  	return fn.None[time.Duration]()
131  }
132  
133  // pingHandler is the main goroutine responsible for enforcing the ping/pong
134  // protocol.
135  func (m *PingManager) pingHandler() {
136  	defer m.wg.Done()
137  	defer m.pingTimeout.Stop()
138  
139  	// Ensure that the pingTimeout channel is empty.
140  	if !m.pingTimeout.Stop() {
141  		<-m.pingTimeout.C
142  	}
143  
144  	// Because we don't know if the OnPingFailure callback actually
145  	// disconnects a peer (dependent on user config), we should never return
146  	// from this loop unless the ping manager is stopped explicitly (which
147  	// happens on disconnect).
148  	for {
149  		select {
150  		case <-m.pingTicker.C:
151  			// If this occurs it means that the new ping cycle has
152  			// begun while there is still an outstanding ping
153  			// awaiting a pong response.  This should never occur,
154  			// but if it does, it implies a timeout.
155  			if m.outstandingPongSize >= 0 {
156  				// Ping was outstanding, meaning it timed out by
157  				// the arrival of the next ping interval.
158  				timeWaited := m.pendingPingWait().UnwrapOr(
159  					m.cfg.IntervalDuration,
160  				)
161  				lastRTT := m.getLastRTT()
162  
163  				m.cfg.OnPongFailure(
164  					errors.New("ping timed "+
165  						"out by next interval"),
166  					timeWaited, lastRTT,
167  				)
168  
169  				m.resetPingState()
170  			}
171  
172  			pongSize := m.cfg.NewPongSize()
173  			ping := &lnwire.Ping{
174  				NumPongBytes: pongSize,
175  				PaddingBytes: m.cfg.NewPingPayload(),
176  			}
177  
178  			// Set up our bookkeeping for the new Ping.
179  			if err := m.setPingState(pongSize); err != nil {
180  				// This is an internal error related to timer
181  				// reset. Pass it to OnPongFailure as it's
182  				// critical. Current and last RTT are not
183  				// directly applicable here.
184  				m.cfg.OnPongFailure(err, 0, 0)
185  
186  				m.resetPingState()
187  
188  				continue
189  			}
190  
191  			m.cfg.SendPing(ping)
192  
193  		case <-m.pingTimeout.C:
194  			timeWaited := m.pendingPingWait().UnwrapOr(
195  				m.cfg.TimeoutDuration,
196  			)
197  			lastRTT := m.getLastRTT()
198  
199  			m.cfg.OnPongFailure(
200  				errors.New("timeout while waiting for "+
201  					"pong response"),
202  				timeWaited, lastRTT,
203  			)
204  
205  			m.resetPingState()
206  
207  		case pong := <-m.pongChan:
208  			pongSize := int32(len(pong.PongBytes))
209  
210  			// Save off values we are about to override when we call
211  			// resetPingState.
212  			expected := m.outstandingPongSize
213  			lastPingTime := m.pingLastSend
214  
215  			// This is an unexpected pong, we'll continue.
216  			if lastPingTime == nil {
217  				continue
218  			}
219  
220  			actualRTT := time.Since(*lastPingTime)
221  
222  			// If the pong we receive doesn't match the ping we sent
223  			// out, then we fail out.
224  			if pongSize != expected {
225  				e := fmt.Errorf("pong response does not match "+
226  					"expected size. Expected: %d, Got: %d",
227  					expected, pongSize)
228  
229  				lastRTT := m.getLastRTT()
230  				m.cfg.OnPongFailure(e, actualRTT, lastRTT)
231  
232  				m.resetPingState()
233  
234  				continue
235  			}
236  
237  			// Pong is good, update RTT and reset state.
238  			m.pingTime.Store(&actualRTT)
239  			m.resetPingState()
240  
241  		case <-m.quit:
242  			return
243  		}
244  	}
245  }
246  
247  // Stop interrupts the goroutines that the PingManager owns.
248  func (m *PingManager) Stop() {
249  	if m.pingTicker == nil {
250  		return
251  	}
252  
253  	m.stopped.Do(func() {
254  		close(m.quit)
255  		m.wg.Wait()
256  
257  		m.pingTicker.Stop()
258  		m.pingTimeout.Stop()
259  	})
260  }
261  
262  // setPingState is a private method to keep track of all of the fields we need
263  // to set when we send out a Ping.
264  func (m *PingManager) setPingState(pongSize uint16) error {
265  	t := time.Now()
266  	m.pingLastSend = &t
267  	m.outstandingPongSize = int32(pongSize)
268  	if m.pingTimeout.Reset(m.cfg.TimeoutDuration) {
269  		return fmt.Errorf(
270  			"impossible: ping timeout reset when already active",
271  		)
272  	}
273  
274  	return nil
275  }
276  
277  // resetPingState is a private method that resets all of the bookkeeping that
278  // is tracking a currently outstanding Ping.
279  func (m *PingManager) resetPingState() {
280  	m.pingLastSend = nil
281  	m.outstandingPongSize = -1
282  
283  	if !m.pingTimeout.Stop() {
284  		select {
285  		case <-m.pingTimeout.C:
286  		default:
287  		}
288  	}
289  }
290  
291  // GetPingTimeMicroSeconds reports back the RTT calculated by the pingManager.
292  func (m *PingManager) GetPingTimeMicroSeconds() int64 {
293  	rtt := m.pingTime.Load()
294  
295  	if rtt == nil {
296  		return -1
297  	}
298  
299  	return rtt.Microseconds()
300  }
301  
302  // ReceivedPong is called to evaluate a Pong message against the expectations
303  // we have for it. It will cause the PingManager to invoke the supplied
304  // OnPongFailure function if the Pong argument supplied violates expectations.
305  func (m *PingManager) ReceivedPong(msg *lnwire.Pong) {
306  	select {
307  	case m.pongChan <- msg:
308  	case <-m.quit:
309  	}
310  }