core.go
1 package node 2 3 import ( 4 "context" 5 "errors" 6 "fmt" 7 8 "github.com/ipfs/boxo/blockservice" 9 blockstore "github.com/ipfs/boxo/blockstore" 10 exchange "github.com/ipfs/boxo/exchange" 11 offline "github.com/ipfs/boxo/exchange/offline" 12 "github.com/ipfs/boxo/fetcher" 13 bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" 14 "github.com/ipfs/boxo/filestore" 15 "github.com/ipfs/boxo/ipld/merkledag" 16 "github.com/ipfs/boxo/ipld/unixfs" 17 "github.com/ipfs/boxo/mfs" 18 pathresolver "github.com/ipfs/boxo/path/resolver" 19 pin "github.com/ipfs/boxo/pinning/pinner" 20 "github.com/ipfs/boxo/pinning/pinner/dspinner" 21 "github.com/ipfs/go-cid" 22 "github.com/ipfs/go-datastore" 23 format "github.com/ipfs/go-ipld-format" 24 "github.com/ipfs/go-unixfsnode" 25 dagpb "github.com/ipld/go-codec-dagpb" 26 "go.uber.org/fx" 27 28 "github.com/ipfs/kubo/config" 29 "github.com/ipfs/kubo/core/node/helpers" 30 "github.com/ipfs/kubo/repo" 31 ) 32 33 // FilesRootDatastoreKey is the datastore key for the MFS files root CID. 34 var FilesRootDatastoreKey = datastore.NewKey("/local/filesroot") 35 36 // BlockService creates new blockservice which provides an interface to fetch content-addressable blocks 37 func BlockService(cfg *config.Config) func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService { 38 return func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService { 39 bsvc := blockservice.New(bs, rem, 40 blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(config.DefaultWriteThrough)), 41 ) 42 43 lc.Append(fx.Hook{ 44 OnStop: func(ctx context.Context) error { 45 return bsvc.Close() 46 }, 47 }) 48 49 return bsvc 50 } 51 } 52 53 // Pinning creates new pinner which tells GC which blocks should be kept 54 func Pinning(strategy string) func(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo, prov DHTProvider) (pin.Pinner, error) { 55 // Parse strategy at function creation time (not inside the returned function) 56 // This happens before the provider is created, which is why we pass the strategy 57 // string and parse it here, rather than using fx-provided ProvidingStrategy. 58 strategyFlag := config.ParseProvideStrategy(strategy) 59 60 return func(bstore blockstore.Blockstore, 61 ds format.DAGService, 62 repo repo.Repo, 63 prov DHTProvider, 64 ) (pin.Pinner, error) { 65 rootDS := repo.Datastore() 66 67 syncFn := func(ctx context.Context) error { 68 if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil { 69 return err 70 } 71 return rootDS.Sync(ctx, filestore.FilestorePrefix) 72 } 73 syncDs := &syncDagService{ds, syncFn} 74 75 ctx := context.TODO() 76 77 var opts []dspinner.Option 78 roots := (strategyFlag & config.ProvideStrategyRoots) != 0 79 pinned := (strategyFlag & config.ProvideStrategyPinned) != 0 80 81 // Important: Only one of WithPinnedProvider or WithRootsProvider should be active. 82 // Having both would cause duplicate root advertisements since "pinned" includes all 83 // pinned content (roots + children), while "roots" is just the root CIDs. 84 // We prioritize "pinned" if both are somehow set (though this shouldn't happen 85 // with proper strategy parsing). 86 if pinned { 87 opts = append(opts, dspinner.WithPinnedProvider(prov)) 88 } else if roots { 89 opts = append(opts, dspinner.WithRootsProvider(prov)) 90 } 91 92 pinning, err := dspinner.New(ctx, rootDS, syncDs, opts...) 93 if err != nil { 94 return nil, err 95 } 96 97 return pinning, nil 98 } 99 } 100 101 var ( 102 _ merkledag.SessionMaker = new(syncDagService) 103 _ format.DAGService = new(syncDagService) 104 ) 105 106 // syncDagService is used by the Pinner to ensure data gets persisted to the underlying datastore 107 type syncDagService struct { 108 format.DAGService 109 syncFn func(context.Context) error 110 } 111 112 func (s *syncDagService) Sync(ctx context.Context) error { 113 return s.syncFn(ctx) 114 } 115 116 func (s *syncDagService) Session(ctx context.Context) format.NodeGetter { 117 return merkledag.NewSession(ctx, s.DAGService) 118 } 119 120 // FetchersOut allows injection of fetchers. 121 type FetchersOut struct { 122 fx.Out 123 IPLDFetcher fetcher.Factory `name:"ipldFetcher"` 124 UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"` 125 OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"` 126 OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"` 127 } 128 129 // FetchersIn allows using fetchers for other dependencies. 130 type FetchersIn struct { 131 fx.In 132 IPLDFetcher fetcher.Factory `name:"ipldFetcher"` 133 UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"` 134 OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"` 135 OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"` 136 } 137 138 // FetcherConfig returns a fetcher config that can build new fetcher instances 139 func FetcherConfig(bs blockservice.BlockService) FetchersOut { 140 ipldFetcher := bsfetcher.NewFetcherConfig(bs) 141 ipldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser) 142 unixFSFetcher := ipldFetcher.WithReifier(unixfsnode.Reify) 143 144 // Construct offline versions which we can safely use in contexts where 145 // path resolution should not fetch new blocks via exchange. 146 offlineBs := blockservice.New(bs.Blockstore(), offline.Exchange(bs.Blockstore())) 147 offlineIpldFetcher := bsfetcher.NewFetcherConfig(offlineBs) 148 offlineIpldFetcher.SkipNotFound = true // carries onto the UnixFSFetcher below 149 offlineIpldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser) 150 offlineUnixFSFetcher := offlineIpldFetcher.WithReifier(unixfsnode.Reify) 151 152 return FetchersOut{ 153 IPLDFetcher: ipldFetcher, 154 UnixfsFetcher: unixFSFetcher, 155 OfflineIPLDFetcher: offlineIpldFetcher, 156 OfflineUnixfsFetcher: offlineUnixFSFetcher, 157 } 158 } 159 160 // PathResolversOut allows injection of path resolvers 161 type PathResolversOut struct { 162 fx.Out 163 IPLDPathResolver pathresolver.Resolver `name:"ipldPathResolver"` 164 UnixFSPathResolver pathresolver.Resolver `name:"unixFSPathResolver"` 165 OfflineIPLDPathResolver pathresolver.Resolver `name:"offlineIpldPathResolver"` 166 OfflineUnixFSPathResolver pathresolver.Resolver `name:"offlineUnixFSPathResolver"` 167 } 168 169 // PathResolverConfig creates path resolvers with the given fetchers. 170 func PathResolverConfig(fetchers FetchersIn) PathResolversOut { 171 return PathResolversOut{ 172 IPLDPathResolver: pathresolver.NewBasicResolver(fetchers.IPLDFetcher), 173 UnixFSPathResolver: pathresolver.NewBasicResolver(fetchers.UnixfsFetcher), 174 OfflineIPLDPathResolver: pathresolver.NewBasicResolver(fetchers.OfflineIPLDFetcher), 175 OfflineUnixFSPathResolver: pathresolver.NewBasicResolver(fetchers.OfflineUnixfsFetcher), 176 } 177 } 178 179 // Dag creates new DAGService 180 func Dag(bs blockservice.BlockService) format.DAGService { 181 return merkledag.NewDAGService(bs) 182 } 183 184 // Files loads persisted MFS root 185 func Files(strategy string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService, bs blockstore.Blockstore, prov DHTProvider) (*mfs.Root, error) { 186 return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService, bs blockstore.Blockstore, prov DHTProvider) (*mfs.Root, error) { 187 pf := func(ctx context.Context, c cid.Cid) error { 188 rootDS := repo.Datastore() 189 if err := rootDS.Sync(ctx, blockstore.BlockPrefix); err != nil { 190 return err 191 } 192 if err := rootDS.Sync(ctx, filestore.FilestorePrefix); err != nil { 193 return err 194 } 195 196 if err := rootDS.Put(ctx, FilesRootDatastoreKey, c.Bytes()); err != nil { 197 return err 198 } 199 return rootDS.Sync(ctx, FilesRootDatastoreKey) 200 } 201 202 var nd *merkledag.ProtoNode 203 ctx := helpers.LifecycleCtx(mctx, lc) 204 val, err := repo.Datastore().Get(ctx, FilesRootDatastoreKey) 205 206 switch { 207 case errors.Is(err, datastore.ErrNotFound): 208 nd = unixfs.EmptyDirNode() 209 err := dag.Add(ctx, nd) 210 if err != nil { 211 return nil, fmt.Errorf("failure writing filesroot to dagstore: %s", err) 212 } 213 case err == nil: 214 c, err := cid.Cast(val) 215 if err != nil { 216 return nil, err 217 } 218 219 offlineDag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) 220 rnd, err := offlineDag.Get(ctx, c) 221 if err != nil { 222 return nil, fmt.Errorf("error loading filesroot from dagservice: %s", err) 223 } 224 225 pbnd, ok := rnd.(*merkledag.ProtoNode) 226 if !ok { 227 return nil, merkledag.ErrNotProtobuf 228 } 229 230 nd = pbnd 231 default: 232 return nil, err 233 } 234 235 // MFS (Mutable File System) provider integration: Only pass the provider 236 // to MFS when the strategy includes "mfs". MFS will call StartProviding() 237 // on every DAGService.Add() operation, which is sufficient for the "mfs" 238 // strategy - it ensures all MFS content gets announced as it's added or 239 // modified. For non-mfs strategies, we set provider to nil to avoid 240 // unnecessary providing. 241 strategyFlag := config.ParseProvideStrategy(strategy) 242 if strategyFlag&config.ProvideStrategyMFS == 0 { 243 prov = nil 244 } 245 246 // Get configured settings from Import config 247 cfg, err := repo.Config() 248 if err != nil { 249 return nil, fmt.Errorf("failed to get config: %w", err) 250 } 251 chunkerGen := cfg.Import.UnixFSSplitterFunc() 252 maxDirLinks := int(cfg.Import.UnixFSDirectoryMaxLinks.WithDefault(config.DefaultUnixFSDirectoryMaxLinks)) 253 maxHAMTFanout := int(cfg.Import.UnixFSHAMTDirectoryMaxFanout.WithDefault(config.DefaultUnixFSHAMTDirectoryMaxFanout)) 254 hamtShardingSize := int(cfg.Import.UnixFSHAMTDirectorySizeThreshold.WithDefault(config.DefaultUnixFSHAMTDirectorySizeThreshold)) 255 sizeEstimationMode := cfg.Import.HAMTSizeEstimationMode() 256 257 root, err := mfs.NewRoot(ctx, dag, nd, pf, prov, 258 mfs.WithChunker(chunkerGen), 259 mfs.WithMaxLinks(maxDirLinks), 260 mfs.WithMaxHAMTFanout(maxHAMTFanout), 261 mfs.WithHAMTShardingSize(hamtShardingSize), 262 mfs.WithSizeEstimationMode(sizeEstimationMode), 263 ) 264 if err != nil { 265 return nil, fmt.Errorf("failed to initialize MFS root from %s stored at %s: %w. "+ 266 "If corrupted, use 'ipfs files chroot' to reset (see --help)", nd.Cid(), FilesRootDatastoreKey, err) 267 } 268 269 lc.Append(fx.Hook{ 270 OnStop: func(ctx context.Context) error { 271 return root.Close() 272 }, 273 }) 274 275 return root, err 276 } 277 }