interceptor.go
1 package rpcperms 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 "sync" 8 "sync/atomic" 9 10 "github.com/btcsuite/btclog/v2" 11 grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" 12 "github.com/lightningnetwork/lnd/lnrpc" 13 "github.com/lightningnetwork/lnd/macaroons" 14 "github.com/lightningnetwork/lnd/monitoring" 15 "github.com/lightningnetwork/lnd/subscribe" 16 "google.golang.org/grpc" 17 "gopkg.in/macaroon-bakery.v2/bakery" 18 ) 19 20 // rpcState is an enum that we use to keep track of the current RPC service 21 // state. This will transition as we go from startup to unlocking the wallet, 22 // and finally fully active. 23 type rpcState uint8 24 25 const ( 26 // waitingToStart indicates that we're at the beginning of the startup 27 // process. In a cluster environment this may mean that we're waiting to 28 // become the leader in which case RPC calls will be disabled until 29 // this instance has been elected as leader. 30 waitingToStart rpcState = iota 31 32 // walletNotCreated is the starting state if the RPC server is active, 33 // but the wallet is not yet created. In this state we'll only allow 34 // calls to the WalletUnlockerService. 35 walletNotCreated 36 37 // walletLocked indicates the RPC server is active, but the wallet is 38 // locked. In this state we'll only allow calls to the 39 // WalletUnlockerService. 40 walletLocked 41 42 // walletUnlocked means that the wallet has been unlocked, but the full 43 // RPC server is not yet ready. 44 walletUnlocked 45 46 // rpcActive means that the RPC server is ready to accept calls. 47 rpcActive 48 49 // serverActive means that the lnd server is ready to accept calls. 50 serverActive 51 ) 52 53 var ( 54 // ErrWaitingToStart is returned if LND is still waiting to start, 55 // possibly blocked until elected as the leader. 56 ErrWaitingToStart = fmt.Errorf("waiting to start, RPC services not " + 57 "available") 58 59 // ErrNoWallet is returned if the wallet does not exist. 60 ErrNoWallet = fmt.Errorf("wallet not created, create one to enable " + 61 "full RPC access") 62 63 // ErrWalletLocked is returned if the wallet is locked and any service 64 // other than the WalletUnlocker is called. 65 ErrWalletLocked = fmt.Errorf("wallet locked, unlock it to enable " + 66 "full RPC access") 67 68 // ErrWalletUnlocked is returned if the WalletUnlocker service is 69 // called when the wallet already has been unlocked. 70 ErrWalletUnlocked = fmt.Errorf("wallet already unlocked, " + 71 "WalletUnlocker service is no longer available") 72 73 // ErrRPCStarting is returned if the wallet has been unlocked but the 74 // RPC server is not yet ready to accept calls. 75 ErrRPCStarting = fmt.Errorf("the RPC server is in the process of " + 76 "starting up, but not yet ready to accept calls") 77 78 // macaroonWhitelist defines methods that we don't require macaroons to 79 // access. We also allow these methods to be called even if not all 80 // mandatory middlewares are registered yet. If the wallet is locked 81 // then a middleware cannot register itself, creating an impossible 82 // situation. Also, a middleware might want to check the state of lnd 83 // by calling the State service before it registers itself. So we also 84 // need to exclude those calls from the mandatory middleware check. 85 macaroonWhitelist = map[string]struct{}{ 86 // We allow all calls to the WalletUnlocker without macaroons. 87 "/lnrpc.WalletUnlocker/GenSeed": {}, 88 "/lnrpc.WalletUnlocker/InitWallet": {}, 89 "/lnrpc.WalletUnlocker/UnlockWallet": {}, 90 "/lnrpc.WalletUnlocker/ChangePassword": {}, 91 92 // The State service must be available at all times, even 93 // before we can check macaroons, so we whitelist it. 94 "/lnrpc.State/SubscribeState": {}, 95 "/lnrpc.State/GetState": {}, 96 } 97 ) 98 99 // InterceptorChain is a struct that can be added to the running GRPC server, 100 // intercepting API calls. This is useful for logging, enforcing permissions, 101 // supporting middleware etc. The following diagram shows the order of each 102 // interceptor in the chain and when exactly requests/responses are intercepted 103 // and forwarded to external middleware for approval/modification. Middleware in 104 // general can only intercept gRPC requests/responses that are sent by the 105 // client with a macaroon that contains a custom caveat that is supported by one 106 // of the registered middlewares. 107 // 108 // | 109 // | gRPC request from client 110 // | 111 // +---v--------------------------------+ 112 // | InterceptorChain | 113 // +-+----------------------------------+ 114 // | Log Interceptor | 115 // +----------------------------------+ 116 // | RPC State Interceptor | 117 // +----------------------------------+ 118 // | Macaroon Interceptor | 119 // +----------------------------------+--------> +---------------------+ 120 // | RPC Macaroon Middleware Handler |<-------- | External Middleware | 121 // +----------------------------------+ | - modify request | 122 // | Prometheus Interceptor | +---------------------+ 123 // +-+--------------------------------+ 124 // | validated gRPC request from client 125 // +---v--------------------------------+ 126 // | main gRPC server | 127 // +---+--------------------------------+ 128 // | 129 // | original gRPC request to client 130 // | 131 // +---v--------------------------------+--------> +---------------------+ 132 // | RPC Macaroon Middleware Handler |<-------- | External Middleware | 133 // +---+--------------------------------+ | - modify response | 134 // | +---------------------+ 135 // | edited gRPC request to client 136 // v 137 type InterceptorChain struct { 138 // lastRequestID is the ID of the last gRPC request or stream that was 139 // intercepted by the middleware interceptor. 140 // 141 // NOTE: Must be used atomically! 142 lastRequestID uint64 143 144 // Required by the grpc-gateway/v2 library for forward compatibility. 145 lnrpc.UnimplementedStateServer 146 147 started sync.Once 148 stopped sync.Once 149 150 // state is the current RPC state of our RPC server. 151 state rpcState 152 153 // ntfnServer is a subscription server we use to notify clients of the 154 // State service when the state changes. 155 ntfnServer *subscribe.Server 156 157 // noMacaroons should be set true if we don't want to check macaroons. 158 noMacaroons bool 159 160 // svc is the macaroon service used to enforce permissions in case 161 // macaroons are used. 162 svc *macaroons.Service 163 164 // permissionMap is the permissions to enforce if macaroons are used. 165 permissionMap map[string][]bakery.Op 166 167 // rpcsLog is the logger used to log calls to the RPCs intercepted. 168 rpcsLog btclog.Logger 169 170 // registeredMiddleware is a slice of all macaroon permission based RPC 171 // middleware clients that are currently registered. The 172 // registeredMiddlewareNames can be used to find the index of a specific 173 // interceptor within the registeredMiddleware slide using the name of 174 // the interceptor as the key. The reason for using these two separate 175 // structures is so that the order in which interceptors are run is 176 // the same as the order in which they were registered. 177 registeredMiddleware []*MiddlewareHandler 178 179 // registeredMiddlewareNames is a map of registered middleware names 180 // to the index at which they are stored in the registeredMiddleware 181 // map. 182 registeredMiddlewareNames map[string]int 183 184 // mandatoryMiddleware is a list of all middleware that is considered to 185 // be mandatory. If any of them is not registered then all RPC requests 186 // (except for the macaroon white listed methods and the middleware 187 // registration itself) are blocked. This is a security feature to make 188 // sure that requests can't just go through unobserved/unaudited if a 189 // middleware crashes. 190 mandatoryMiddleware []string 191 192 quit chan struct{} 193 sync.RWMutex 194 } 195 196 // A compile time check to ensure that InterceptorChain fully implements the 197 // StateServer gRPC service. 198 var _ lnrpc.StateServer = (*InterceptorChain)(nil) 199 200 // NewInterceptorChain creates a new InterceptorChain. 201 func NewInterceptorChain(log btclog.Logger, noMacaroons bool, 202 mandatoryMiddleware []string) *InterceptorChain { 203 204 return &InterceptorChain{ 205 state: waitingToStart, 206 ntfnServer: subscribe.NewServer(), 207 noMacaroons: noMacaroons, 208 permissionMap: make(map[string][]bakery.Op), 209 rpcsLog: log, 210 registeredMiddlewareNames: make(map[string]int), 211 mandatoryMiddleware: mandatoryMiddleware, 212 quit: make(chan struct{}), 213 } 214 } 215 216 // Start starts the InterceptorChain, which is needed to start the state 217 // subscription server it powers. 218 func (r *InterceptorChain) Start() error { 219 var err error 220 r.started.Do(func() { 221 err = r.ntfnServer.Start() 222 }) 223 224 return err 225 } 226 227 // Stop stops the InterceptorChain and its internal state subscription server. 228 func (r *InterceptorChain) Stop() error { 229 var err error 230 r.stopped.Do(func() { 231 close(r.quit) 232 err = r.ntfnServer.Stop() 233 }) 234 235 return err 236 } 237 238 // SetWalletNotCreated moves the RPC state from either waitingToStart to 239 // walletNotCreated. 240 func (r *InterceptorChain) SetWalletNotCreated() { 241 r.Lock() 242 defer r.Unlock() 243 244 r.state = walletNotCreated 245 _ = r.ntfnServer.SendUpdate(r.state) 246 } 247 248 // SetWalletLocked moves the RPC state from either walletNotCreated to 249 // walletLocked. 250 func (r *InterceptorChain) SetWalletLocked() { 251 r.Lock() 252 defer r.Unlock() 253 254 r.state = walletLocked 255 _ = r.ntfnServer.SendUpdate(r.state) 256 } 257 258 // SetWalletUnlocked moves the RPC state from either walletNotCreated or 259 // walletLocked to walletUnlocked. 260 func (r *InterceptorChain) SetWalletUnlocked() { 261 r.Lock() 262 defer r.Unlock() 263 264 r.state = walletUnlocked 265 _ = r.ntfnServer.SendUpdate(r.state) 266 } 267 268 // SetRPCActive moves the RPC state from walletUnlocked to rpcActive. 269 func (r *InterceptorChain) SetRPCActive() { 270 r.Lock() 271 defer r.Unlock() 272 273 r.state = rpcActive 274 _ = r.ntfnServer.SendUpdate(r.state) 275 } 276 277 // SetServerActive moves the RPC state from walletUnlocked to rpcActive. 278 func (r *InterceptorChain) SetServerActive() { 279 r.Lock() 280 defer r.Unlock() 281 282 r.state = serverActive 283 _ = r.ntfnServer.SendUpdate(r.state) 284 } 285 286 // rpcStateToWalletState converts rpcState to lnrpc.WalletState. Returns 287 // WAITING_TO_START and an error on conversion error. 288 func rpcStateToWalletState(state rpcState) (lnrpc.WalletState, error) { 289 const defaultState = lnrpc.WalletState_WAITING_TO_START 290 var walletState lnrpc.WalletState 291 292 switch state { 293 case waitingToStart: 294 walletState = lnrpc.WalletState_WAITING_TO_START 295 case walletNotCreated: 296 walletState = lnrpc.WalletState_NON_EXISTING 297 case walletLocked: 298 walletState = lnrpc.WalletState_LOCKED 299 case walletUnlocked: 300 walletState = lnrpc.WalletState_UNLOCKED 301 case rpcActive: 302 walletState = lnrpc.WalletState_RPC_ACTIVE 303 case serverActive: 304 walletState = lnrpc.WalletState_SERVER_ACTIVE 305 306 default: 307 return defaultState, fmt.Errorf("unknown wallet state %v", state) 308 } 309 310 return walletState, nil 311 } 312 313 // SubscribeState subscribes to the state of the wallet. The current wallet 314 // state will always be delivered immediately. 315 // 316 // NOTE: Part of the StateService interface. 317 func (r *InterceptorChain) SubscribeState(_ *lnrpc.SubscribeStateRequest, 318 stream lnrpc.State_SubscribeStateServer) error { 319 320 sendStateUpdate := func(state rpcState) error { 321 walletState, err := rpcStateToWalletState(state) 322 if err != nil { 323 return err 324 } 325 326 return stream.Send(&lnrpc.SubscribeStateResponse{ 327 State: walletState, 328 }) 329 } 330 331 // Subscribe to state updates. 332 client, err := r.ntfnServer.Subscribe() 333 if err != nil { 334 return err 335 } 336 defer client.Cancel() 337 338 // Always start by sending the current state. 339 r.RLock() 340 state := r.state 341 r.RUnlock() 342 343 if err := sendStateUpdate(state); err != nil { 344 return err 345 } 346 347 for { 348 select { 349 case e := <-client.Updates(): 350 newState := e.(rpcState) 351 352 // Ignore already sent state. 353 if newState == state { 354 continue 355 } 356 357 state = newState 358 err := sendStateUpdate(state) 359 if err != nil { 360 return err 361 } 362 363 // The response stream's context for whatever reason has been 364 // closed. If context is closed by an exceeded deadline we will 365 // return an error. 366 case <-stream.Context().Done(): 367 if errors.Is(stream.Context().Err(), context.Canceled) { 368 return nil 369 } 370 return stream.Context().Err() 371 372 case <-r.quit: 373 return fmt.Errorf("server exiting") 374 } 375 } 376 } 377 378 // GetState returns the current wallet state. 379 func (r *InterceptorChain) GetState(_ context.Context, 380 _ *lnrpc.GetStateRequest) (*lnrpc.GetStateResponse, error) { 381 382 r.RLock() 383 state := r.state 384 r.RUnlock() 385 386 walletState, err := rpcStateToWalletState(state) 387 if err != nil { 388 return nil, err 389 } 390 391 return &lnrpc.GetStateResponse{ 392 State: walletState, 393 }, nil 394 } 395 396 // AddMacaroonService adds a macaroon service to the interceptor. After this is 397 // done every RPC call made will have to pass a valid macaroon to be accepted. 398 func (r *InterceptorChain) AddMacaroonService(svc *macaroons.Service) { 399 r.Lock() 400 defer r.Unlock() 401 402 r.svc = svc 403 } 404 405 // MacaroonService returns the currently registered macaroon service. This might 406 // be nil if none was registered (yet). 407 func (r *InterceptorChain) MacaroonService() *macaroons.Service { 408 r.RLock() 409 defer r.RUnlock() 410 411 return r.svc 412 } 413 414 // AddPermission adds a new macaroon rule for the given method. 415 func (r *InterceptorChain) AddPermission(method string, ops []bakery.Op) error { 416 r.Lock() 417 defer r.Unlock() 418 419 if _, ok := r.permissionMap[method]; ok { 420 return fmt.Errorf("detected duplicate macaroon constraints "+ 421 "for path: %v", method) 422 } 423 424 r.permissionMap[method] = ops 425 return nil 426 } 427 428 // Permissions returns the current set of macaroon permissions. 429 func (r *InterceptorChain) Permissions() map[string][]bakery.Op { 430 r.RLock() 431 defer r.RUnlock() 432 433 // Make a copy under the read lock to avoid races. 434 c := make(map[string][]bakery.Op) 435 for k, v := range r.permissionMap { 436 s := make([]bakery.Op, len(v)) 437 copy(s, v) 438 c[k] = s 439 } 440 441 return c 442 } 443 444 // RegisterMiddleware registers a new middleware that will handle request/ 445 // response interception for all RPC messages that are initiated with a custom 446 // macaroon caveat. The name of the custom caveat a middleware is handling is 447 // also its unique identifier. Only one middleware can be registered for each 448 // custom caveat. 449 func (r *InterceptorChain) RegisterMiddleware(mw *MiddlewareHandler) error { 450 r.Lock() 451 defer r.Unlock() 452 453 // The name of the middleware is the unique identifier. 454 _, ok := r.registeredMiddlewareNames[mw.middlewareName] 455 if ok { 456 return fmt.Errorf("a middleware with the name '%s' is already "+ 457 "registered", mw.middlewareName) 458 } 459 460 // For now, we only want one middleware per custom caveat name. If we 461 // allowed multiple middlewares handling the same caveat there would be 462 // a need for extra call chaining logic, and they could overwrite each 463 // other's responses. 464 for _, middleware := range r.registeredMiddleware { 465 if middleware.customCaveatName == mw.customCaveatName { 466 return fmt.Errorf("a middleware is already registered "+ 467 "for the custom caveat name '%s': %v", 468 mw.customCaveatName, middleware.middlewareName) 469 } 470 } 471 472 r.registeredMiddleware = append(r.registeredMiddleware, mw) 473 index := len(r.registeredMiddleware) - 1 474 r.registeredMiddlewareNames[mw.middlewareName] = index 475 476 return nil 477 } 478 479 // RemoveMiddleware removes the middleware that handles the given custom caveat 480 // name. 481 func (r *InterceptorChain) RemoveMiddleware(middlewareName string) { 482 r.Lock() 483 defer r.Unlock() 484 485 log.Debugf("Removing middleware %s", middlewareName) 486 487 index, ok := r.registeredMiddlewareNames[middlewareName] 488 if !ok { 489 return 490 } 491 delete(r.registeredMiddlewareNames, middlewareName) 492 493 r.registeredMiddleware = append( 494 r.registeredMiddleware[:index], 495 r.registeredMiddleware[index+1:]..., 496 ) 497 498 // Re-initialise the middleware look-up map with the updated indexes. 499 r.registeredMiddlewareNames = make(map[string]int) 500 for i, mw := range r.registeredMiddleware { 501 r.registeredMiddlewareNames[mw.middlewareName] = i 502 } 503 } 504 505 // CustomCaveatSupported makes sure a middleware that handles the given custom 506 // caveat name is registered. If none is, an error is returned, signalling to 507 // the macaroon bakery and its validator to reject macaroons that have a custom 508 // caveat with that name. 509 // 510 // NOTE: This method is part of the macaroons.CustomCaveatAcceptor interface. 511 func (r *InterceptorChain) CustomCaveatSupported(customCaveatName string) error { 512 r.RLock() 513 defer r.RUnlock() 514 515 // We only accept requests with a custom caveat if we also have a 516 // middleware registered that handles that custom caveat. That is 517 // crucial for security! Otherwise a request with an encumbered (=has 518 // restricted permissions based upon the custom caveat condition) 519 // macaroon would not be validated against the limitations that the 520 // custom caveat implicate. Since the map is keyed by the _name_ of the 521 // middleware, we need to loop through all of them to see if one has 522 // the given custom macaroon caveat name. 523 for _, middleware := range r.registeredMiddleware { 524 if middleware.customCaveatName == customCaveatName { 525 return nil 526 } 527 } 528 529 return fmt.Errorf("cannot accept macaroon with custom caveat '%s', "+ 530 "no middleware registered to handle it", customCaveatName) 531 } 532 533 // CreateServerOpts creates the GRPC server options that can be added to a GRPC 534 // server in order to add this InterceptorChain. 535 func (r *InterceptorChain) CreateServerOpts() []grpc.ServerOption { 536 var unaryInterceptors []grpc.UnaryServerInterceptor 537 var strmInterceptors []grpc.StreamServerInterceptor 538 539 // The first interceptors we'll add to the chain is our logging 540 // interceptors, so we can automatically log all errors that happen 541 // during RPC calls. 542 unaryInterceptors = append( 543 unaryInterceptors, errorLogUnaryServerInterceptor(r.rpcsLog), 544 ) 545 strmInterceptors = append( 546 strmInterceptors, errorLogStreamServerInterceptor(r.rpcsLog), 547 ) 548 549 // Next we'll add our RPC state check interceptors, that will check 550 // whether the attempted call is allowed in the current state. 551 unaryInterceptors = append( 552 unaryInterceptors, r.rpcStateUnaryServerInterceptor(), 553 ) 554 strmInterceptors = append( 555 strmInterceptors, r.rpcStateStreamServerInterceptor(), 556 ) 557 558 // We'll add the macaroon interceptors. If macaroons aren't disabled, 559 // then these interceptors will enforce macaroon authentication. 560 unaryInterceptors = append( 561 unaryInterceptors, r.MacaroonUnaryServerInterceptor(), 562 ) 563 strmInterceptors = append( 564 strmInterceptors, r.MacaroonStreamServerInterceptor(), 565 ) 566 567 // Next, we'll add the interceptors for our custom macaroon caveat based 568 // middleware. 569 unaryInterceptors = append( 570 unaryInterceptors, r.middlewareUnaryServerInterceptor(), 571 ) 572 strmInterceptors = append( 573 strmInterceptors, r.middlewareStreamServerInterceptor(), 574 ) 575 576 // Get interceptors for Prometheus to gather gRPC performance metrics. 577 // If monitoring is disabled, GetPromInterceptors() will return empty 578 // slices. 579 promUnaryInterceptors, promStrmInterceptors := 580 monitoring.GetPromInterceptors() 581 582 // Concatenate the slices of unary and stream interceptors respectively. 583 unaryInterceptors = append(unaryInterceptors, promUnaryInterceptors...) 584 strmInterceptors = append(strmInterceptors, promStrmInterceptors...) 585 586 // Create server options from the interceptors we just set up. 587 chainedUnary := grpc_middleware.WithUnaryServerChain( 588 unaryInterceptors..., 589 ) 590 chainedStream := grpc_middleware.WithStreamServerChain( 591 strmInterceptors..., 592 ) 593 serverOpts := []grpc.ServerOption{chainedUnary, chainedStream} 594 595 return serverOpts 596 } 597 598 // errorLogUnaryServerInterceptor is a simple UnaryServerInterceptor that will 599 // automatically log any errors that occur when serving a client's unary 600 // request. 601 func errorLogUnaryServerInterceptor(logger btclog.Logger) grpc.UnaryServerInterceptor { 602 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, 603 handler grpc.UnaryHandler) (interface{}, error) { 604 605 resp, err := handler(ctx, req) 606 if err != nil { 607 // TODO(roasbeef): also log request details? 608 logger.Errorf("[%v]: %v", info.FullMethod, err) 609 } 610 611 return resp, err 612 } 613 } 614 615 // errorLogStreamServerInterceptor is a simple StreamServerInterceptor that 616 // will log any errors that occur while processing a client or server streaming 617 // RPC. 618 func errorLogStreamServerInterceptor(logger btclog.Logger) grpc.StreamServerInterceptor { 619 return func(srv interface{}, ss grpc.ServerStream, 620 info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 621 622 err := handler(srv, ss) 623 if err != nil { 624 logger.Errorf("[%v]: %v", info.FullMethod, err) 625 } 626 627 return err 628 } 629 } 630 631 // checkMacaroon validates that the context contains the macaroon needed to 632 // invoke the given RPC method. 633 func (r *InterceptorChain) checkMacaroon(ctx context.Context, 634 fullMethod string) error { 635 636 // If noMacaroons is set, we'll always allow the call. 637 if r.noMacaroons { 638 return nil 639 } 640 641 // Check whether the method is whitelisted, if so we'll allow it 642 // regardless of macaroons. 643 _, ok := macaroonWhitelist[fullMethod] 644 if ok { 645 return nil 646 } 647 648 r.RLock() 649 svc := r.svc 650 r.RUnlock() 651 652 // If the macaroon service is not yet active, we cannot allow 653 // the call. 654 if svc == nil { 655 return fmt.Errorf("unable to determine macaroon permissions") 656 } 657 658 r.RLock() 659 uriPermissions, ok := r.permissionMap[fullMethod] 660 r.RUnlock() 661 if !ok { 662 return fmt.Errorf("%s: unknown permissions required for method", 663 fullMethod) 664 } 665 666 // Find out if there is an external validator registered for 667 // this method. Fall back to the internal one if there isn't. 668 validator, ok := svc.ExternalValidators[fullMethod] 669 if !ok { 670 validator = svc 671 } 672 673 // Now that we know what validator to use, let it do its work. 674 return validator.ValidateMacaroon(ctx, uriPermissions, fullMethod) 675 } 676 677 // MacaroonUnaryServerInterceptor is a GRPC interceptor that checks whether the 678 // request is authorized by the included macaroons. 679 func (r *InterceptorChain) MacaroonUnaryServerInterceptor() grpc.UnaryServerInterceptor { 680 return func(ctx context.Context, req interface{}, 681 info *grpc.UnaryServerInfo, 682 handler grpc.UnaryHandler) (interface{}, error) { 683 684 // Check macaroons. 685 if err := r.checkMacaroon(ctx, info.FullMethod); err != nil { 686 return nil, err 687 } 688 689 return handler(ctx, req) 690 } 691 } 692 693 // MacaroonStreamServerInterceptor is a GRPC interceptor that checks whether 694 // the request is authorized by the included macaroons. 695 func (r *InterceptorChain) MacaroonStreamServerInterceptor() grpc.StreamServerInterceptor { 696 return func(srv interface{}, ss grpc.ServerStream, 697 info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 698 699 // Check macaroons. 700 err := r.checkMacaroon(ss.Context(), info.FullMethod) 701 if err != nil { 702 return err 703 } 704 705 return handler(srv, ss) 706 } 707 } 708 709 // checkRPCState checks whether a call to the given server is allowed in the 710 // current RPC state. 711 func (r *InterceptorChain) checkRPCState(srv interface{}) error { 712 // The StateService is being accessed, we allow the call regardless of 713 // the current state. 714 _, ok := srv.(lnrpc.StateServer) 715 if ok { 716 return nil 717 } 718 719 r.RLock() 720 state := r.state 721 r.RUnlock() 722 723 switch state { 724 // Do not accept any RPC calls (unless to the state service) until LND 725 // has not started. 726 case waitingToStart: 727 return ErrWaitingToStart 728 729 // If the wallet does not exists, only calls to the WalletUnlocker are 730 // accepted. 731 case walletNotCreated: 732 _, ok := srv.(lnrpc.WalletUnlockerServer) 733 if !ok { 734 return ErrNoWallet 735 } 736 737 // If the wallet is locked, only calls to the WalletUnlocker are 738 // accepted. 739 case walletLocked: 740 _, ok := srv.(lnrpc.WalletUnlockerServer) 741 if !ok { 742 return ErrWalletLocked 743 } 744 745 // If the wallet is unlocked, but the RPC not yet active, we reject. 746 case walletUnlocked: 747 _, ok := srv.(lnrpc.WalletUnlockerServer) 748 if ok { 749 return ErrWalletUnlocked 750 } 751 752 return ErrRPCStarting 753 754 // If the RPC server or lnd server is active, we allow calls to any 755 // service except the WalletUnlocker. 756 case rpcActive, serverActive: 757 _, ok := srv.(lnrpc.WalletUnlockerServer) 758 if ok { 759 return ErrWalletUnlocked 760 } 761 762 default: 763 return fmt.Errorf("unknown RPC state: %v", state) 764 } 765 766 return nil 767 } 768 769 // rpcStateUnaryServerInterceptor is a GRPC interceptor that checks whether 770 // calls to the given gGRPC server is allowed in the current rpc state. 771 func (r *InterceptorChain) rpcStateUnaryServerInterceptor() grpc.UnaryServerInterceptor { 772 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, 773 handler grpc.UnaryHandler) (interface{}, error) { 774 775 r.rpcsLog.Debugf("[%v] requested", info.FullMethod) 776 777 if err := r.checkRPCState(info.Server); err != nil { 778 return nil, err 779 } 780 781 return handler(ctx, req) 782 } 783 } 784 785 // rpcStateStreamServerInterceptor is a GRPC interceptor that checks whether 786 // calls to the given gGRPC server is allowed in the current rpc state. 787 func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerInterceptor { 788 return func(srv interface{}, ss grpc.ServerStream, 789 info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { 790 791 r.rpcsLog.Debugf("[%v] requested", info.FullMethod) 792 793 if err := r.checkRPCState(srv); err != nil { 794 return err 795 } 796 797 return handler(srv, ss) 798 } 799 } 800 801 // middlewareUnaryServerInterceptor is a unary gRPC interceptor that intercepts 802 // all requests and responses that are sent with a macaroon containing a custom 803 // caveat condition that is handled by registered middleware. 804 func (r *InterceptorChain) middlewareUnaryServerInterceptor() grpc.UnaryServerInterceptor { 805 return func(ctx context.Context, 806 req interface{}, info *grpc.UnaryServerInfo, 807 handler grpc.UnaryHandler) (interface{}, error) { 808 809 // Make sure we don't allow any requests through if one of the 810 // mandatory middlewares is missing. 811 fullMethod := info.FullMethod 812 if err := r.checkMandatoryMiddleware(fullMethod); err != nil { 813 return nil, err 814 } 815 816 // If there is no middleware registered, we don't need to 817 // intercept anything. 818 if !r.middlewareRegistered() { 819 return handler(ctx, req) 820 } 821 822 requestID := atomic.AddUint64(&r.lastRequestID, 1) 823 req, err := r.interceptMessage( 824 ctx, TypeRequest, requestID, false, info.FullMethod, 825 req, 826 ) 827 if err != nil { 828 return nil, err 829 } 830 831 // Call the handler, which executes the request against lnd. 832 lndResp, lndErr := handler(ctx, req) 833 if lndErr != nil { 834 // The call to lnd ended in an error and not a normal 835 // proto message response. Send the error to the 836 // interceptor as well to inform about the abnormal 837 // termination of the stream and to give the option to 838 // replace the error message with a custom one. 839 replacedErr, err := r.interceptMessage( 840 ctx, TypeResponse, requestID, false, 841 info.FullMethod, lndErr, 842 ) 843 if err != nil { 844 return nil, err 845 } 846 return lndResp, replacedErr.(error) 847 } 848 849 return r.interceptMessage( 850 ctx, TypeResponse, requestID, false, info.FullMethod, 851 lndResp, 852 ) 853 } 854 } 855 856 // middlewareStreamServerInterceptor is a streaming gRPC interceptor that 857 // intercepts all requests and responses that are sent with a macaroon 858 // containing a custom caveat condition that is handled by registered 859 // middleware. 860 func (r *InterceptorChain) middlewareStreamServerInterceptor() grpc.StreamServerInterceptor { 861 return func(srv interface{}, 862 ss grpc.ServerStream, info *grpc.StreamServerInfo, 863 handler grpc.StreamHandler) error { 864 865 // Don't intercept the interceptor itself which is a streaming 866 // RPC too! 867 fullMethod := info.FullMethod 868 if fullMethod == lnrpc.RegisterRPCMiddlewareURI { 869 return handler(srv, ss) 870 } 871 872 // Make sure we don't allow any requests through if one of the 873 // mandatory middlewares is missing. We add this check here to 874 // make sure the middleware registration itself can still be 875 // called. 876 if err := r.checkMandatoryMiddleware(fullMethod); err != nil { 877 return err 878 } 879 880 // If there is no middleware registered, we don't need to 881 // intercept anything. 882 if !r.middlewareRegistered() { 883 return handler(srv, ss) 884 } 885 886 // To give the middleware a chance to accept or reject the 887 // establishment of the stream itself (and not only when the 888 // first message is sent on the stream), we send an intercept 889 // request for the stream auth now: 890 msg, err := NewStreamAuthInterceptionRequest( 891 ss.Context(), info.FullMethod, 892 ) 893 if err != nil { 894 return err 895 } 896 897 requestID := atomic.AddUint64(&r.lastRequestID, 1) 898 err = r.acceptStream(requestID, msg) 899 if err != nil { 900 return err 901 } 902 903 wrappedSS := &serverStreamWrapper{ 904 ServerStream: ss, 905 requestID: requestID, 906 fullMethod: info.FullMethod, 907 interceptor: r, 908 } 909 910 // Call the stream handler, which will block as long as the 911 // stream is alive. 912 lndErr := handler(srv, wrappedSS) 913 if lndErr != nil { 914 // This is an error being returned from lnd. Send it to 915 // the interceptor as well to inform about the abnormal 916 // termination of the stream and to give the option to 917 // replace the error message with a custom one. 918 replacedErr, err := r.interceptMessage( 919 ss.Context(), TypeResponse, requestID, 920 true, info.FullMethod, lndErr, 921 ) 922 if err != nil { 923 return err 924 } 925 926 return replacedErr.(error) 927 } 928 929 // Normal/successful termination of the stream. 930 return nil 931 } 932 } 933 934 // checkMandatoryMiddleware makes sure that each of the middlewares declared as 935 // mandatory is currently registered. 936 func (r *InterceptorChain) checkMandatoryMiddleware(fullMethod string) error { 937 r.RLock() 938 defer r.RUnlock() 939 940 // Allow calls that are whitelisted for macaroons as well, otherwise we 941 // get into impossible situations where the wallet is locked but the 942 // unlock call is denied because the middleware isn't registered. But 943 // the middleware cannot register itself because the wallet is locked. 944 if _, ok := macaroonWhitelist[fullMethod]; ok { 945 return nil 946 } 947 948 // Not a white listed call so make sure every mandatory middleware is 949 // currently connected to lnd. 950 for _, name := range r.mandatoryMiddleware { 951 if _, ok := r.registeredMiddlewareNames[name]; !ok { 952 return fmt.Errorf("mandatory middleware '%s' is "+ 953 "currently not registered, not allowing any "+ 954 "RPC calls", name) 955 } 956 } 957 958 return nil 959 } 960 961 // middlewareRegistered returns true if there is at least one middleware 962 // currently registered. 963 func (r *InterceptorChain) middlewareRegistered() bool { 964 r.RLock() 965 defer r.RUnlock() 966 967 return len(r.registeredMiddleware) > 0 968 } 969 970 // acceptStream sends an intercept request to all middlewares that have 971 // registered for it. This means either a middleware has requested read-only 972 // access or the request actually has a macaroon with a caveat the middleware 973 // registered for. 974 func (r *InterceptorChain) acceptStream(requestID uint64, 975 msg *InterceptionRequest) error { 976 977 r.RLock() 978 defer r.RUnlock() 979 980 for _, middleware := range r.registeredMiddleware { 981 // If there is a custom caveat in the macaroon, make sure the 982 // middleware registered for it. Or if a middleware registered 983 // for read-only mode, it also gets the request. 984 hasCustomCaveat := macaroons.HasCustomCaveat( 985 msg.Macaroon, middleware.customCaveatName, 986 ) 987 if !hasCustomCaveat && !middleware.readOnly { 988 continue 989 } 990 991 msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition( 992 msg.Macaroon, middleware.customCaveatName, 993 ) 994 995 resp, err := middleware.intercept(requestID, msg) 996 997 // Error during interception itself. 998 if err != nil { 999 return err 1000 } 1001 1002 // Error returned from middleware client. 1003 if resp.err != nil { 1004 return resp.err 1005 } 1006 } 1007 1008 return nil 1009 } 1010 1011 // interceptMessage sends out an intercept request for an RPC response. Since 1012 // middleware that hasn't registered for the read-only mode has the option to 1013 // overwrite/replace the message, this needs to be handled differently than the 1014 // auth path above. 1015 func (r *InterceptorChain) interceptMessage(ctx context.Context, 1016 interceptType InterceptType, requestID uint64, isStream bool, 1017 fullMethod string, m interface{}) (interface{}, error) { 1018 1019 r.RLock() 1020 defer r.RUnlock() 1021 1022 currentMessage := m 1023 for _, middleware := range r.registeredMiddleware { 1024 msg, err := NewMessageInterceptionRequest( 1025 ctx, interceptType, isStream, fullMethod, 1026 currentMessage, 1027 ) 1028 if err != nil { 1029 return nil, err 1030 } 1031 1032 // If there is a custom caveat in the macaroon, make sure the 1033 // middleware registered for it. Or if a middleware registered 1034 // for read-only mode, it also gets the request. 1035 hasCustomCaveat := macaroons.HasCustomCaveat( 1036 msg.Macaroon, middleware.customCaveatName, 1037 ) 1038 if !hasCustomCaveat && !middleware.readOnly { 1039 continue 1040 } 1041 1042 msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition( 1043 msg.Macaroon, middleware.customCaveatName, 1044 ) 1045 1046 resp, err := middleware.intercept(requestID, msg) 1047 1048 // Error during interception itself. 1049 if err != nil { 1050 return nil, err 1051 } 1052 1053 // Error returned from middleware client. 1054 if resp.err != nil { 1055 return nil, resp.err 1056 } 1057 1058 // The message was replaced, make sure the next middleware in 1059 // line receives the updated message. 1060 if !middleware.readOnly && resp.replace { 1061 currentMessage = resp.replacement 1062 } 1063 } 1064 1065 return currentMessage, nil 1066 } 1067 1068 // serverStreamWrapper is a struct that wraps a server stream in a way that all 1069 // requests and responses can be intercepted individually. 1070 type serverStreamWrapper struct { 1071 // ServerStream is the stream that's being wrapped. 1072 grpc.ServerStream 1073 1074 requestID uint64 1075 1076 fullMethod string 1077 1078 interceptor *InterceptorChain 1079 } 1080 1081 // SendMsg is called when lnd sends a message to the client. This is wrapped to 1082 // intercept streaming RPC responses. 1083 func (w *serverStreamWrapper) SendMsg(m interface{}) error { 1084 newMsg, err := w.interceptor.interceptMessage( 1085 w.ServerStream.Context(), TypeResponse, w.requestID, true, 1086 w.fullMethod, m, 1087 ) 1088 if err != nil { 1089 return err 1090 } 1091 1092 return w.ServerStream.SendMsg(newMsg) 1093 } 1094 1095 // RecvMsg is called when lnd wants to receive a message from the client. This 1096 // is wrapped to intercept streaming RPC requests. 1097 func (w *serverStreamWrapper) RecvMsg(m interface{}) error { 1098 err := w.ServerStream.RecvMsg(m) 1099 if err != nil { 1100 return err 1101 } 1102 1103 req, err := w.interceptor.interceptMessage( 1104 w.ServerStream.Context(), TypeRequest, w.requestID, true, 1105 w.fullMethod, m, 1106 ) 1107 if err != nil { 1108 return err 1109 } 1110 1111 return replaceProtoMsg(m, req) 1112 }