/ core / node / provider.go
provider.go
   1  package node
   2  
   3  import (
   4  	"context"
   5  	"errors"
   6  	"fmt"
   7  	"os"
   8  	"path/filepath"
   9  	"time"
  10  
  11  	"github.com/ipfs/boxo/blockstore"
  12  	"github.com/ipfs/boxo/fetcher"
  13  	"github.com/ipfs/boxo/mfs"
  14  	pin "github.com/ipfs/boxo/pinning/pinner"
  15  	"github.com/ipfs/boxo/pinning/pinner/dspinner"
  16  	"github.com/ipfs/boxo/provider"
  17  	"github.com/ipfs/go-cid"
  18  	"github.com/ipfs/go-datastore"
  19  	"github.com/ipfs/go-datastore/mount"
  20  	"github.com/ipfs/go-datastore/namespace"
  21  	"github.com/ipfs/go-datastore/query"
  22  	log "github.com/ipfs/go-log/v2"
  23  	"github.com/ipfs/kubo/config"
  24  	"github.com/ipfs/kubo/repo"
  25  	"github.com/ipfs/kubo/repo/fsrepo"
  26  	irouting "github.com/ipfs/kubo/routing"
  27  	dht "github.com/libp2p/go-libp2p-kad-dht"
  28  	"github.com/libp2p/go-libp2p-kad-dht/amino"
  29  	"github.com/libp2p/go-libp2p-kad-dht/dual"
  30  	"github.com/libp2p/go-libp2p-kad-dht/fullrt"
  31  	dht_pb "github.com/libp2p/go-libp2p-kad-dht/pb"
  32  	dhtprovider "github.com/libp2p/go-libp2p-kad-dht/provider"
  33  	"github.com/libp2p/go-libp2p-kad-dht/provider/buffered"
  34  	ddhtprovider "github.com/libp2p/go-libp2p-kad-dht/provider/dual"
  35  	"github.com/libp2p/go-libp2p-kad-dht/provider/keystore"
  36  	routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
  37  	"github.com/libp2p/go-libp2p/core/host"
  38  	peer "github.com/libp2p/go-libp2p/core/peer"
  39  	"github.com/libp2p/go-libp2p/core/routing"
  40  	ma "github.com/multiformats/go-multiaddr"
  41  	mh "github.com/multiformats/go-multihash"
  42  	"go.uber.org/fx"
  43  )
  44  
  45  const (
  46  	// The size of a batch that will be used for calculating average announcement
  47  	// time per CID, inside of boxo/provider.ThroughputReport
  48  	// and in 'ipfs stats provide' report.
  49  	// Used when Provide.DHT.SweepEnabled=false
  50  	sampledBatchSize = 1000
  51  
  52  	// Datastore key used to store previous reprovide strategy.
  53  	reprovideStrategyKey = "/reprovideStrategy"
  54  
  55  	// KeystoreDatastorePath is the base directory for the provider keystore datastores.
  56  	KeystoreDatastorePath = "provider-keystore"
  57  )
  58  
  59  var (
  60  	// Datastore namespace key for provider data.
  61  	providerDatastoreKey = datastore.NewKey("provider")
  62  	// Datastore namespace key for provider keystore data.
  63  	keystoreDatastoreKey = datastore.NewKey("keystore")
  64  )
  65  
  66  var errAcceleratedDHTNotReady = errors.New("AcceleratedDHTClient: routing table not ready")
  67  
  68  // validateKeystoreSuffix rejects any suffix other than "0" or "1".
  69  // The upstream library uses these two values as alternating namespace
  70  // identifiers. Validating here prevents accidental deletion of unrelated
  71  // directories via os.RemoveAll if the upstream ever changes its scheme.
  72  func validateKeystoreSuffix(suffix string) error {
  73  	if suffix != "0" && suffix != "1" {
  74  		return fmt.Errorf("unexpected keystore suffix %q, expected \"0\" or \"1\"", suffix)
  75  	}
  76  	return nil
  77  }
  78  
  79  // Interval between reprovide queue monitoring checks for slow reprovide alerts.
  80  // Used when Provide.DHT.SweepEnabled=true
  81  const reprovideAlertPollInterval = 15 * time.Minute
  82  
  83  // Number of consecutive polling intervals with sustained queue growth before
  84  // triggering a slow reprovide alert (3 intervals = 45 minutes).
  85  // Used when Provide.DHT.SweepEnabled=true
  86  const consecutiveAlertsThreshold = 3
  87  
  88  // DHTProvider is an interface for providing keys to a DHT swarm. It holds a
  89  // state of keys to be advertised, and is responsible for periodically
  90  // publishing provider records for these keys to the DHT swarm before the
  91  // records expire.
  92  type DHTProvider interface {
  93  	// StartProviding ensures keys are periodically advertised to the DHT swarm.
  94  	//
  95  	// If the `keys` aren't currently being reprovided, they are added to the
  96  	// queue to be provided to the DHT swarm as soon as possible, and scheduled
  97  	// to be reprovided periodically. If `force` is set to true, all keys are
  98  	// provided to the DHT swarm, regardless of whether they were already being
  99  	// reprovided in the past. `keys` keep being reprovided until `StopProviding`
 100  	// is called.
 101  	//
 102  	// This operation is asynchronous, it returns as soon as the `keys` are added
 103  	// to the provide queue, and provides happens asynchronously.
 104  	//
 105  	// Returns an error if the keys couldn't be added to the provide queue. This
 106  	// can happen if the provider is closed or if the node is currently Offline
 107  	// (either never bootstrapped, or disconnected since more than `OfflineDelay`).
 108  	// The schedule and provide queue depend on the network size, hence recent
 109  	// network connectivity is essential.
 110  	StartProviding(force bool, keys ...mh.Multihash) error
 111  	// ProvideOnce sends provider records for the specified keys to the DHT swarm
 112  	// only once. It does not automatically reprovide those keys afterward.
 113  	//
 114  	// Add the supplied multihashes to the provide queue, and return immediately.
 115  	// The provide operation happens asynchronously.
 116  	//
 117  	// Returns an error if the keys couldn't be added to the provide queue. This
 118  	// can happen if the provider is closed or if the node is currently Offline
 119  	// (either never bootstrapped, or disconnected since more than `OfflineDelay`).
 120  	// The schedule and provide queue depend on the network size, hence recent
 121  	// network connectivity is essential.
 122  	ProvideOnce(keys ...mh.Multihash) error
 123  	// Clear clears the all the keys from the provide queue and returns the number
 124  	// of keys that were cleared.
 125  	//
 126  	// The keys are not deleted from the keystore, so they will continue to be
 127  	// reprovided as scheduled.
 128  	Clear() int
 129  	// RefreshSchedule scans the Keystore for any keys that are not currently
 130  	// scheduled for reproviding. If such keys are found, it schedules their
 131  	// associated keyspace region to be reprovided.
 132  	//
 133  	// This function doesn't remove prefixes that have no keys from the schedule.
 134  	// This is done automatically during the reprovide operation if a region has no
 135  	// keys.
 136  	//
 137  	// Returns an error if the provider is closed or if the node is currently
 138  	// Offline (either never bootstrapped, or disconnected since more than
 139  	// `OfflineDelay`). The schedule depends on the network size, hence recent
 140  	// network connectivity is essential.
 141  	RefreshSchedule() error
 142  	Close() error
 143  }
 144  
 145  var (
 146  	_ DHTProvider = &ddhtprovider.SweepingProvider{}
 147  	_ DHTProvider = &dhtprovider.SweepingProvider{}
 148  	_ DHTProvider = &NoopProvider{}
 149  	_ DHTProvider = &LegacyProvider{}
 150  )
 151  
 152  // NoopProvider is a no-operation provider implementation that does nothing.
 153  // It is used when providing is disabled or when no DHT is available.
 154  // All methods return successfully without performing any actual operations.
 155  type NoopProvider struct{}
 156  
 157  func (r *NoopProvider) StartProviding(bool, ...mh.Multihash) error { return nil }
 158  func (r *NoopProvider) ProvideOnce(...mh.Multihash) error          { return nil }
 159  func (r *NoopProvider) Clear() int                                 { return 0 }
 160  func (r *NoopProvider) RefreshSchedule() error                     { return nil }
 161  func (r *NoopProvider) Close() error                               { return nil }
 162  
 163  // LegacyProvider is a wrapper around the boxo/provider.System that implements
 164  // the DHTProvider interface. This provider manages reprovides using a burst
 165  // strategy where it sequentially reprovides all keys at once during each
 166  // reprovide interval, rather than spreading the load over time.
 167  //
 168  // This is the legacy provider implementation that can cause resource spikes
 169  // during reprovide operations. For more efficient providing, consider using
 170  // the SweepingProvider which spreads the load over the reprovide interval.
 171  type LegacyProvider struct {
 172  	provider.System
 173  }
 174  
 175  func (r *LegacyProvider) StartProviding(force bool, keys ...mh.Multihash) error {
 176  	return r.ProvideOnce(keys...)
 177  }
 178  
 179  func (r *LegacyProvider) ProvideOnce(keys ...mh.Multihash) error {
 180  	if many, ok := r.System.(routinghelpers.ProvideManyRouter); ok {
 181  		return many.ProvideMany(context.Background(), keys)
 182  	}
 183  
 184  	for _, k := range keys {
 185  		if err := r.Provide(context.Background(), cid.NewCidV1(cid.Raw, k), true); err != nil {
 186  			return err
 187  		}
 188  	}
 189  	return nil
 190  }
 191  
 192  func (r *LegacyProvider) Clear() int {
 193  	return r.System.Clear()
 194  }
 195  
 196  func (r *LegacyProvider) RefreshSchedule() error { return nil }
 197  
 198  // LegacyProviderOpt creates a LegacyProvider to be used as provider in the
 199  // IpfsNode
 200  func LegacyProviderOpt(reprovideInterval time.Duration, strategy string, acceleratedDHTClient bool, provideWorkerCount int) fx.Option {
 201  	system := fx.Provide(
 202  		fx.Annotate(func(lc fx.Lifecycle, cr irouting.ProvideManyRouter, repo repo.Repo) (*LegacyProvider, error) {
 203  			// Initialize provider.System first, before pinner/blockstore/etc.
 204  			// The KeyChanFunc will be set later via SetKeyProvider() once we have
 205  			// created the pinner, blockstore and other dependencies.
 206  			opts := []provider.Option{
 207  				provider.Online(cr),
 208  				provider.ReproviderInterval(reprovideInterval),
 209  				provider.ProvideWorkerCount(provideWorkerCount),
 210  			}
 211  			if !acceleratedDHTClient && reprovideInterval > 0 {
 212  				// The estimation kinda suck if you are running with accelerated DHT client,
 213  				// given this message is just trying to push people to use the acceleratedDHTClient
 214  				// let's not report on through if it's in use
 215  				opts = append(opts,
 216  					provider.ThroughputReport(func(reprovide bool, complete bool, keysProvided uint, duration time.Duration) bool {
 217  						avgProvideSpeed := duration / time.Duration(keysProvided)
 218  						count := uint64(keysProvided)
 219  
 220  						if !reprovide || !complete {
 221  							// We don't know how many CIDs we have to provide, try to fetch it from the blockstore.
 222  							// But don't try for too long as this might be very expensive if you have a huge datastore.
 223  							ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
 224  							defer cancel()
 225  
 226  							// FIXME: I want a running counter of blocks so size of blockstore can be an O(1) lookup.
 227  							// Note: talk to datastore directly, as to not depend on Blockstore here.
 228  							qr, err := repo.Datastore().Query(ctx, query.Query{
 229  								Prefix:   blockstore.BlockPrefix.String(),
 230  								KeysOnly: true,
 231  							})
 232  							if err != nil {
 233  								logger.Errorf("fetching AllKeysChain in provider ThroughputReport: %v", err)
 234  								return false
 235  							}
 236  							defer qr.Close()
 237  							count = 0
 238  						countLoop:
 239  							for {
 240  								select {
 241  								case _, ok := <-qr.Next():
 242  									if !ok {
 243  										break countLoop
 244  									}
 245  									count++
 246  								case <-ctx.Done():
 247  									// really big blockstore mode
 248  
 249  									// how many blocks would be in a 10TiB blockstore with 128KiB blocks.
 250  									const probableBigBlockstore = (10 * 1024 * 1024 * 1024 * 1024) / (128 * 1024)
 251  									// How long per block that lasts us.
 252  									expectedProvideSpeed := reprovideInterval / probableBigBlockstore
 253  									if avgProvideSpeed > expectedProvideSpeed {
 254  										logger.Errorf(`
 255  🔔🔔🔔 Reprovide Operations Too Slow 🔔🔔🔔
 256  
 257  Your node may be falling behind on DHT reprovides, which could affect content availability.
 258  
 259  Observed: %d keys at %v per key
 260  Estimated: Assuming 10TiB blockstore, would take %v to complete
 261  ⏰ Must finish within %v (Provide.DHT.Interval)
 262  
 263  Solutions (try in order):
 264  1. Enable Provide.DHT.SweepEnabled=true (recommended)
 265  2. Increase Provide.DHT.MaxWorkers if needed
 266  3. Enable Routing.AcceleratedDHTClient=true (last resort, resource intensive)
 267  
 268  Learn more: https://github.com/ipfs/kubo/blob/master/docs/config.md#provide`,
 269  											keysProvided, avgProvideSpeed, avgProvideSpeed*probableBigBlockstore, reprovideInterval)
 270  										return false
 271  									}
 272  								}
 273  							}
 274  						}
 275  
 276  						// How long per block that lasts us.
 277  						expectedProvideSpeed := reprovideInterval
 278  						if count > 0 {
 279  							expectedProvideSpeed = reprovideInterval / time.Duration(count)
 280  						}
 281  
 282  						if avgProvideSpeed > expectedProvideSpeed {
 283  							logger.Errorf(`
 284  🔔🔔🔔 Reprovide Operations Too Slow 🔔🔔🔔
 285  
 286  Your node is falling behind on DHT reprovides, which will affect content availability.
 287  
 288  Observed: %d keys at %v per key
 289  Confirmed: ~%d total CIDs requiring %v to complete
 290  ⏰ Must finish within %v (Provide.DHT.Interval)
 291  
 292  Solutions (try in order):
 293  1. Enable Provide.DHT.SweepEnabled=true (recommended)
 294  2. Increase Provide.DHT.MaxWorkers if needed
 295  3. Enable Routing.AcceleratedDHTClient=true (last resort, resource intensive)
 296  
 297  Learn more: https://github.com/ipfs/kubo/blob/master/docs/config.md#provide`,
 298  								keysProvided, avgProvideSpeed, count, avgProvideSpeed*time.Duration(count), reprovideInterval)
 299  						}
 300  						return false
 301  					}, sampledBatchSize))
 302  			}
 303  
 304  			sys, err := provider.New(repo.Datastore(), opts...)
 305  			if err != nil {
 306  				return nil, err
 307  			}
 308  			lc.Append(fx.Hook{
 309  				OnStop: func(ctx context.Context) error {
 310  					return sys.Close()
 311  				},
 312  			})
 313  
 314  			prov := &LegacyProvider{sys}
 315  			handleStrategyChange(strategy, prov, repo.Datastore())
 316  
 317  			return prov, nil
 318  		},
 319  			fx.As(new(provider.System)),
 320  			fx.As(new(DHTProvider)),
 321  		),
 322  	)
 323  	setKeyProvider := fx.Invoke(func(lc fx.Lifecycle, system provider.System, keyProvider provider.KeyChanFunc) {
 324  		lc.Append(fx.Hook{
 325  			OnStart: func(ctx context.Context) error {
 326  				// SetKeyProvider breaks the circular dependency between provider, blockstore, and pinner.
 327  				// We cannot create the blockstore without the provider (it needs to provide blocks),
 328  				// and we cannot determine the reproviding strategy without the pinner/blockstore.
 329  				// This deferred initialization allows us to create provider.System first,
 330  				// then set the actual key provider function after all dependencies are ready.
 331  				system.SetKeyProvider(keyProvider)
 332  				return nil
 333  			},
 334  		})
 335  	})
 336  	return fx.Options(
 337  		system,
 338  		setKeyProvider,
 339  	)
 340  }
 341  
 342  type dhtImpl interface {
 343  	routing.Routing
 344  	GetClosestPeers(context.Context, string) ([]peer.ID, error)
 345  	Host() host.Host
 346  	MessageSender() dht_pb.MessageSender
 347  }
 348  
 349  type fullrtRouter struct {
 350  	*fullrt.FullRT
 351  	ready  bool
 352  	logger *log.ZapEventLogger
 353  }
 354  
 355  func newFullRTRouter(fr *fullrt.FullRT, loggerName string) *fullrtRouter {
 356  	return &fullrtRouter{
 357  		FullRT: fr,
 358  		ready:  true,
 359  		logger: log.Logger(loggerName),
 360  	}
 361  }
 362  
 363  // GetClosestPeers overrides fullrt.FullRT's GetClosestPeers and returns an
 364  // error if the fullrt's initial network crawl isn't complete yet.
 365  func (fr *fullrtRouter) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
 366  	if fr.ready {
 367  		if !fr.Ready() {
 368  			fr.ready = false
 369  			fr.logger.Info("AcceleratedDHTClient: waiting for routing table initialization (5-10 min, depends on DHT size and network) to complete before providing")
 370  			return nil, errAcceleratedDHTNotReady
 371  		}
 372  	} else {
 373  		if fr.Ready() {
 374  			fr.ready = true
 375  			fr.logger.Info("AcceleratedDHTClient: routing table ready, providing can begin")
 376  		} else {
 377  			return nil, errAcceleratedDHTNotReady
 378  		}
 379  	}
 380  	return fr.FullRT.GetClosestPeers(ctx, key)
 381  }
 382  
 383  var (
 384  	_ dhtImpl = &dht.IpfsDHT{}
 385  	_ dhtImpl = &fullrtRouter{}
 386  )
 387  
 388  type addrsFilter interface {
 389  	FilteredAddrs() []ma.Multiaddr
 390  }
 391  
 392  // findRootDatastoreSpec extracts the leaf datastore spec for the root ("/")
 393  // mount from the repo's Datastore.Spec config. It unwraps mount (picks the "/"
 394  // mountpoint), measure, and log wrappers to find the actual backend spec
 395  // (e.g., levelds, pebbleds).
 396  func findRootDatastoreSpec(spec map[string]any) map[string]any {
 397  	if spec == nil {
 398  		return nil
 399  	}
 400  	switch spec["type"] {
 401  	case "mount":
 402  		mounts, ok := spec["mounts"].([]any)
 403  		if !ok {
 404  			return spec
 405  		}
 406  		for _, m := range mounts {
 407  			mnt, ok := m.(map[string]any)
 408  			if !ok {
 409  				continue
 410  			}
 411  			if mnt["mountpoint"] == "/" {
 412  				return findRootDatastoreSpec(mnt)
 413  			}
 414  		}
 415  		// No root mount found; return nil so callers fall back gracefully
 416  		// (in-memory datastore or skip mounting) rather than passing a
 417  		// mount-type spec to openDatastoreAt which expects a leaf backend.
 418  		return nil
 419  	case "measure", "log":
 420  		if child, ok := spec["child"].(map[string]any); ok {
 421  			return findRootDatastoreSpec(child)
 422  		}
 423  		return spec
 424  	default:
 425  		if _, hasChild := spec["child"]; hasChild {
 426  			logger.Warnw("unrecognized datastore wrapper type, using as-is",
 427  				"type", spec["type"])
 428  		}
 429  		return spec
 430  	}
 431  }
 432  
 433  // MountKeystoreDatastores opens any provider keystore datastores that exist on
 434  // disk and returns them as mount.Mount entries ready to be combined with the
 435  // main repo datastore. The caller must call the returned cleanup function when
 436  // done. Returns nil mounts and a no-op closer if no keystores exist.
 437  func MountKeystoreDatastores(repo repo.Repo) ([]mount.Mount, func(), error) {
 438  	cfg, err := repo.Config()
 439  	if err != nil {
 440  		return nil, nil, fmt.Errorf("reading repo config: %w", err)
 441  	}
 442  
 443  	rootSpec := findRootDatastoreSpec(cfg.Datastore.Spec)
 444  	if rootSpec == nil {
 445  		return nil, func() {}, nil
 446  	}
 447  
 448  	keystoreBasePath := filepath.Join(repo.Path(), KeystoreDatastorePath)
 449  	var mounts []mount.Mount
 450  	var closers []func()
 451  
 452  	for _, suffix := range []string{"0", "1"} {
 453  		dir := filepath.Join(keystoreBasePath, suffix)
 454  		if _, err := os.Stat(dir); err != nil {
 455  			continue
 456  		}
 457  		ds, err := openDatastoreAt(rootSpec, dir)
 458  		if err != nil {
 459  			for _, c := range closers {
 460  				c()
 461  			}
 462  			return nil, nil, err
 463  		}
 464  		prefix := providerDatastoreKey.Child(keystoreDatastoreKey).ChildString(suffix)
 465  		mounts = append(mounts, mount.Mount{Prefix: prefix, Datastore: ds})
 466  		closers = append(closers, func() { ds.Close() })
 467  	}
 468  
 469  	closer := func() {
 470  		for _, c := range closers {
 471  			c()
 472  		}
 473  	}
 474  	return mounts, closer, nil
 475  }
 476  
 477  // openDatastoreAt opens a datastore using the given spec at the specified path.
 478  // It deep-copies the spec to avoid mutating the original.
 479  func openDatastoreAt(rootSpec map[string]any, path string) (datastore.Batching, error) {
 480  	spec := copySpec(rootSpec)
 481  	spec["path"] = path
 482  	dsc, err := fsrepo.AnyDatastoreConfig(spec)
 483  	if err != nil {
 484  		return nil, fmt.Errorf("creating datastore config for %s: %w", path, err)
 485  	}
 486  	return dsc.Create("")
 487  }
 488  
 489  // copySpec deep-copies a datastore spec map so modifications (e.g., changing
 490  // the path) don't affect the original.
 491  func copySpec(spec map[string]any) map[string]any {
 492  	if spec == nil {
 493  		return nil
 494  	}
 495  	cp := make(map[string]any, len(spec))
 496  	for k, v := range spec {
 497  		switch val := v.(type) {
 498  		case map[string]any:
 499  			cp[k] = copySpec(val)
 500  		case []any:
 501  			s := make([]any, len(val))
 502  			for i, elem := range val {
 503  				if m, ok := elem.(map[string]any); ok {
 504  					s[i] = copySpec(m)
 505  				} else {
 506  					s[i] = elem
 507  				}
 508  			}
 509  			cp[k] = s
 510  		default:
 511  			cp[k] = v
 512  		}
 513  	}
 514  	return cp
 515  }
 516  
 517  // purgeBatchSize is the number of keys deleted per batch commit during
 518  // orphaned keystore cleanup. Each commit is a cancellation checkpoint.
 519  const purgeBatchSize = 1 << 12 // 4096
 520  
 521  // purgeOrphanedKeystoreData deletes all keys under /provider/keystore/ from the
 522  // shared repo datastore. These were written by older Kubo versions that stored
 523  // provider keystore data inline in the shared datastore. The new code uses
 524  // separate filesystem datastores under <repo>/{KeystoreDatastorePath}/ instead.
 525  //
 526  // The operation is idempotent and safe to interrupt: partial completion is
 527  // fine because already-deleted keys are no-ops on re-run.
 528  func purgeOrphanedKeystoreData(ctx context.Context, ds datastore.Batching) error {
 529  	orphanedPrefix := providerDatastoreKey.Child(keystoreDatastoreKey).String()
 530  	syncKey := datastore.NewKey(orphanedPrefix)
 531  
 532  	results, err := ds.Query(ctx, query.Query{
 533  		Prefix:   orphanedPrefix,
 534  		KeysOnly: true,
 535  	})
 536  	if err != nil {
 537  		return fmt.Errorf("querying orphaned keystore data: %w", err)
 538  	}
 539  	defer results.Close()
 540  
 541  	var batch datastore.Batch
 542  	var count, pending int
 543  	for result := range results.Next() {
 544  		if ctx.Err() != nil {
 545  			return ctx.Err()
 546  		}
 547  		if result.Error != nil {
 548  			return fmt.Errorf("iterating orphaned keystore data: %w", result.Error)
 549  		}
 550  		if batch == nil {
 551  			batch, err = ds.Batch(ctx)
 552  			if err != nil {
 553  				return fmt.Errorf("creating batch for orphaned keystore cleanup: %w", err)
 554  			}
 555  		}
 556  		if err := batch.Delete(ctx, datastore.NewKey(result.Key)); err != nil {
 557  			return fmt.Errorf("batch deleting orphaned key %s: %w", result.Key, err)
 558  		}
 559  		count++
 560  		pending++
 561  		if pending >= purgeBatchSize {
 562  			if err := batch.Commit(ctx); err != nil {
 563  				return fmt.Errorf("committing orphaned keystore cleanup batch: %w", err)
 564  			}
 565  			if err := ds.Sync(ctx, syncKey); err != nil {
 566  				return fmt.Errorf("syncing orphaned keystore cleanup: %w", err)
 567  			}
 568  			batch = nil
 569  			pending = 0
 570  		}
 571  	}
 572  	if pending > 0 {
 573  		if err := batch.Commit(ctx); err != nil {
 574  			return fmt.Errorf("committing orphaned keystore cleanup batch: %w", err)
 575  		}
 576  		if err := ds.Sync(ctx, syncKey); err != nil {
 577  			return fmt.Errorf("syncing orphaned keystore cleanup: %w", err)
 578  		}
 579  	}
 580  	if count > 0 {
 581  		logger.Infow("purged orphaned provider keystore data from shared datastore", "keys", count)
 582  	}
 583  	return nil
 584  }
 585  
 586  func SweepingProviderOpt(cfg *config.Config) fx.Option {
 587  	reprovideInterval := cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval)
 588  	type providerInput struct {
 589  		fx.In
 590  		DHT  routing.Routing `name:"dhtc"`
 591  		Repo repo.Repo
 592  		Lc   fx.Lifecycle
 593  	}
 594  	sweepingReprovider := fx.Provide(func(in providerInput) (DHTProvider, *keystore.ResettableKeystore, error) {
 595  		ds := namespace.Wrap(in.Repo.Datastore(), providerDatastoreKey)
 596  
 597  		// Get repo path and config to determine datastore type
 598  		repoPath := in.Repo.Path()
 599  		repoCfg, err := in.Repo.Config()
 600  		if err != nil {
 601  			return nil, nil, fmt.Errorf("getting repo config: %w", err)
 602  		}
 603  
 604  		// Find the root datastore type (levelds, pebbleds, etc.)
 605  		rootSpec := findRootDatastoreSpec(repoCfg.Datastore.Spec)
 606  
 607  		// Keystore datastores live at <repo>/{KeystoreDatastorePath}/<suffix>
 608  		keystoreBasePath := filepath.Join(repoPath, KeystoreDatastorePath)
 609  
 610  		createDs := func(suffix string) (datastore.Batching, error) {
 611  			if err := validateKeystoreSuffix(suffix); err != nil {
 612  				return nil, err
 613  			}
 614  			// When no datastore spec is configured (e.g., test/mock repos),
 615  			// fall back to an in-memory datastore.
 616  			if rootSpec == nil {
 617  				return datastore.NewMapDatastore(), nil
 618  			}
 619  			if err := os.MkdirAll(keystoreBasePath, 0o755); err != nil {
 620  				return nil, fmt.Errorf("creating keystore base directory: %w", err)
 621  			}
 622  			ds, err := openDatastoreAt(rootSpec, filepath.Join(keystoreBasePath, suffix))
 623  			if err != nil {
 624  				return nil, err
 625  			}
 626  			logger.Infow("provider keystore: opened datastore", "suffix", suffix, "path", filepath.Join(keystoreBasePath, suffix))
 627  			return ds, nil
 628  		}
 629  
 630  		destroyDs := func(suffix string) error {
 631  			if err := validateKeystoreSuffix(suffix); err != nil {
 632  				return err
 633  			}
 634  			logger.Infow("provider keystore: removing datastore from disk", "suffix", suffix, "path", filepath.Join(keystoreBasePath, suffix))
 635  			return os.RemoveAll(filepath.Join(keystoreBasePath, suffix))
 636  		}
 637  
 638  		// One-time cleanup of stale keystore data left by older Kubo in the
 639  		// shared repo datastore under /provider/keystore/. New code stores
 640  		// bulk key data in separate filesystem datastores under
 641  		// <repo>/{KeystoreDatastorePath}/ while still using the same
 642  		// /provider/keystore/ namespace in the shared datastore for metadata.
 643  		//
 644  		// The absence of the keystoreBasePath directory signals a first run
 645  		// after upgrade: the directory is created later by createDs on first
 646  		// use, so it doubles as a "cleanup done" flag. If the process dies
 647  		// mid-purge the directory still won't exist and the cleanup re-runs
 648  		// on next start (it is idempotent). Must run synchronously before
 649  		// NewResettableKeystore to avoid racing with reads on the same
 650  		// namespace.
 651  		if _, statErr := os.Stat(keystoreBasePath); os.IsNotExist(statErr) {
 652  			logger.Infow("migrating provider keystore data from shared datastore to separate filesystem datastores", "path", keystoreBasePath)
 653  			// Create a cancellable context for the purge. The OnStop hook
 654  			// below calls purgeCancel when the node receives a shutdown
 655  			// signal (e.g., SIGINT), which interrupts the purge loop
 656  			// instead of blocking indefinitely.
 657  			purgeCtx, purgeCancel := context.WithCancel(context.Background())
 658  			in.Lc.Append(fx.Hook{
 659  				OnStop: func(_ context.Context) error {
 660  					purgeCancel()
 661  					return nil
 662  				},
 663  			})
 664  			if purgeErr := purgeOrphanedKeystoreData(purgeCtx, in.Repo.Datastore()); purgeErr != nil {
 665  				if purgeCtx.Err() != nil {
 666  					logger.Infow("provider keystore migration interrupted by shutdown, will resume on next start")
 667  				} else {
 668  					logger.Warnw("provider keystore migration failed, will retry on next start", "error", purgeErr)
 669  				}
 670  			} else {
 671  				logger.Infow("provider keystore migration completed")
 672  			}
 673  			purgeCancel()
 674  		}
 675  
 676  		keystoreDs := namespace.Wrap(ds, keystoreDatastoreKey)
 677  		ks, err := keystore.NewResettableKeystore(keystoreDs,
 678  			keystore.WithDatastoreFactory(createDs, destroyDs),
 679  			keystore.KeystoreOption(
 680  				keystore.WithPrefixBits(16),
 681  				keystore.WithBatchSize(int(cfg.Provide.DHT.KeystoreBatchSize.WithDefault(config.DefaultProvideDHTKeystoreBatchSize))),
 682  			),
 683  		)
 684  		if err != nil {
 685  			return nil, nil, err
 686  		}
 687  		// Constants for buffered provider configuration
 688  		// These values match the upstream defaults from go-libp2p-kad-dht and have been battle-tested
 689  		const (
 690  			// bufferedDsName is the datastore namespace used by the buffered provider.
 691  			// The dsqueue persists operations here to handle large data additions without
 692  			// being memory-bound, allowing operations on hardware with limited RAM and
 693  			// enabling core operations to return instantly while processing happens async.
 694  			bufferedDsName = "bprov"
 695  
 696  			// bufferedBatchSize controls how many operations are dequeued and processed
 697  			// together from the datastore queue. The worker processes up to this many
 698  			// operations at once, grouping them by type for efficiency.
 699  			bufferedBatchSize = 1 << 10 // 1024 items
 700  
 701  			// bufferedIdleWriteTime is an implementation detail of go-dsqueue that controls
 702  			// how long the datastore buffer waits for new multihashes to arrive before
 703  			// flushing in-memory items to the datastore. This does NOT affect providing speed -
 704  			// provides happen as fast as possible via a dedicated worker that continuously
 705  			// processes the queue regardless of this timing.
 706  			bufferedIdleWriteTime = time.Minute
 707  
 708  			// loggerName is the name of the go-log logger used by the provider.
 709  			loggerName = dhtprovider.DefaultLoggerName
 710  		)
 711  
 712  		bufferedProviderOpts := []buffered.Option{
 713  			buffered.WithBatchSize(bufferedBatchSize),
 714  			buffered.WithDsName(bufferedDsName),
 715  			buffered.WithIdleWriteTime(bufferedIdleWriteTime),
 716  		}
 717  		var impl dhtImpl
 718  		switch inDht := in.DHT.(type) {
 719  		case *dht.IpfsDHT:
 720  			if inDht != nil {
 721  				impl = inDht
 722  			}
 723  		case *dual.DHT:
 724  			if inDht != nil {
 725  				prov, err := ddhtprovider.New(inDht,
 726  					ddhtprovider.WithKeystore(ks),
 727  					ddhtprovider.WithDatastore(ds),
 728  					ddhtprovider.WithResumeCycle(cfg.Provide.DHT.ResumeEnabled.WithDefault(config.DefaultProvideDHTResumeEnabled)),
 729  
 730  					ddhtprovider.WithReprovideInterval(reprovideInterval),
 731  					ddhtprovider.WithMaxReprovideDelay(time.Hour),
 732  					ddhtprovider.WithOfflineDelay(cfg.Provide.DHT.OfflineDelay.WithDefault(config.DefaultProvideDHTOfflineDelay)),
 733  					ddhtprovider.WithConnectivityCheckOnlineInterval(1*time.Minute),
 734  
 735  					ddhtprovider.WithMaxWorkers(int(cfg.Provide.DHT.MaxWorkers.WithDefault(config.DefaultProvideDHTMaxWorkers))),
 736  					ddhtprovider.WithDedicatedPeriodicWorkers(int(cfg.Provide.DHT.DedicatedPeriodicWorkers.WithDefault(config.DefaultProvideDHTDedicatedPeriodicWorkers))),
 737  					ddhtprovider.WithDedicatedBurstWorkers(int(cfg.Provide.DHT.DedicatedBurstWorkers.WithDefault(config.DefaultProvideDHTDedicatedBurstWorkers))),
 738  					ddhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Provide.DHT.MaxProvideConnsPerWorker.WithDefault(config.DefaultProvideDHTMaxProvideConnsPerWorker))),
 739  
 740  					ddhtprovider.WithLoggerName(loggerName),
 741  				)
 742  				if err != nil {
 743  					return nil, nil, err
 744  				}
 745  				return buffered.New(prov, ds, bufferedProviderOpts...), ks, nil
 746  			}
 747  		case *fullrt.FullRT:
 748  			if inDht != nil {
 749  				impl = newFullRTRouter(inDht, loggerName)
 750  			}
 751  		}
 752  		if impl == nil {
 753  			return &NoopProvider{}, nil, nil
 754  		}
 755  
 756  		var selfAddrsFunc func() []ma.Multiaddr
 757  		if imlpFilter, ok := impl.(addrsFilter); ok {
 758  			selfAddrsFunc = imlpFilter.FilteredAddrs
 759  		} else {
 760  			selfAddrsFunc = func() []ma.Multiaddr { return impl.Host().Addrs() }
 761  		}
 762  		opts := []dhtprovider.Option{
 763  			dhtprovider.WithKeystore(ks),
 764  			dhtprovider.WithDatastore(ds),
 765  			dhtprovider.WithResumeCycle(cfg.Provide.DHT.ResumeEnabled.WithDefault(config.DefaultProvideDHTResumeEnabled)),
 766  			dhtprovider.WithHost(impl.Host()),
 767  			dhtprovider.WithRouter(impl),
 768  			dhtprovider.WithMessageSender(impl.MessageSender()),
 769  			dhtprovider.WithSelfAddrs(selfAddrsFunc),
 770  			dhtprovider.WithAddLocalRecord(func(h mh.Multihash) error {
 771  				return impl.Provide(context.Background(), cid.NewCidV1(cid.Raw, h), false)
 772  			}),
 773  
 774  			dhtprovider.WithReplicationFactor(amino.DefaultBucketSize),
 775  			dhtprovider.WithReprovideInterval(reprovideInterval),
 776  			dhtprovider.WithMaxReprovideDelay(time.Hour),
 777  			dhtprovider.WithOfflineDelay(cfg.Provide.DHT.OfflineDelay.WithDefault(config.DefaultProvideDHTOfflineDelay)),
 778  			dhtprovider.WithConnectivityCheckOnlineInterval(1 * time.Minute),
 779  
 780  			dhtprovider.WithMaxWorkers(int(cfg.Provide.DHT.MaxWorkers.WithDefault(config.DefaultProvideDHTMaxWorkers))),
 781  			dhtprovider.WithDedicatedPeriodicWorkers(int(cfg.Provide.DHT.DedicatedPeriodicWorkers.WithDefault(config.DefaultProvideDHTDedicatedPeriodicWorkers))),
 782  			dhtprovider.WithDedicatedBurstWorkers(int(cfg.Provide.DHT.DedicatedBurstWorkers.WithDefault(config.DefaultProvideDHTDedicatedBurstWorkers))),
 783  			dhtprovider.WithMaxProvideConnsPerWorker(int(cfg.Provide.DHT.MaxProvideConnsPerWorker.WithDefault(config.DefaultProvideDHTMaxProvideConnsPerWorker))),
 784  
 785  			dhtprovider.WithLoggerName(loggerName),
 786  		}
 787  
 788  		prov, err := dhtprovider.New(opts...)
 789  		if err != nil {
 790  			return nil, nil, err
 791  		}
 792  		return buffered.New(prov, ds, bufferedProviderOpts...), ks, nil
 793  	})
 794  
 795  	type keystoreInput struct {
 796  		fx.In
 797  		Provider    DHTProvider
 798  		Keystore    *keystore.ResettableKeystore
 799  		KeyProvider provider.KeyChanFunc
 800  	}
 801  	initKeystore := fx.Invoke(func(lc fx.Lifecycle, in keystoreInput) {
 802  		// Skip keystore initialization for NoopProvider
 803  		if _, ok := in.Provider.(*NoopProvider); ok {
 804  			return
 805  		}
 806  
 807  		var (
 808  			cancel context.CancelFunc
 809  			done   = make(chan struct{})
 810  		)
 811  
 812  		syncKeystore := func(ctx context.Context) error {
 813  			kcf, err := in.KeyProvider(ctx)
 814  			if err != nil {
 815  				return err
 816  			}
 817  			if err := in.Keystore.ResetCids(ctx, kcf); err != nil {
 818  				return err
 819  			}
 820  			if err := in.Provider.RefreshSchedule(); err != nil {
 821  				logger.Infow("refreshing provider schedule", "err", err)
 822  			}
 823  			return nil
 824  		}
 825  
 826  		lc.Append(fx.Hook{
 827  			OnStart: func(ctx context.Context) error {
 828  				// Set the KeyProvider as a garbage collection function for the
 829  				// keystore. Periodically purge the Keystore from all its keys and
 830  				// replace them with the keys that needs to be reprovided, coming from
 831  				// the KeyChanFunc. So far, this is the less worse way to remove CIDs
 832  				// that shouldn't be reprovided from the provider's state.
 833  				go func() {
 834  					// Sync the keystore once at startup. This operation is async since
 835  					// we need to walk the DAG of objects matching the provide strategy,
 836  					// which can take a while.
 837  					strategy := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy)
 838  					logger.Infow("provider keystore sync started", "strategy", strategy)
 839  					if err := syncKeystore(ctx); err != nil {
 840  						if ctx.Err() == nil {
 841  							logger.Errorw("provider keystore sync failed", "err", err, "strategy", strategy)
 842  						} else {
 843  							logger.Debugw("provider keystore sync interrupted by shutdown", "err", err, "strategy", strategy)
 844  						}
 845  						return
 846  					}
 847  					logger.Infow("provider keystore sync completed", "strategy", strategy)
 848  				}()
 849  
 850  				gcCtx, c := context.WithCancel(context.Background())
 851  				cancel = c
 852  
 853  				go func() { // garbage collection loop for cids to reprovide
 854  					defer close(done)
 855  					ticker := time.NewTicker(reprovideInterval)
 856  					defer ticker.Stop()
 857  
 858  					for {
 859  						select {
 860  						case <-gcCtx.Done():
 861  							return
 862  						case <-ticker.C:
 863  							if err := syncKeystore(gcCtx); err != nil {
 864  								logger.Errorw("provider keystore sync", "err", err)
 865  							}
 866  						}
 867  					}
 868  				}()
 869  				return nil
 870  			},
 871  			OnStop: func(ctx context.Context) error {
 872  				if cancel != nil {
 873  					cancel()
 874  				}
 875  				select {
 876  				case <-done:
 877  				case <-ctx.Done():
 878  					return ctx.Err()
 879  				}
 880  				// Keystore will be closed by ensureProviderClosesBeforeKeystore hook
 881  				// to guarantee provider closes before keystore.
 882  				return nil
 883  			},
 884  		})
 885  	})
 886  
 887  	// ensureProviderClosesBeforeKeystore manages the shutdown order between
 888  	// provider and keystore to prevent race conditions.
 889  	//
 890  	// The provider's worker goroutines may call keystore methods during their
 891  	// operation. If keystore closes while these operations are in-flight, we get
 892  	// "keystore is closed" errors. By closing the provider first, we ensure all
 893  	// worker goroutines exit and complete any pending keystore operations before
 894  	// the keystore itself closes.
 895  	type providerKeystoreShutdownInput struct {
 896  		fx.In
 897  		Provider DHTProvider
 898  		Keystore *keystore.ResettableKeystore
 899  	}
 900  	ensureProviderClosesBeforeKeystore := fx.Invoke(func(lc fx.Lifecycle, in providerKeystoreShutdownInput) {
 901  		// Skip for NoopProvider
 902  		if _, ok := in.Provider.(*NoopProvider); ok {
 903  			return
 904  		}
 905  
 906  		lc.Append(fx.Hook{
 907  			OnStop: func(ctx context.Context) error {
 908  				// Close provider first - waits for all worker goroutines to exit.
 909  				// This ensures no code can access keystore after this returns.
 910  				if err := in.Provider.Close(); err != nil {
 911  					logger.Errorw("error closing provider during shutdown", "error", err)
 912  				}
 913  
 914  				// Close keystore - safe now, provider is fully shut down
 915  				return in.Keystore.Close()
 916  			},
 917  		})
 918  	})
 919  
 920  	// extractSweepingProvider extracts a SweepingProvider from the given provider interface.
 921  	// It handles unwrapping buffered and dual providers, always selecting WAN for dual DHT.
 922  	// Returns nil if the provider is not a sweeping provider type.
 923  	var extractSweepingProvider func(prov any) *dhtprovider.SweepingProvider
 924  	extractSweepingProvider = func(prov any) *dhtprovider.SweepingProvider {
 925  		switch p := prov.(type) {
 926  		case *dhtprovider.SweepingProvider:
 927  			return p
 928  		case *ddhtprovider.SweepingProvider:
 929  			return p.WAN
 930  		case *buffered.SweepingProvider:
 931  			// Recursively extract from the inner provider
 932  			return extractSweepingProvider(p.Provider)
 933  		default:
 934  			return nil
 935  		}
 936  	}
 937  
 938  	type alertInput struct {
 939  		fx.In
 940  		Provider DHTProvider
 941  	}
 942  	reprovideAlert := fx.Invoke(func(lc fx.Lifecycle, in alertInput) {
 943  		prov := extractSweepingProvider(in.Provider)
 944  		if prov == nil {
 945  			return
 946  		}
 947  
 948  		var (
 949  			cancel context.CancelFunc
 950  			done   = make(chan struct{})
 951  		)
 952  
 953  		lc.Append(fx.Hook{
 954  			OnStart: func(ctx context.Context) error {
 955  				gcCtx, c := context.WithCancel(context.Background())
 956  				cancel = c
 957  				go func() {
 958  					defer close(done)
 959  
 960  					ticker := time.NewTicker(reprovideAlertPollInterval)
 961  					defer ticker.Stop()
 962  
 963  					var (
 964  						queueSize, prevQueueSize         int64
 965  						queuedWorkers, prevQueuedWorkers bool
 966  						count                            int
 967  					)
 968  
 969  					for {
 970  						select {
 971  						case <-gcCtx.Done():
 972  							return
 973  						case <-ticker.C:
 974  						}
 975  
 976  						stats := prov.Stats()
 977  						queuedWorkers = stats.Workers.QueuedPeriodic > 0
 978  						queueSize = int64(stats.Queues.PendingRegionReprovides)
 979  
 980  						// Alert if reprovide queue keeps growing and all periodic workers are busy.
 981  						// Requires consecutiveAlertsThreshold intervals of sustained growth.
 982  						if prevQueuedWorkers && queuedWorkers && queueSize > prevQueueSize {
 983  							count++
 984  							if count >= consecutiveAlertsThreshold {
 985  								logger.Errorf(`
 986  🔔🔔🔔 Reprovide Operations Too Slow 🔔🔔🔔
 987  
 988  Your node is falling behind on DHT reprovides, which will affect content availability.
 989  
 990  Keyspace regions enqueued for reprovide:
 991    %s ago:\t%d
 992    Now:\t%d
 993  
 994  All periodic workers are busy!
 995    Active workers:\t%d / %d (max)
 996    Active workers types:\t%d periodic, %d burst
 997    Dedicated workers:\t%d periodic, %d burst
 998  
 999  Solutions (try in order):
1000  1. Increase Provide.DHT.MaxWorkers (current %d)
1001  2. Increase Provide.DHT.DedicatedPeriodicWorkers (current %d)
1002  3. Set Provide.DHT.SweepEnabled=false and Routing.AcceleratedDHTClient=true (last resort, not recommended)
1003  
1004  See how the reprovide queue is processed in real-time with 'watch ipfs provide stat --all --compact'
1005  
1006  See docs: https://github.com/ipfs/kubo/blob/master/docs/config.md#providedhtmaxworkers`,
1007  									reprovideAlertPollInterval.Truncate(time.Minute).String(), prevQueueSize, queueSize,
1008  									stats.Workers.Active, stats.Workers.Max,
1009  									stats.Workers.ActivePeriodic, stats.Workers.ActiveBurst,
1010  									stats.Workers.DedicatedPeriodic, stats.Workers.DedicatedBurst,
1011  									stats.Workers.Max, stats.Workers.DedicatedPeriodic)
1012  							}
1013  						} else if !queuedWorkers {
1014  							count = 0
1015  						}
1016  
1017  						prevQueueSize, prevQueuedWorkers = queueSize, queuedWorkers
1018  					}
1019  				}()
1020  				return nil
1021  			},
1022  			OnStop: func(ctx context.Context) error {
1023  				// Cancel the alert loop
1024  				if cancel != nil {
1025  					cancel()
1026  				}
1027  				select {
1028  				case <-done:
1029  				case <-ctx.Done():
1030  					return ctx.Err()
1031  				}
1032  				return nil
1033  			},
1034  		})
1035  	})
1036  
1037  	return fx.Options(
1038  		sweepingReprovider,
1039  		initKeystore,
1040  		ensureProviderClosesBeforeKeystore,
1041  		reprovideAlert,
1042  	)
1043  }
1044  
1045  // ONLINE/OFFLINE
1046  
1047  // hasDHTRouting checks if the routing configuration includes a DHT component.
1048  // Returns false for HTTP-only custom routing configurations (e.g., Routing.Type="custom"
1049  // with only HTTP routers). This is used to determine whether SweepingProviderOpt
1050  // can be used, since it requires a DHT client.
1051  func hasDHTRouting(cfg *config.Config) bool {
1052  	routingType := cfg.Routing.Type.WithDefault(config.DefaultRoutingType)
1053  	switch routingType {
1054  	case "auto", "autoclient", "dht", "dhtclient", "dhtserver":
1055  		return true
1056  	case "custom":
1057  		// Check if any router in custom config is DHT-based
1058  		for _, router := range cfg.Routing.Routers {
1059  			if routerIncludesDHT(router, cfg) {
1060  				return true
1061  			}
1062  		}
1063  		return false
1064  	default: // "none", "delegated"
1065  		return false
1066  	}
1067  }
1068  
1069  // routerIncludesDHT recursively checks if a router configuration includes DHT.
1070  // Handles parallel and sequential composite routers by checking their children.
1071  func routerIncludesDHT(rp config.RouterParser, cfg *config.Config) bool {
1072  	switch rp.Type {
1073  	case config.RouterTypeDHT:
1074  		return true
1075  	case config.RouterTypeParallel, config.RouterTypeSequential:
1076  		if children, ok := rp.Parameters.(*config.ComposableRouterParams); ok {
1077  			for _, child := range children.Routers {
1078  				if childRouter, exists := cfg.Routing.Routers[child.RouterName]; exists {
1079  					if routerIncludesDHT(childRouter, cfg) {
1080  						return true
1081  					}
1082  				}
1083  			}
1084  		}
1085  	}
1086  	return false
1087  }
1088  
1089  // OnlineProviders groups units managing provide routing records online
1090  func OnlineProviders(provide bool, cfg *config.Config) fx.Option {
1091  	if !provide {
1092  		return OfflineProviders()
1093  	}
1094  
1095  	providerStrategy := cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy)
1096  
1097  	strategyFlag := config.ParseProvideStrategy(providerStrategy)
1098  	if strategyFlag == 0 {
1099  		return fx.Error(fmt.Errorf("provider: unknown strategy %q", providerStrategy))
1100  	}
1101  
1102  	opts := []fx.Option{
1103  		fx.Provide(setReproviderKeyProvider(providerStrategy)),
1104  	}
1105  
1106  	sweepEnabled := cfg.Provide.DHT.SweepEnabled.WithDefault(config.DefaultProvideDHTSweepEnabled)
1107  	dhtAvailable := hasDHTRouting(cfg)
1108  
1109  	// Use SweepingProvider only when both sweep is enabled AND DHT is available.
1110  	// For HTTP-only routing (e.g., Routing.Type="custom" with only HTTP routers),
1111  	// fall back to LegacyProvider which works with ProvideManyRouter.
1112  	// See https://github.com/ipfs/kubo/issues/11089
1113  	if sweepEnabled && dhtAvailable {
1114  		opts = append(opts, SweepingProviderOpt(cfg))
1115  	} else {
1116  		reprovideInterval := cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval)
1117  		acceleratedDHTClient := cfg.Routing.AcceleratedDHTClient.WithDefault(config.DefaultAcceleratedDHTClient)
1118  		provideWorkerCount := int(cfg.Provide.DHT.MaxWorkers.WithDefault(config.DefaultProvideDHTMaxWorkers))
1119  
1120  		opts = append(opts, LegacyProviderOpt(reprovideInterval, providerStrategy, acceleratedDHTClient, provideWorkerCount))
1121  	}
1122  
1123  	return fx.Options(opts...)
1124  }
1125  
1126  // OfflineProviders groups units managing provide routing records offline
1127  func OfflineProviders() fx.Option {
1128  	return fx.Provide(func() DHTProvider {
1129  		return &NoopProvider{}
1130  	})
1131  }
1132  
1133  func mfsProvider(mfsRoot *mfs.Root, fetcher fetcher.Factory) provider.KeyChanFunc {
1134  	return func(ctx context.Context) (<-chan cid.Cid, error) {
1135  		err := mfsRoot.FlushMemFree(ctx)
1136  		if err != nil {
1137  			return nil, fmt.Errorf("provider: error flushing MFS, cannot provide MFS: %w", err)
1138  		}
1139  		rootNode, err := mfsRoot.GetDirectory().GetNode()
1140  		if err != nil {
1141  			return nil, fmt.Errorf("provider: error loading MFS root, cannot provide MFS: %w", err)
1142  		}
1143  
1144  		kcf := provider.NewDAGProvider(rootNode.Cid(), fetcher)
1145  		return kcf(ctx)
1146  	}
1147  }
1148  
1149  type provStrategyIn struct {
1150  	fx.In
1151  	Pinner               pin.Pinner
1152  	Blockstore           blockstore.Blockstore
1153  	OfflineIPLDFetcher   fetcher.Factory `name:"offlineIpldFetcher"`
1154  	OfflineUnixFSFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
1155  	MFSRoot              *mfs.Root
1156  	Repo                 repo.Repo
1157  }
1158  
1159  type provStrategyOut struct {
1160  	fx.Out
1161  	ProvidingStrategy    config.ProvideStrategy
1162  	ProvidingKeyChanFunc provider.KeyChanFunc
1163  }
1164  
1165  // createKeyProvider creates the appropriate KeyChanFunc based on strategy.
1166  // Each strategy has different behavior:
1167  // - "roots": Only root CIDs of pinned content
1168  // - "pinned": All pinned content (roots + children)
1169  // - "mfs": Only MFS content
1170  // - "all": all blocks
1171  func createKeyProvider(strategyFlag config.ProvideStrategy, in provStrategyIn) provider.KeyChanFunc {
1172  	switch strategyFlag {
1173  	case config.ProvideStrategyRoots:
1174  		return provider.NewBufferedProvider(dspinner.NewPinnedProvider(true, in.Pinner, in.OfflineIPLDFetcher))
1175  	case config.ProvideStrategyPinned:
1176  		return provider.NewBufferedProvider(dspinner.NewPinnedProvider(false, in.Pinner, in.OfflineIPLDFetcher))
1177  	case config.ProvideStrategyPinned | config.ProvideStrategyMFS:
1178  		return provider.NewPrioritizedProvider(
1179  			provider.NewBufferedProvider(dspinner.NewPinnedProvider(false, in.Pinner, in.OfflineIPLDFetcher)),
1180  			mfsProvider(in.MFSRoot, in.OfflineUnixFSFetcher),
1181  		)
1182  	case config.ProvideStrategyMFS:
1183  		return mfsProvider(in.MFSRoot, in.OfflineUnixFSFetcher)
1184  	default: // "all", "", "flat" (compat)
1185  		return in.Blockstore.AllKeysChan
1186  	}
1187  }
1188  
1189  // detectStrategyChange checks if the reproviding strategy has changed from what's persisted.
1190  // Returns: (previousStrategy, hasChanged, error)
1191  func detectStrategyChange(ctx context.Context, strategy string, ds datastore.Datastore) (string, bool, error) {
1192  	strategyKey := datastore.NewKey(reprovideStrategyKey)
1193  
1194  	prev, err := ds.Get(ctx, strategyKey)
1195  	if err != nil {
1196  		if errors.Is(err, datastore.ErrNotFound) {
1197  			return "", strategy != "", nil
1198  		}
1199  		return "", false, err
1200  	}
1201  
1202  	previousStrategy := string(prev)
1203  	return previousStrategy, previousStrategy != strategy, nil
1204  }
1205  
1206  // persistStrategy saves the current reproviding strategy to the datastore.
1207  // Empty string strategies are deleted rather than stored.
1208  func persistStrategy(ctx context.Context, strategy string, ds datastore.Datastore) error {
1209  	strategyKey := datastore.NewKey(reprovideStrategyKey)
1210  
1211  	if strategy == "" {
1212  		return ds.Delete(ctx, strategyKey)
1213  	}
1214  	return ds.Put(ctx, strategyKey, []byte(strategy))
1215  }
1216  
1217  // handleStrategyChange manages strategy change detection and queue clearing.
1218  // Strategy change detection: when the reproviding strategy changes,
1219  // we clear the provide queue to avoid unexpected behavior from mixing
1220  // strategies. This ensures a clean transition between different providing modes.
1221  func handleStrategyChange(strategy string, provider DHTProvider, ds datastore.Datastore) {
1222  	ctx := context.Background()
1223  
1224  	previous, changed, err := detectStrategyChange(ctx, strategy, ds)
1225  	if err != nil {
1226  		logger.Error("cannot read previous reprovide strategy", "err", err)
1227  		return
1228  	}
1229  
1230  	if !changed {
1231  		return
1232  	}
1233  
1234  	logger.Infow("Provide.Strategy changed, clearing provide queue", "previous", previous, "current", strategy)
1235  	provider.Clear()
1236  
1237  	if err := persistStrategy(ctx, strategy, ds); err != nil {
1238  		logger.Error("cannot update reprovide strategy", "err", err)
1239  	}
1240  }
1241  
1242  func setReproviderKeyProvider(strategy string) func(in provStrategyIn) provStrategyOut {
1243  	strategyFlag := config.ParseProvideStrategy(strategy)
1244  
1245  	return func(in provStrategyIn) provStrategyOut {
1246  		// Create the appropriate key provider based on strategy
1247  		kcf := createKeyProvider(strategyFlag, in)
1248  		return provStrategyOut{
1249  			ProvidingStrategy:    strategyFlag,
1250  			ProvidingKeyChanFunc: kcf,
1251  		}
1252  	}
1253  }