/ core / node / core.go
core.go
  1  package node
  2  
  3  import (
  4  	"context"
  5  	"errors"
  6  	"fmt"
  7  
  8  	"github.com/ipfs/boxo/blockservice"
  9  	blockstore "github.com/ipfs/boxo/blockstore"
 10  	exchange "github.com/ipfs/boxo/exchange"
 11  	offline "github.com/ipfs/boxo/exchange/offline"
 12  	"github.com/ipfs/boxo/fetcher"
 13  	bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
 14  	"github.com/ipfs/boxo/filestore"
 15  	"github.com/ipfs/boxo/ipld/merkledag"
 16  	"github.com/ipfs/boxo/ipld/unixfs"
 17  	"github.com/ipfs/boxo/mfs"
 18  	pathresolver "github.com/ipfs/boxo/path/resolver"
 19  	pin "github.com/ipfs/boxo/pinning/pinner"
 20  	"github.com/ipfs/boxo/pinning/pinner/dspinner"
 21  	"github.com/ipfs/go-cid"
 22  	"github.com/ipfs/go-datastore"
 23  	format "github.com/ipfs/go-ipld-format"
 24  	"github.com/ipfs/go-unixfsnode"
 25  	dagpb "github.com/ipld/go-codec-dagpb"
 26  	"go.uber.org/fx"
 27  
 28  	"github.com/ipfs/kubo/config"
 29  	"github.com/ipfs/kubo/core/node/helpers"
 30  	"github.com/ipfs/kubo/repo"
 31  )
 32  
 33  // FilesRootDatastoreKey is the datastore key for the MFS files root CID.
 34  var FilesRootDatastoreKey = datastore.NewKey("/local/filesroot")
 35  
 36  // BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
 37  func BlockService(cfg *config.Config) func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
 38  	return func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
 39  		bsvc := blockservice.New(bs, rem,
 40  			blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)),
 41  		)
 42  
 43  		lc.Append(fx.Hook{
 44  			OnStop: func(ctx context.Context) error {
 45  				return bsvc.Close()
 46  			},
 47  		})
 48  
 49  		return bsvc
 50  	}
 51  }
 52  
 53  // Pinning creates new pinner which tells GC which blocks should be kept
 54  func Pinning(strategy string) func(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo, prov DHTProvider) (pin.Pinner, error) {
 55  	// Parse strategy at function creation time (not inside the returned function)
 56  	// This happens before the provider is created, which is why we pass the strategy
 57  	// string and parse it here, rather than using fx-provided ProvidingStrategy.
 58  	strategyFlag := config.ParseProvideStrategy(strategy)
 59  
 60  	return func(bstore blockstore.Blockstore,
 61  		ds format.DAGService,
 62  		repo repo.Repo,
 63  		prov DHTProvider,
 64  	) (pin.Pinner, error) {
 65  		rootDS := repo.Datastore()
 66  
 67  		syncFn := func(ctx context.Context) error {
 68  			if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil {
 69  				return err
 70  			}
 71  			return rootDS.Sync(ctx, filestore.FilestorePrefix)
 72  		}
 73  		syncDs := &syncDagService{ds, syncFn}
 74  
 75  		ctx := context.TODO()
 76  
 77  		var opts []dspinner.Option
 78  		roots := (strategyFlag & config.ProvideStrategyRoots) != 0
 79  		pinned := (strategyFlag & config.ProvideStrategyPinned) != 0
 80  
 81  		// Important: Only one of WithPinnedProvider or WithRootsProvider should be active.
 82  		// Having both would cause duplicate root advertisements since "pinned" includes all
 83  		// pinned content (roots + children), while "roots" is just the root CIDs.
 84  		// We prioritize "pinned" if both are somehow set (though this shouldn't happen
 85  		// with proper strategy parsing).
 86  		if pinned {
 87  			opts = append(opts, dspinner.WithPinnedProvider(prov))
 88  		} else if roots {
 89  			opts = append(opts, dspinner.WithRootsProvider(prov))
 90  		}
 91  
 92  		pinning, err := dspinner.New(ctx, rootDS, syncDs, opts...)
 93  		if err != nil {
 94  			return nil, err
 95  		}
 96  
 97  		return pinning, nil
 98  	}
 99  }
100  
101  var (
102  	_ merkledag.SessionMaker = new(syncDagService)
103  	_ format.DAGService      = new(syncDagService)
104  )
105  
106  // syncDagService is used by the Pinner to ensure data gets persisted to the underlying datastore
107  type syncDagService struct {
108  	format.DAGService
109  	syncFn func(context.Context) error
110  }
111  
112  func (s *syncDagService) Sync(ctx context.Context) error {
113  	return s.syncFn(ctx)
114  }
115  
116  func (s *syncDagService) Session(ctx context.Context) format.NodeGetter {
117  	return merkledag.NewSession(ctx, s.DAGService)
118  }
119  
120  // FetchersOut allows injection of fetchers.
121  type FetchersOut struct {
122  	fx.Out
123  	IPLDFetcher          fetcher.Factory `name:"ipldFetcher"`
124  	UnixfsFetcher        fetcher.Factory `name:"unixfsFetcher"`
125  	OfflineIPLDFetcher   fetcher.Factory `name:"offlineIpldFetcher"`
126  	OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
127  }
128  
129  // FetchersIn allows using fetchers for other dependencies.
130  type FetchersIn struct {
131  	fx.In
132  	IPLDFetcher          fetcher.Factory `name:"ipldFetcher"`
133  	UnixfsFetcher        fetcher.Factory `name:"unixfsFetcher"`
134  	OfflineIPLDFetcher   fetcher.Factory `name:"offlineIpldFetcher"`
135  	OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
136  }
137  
138  // FetcherConfig returns a fetcher config that can build new fetcher instances
139  func FetcherConfig(bs blockservice.BlockService) FetchersOut {
140  	ipldFetcher := bsfetcher.NewFetcherConfig(bs)
141  	ipldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser)
142  	unixFSFetcher := ipldFetcher.WithReifier(unixfsnode.Reify)
143  
144  	// Construct offline versions which we can safely use in contexts where
145  	// path resolution should not fetch new blocks via exchange.
146  	offlineBs := blockservice.New(bs.Blockstore(), offline.Exchange(bs.Blockstore()))
147  	offlineIpldFetcher := bsfetcher.NewFetcherConfig(offlineBs)
148  	offlineIpldFetcher.SkipNotFound = true // carries onto the UnixFSFetcher below
149  	offlineIpldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser)
150  	offlineUnixFSFetcher := offlineIpldFetcher.WithReifier(unixfsnode.Reify)
151  
152  	return FetchersOut{
153  		IPLDFetcher:          ipldFetcher,
154  		UnixfsFetcher:        unixFSFetcher,
155  		OfflineIPLDFetcher:   offlineIpldFetcher,
156  		OfflineUnixfsFetcher: offlineUnixFSFetcher,
157  	}
158  }
159  
160  // PathResolversOut allows injection of path resolvers
161  type PathResolversOut struct {
162  	fx.Out
163  	IPLDPathResolver          pathresolver.Resolver `name:"ipldPathResolver"`
164  	UnixFSPathResolver        pathresolver.Resolver `name:"unixFSPathResolver"`
165  	OfflineIPLDPathResolver   pathresolver.Resolver `name:"offlineIpldPathResolver"`
166  	OfflineUnixFSPathResolver pathresolver.Resolver `name:"offlineUnixFSPathResolver"`
167  }
168  
169  // PathResolverConfig creates path resolvers with the given fetchers.
170  func PathResolverConfig(fetchers FetchersIn) PathResolversOut {
171  	return PathResolversOut{
172  		IPLDPathResolver:          pathresolver.NewBasicResolver(fetchers.IPLDFetcher),
173  		UnixFSPathResolver:        pathresolver.NewBasicResolver(fetchers.UnixfsFetcher),
174  		OfflineIPLDPathResolver:   pathresolver.NewBasicResolver(fetchers.OfflineIPLDFetcher),
175  		OfflineUnixFSPathResolver: pathresolver.NewBasicResolver(fetchers.OfflineUnixfsFetcher),
176  	}
177  }
178  
179  // Dag creates new DAGService
180  func Dag(bs blockservice.BlockService) format.DAGService {
181  	return merkledag.NewDAGService(bs)
182  }
183  
184  // Files loads persisted MFS root
185  func Files(strategy string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService, bs blockstore.Blockstore, prov DHTProvider) (*mfs.Root, error) {
186  	return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService, bs blockstore.Blockstore, prov DHTProvider) (*mfs.Root, error) {
187  		pf := func(ctx context.Context, c cid.Cid) error {
188  			rootDS := repo.Datastore()
189  			if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil {
190  				return err
191  			}
192  			if err := rootDS.Sync(ctx, filestore.FilestorePrefix); err != nil {
193  				return err
194  			}
195  
196  			if err := rootDS.Put(ctx, FilesRootDatastoreKey, c.Bytes()); err != nil {
197  				return err
198  			}
199  			return rootDS.Sync(ctx, FilesRootDatastoreKey)
200  		}
201  
202  		var nd *merkledag.ProtoNode
203  		ctx := helpers.LifecycleCtx(mctx, lc)
204  		val, err := repo.Datastore().Get(ctx, FilesRootDatastoreKey)
205  
206  		switch {
207  		case errors.Is(err, datastore.ErrNotFound):
208  			nd = unixfs.EmptyDirNode()
209  			err := dag.Add(ctx, nd)
210  			if err != nil {
211  				return nil, fmt.Errorf("failure writing filesroot to dagstore: %s", err)
212  			}
213  		case err == nil:
214  			c, err := cid.Cast(val)
215  			if err != nil {
216  				return nil, err
217  			}
218  
219  			offlineDag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
220  			rnd, err := offlineDag.Get(ctx, c)
221  			if err != nil {
222  				return nil, fmt.Errorf("error loading filesroot from dagservice: %s", err)
223  			}
224  
225  			pbnd, ok := rnd.(*merkledag.ProtoNode)
226  			if !ok {
227  				return nil, merkledag.ErrNotProtobuf
228  			}
229  
230  			nd = pbnd
231  		default:
232  			return nil, err
233  		}
234  
235  		// MFS (Mutable File System) provider integration: Only pass the provider
236  		// to MFS when the strategy includes "mfs". MFS will call StartProviding()
237  		// on every DAGService.Add() operation, which is sufficient for the "mfs"
238  		// strategy - it ensures all MFS content gets announced as it's added or
239  		// modified. For non-mfs strategies, we set provider to nil to avoid
240  		// unnecessary providing.
241  		strategyFlag := config.ParseProvideStrategy(strategy)
242  		if strategyFlag&config.ProvideStrategyMFS == 0 {
243  			prov = nil
244  		}
245  
246  		// Get configured settings from Import config
247  		cfg, err := repo.Config()
248  		if err != nil {
249  			return nil, fmt.Errorf("failed to get config: %w", err)
250  		}
251  		chunkerGen := cfg.Import.UnixFSSplitterFunc()
252  		maxDirLinks := int(cfg.Import.UnixFSDirectoryMaxLinks.WithDefault(config.DefaultUnixFSDirectoryMaxLinks))
253  		maxHAMTFanout := int(cfg.Import.UnixFSHAMTDirectoryMaxFanout.WithDefault(config.DefaultUnixFSHAMTDirectoryMaxFanout))
254  		hamtShardingSize := int(cfg.Import.UnixFSHAMTDirectorySizeThreshold.WithDefault(config.DefaultUnixFSHAMTDirectorySizeThreshold))
255  		sizeEstimationMode := cfg.Import.HAMTSizeEstimationMode()
256  
257  		root, err := mfs.NewRoot(ctx, dag, nd, pf, prov,
258  			mfs.WithChunker(chunkerGen),
259  			mfs.WithMaxLinks(maxDirLinks),
260  			mfs.WithMaxHAMTFanout(maxHAMTFanout),
261  			mfs.WithHAMTShardingSize(hamtShardingSize),
262  			mfs.WithSizeEstimationMode(sizeEstimationMode),
263  		)
264  		if err != nil {
265  			return nil, fmt.Errorf("failed to initialize MFS root from %s stored at %s: %w. "+
266  				"If corrupted, use 'ipfs files chroot' to reset (see --help)", nd.Cid(), FilesRootDatastoreKey, err)
267  		}
268  
269  		lc.Append(fx.Hook{
270  			OnStop: func(ctx context.Context) error {
271  				return root.Close()
272  			},
273  		})
274  
275  		return root, err
276  	}
277  }