ping_manager.go
1 package peer 2 3 import ( 4 "errors" 5 "fmt" 6 "sync" 7 "sync/atomic" 8 "time" 9 10 "github.com/lightningnetwork/lnd/fn/v2" 11 "github.com/lightningnetwork/lnd/lnwire" 12 ) 13 14 // PingManagerConfig is a structure containing various parameters that govern 15 // how the PingManager behaves. 16 type PingManagerConfig struct { 17 // NewPingPayload is a closure that returns the payload to be packaged 18 // in the Ping message. 19 NewPingPayload func() []byte 20 21 // NewPongSize is a closure that returns a random value between 22 // [0, lnwire.MaxPongBytes]. This random value helps to more effectively 23 // pair Pong messages with Ping. 24 NewPongSize func() uint16 25 26 // IntervalDuration is the Duration between attempted pings. 27 IntervalDuration time.Duration 28 29 // TimeoutDuration is the Duration we wait before declaring a ping 30 // attempt failed. 31 TimeoutDuration time.Duration 32 33 // SendPing is a closure that is responsible for sending the Ping 34 // message out to our peer 35 SendPing func(ping *lnwire.Ping) 36 37 // OnPongFailure is a closure that is responsible for executing the 38 // logic when a Pong message is either late or does not match our 39 // expectations for that Pong 40 OnPongFailure func(failureReason error, timeWaitedForPong time.Duration, 41 lastKnownRTT time.Duration) 42 } 43 44 // PingManager is a structure that is designed to manage the internal state 45 // of the ping pong lifecycle with the remote peer. We assume there is only one 46 // ping outstanding at once. 47 // 48 // NOTE: This structure MUST be initialized with NewPingManager. 49 type PingManager struct { 50 cfg *PingManagerConfig 51 52 // pingTime is a rough estimate of the RTT (round-trip-time) between us 53 // and the connected peer. 54 // To be used atomically. 55 // TODO(roasbeef): also use a WMA or EMA? 56 pingTime atomic.Pointer[time.Duration] 57 58 // pingLastSend is the time when we sent our last ping message. 59 // To be used atomically. 60 pingLastSend *time.Time 61 62 // outstandingPongSize is the current size of the requested pong 63 // payload. This value can only validly range from [0,65531]. Any 64 // value < 0 is interpreted as if there is no outstanding ping message. 65 outstandingPongSize int32 66 67 // pingTicker is a pointer to a Ticker that fires on every ping 68 // interval. 69 pingTicker *time.Ticker 70 71 // pingTimeout is a Timer that will fire when we want to time out a 72 // ping 73 pingTimeout *time.Timer 74 75 // pongChan is the channel on which the pingManager will write Pong 76 // messages it is evaluating 77 pongChan chan *lnwire.Pong 78 79 started sync.Once 80 stopped sync.Once 81 82 quit chan struct{} 83 wg sync.WaitGroup 84 } 85 86 // NewPingManager constructs a pingManager in a valid state. It must be started 87 // before it does anything useful, though. 88 func NewPingManager(cfg *PingManagerConfig) *PingManager { 89 m := PingManager{ 90 cfg: cfg, 91 outstandingPongSize: -1, 92 pongChan: make(chan *lnwire.Pong, 1), 93 quit: make(chan struct{}), 94 } 95 96 return &m 97 } 98 99 // Start launches the primary goroutine that is owned by the pingManager. 100 func (m *PingManager) Start() error { 101 var err error 102 m.started.Do(func() { 103 m.pingTicker = time.NewTicker(m.cfg.IntervalDuration) 104 m.pingTimeout = time.NewTimer(0) 105 106 m.wg.Add(1) 107 go m.pingHandler() 108 }) 109 110 return err 111 } 112 113 // getLastRTT safely retrieves the last known RTT, returning 0 if none exists. 114 func (m *PingManager) getLastRTT() time.Duration { 115 rttPtr := m.pingTime.Load() 116 if rttPtr == nil { 117 return 0 118 } 119 120 return *rttPtr 121 } 122 123 // pendingPingWait calculates the time waited since the last ping was sent. If 124 // no ping time is reported, None is returned. defaultDuration. 125 func (m *PingManager) pendingPingWait() fn.Option[time.Duration] { 126 if m.pingLastSend != nil { 127 return fn.Some(time.Since(*m.pingLastSend)) 128 } 129 130 return fn.None[time.Duration]() 131 } 132 133 // pingHandler is the main goroutine responsible for enforcing the ping/pong 134 // protocol. 135 func (m *PingManager) pingHandler() { 136 defer m.wg.Done() 137 defer m.pingTimeout.Stop() 138 139 // Ensure that the pingTimeout channel is empty. 140 if !m.pingTimeout.Stop() { 141 <-m.pingTimeout.C 142 } 143 144 // Because we don't know if the OnPingFailure callback actually 145 // disconnects a peer (dependent on user config), we should never return 146 // from this loop unless the ping manager is stopped explicitly (which 147 // happens on disconnect). 148 for { 149 select { 150 case <-m.pingTicker.C: 151 // If this occurs it means that the new ping cycle has 152 // begun while there is still an outstanding ping 153 // awaiting a pong response. This should never occur, 154 // but if it does, it implies a timeout. 155 if m.outstandingPongSize >= 0 { 156 // Ping was outstanding, meaning it timed out by 157 // the arrival of the next ping interval. 158 timeWaited := m.pendingPingWait().UnwrapOr( 159 m.cfg.IntervalDuration, 160 ) 161 lastRTT := m.getLastRTT() 162 163 m.cfg.OnPongFailure( 164 errors.New("ping timed "+ 165 "out by next interval"), 166 timeWaited, lastRTT, 167 ) 168 169 m.resetPingState() 170 } 171 172 pongSize := m.cfg.NewPongSize() 173 ping := &lnwire.Ping{ 174 NumPongBytes: pongSize, 175 PaddingBytes: m.cfg.NewPingPayload(), 176 } 177 178 // Set up our bookkeeping for the new Ping. 179 if err := m.setPingState(pongSize); err != nil { 180 // This is an internal error related to timer 181 // reset. Pass it to OnPongFailure as it's 182 // critical. Current and last RTT are not 183 // directly applicable here. 184 m.cfg.OnPongFailure(err, 0, 0) 185 186 m.resetPingState() 187 188 continue 189 } 190 191 m.cfg.SendPing(ping) 192 193 case <-m.pingTimeout.C: 194 timeWaited := m.pendingPingWait().UnwrapOr( 195 m.cfg.TimeoutDuration, 196 ) 197 lastRTT := m.getLastRTT() 198 199 m.cfg.OnPongFailure( 200 errors.New("timeout while waiting for "+ 201 "pong response"), 202 timeWaited, lastRTT, 203 ) 204 205 m.resetPingState() 206 207 case pong := <-m.pongChan: 208 pongSize := int32(len(pong.PongBytes)) 209 210 // Save off values we are about to override when we call 211 // resetPingState. 212 expected := m.outstandingPongSize 213 lastPingTime := m.pingLastSend 214 215 // This is an unexpected pong, we'll continue. 216 if lastPingTime == nil { 217 continue 218 } 219 220 actualRTT := time.Since(*lastPingTime) 221 222 // If the pong we receive doesn't match the ping we sent 223 // out, then we fail out. 224 if pongSize != expected { 225 e := fmt.Errorf("pong response does not match "+ 226 "expected size. Expected: %d, Got: %d", 227 expected, pongSize) 228 229 lastRTT := m.getLastRTT() 230 m.cfg.OnPongFailure(e, actualRTT, lastRTT) 231 232 m.resetPingState() 233 234 continue 235 } 236 237 // Pong is good, update RTT and reset state. 238 m.pingTime.Store(&actualRTT) 239 m.resetPingState() 240 241 case <-m.quit: 242 return 243 } 244 } 245 } 246 247 // Stop interrupts the goroutines that the PingManager owns. 248 func (m *PingManager) Stop() { 249 if m.pingTicker == nil { 250 return 251 } 252 253 m.stopped.Do(func() { 254 close(m.quit) 255 m.wg.Wait() 256 257 m.pingTicker.Stop() 258 m.pingTimeout.Stop() 259 }) 260 } 261 262 // setPingState is a private method to keep track of all of the fields we need 263 // to set when we send out a Ping. 264 func (m *PingManager) setPingState(pongSize uint16) error { 265 t := time.Now() 266 m.pingLastSend = &t 267 m.outstandingPongSize = int32(pongSize) 268 if m.pingTimeout.Reset(m.cfg.TimeoutDuration) { 269 return fmt.Errorf( 270 "impossible: ping timeout reset when already active", 271 ) 272 } 273 274 return nil 275 } 276 277 // resetPingState is a private method that resets all of the bookkeeping that 278 // is tracking a currently outstanding Ping. 279 func (m *PingManager) resetPingState() { 280 m.pingLastSend = nil 281 m.outstandingPongSize = -1 282 283 if !m.pingTimeout.Stop() { 284 select { 285 case <-m.pingTimeout.C: 286 default: 287 } 288 } 289 } 290 291 // GetPingTimeMicroSeconds reports back the RTT calculated by the pingManager. 292 func (m *PingManager) GetPingTimeMicroSeconds() int64 { 293 rtt := m.pingTime.Load() 294 295 if rtt == nil { 296 return -1 297 } 298 299 return rtt.Microseconds() 300 } 301 302 // ReceivedPong is called to evaluate a Pong message against the expectations 303 // we have for it. It will cause the PingManager to invoke the supplied 304 // OnPongFailure function if the Pong argument supplied violates expectations. 305 func (m *PingManager) ReceivedPong(msg *lnwire.Pong) { 306 select { 307 case m.pongChan <- msg: 308 case <-m.quit: 309 } 310 }