/ rpcperms / interceptor.go
interceptor.go
   1  package rpcperms
   2  
   3  import (
   4  	"context"
   5  	"errors"
   6  	"fmt"
   7  	"sync"
   8  	"sync/atomic"
   9  
  10  	"github.com/btcsuite/btclog/v2"
  11  	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
  12  	"github.com/lightningnetwork/lnd/lnrpc"
  13  	"github.com/lightningnetwork/lnd/macaroons"
  14  	"github.com/lightningnetwork/lnd/monitoring"
  15  	"github.com/lightningnetwork/lnd/subscribe"
  16  	"google.golang.org/grpc"
  17  	"gopkg.in/macaroon-bakery.v2/bakery"
  18  )
  19  
  20  // rpcState is an enum that we use to keep track of the current RPC service
  21  // state. This will transition as we go from startup to unlocking the wallet,
  22  // and finally fully active.
  23  type rpcState uint8
  24  
  25  const (
  26  	// waitingToStart indicates that we're at the beginning of the startup
  27  	// process. In a cluster environment this may mean that we're waiting to
  28  	// become the leader in which case RPC calls will be disabled until
  29  	// this instance has been elected as leader.
  30  	waitingToStart rpcState = iota
  31  
  32  	// walletNotCreated is the starting state if the RPC server is active,
  33  	// but the wallet is not yet created. In this state we'll only allow
  34  	// calls to the WalletUnlockerService.
  35  	walletNotCreated
  36  
  37  	// walletLocked indicates the RPC server is active, but the wallet is
  38  	// locked. In this state we'll only allow calls to the
  39  	// WalletUnlockerService.
  40  	walletLocked
  41  
  42  	// walletUnlocked means that the wallet has been unlocked, but the full
  43  	// RPC server is not yet ready.
  44  	walletUnlocked
  45  
  46  	// rpcActive means that the RPC server is ready to accept calls.
  47  	rpcActive
  48  
  49  	// serverActive means that the lnd server is ready to accept calls.
  50  	serverActive
  51  )
  52  
  53  var (
  54  	// ErrWaitingToStart is returned if LND is still waiting to start,
  55  	// possibly blocked until elected as the leader.
  56  	ErrWaitingToStart = fmt.Errorf("waiting to start, RPC services not " +
  57  		"available")
  58  
  59  	// ErrNoWallet is returned if the wallet does not exist.
  60  	ErrNoWallet = fmt.Errorf("wallet not created, create one to enable " +
  61  		"full RPC access")
  62  
  63  	// ErrWalletLocked is returned if the wallet is locked and any service
  64  	// other than the WalletUnlocker is called.
  65  	ErrWalletLocked = fmt.Errorf("wallet locked, unlock it to enable " +
  66  		"full RPC access")
  67  
  68  	// ErrWalletUnlocked is returned if the WalletUnlocker service is
  69  	// called when the wallet already has been unlocked.
  70  	ErrWalletUnlocked = fmt.Errorf("wallet already unlocked, " +
  71  		"WalletUnlocker service is no longer available")
  72  
  73  	// ErrRPCStarting is returned if the wallet has been unlocked but the
  74  	// RPC server is not yet ready to accept calls.
  75  	ErrRPCStarting = fmt.Errorf("the RPC server is in the process of " +
  76  		"starting up, but not yet ready to accept calls")
  77  
  78  	// macaroonWhitelist defines methods that we don't require macaroons to
  79  	// access. We also allow these methods to be called even if not all
  80  	// mandatory middlewares are registered yet. If the wallet is locked
  81  	// then a middleware cannot register itself, creating an impossible
  82  	// situation. Also, a middleware might want to check the state of lnd
  83  	// by calling the State service before it registers itself. So we also
  84  	// need to exclude those calls from the mandatory middleware check.
  85  	macaroonWhitelist = map[string]struct{}{
  86  		// We allow all calls to the WalletUnlocker without macaroons.
  87  		"/lnrpc.WalletUnlocker/GenSeed":        {},
  88  		"/lnrpc.WalletUnlocker/InitWallet":     {},
  89  		"/lnrpc.WalletUnlocker/UnlockWallet":   {},
  90  		"/lnrpc.WalletUnlocker/ChangePassword": {},
  91  
  92  		// The State service must be available at all times, even
  93  		// before we can check macaroons, so we whitelist it.
  94  		"/lnrpc.State/SubscribeState": {},
  95  		"/lnrpc.State/GetState":       {},
  96  	}
  97  )
  98  
  99  // InterceptorChain is a struct that can be added to the running GRPC server,
 100  // intercepting API calls. This is useful for logging, enforcing permissions,
 101  // supporting middleware etc. The following diagram shows the order of each
 102  // interceptor in the chain and when exactly requests/responses are intercepted
 103  // and forwarded to external middleware for approval/modification. Middleware in
 104  // general can only intercept gRPC requests/responses that are sent by the
 105  // client with a macaroon that contains a custom caveat that is supported by one
 106  // of the registered middlewares.
 107  //
 108  //	    |
 109  //	    | gRPC request from client
 110  //	    |
 111  //	+---v--------------------------------+
 112  //	|   InterceptorChain                 |
 113  //	+-+----------------------------------+
 114  //	  | Log Interceptor                  |
 115  //	  +----------------------------------+
 116  //	  | RPC State Interceptor            |
 117  //	  +----------------------------------+
 118  //	  | Macaroon Interceptor             |
 119  //	  +----------------------------------+--------> +---------------------+
 120  //	  | RPC Macaroon Middleware Handler  |<-------- | External Middleware |
 121  //	  +----------------------------------+          |   - modify request |
 122  //	  | Prometheus Interceptor           |          +---------------------+
 123  //	  +-+--------------------------------+
 124  //	    | validated gRPC request from client
 125  //	+---v--------------------------------+
 126  //	|   main gRPC server                 |
 127  //	+---+--------------------------------+
 128  //	    |
 129  //	    | original gRPC request to client
 130  //	    |
 131  //	+---v--------------------------------+--------> +---------------------+
 132  //	|   RPC Macaroon Middleware Handler  |<-------- | External Middleware |
 133  //	+---+--------------------------------+          |   - modify response |
 134  //	    |                                           +---------------------+
 135  //	    | edited gRPC request to client
 136  //	    v
 137  type InterceptorChain struct {
 138  	// lastRequestID is the ID of the last gRPC request or stream that was
 139  	// intercepted by the middleware interceptor.
 140  	//
 141  	// NOTE: Must be used atomically!
 142  	lastRequestID uint64
 143  
 144  	// Required by the grpc-gateway/v2 library for forward compatibility.
 145  	lnrpc.UnimplementedStateServer
 146  
 147  	started sync.Once
 148  	stopped sync.Once
 149  
 150  	// state is the current RPC state of our RPC server.
 151  	state rpcState
 152  
 153  	// ntfnServer is a subscription server we use to notify clients of the
 154  	// State service when the state changes.
 155  	ntfnServer *subscribe.Server
 156  
 157  	// noMacaroons should be set true if we don't want to check macaroons.
 158  	noMacaroons bool
 159  
 160  	// svc is the macaroon service used to enforce permissions in case
 161  	// macaroons are used.
 162  	svc *macaroons.Service
 163  
 164  	// permissionMap is the permissions to enforce if macaroons are used.
 165  	permissionMap map[string][]bakery.Op
 166  
 167  	// rpcsLog is the logger used to log calls to the RPCs intercepted.
 168  	rpcsLog btclog.Logger
 169  
 170  	// registeredMiddleware is a slice of all macaroon permission based RPC
 171  	// middleware clients that are currently registered. The
 172  	// registeredMiddlewareNames can be used to find the index of a specific
 173  	// interceptor within the registeredMiddleware slide using the name of
 174  	// the interceptor as the key. The reason for using these two separate
 175  	// structures is so that the order in which interceptors are run is
 176  	// the same as the order in which they were registered.
 177  	registeredMiddleware []*MiddlewareHandler
 178  
 179  	// registeredMiddlewareNames is a map of registered middleware names
 180  	// to the index at which they are stored in the registeredMiddleware
 181  	// map.
 182  	registeredMiddlewareNames map[string]int
 183  
 184  	// mandatoryMiddleware is a list of all middleware that is considered to
 185  	// be mandatory. If any of them is not registered then all RPC requests
 186  	// (except for the macaroon white listed methods and the middleware
 187  	// registration itself) are blocked. This is a security feature to make
 188  	// sure that requests can't just go through unobserved/unaudited if a
 189  	// middleware crashes.
 190  	mandatoryMiddleware []string
 191  
 192  	quit chan struct{}
 193  	sync.RWMutex
 194  }
 195  
 196  // A compile time check to ensure that InterceptorChain fully implements the
 197  // StateServer gRPC service.
 198  var _ lnrpc.StateServer = (*InterceptorChain)(nil)
 199  
 200  // NewInterceptorChain creates a new InterceptorChain.
 201  func NewInterceptorChain(log btclog.Logger, noMacaroons bool,
 202  	mandatoryMiddleware []string) *InterceptorChain {
 203  
 204  	return &InterceptorChain{
 205  		state:                     waitingToStart,
 206  		ntfnServer:                subscribe.NewServer(),
 207  		noMacaroons:               noMacaroons,
 208  		permissionMap:             make(map[string][]bakery.Op),
 209  		rpcsLog:                   log,
 210  		registeredMiddlewareNames: make(map[string]int),
 211  		mandatoryMiddleware:       mandatoryMiddleware,
 212  		quit:                      make(chan struct{}),
 213  	}
 214  }
 215  
 216  // Start starts the InterceptorChain, which is needed to start the state
 217  // subscription server it powers.
 218  func (r *InterceptorChain) Start() error {
 219  	var err error
 220  	r.started.Do(func() {
 221  		err = r.ntfnServer.Start()
 222  	})
 223  
 224  	return err
 225  }
 226  
 227  // Stop stops the InterceptorChain and its internal state subscription server.
 228  func (r *InterceptorChain) Stop() error {
 229  	var err error
 230  	r.stopped.Do(func() {
 231  		close(r.quit)
 232  		err = r.ntfnServer.Stop()
 233  	})
 234  
 235  	return err
 236  }
 237  
 238  // SetWalletNotCreated moves the RPC state from either waitingToStart to
 239  // walletNotCreated.
 240  func (r *InterceptorChain) SetWalletNotCreated() {
 241  	r.Lock()
 242  	defer r.Unlock()
 243  
 244  	r.state = walletNotCreated
 245  	_ = r.ntfnServer.SendUpdate(r.state)
 246  }
 247  
 248  // SetWalletLocked moves the RPC state from either walletNotCreated to
 249  // walletLocked.
 250  func (r *InterceptorChain) SetWalletLocked() {
 251  	r.Lock()
 252  	defer r.Unlock()
 253  
 254  	r.state = walletLocked
 255  	_ = r.ntfnServer.SendUpdate(r.state)
 256  }
 257  
 258  // SetWalletUnlocked moves the RPC state from either walletNotCreated or
 259  // walletLocked to walletUnlocked.
 260  func (r *InterceptorChain) SetWalletUnlocked() {
 261  	r.Lock()
 262  	defer r.Unlock()
 263  
 264  	r.state = walletUnlocked
 265  	_ = r.ntfnServer.SendUpdate(r.state)
 266  }
 267  
 268  // SetRPCActive moves the RPC state from walletUnlocked to rpcActive.
 269  func (r *InterceptorChain) SetRPCActive() {
 270  	r.Lock()
 271  	defer r.Unlock()
 272  
 273  	r.state = rpcActive
 274  	_ = r.ntfnServer.SendUpdate(r.state)
 275  }
 276  
 277  // SetServerActive moves the RPC state from walletUnlocked to rpcActive.
 278  func (r *InterceptorChain) SetServerActive() {
 279  	r.Lock()
 280  	defer r.Unlock()
 281  
 282  	r.state = serverActive
 283  	_ = r.ntfnServer.SendUpdate(r.state)
 284  }
 285  
 286  // rpcStateToWalletState converts rpcState to lnrpc.WalletState. Returns
 287  // WAITING_TO_START and an error on conversion error.
 288  func rpcStateToWalletState(state rpcState) (lnrpc.WalletState, error) {
 289  	const defaultState = lnrpc.WalletState_WAITING_TO_START
 290  	var walletState lnrpc.WalletState
 291  
 292  	switch state {
 293  	case waitingToStart:
 294  		walletState = lnrpc.WalletState_WAITING_TO_START
 295  	case walletNotCreated:
 296  		walletState = lnrpc.WalletState_NON_EXISTING
 297  	case walletLocked:
 298  		walletState = lnrpc.WalletState_LOCKED
 299  	case walletUnlocked:
 300  		walletState = lnrpc.WalletState_UNLOCKED
 301  	case rpcActive:
 302  		walletState = lnrpc.WalletState_RPC_ACTIVE
 303  	case serverActive:
 304  		walletState = lnrpc.WalletState_SERVER_ACTIVE
 305  
 306  	default:
 307  		return defaultState, fmt.Errorf("unknown wallet state %v", state)
 308  	}
 309  
 310  	return walletState, nil
 311  }
 312  
 313  // SubscribeState subscribes to the state of the wallet. The current wallet
 314  // state will always be delivered immediately.
 315  //
 316  // NOTE: Part of the StateService interface.
 317  func (r *InterceptorChain) SubscribeState(_ *lnrpc.SubscribeStateRequest,
 318  	stream lnrpc.State_SubscribeStateServer) error {
 319  
 320  	sendStateUpdate := func(state rpcState) error {
 321  		walletState, err := rpcStateToWalletState(state)
 322  		if err != nil {
 323  			return err
 324  		}
 325  
 326  		return stream.Send(&lnrpc.SubscribeStateResponse{
 327  			State: walletState,
 328  		})
 329  	}
 330  
 331  	// Subscribe to state updates.
 332  	client, err := r.ntfnServer.Subscribe()
 333  	if err != nil {
 334  		return err
 335  	}
 336  	defer client.Cancel()
 337  
 338  	// Always start by sending the current state.
 339  	r.RLock()
 340  	state := r.state
 341  	r.RUnlock()
 342  
 343  	if err := sendStateUpdate(state); err != nil {
 344  		return err
 345  	}
 346  
 347  	for {
 348  		select {
 349  		case e := <-client.Updates():
 350  			newState := e.(rpcState)
 351  
 352  			// Ignore already sent state.
 353  			if newState == state {
 354  				continue
 355  			}
 356  
 357  			state = newState
 358  			err := sendStateUpdate(state)
 359  			if err != nil {
 360  				return err
 361  			}
 362  
 363  		// The response stream's context for whatever reason has been
 364  		// closed. If context is closed by an exceeded deadline we will
 365  		// return an error.
 366  		case <-stream.Context().Done():
 367  			if errors.Is(stream.Context().Err(), context.Canceled) {
 368  				return nil
 369  			}
 370  			return stream.Context().Err()
 371  
 372  		case <-r.quit:
 373  			return fmt.Errorf("server exiting")
 374  		}
 375  	}
 376  }
 377  
 378  // GetState returns the current wallet state.
 379  func (r *InterceptorChain) GetState(_ context.Context,
 380  	_ *lnrpc.GetStateRequest) (*lnrpc.GetStateResponse, error) {
 381  
 382  	r.RLock()
 383  	state := r.state
 384  	r.RUnlock()
 385  
 386  	walletState, err := rpcStateToWalletState(state)
 387  	if err != nil {
 388  		return nil, err
 389  	}
 390  
 391  	return &lnrpc.GetStateResponse{
 392  		State: walletState,
 393  	}, nil
 394  }
 395  
 396  // AddMacaroonService adds a macaroon service to the interceptor. After this is
 397  // done every RPC call made will have to pass a valid macaroon to be accepted.
 398  func (r *InterceptorChain) AddMacaroonService(svc *macaroons.Service) {
 399  	r.Lock()
 400  	defer r.Unlock()
 401  
 402  	r.svc = svc
 403  }
 404  
 405  // MacaroonService returns the currently registered macaroon service. This might
 406  // be nil if none was registered (yet).
 407  func (r *InterceptorChain) MacaroonService() *macaroons.Service {
 408  	r.RLock()
 409  	defer r.RUnlock()
 410  
 411  	return r.svc
 412  }
 413  
 414  // AddPermission adds a new macaroon rule for the given method.
 415  func (r *InterceptorChain) AddPermission(method string, ops []bakery.Op) error {
 416  	r.Lock()
 417  	defer r.Unlock()
 418  
 419  	if _, ok := r.permissionMap[method]; ok {
 420  		return fmt.Errorf("detected duplicate macaroon constraints "+
 421  			"for path: %v", method)
 422  	}
 423  
 424  	r.permissionMap[method] = ops
 425  	return nil
 426  }
 427  
 428  // Permissions returns the current set of macaroon permissions.
 429  func (r *InterceptorChain) Permissions() map[string][]bakery.Op {
 430  	r.RLock()
 431  	defer r.RUnlock()
 432  
 433  	// Make a copy under the read lock to avoid races.
 434  	c := make(map[string][]bakery.Op)
 435  	for k, v := range r.permissionMap {
 436  		s := make([]bakery.Op, len(v))
 437  		copy(s, v)
 438  		c[k] = s
 439  	}
 440  
 441  	return c
 442  }
 443  
 444  // RegisterMiddleware registers a new middleware that will handle request/
 445  // response interception for all RPC messages that are initiated with a custom
 446  // macaroon caveat. The name of the custom caveat a middleware is handling is
 447  // also its unique identifier. Only one middleware can be registered for each
 448  // custom caveat.
 449  func (r *InterceptorChain) RegisterMiddleware(mw *MiddlewareHandler) error {
 450  	r.Lock()
 451  	defer r.Unlock()
 452  
 453  	// The name of the middleware is the unique identifier.
 454  	_, ok := r.registeredMiddlewareNames[mw.middlewareName]
 455  	if ok {
 456  		return fmt.Errorf("a middleware with the name '%s' is already "+
 457  			"registered", mw.middlewareName)
 458  	}
 459  
 460  	// For now, we only want one middleware per custom caveat name. If we
 461  	// allowed multiple middlewares handling the same caveat there would be
 462  	// a need for extra call chaining logic, and they could overwrite each
 463  	// other's responses.
 464  	for _, middleware := range r.registeredMiddleware {
 465  		if middleware.customCaveatName == mw.customCaveatName {
 466  			return fmt.Errorf("a middleware is already registered "+
 467  				"for the custom caveat name '%s': %v",
 468  				mw.customCaveatName, middleware.middlewareName)
 469  		}
 470  	}
 471  
 472  	r.registeredMiddleware = append(r.registeredMiddleware, mw)
 473  	index := len(r.registeredMiddleware) - 1
 474  	r.registeredMiddlewareNames[mw.middlewareName] = index
 475  
 476  	return nil
 477  }
 478  
 479  // RemoveMiddleware removes the middleware that handles the given custom caveat
 480  // name.
 481  func (r *InterceptorChain) RemoveMiddleware(middlewareName string) {
 482  	r.Lock()
 483  	defer r.Unlock()
 484  
 485  	log.Debugf("Removing middleware %s", middlewareName)
 486  
 487  	index, ok := r.registeredMiddlewareNames[middlewareName]
 488  	if !ok {
 489  		return
 490  	}
 491  	delete(r.registeredMiddlewareNames, middlewareName)
 492  
 493  	r.registeredMiddleware = append(
 494  		r.registeredMiddleware[:index],
 495  		r.registeredMiddleware[index+1:]...,
 496  	)
 497  
 498  	// Re-initialise the middleware look-up map with the updated indexes.
 499  	r.registeredMiddlewareNames = make(map[string]int)
 500  	for i, mw := range r.registeredMiddleware {
 501  		r.registeredMiddlewareNames[mw.middlewareName] = i
 502  	}
 503  }
 504  
 505  // CustomCaveatSupported makes sure a middleware that handles the given custom
 506  // caveat name is registered. If none is, an error is returned, signalling to
 507  // the macaroon bakery and its validator to reject macaroons that have a custom
 508  // caveat with that name.
 509  //
 510  // NOTE: This method is part of the macaroons.CustomCaveatAcceptor interface.
 511  func (r *InterceptorChain) CustomCaveatSupported(customCaveatName string) error {
 512  	r.RLock()
 513  	defer r.RUnlock()
 514  
 515  	// We only accept requests with a custom caveat if we also have a
 516  	// middleware registered that handles that custom caveat. That is
 517  	// crucial for security! Otherwise a request with an encumbered (=has
 518  	// restricted permissions based upon the custom caveat condition)
 519  	// macaroon would not be validated against the limitations that the
 520  	// custom caveat implicate. Since the map is keyed by the _name_ of the
 521  	// middleware, we need to loop through all of them to see if one has
 522  	// the given custom macaroon caveat name.
 523  	for _, middleware := range r.registeredMiddleware {
 524  		if middleware.customCaveatName == customCaveatName {
 525  			return nil
 526  		}
 527  	}
 528  
 529  	return fmt.Errorf("cannot accept macaroon with custom caveat '%s', "+
 530  		"no middleware registered to handle it", customCaveatName)
 531  }
 532  
 533  // CreateServerOpts creates the GRPC server options that can be added to a GRPC
 534  // server in order to add this InterceptorChain.
 535  func (r *InterceptorChain) CreateServerOpts() []grpc.ServerOption {
 536  	var unaryInterceptors []grpc.UnaryServerInterceptor
 537  	var strmInterceptors []grpc.StreamServerInterceptor
 538  
 539  	// The first interceptors we'll add to the chain is our logging
 540  	// interceptors, so we can automatically log all errors that happen
 541  	// during RPC calls.
 542  	unaryInterceptors = append(
 543  		unaryInterceptors, errorLogUnaryServerInterceptor(r.rpcsLog),
 544  	)
 545  	strmInterceptors = append(
 546  		strmInterceptors, errorLogStreamServerInterceptor(r.rpcsLog),
 547  	)
 548  
 549  	// Next we'll add our RPC state check interceptors, that will check
 550  	// whether the attempted call is allowed in the current state.
 551  	unaryInterceptors = append(
 552  		unaryInterceptors, r.rpcStateUnaryServerInterceptor(),
 553  	)
 554  	strmInterceptors = append(
 555  		strmInterceptors, r.rpcStateStreamServerInterceptor(),
 556  	)
 557  
 558  	// We'll add the macaroon interceptors. If macaroons aren't disabled,
 559  	// then these interceptors will enforce macaroon authentication.
 560  	unaryInterceptors = append(
 561  		unaryInterceptors, r.MacaroonUnaryServerInterceptor(),
 562  	)
 563  	strmInterceptors = append(
 564  		strmInterceptors, r.MacaroonStreamServerInterceptor(),
 565  	)
 566  
 567  	// Next, we'll add the interceptors for our custom macaroon caveat based
 568  	// middleware.
 569  	unaryInterceptors = append(
 570  		unaryInterceptors, r.middlewareUnaryServerInterceptor(),
 571  	)
 572  	strmInterceptors = append(
 573  		strmInterceptors, r.middlewareStreamServerInterceptor(),
 574  	)
 575  
 576  	// Get interceptors for Prometheus to gather gRPC performance metrics.
 577  	// If monitoring is disabled, GetPromInterceptors() will return empty
 578  	// slices.
 579  	promUnaryInterceptors, promStrmInterceptors :=
 580  		monitoring.GetPromInterceptors()
 581  
 582  	// Concatenate the slices of unary and stream interceptors respectively.
 583  	unaryInterceptors = append(unaryInterceptors, promUnaryInterceptors...)
 584  	strmInterceptors = append(strmInterceptors, promStrmInterceptors...)
 585  
 586  	// Create server options from the interceptors we just set up.
 587  	chainedUnary := grpc_middleware.WithUnaryServerChain(
 588  		unaryInterceptors...,
 589  	)
 590  	chainedStream := grpc_middleware.WithStreamServerChain(
 591  		strmInterceptors...,
 592  	)
 593  	serverOpts := []grpc.ServerOption{chainedUnary, chainedStream}
 594  
 595  	return serverOpts
 596  }
 597  
 598  // errorLogUnaryServerInterceptor is a simple UnaryServerInterceptor that will
 599  // automatically log any errors that occur when serving a client's unary
 600  // request.
 601  func errorLogUnaryServerInterceptor(logger btclog.Logger) grpc.UnaryServerInterceptor {
 602  	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
 603  		handler grpc.UnaryHandler) (interface{}, error) {
 604  
 605  		resp, err := handler(ctx, req)
 606  		if err != nil {
 607  			// TODO(roasbeef): also log request details?
 608  			logger.Errorf("[%v]: %v", info.FullMethod, err)
 609  		}
 610  
 611  		return resp, err
 612  	}
 613  }
 614  
 615  // errorLogStreamServerInterceptor is a simple StreamServerInterceptor that
 616  // will log any errors that occur while processing a client or server streaming
 617  // RPC.
 618  func errorLogStreamServerInterceptor(logger btclog.Logger) grpc.StreamServerInterceptor {
 619  	return func(srv interface{}, ss grpc.ServerStream,
 620  		info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
 621  
 622  		err := handler(srv, ss)
 623  		if err != nil {
 624  			logger.Errorf("[%v]: %v", info.FullMethod, err)
 625  		}
 626  
 627  		return err
 628  	}
 629  }
 630  
 631  // checkMacaroon validates that the context contains the macaroon needed to
 632  // invoke the given RPC method.
 633  func (r *InterceptorChain) checkMacaroon(ctx context.Context,
 634  	fullMethod string) error {
 635  
 636  	// If noMacaroons is set, we'll always allow the call.
 637  	if r.noMacaroons {
 638  		return nil
 639  	}
 640  
 641  	// Check whether the method is whitelisted, if so we'll allow it
 642  	// regardless of macaroons.
 643  	_, ok := macaroonWhitelist[fullMethod]
 644  	if ok {
 645  		return nil
 646  	}
 647  
 648  	r.RLock()
 649  	svc := r.svc
 650  	r.RUnlock()
 651  
 652  	// If the macaroon service is not yet active, we cannot allow
 653  	// the call.
 654  	if svc == nil {
 655  		return fmt.Errorf("unable to determine macaroon permissions")
 656  	}
 657  
 658  	r.RLock()
 659  	uriPermissions, ok := r.permissionMap[fullMethod]
 660  	r.RUnlock()
 661  	if !ok {
 662  		return fmt.Errorf("%s: unknown permissions required for method",
 663  			fullMethod)
 664  	}
 665  
 666  	// Find out if there is an external validator registered for
 667  	// this method. Fall back to the internal one if there isn't.
 668  	validator, ok := svc.ExternalValidators[fullMethod]
 669  	if !ok {
 670  		validator = svc
 671  	}
 672  
 673  	// Now that we know what validator to use, let it do its work.
 674  	return validator.ValidateMacaroon(ctx, uriPermissions, fullMethod)
 675  }
 676  
 677  // MacaroonUnaryServerInterceptor is a GRPC interceptor that checks whether the
 678  // request is authorized by the included macaroons.
 679  func (r *InterceptorChain) MacaroonUnaryServerInterceptor() grpc.UnaryServerInterceptor {
 680  	return func(ctx context.Context, req interface{},
 681  		info *grpc.UnaryServerInfo,
 682  		handler grpc.UnaryHandler) (interface{}, error) {
 683  
 684  		// Check macaroons.
 685  		if err := r.checkMacaroon(ctx, info.FullMethod); err != nil {
 686  			return nil, err
 687  		}
 688  
 689  		return handler(ctx, req)
 690  	}
 691  }
 692  
 693  // MacaroonStreamServerInterceptor is a GRPC interceptor that checks whether
 694  // the request is authorized by the included macaroons.
 695  func (r *InterceptorChain) MacaroonStreamServerInterceptor() grpc.StreamServerInterceptor {
 696  	return func(srv interface{}, ss grpc.ServerStream,
 697  		info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
 698  
 699  		// Check macaroons.
 700  		err := r.checkMacaroon(ss.Context(), info.FullMethod)
 701  		if err != nil {
 702  			return err
 703  		}
 704  
 705  		return handler(srv, ss)
 706  	}
 707  }
 708  
 709  // checkRPCState checks whether a call to the given server is allowed in the
 710  // current RPC state.
 711  func (r *InterceptorChain) checkRPCState(srv interface{}) error {
 712  	// The StateService is being accessed, we allow the call regardless of
 713  	// the current state.
 714  	_, ok := srv.(lnrpc.StateServer)
 715  	if ok {
 716  		return nil
 717  	}
 718  
 719  	r.RLock()
 720  	state := r.state
 721  	r.RUnlock()
 722  
 723  	switch state {
 724  	// Do not accept any RPC calls (unless to the state service) until LND
 725  	// has not started.
 726  	case waitingToStart:
 727  		return ErrWaitingToStart
 728  
 729  	// If the wallet does not exists, only calls to the WalletUnlocker are
 730  	// accepted.
 731  	case walletNotCreated:
 732  		_, ok := srv.(lnrpc.WalletUnlockerServer)
 733  		if !ok {
 734  			return ErrNoWallet
 735  		}
 736  
 737  	// If the wallet is locked, only calls to the WalletUnlocker are
 738  	// accepted.
 739  	case walletLocked:
 740  		_, ok := srv.(lnrpc.WalletUnlockerServer)
 741  		if !ok {
 742  			return ErrWalletLocked
 743  		}
 744  
 745  	// If the wallet is unlocked, but the RPC not yet active, we reject.
 746  	case walletUnlocked:
 747  		_, ok := srv.(lnrpc.WalletUnlockerServer)
 748  		if ok {
 749  			return ErrWalletUnlocked
 750  		}
 751  
 752  		return ErrRPCStarting
 753  
 754  	// If the RPC server or lnd server is active, we allow calls to any
 755  	// service except the WalletUnlocker.
 756  	case rpcActive, serverActive:
 757  		_, ok := srv.(lnrpc.WalletUnlockerServer)
 758  		if ok {
 759  			return ErrWalletUnlocked
 760  		}
 761  
 762  	default:
 763  		return fmt.Errorf("unknown RPC state: %v", state)
 764  	}
 765  
 766  	return nil
 767  }
 768  
 769  // rpcStateUnaryServerInterceptor is a GRPC interceptor that checks whether
 770  // calls to the given gGRPC server is allowed in the current rpc state.
 771  func (r *InterceptorChain) rpcStateUnaryServerInterceptor() grpc.UnaryServerInterceptor {
 772  	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
 773  		handler grpc.UnaryHandler) (interface{}, error) {
 774  
 775  		r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
 776  
 777  		if err := r.checkRPCState(info.Server); err != nil {
 778  			return nil, err
 779  		}
 780  
 781  		return handler(ctx, req)
 782  	}
 783  }
 784  
 785  // rpcStateStreamServerInterceptor is a GRPC interceptor that checks whether
 786  // calls to the given gGRPC server is allowed in the current rpc state.
 787  func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerInterceptor {
 788  	return func(srv interface{}, ss grpc.ServerStream,
 789  		info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
 790  
 791  		r.rpcsLog.Debugf("[%v] requested", info.FullMethod)
 792  
 793  		if err := r.checkRPCState(srv); err != nil {
 794  			return err
 795  		}
 796  
 797  		return handler(srv, ss)
 798  	}
 799  }
 800  
 801  // middlewareUnaryServerInterceptor is a unary gRPC interceptor that intercepts
 802  // all requests and responses that are sent with a macaroon containing a custom
 803  // caveat condition that is handled by registered middleware.
 804  func (r *InterceptorChain) middlewareUnaryServerInterceptor() grpc.UnaryServerInterceptor {
 805  	return func(ctx context.Context,
 806  		req interface{}, info *grpc.UnaryServerInfo,
 807  		handler grpc.UnaryHandler) (interface{}, error) {
 808  
 809  		// Make sure we don't allow any requests through if one of the
 810  		// mandatory middlewares is missing.
 811  		fullMethod := info.FullMethod
 812  		if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
 813  			return nil, err
 814  		}
 815  
 816  		// If there is no middleware registered, we don't need to
 817  		// intercept anything.
 818  		if !r.middlewareRegistered() {
 819  			return handler(ctx, req)
 820  		}
 821  
 822  		requestID := atomic.AddUint64(&r.lastRequestID, 1)
 823  		req, err := r.interceptMessage(
 824  			ctx, TypeRequest, requestID, false, info.FullMethod,
 825  			req,
 826  		)
 827  		if err != nil {
 828  			return nil, err
 829  		}
 830  
 831  		// Call the handler, which executes the request against lnd.
 832  		lndResp, lndErr := handler(ctx, req)
 833  		if lndErr != nil {
 834  			// The call to lnd ended in an error and not a normal
 835  			// proto message response. Send the error to the
 836  			// interceptor as well to inform about the abnormal
 837  			// termination of the stream and to give the option to
 838  			// replace the error message with a custom one.
 839  			replacedErr, err := r.interceptMessage(
 840  				ctx, TypeResponse, requestID, false,
 841  				info.FullMethod, lndErr,
 842  			)
 843  			if err != nil {
 844  				return nil, err
 845  			}
 846  			return lndResp, replacedErr.(error)
 847  		}
 848  
 849  		return r.interceptMessage(
 850  			ctx, TypeResponse, requestID, false, info.FullMethod,
 851  			lndResp,
 852  		)
 853  	}
 854  }
 855  
 856  // middlewareStreamServerInterceptor is a streaming gRPC interceptor that
 857  // intercepts all requests and responses that are sent with a macaroon
 858  // containing a custom caveat condition that is handled by registered
 859  // middleware.
 860  func (r *InterceptorChain) middlewareStreamServerInterceptor() grpc.StreamServerInterceptor {
 861  	return func(srv interface{},
 862  		ss grpc.ServerStream, info *grpc.StreamServerInfo,
 863  		handler grpc.StreamHandler) error {
 864  
 865  		// Don't intercept the interceptor itself which is a streaming
 866  		// RPC too!
 867  		fullMethod := info.FullMethod
 868  		if fullMethod == lnrpc.RegisterRPCMiddlewareURI {
 869  			return handler(srv, ss)
 870  		}
 871  
 872  		// Make sure we don't allow any requests through if one of the
 873  		// mandatory middlewares is missing. We add this check here to
 874  		// make sure the middleware registration itself can still be
 875  		// called.
 876  		if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
 877  			return err
 878  		}
 879  
 880  		// If there is no middleware registered, we don't need to
 881  		// intercept anything.
 882  		if !r.middlewareRegistered() {
 883  			return handler(srv, ss)
 884  		}
 885  
 886  		// To give the middleware a chance to accept or reject the
 887  		// establishment of the stream itself (and not only when the
 888  		// first message is sent on the stream), we send an intercept
 889  		// request for the stream auth now:
 890  		msg, err := NewStreamAuthInterceptionRequest(
 891  			ss.Context(), info.FullMethod,
 892  		)
 893  		if err != nil {
 894  			return err
 895  		}
 896  
 897  		requestID := atomic.AddUint64(&r.lastRequestID, 1)
 898  		err = r.acceptStream(requestID, msg)
 899  		if err != nil {
 900  			return err
 901  		}
 902  
 903  		wrappedSS := &serverStreamWrapper{
 904  			ServerStream: ss,
 905  			requestID:    requestID,
 906  			fullMethod:   info.FullMethod,
 907  			interceptor:  r,
 908  		}
 909  
 910  		// Call the stream handler, which will block as long as the
 911  		// stream is alive.
 912  		lndErr := handler(srv, wrappedSS)
 913  		if lndErr != nil {
 914  			// This is an error being returned from lnd. Send it to
 915  			// the interceptor as well to inform about the abnormal
 916  			// termination of the stream and to give the option to
 917  			// replace the error message with a custom one.
 918  			replacedErr, err := r.interceptMessage(
 919  				ss.Context(), TypeResponse, requestID,
 920  				true, info.FullMethod, lndErr,
 921  			)
 922  			if err != nil {
 923  				return err
 924  			}
 925  
 926  			return replacedErr.(error)
 927  		}
 928  
 929  		// Normal/successful termination of the stream.
 930  		return nil
 931  	}
 932  }
 933  
 934  // checkMandatoryMiddleware makes sure that each of the middlewares declared as
 935  // mandatory is currently registered.
 936  func (r *InterceptorChain) checkMandatoryMiddleware(fullMethod string) error {
 937  	r.RLock()
 938  	defer r.RUnlock()
 939  
 940  	// Allow calls that are whitelisted for macaroons as well, otherwise we
 941  	// get into impossible situations where the wallet is locked but the
 942  	// unlock call is denied because the middleware isn't registered. But
 943  	// the middleware cannot register itself because the wallet is locked.
 944  	if _, ok := macaroonWhitelist[fullMethod]; ok {
 945  		return nil
 946  	}
 947  
 948  	// Not a white listed call so make sure every mandatory middleware is
 949  	// currently connected to lnd.
 950  	for _, name := range r.mandatoryMiddleware {
 951  		if _, ok := r.registeredMiddlewareNames[name]; !ok {
 952  			return fmt.Errorf("mandatory middleware '%s' is "+
 953  				"currently not registered, not allowing any "+
 954  				"RPC calls", name)
 955  		}
 956  	}
 957  
 958  	return nil
 959  }
 960  
 961  // middlewareRegistered returns true if there is at least one middleware
 962  // currently registered.
 963  func (r *InterceptorChain) middlewareRegistered() bool {
 964  	r.RLock()
 965  	defer r.RUnlock()
 966  
 967  	return len(r.registeredMiddleware) > 0
 968  }
 969  
 970  // acceptStream sends an intercept request to all middlewares that have
 971  // registered for it. This means either a middleware has requested read-only
 972  // access or the request actually has a macaroon with a caveat the middleware
 973  // registered for.
 974  func (r *InterceptorChain) acceptStream(requestID uint64,
 975  	msg *InterceptionRequest) error {
 976  
 977  	r.RLock()
 978  	defer r.RUnlock()
 979  
 980  	for _, middleware := range r.registeredMiddleware {
 981  		// If there is a custom caveat in the macaroon, make sure the
 982  		// middleware registered for it. Or if a middleware registered
 983  		// for read-only mode, it also gets the request.
 984  		hasCustomCaveat := macaroons.HasCustomCaveat(
 985  			msg.Macaroon, middleware.customCaveatName,
 986  		)
 987  		if !hasCustomCaveat && !middleware.readOnly {
 988  			continue
 989  		}
 990  
 991  		msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
 992  			msg.Macaroon, middleware.customCaveatName,
 993  		)
 994  
 995  		resp, err := middleware.intercept(requestID, msg)
 996  
 997  		// Error during interception itself.
 998  		if err != nil {
 999  			return err
1000  		}
1001  
1002  		// Error returned from middleware client.
1003  		if resp.err != nil {
1004  			return resp.err
1005  		}
1006  	}
1007  
1008  	return nil
1009  }
1010  
1011  // interceptMessage sends out an intercept request for an RPC response. Since
1012  // middleware that hasn't registered for the read-only mode has the option to
1013  // overwrite/replace the message, this needs to be handled differently than the
1014  // auth path above.
1015  func (r *InterceptorChain) interceptMessage(ctx context.Context,
1016  	interceptType InterceptType, requestID uint64, isStream bool,
1017  	fullMethod string, m interface{}) (interface{}, error) {
1018  
1019  	r.RLock()
1020  	defer r.RUnlock()
1021  
1022  	currentMessage := m
1023  	for _, middleware := range r.registeredMiddleware {
1024  		msg, err := NewMessageInterceptionRequest(
1025  			ctx, interceptType, isStream, fullMethod,
1026  			currentMessage,
1027  		)
1028  		if err != nil {
1029  			return nil, err
1030  		}
1031  
1032  		// If there is a custom caveat in the macaroon, make sure the
1033  		// middleware registered for it. Or if a middleware registered
1034  		// for read-only mode, it also gets the request.
1035  		hasCustomCaveat := macaroons.HasCustomCaveat(
1036  			msg.Macaroon, middleware.customCaveatName,
1037  		)
1038  		if !hasCustomCaveat && !middleware.readOnly {
1039  			continue
1040  		}
1041  
1042  		msg.CustomCaveatCondition = macaroons.GetCustomCaveatCondition(
1043  			msg.Macaroon, middleware.customCaveatName,
1044  		)
1045  
1046  		resp, err := middleware.intercept(requestID, msg)
1047  
1048  		// Error during interception itself.
1049  		if err != nil {
1050  			return nil, err
1051  		}
1052  
1053  		// Error returned from middleware client.
1054  		if resp.err != nil {
1055  			return nil, resp.err
1056  		}
1057  
1058  		// The message was replaced, make sure the next middleware in
1059  		// line receives the updated message.
1060  		if !middleware.readOnly && resp.replace {
1061  			currentMessage = resp.replacement
1062  		}
1063  	}
1064  
1065  	return currentMessage, nil
1066  }
1067  
1068  // serverStreamWrapper is a struct that wraps a server stream in a way that all
1069  // requests and responses can be intercepted individually.
1070  type serverStreamWrapper struct {
1071  	// ServerStream is the stream that's being wrapped.
1072  	grpc.ServerStream
1073  
1074  	requestID uint64
1075  
1076  	fullMethod string
1077  
1078  	interceptor *InterceptorChain
1079  }
1080  
1081  // SendMsg is called when lnd sends a message to the client. This is wrapped to
1082  // intercept streaming RPC responses.
1083  func (w *serverStreamWrapper) SendMsg(m interface{}) error {
1084  	newMsg, err := w.interceptor.interceptMessage(
1085  		w.ServerStream.Context(), TypeResponse, w.requestID, true,
1086  		w.fullMethod, m,
1087  	)
1088  	if err != nil {
1089  		return err
1090  	}
1091  
1092  	return w.ServerStream.SendMsg(newMsg)
1093  }
1094  
1095  // RecvMsg is called when lnd wants to receive a message from the client. This
1096  // is wrapped to intercept streaming RPC requests.
1097  func (w *serverStreamWrapper) RecvMsg(m interface{}) error {
1098  	err := w.ServerStream.RecvMsg(m)
1099  	if err != nil {
1100  		return err
1101  	}
1102  
1103  	req, err := w.interceptor.interceptMessage(
1104  		w.ServerStream.Context(), TypeRequest, w.requestID, true,
1105  		w.fullMethod, m,
1106  	)
1107  	if err != nil {
1108  		return err
1109  	}
1110  
1111  	return replaceProtoMsg(m, req)
1112  }