pin.go
1 package coreapi 2 3 import ( 4 "context" 5 "fmt" 6 "strings" 7 8 bserv "github.com/ipfs/boxo/blockservice" 9 offline "github.com/ipfs/boxo/exchange/offline" 10 "github.com/ipfs/boxo/ipld/merkledag" 11 "github.com/ipfs/boxo/path" 12 pin "github.com/ipfs/boxo/pinning/pinner" 13 "github.com/ipfs/go-cid" 14 coreiface "github.com/ipfs/kubo/core/coreiface" 15 caopts "github.com/ipfs/kubo/core/coreiface/options" 16 "go.opentelemetry.io/otel/attribute" 17 "go.opentelemetry.io/otel/trace" 18 19 "github.com/ipfs/kubo/tracing" 20 ) 21 22 type PinAPI CoreAPI 23 24 func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOption) error { 25 ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Add", trace.WithAttributes(attribute.String("path", p.String()))) 26 defer span.End() 27 28 dagNode, err := api.core().ResolveNode(ctx, p) 29 if err != nil { 30 return fmt.Errorf("pin: %s", err) 31 } 32 33 settings, err := caopts.PinAddOptions(opts...) 34 if err != nil { 35 return err 36 } 37 38 span.SetAttributes(attribute.Bool("recursive", settings.Recursive)) 39 40 defer api.blockstore.PinLock(ctx).Unlock(ctx) 41 42 err = api.pinning.Pin(ctx, dagNode, settings.Recursive, settings.Name) 43 if err != nil { 44 return fmt.Errorf("pin: %s", err) 45 } 46 47 return api.pinning.Flush(ctx) 48 } 49 50 func (api *PinAPI) Ls(ctx context.Context, pins chan<- coreiface.Pin, opts ...caopts.PinLsOption) error { 51 ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Ls") 52 defer span.End() 53 54 settings, err := caopts.PinLsOptions(opts...) 55 if err != nil { 56 close(pins) 57 return err 58 } 59 60 span.SetAttributes(attribute.String("type", settings.Type)) 61 62 switch settings.Type { 63 case "all", "direct", "indirect", "recursive": 64 default: 65 close(pins) 66 return fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type) 67 } 68 69 return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name, pins) 70 } 71 72 func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) { 73 ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "IsPinned", trace.WithAttributes(attribute.String("path", p.String()))) 74 defer span.End() 75 76 resolved, _, err := api.core().ResolvePath(ctx, p) 77 if err != nil { 78 return "", false, fmt.Errorf("error resolving path: %s", err) 79 } 80 81 settings, err := caopts.PinIsPinnedOptions(opts...) 82 if err != nil { 83 return "", false, err 84 } 85 86 span.SetAttributes(attribute.String("withtype", settings.WithType)) 87 88 mode, ok := pin.StringToMode(settings.WithType) 89 if !ok { 90 return "", false, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.WithType) 91 } 92 93 return api.pinning.IsPinnedWithType(ctx, resolved.RootCid(), mode) 94 } 95 96 // Rm pin rm api 97 func (api *PinAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.PinRmOption) error { 98 ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Rm", trace.WithAttributes(attribute.String("path", p.String()))) 99 defer span.End() 100 101 rp, _, err := api.core().ResolvePath(ctx, p) 102 if err != nil { 103 return err 104 } 105 106 settings, err := caopts.PinRmOptions(opts...) 107 if err != nil { 108 return err 109 } 110 111 span.SetAttributes(attribute.Bool("recursive", settings.Recursive)) 112 113 // Note: after unpin the pin sets are flushed to the blockstore, so we need 114 // to take a lock to prevent a concurrent garbage collection 115 defer api.blockstore.PinLock(ctx).Unlock(ctx) 116 117 if err = api.pinning.Unpin(ctx, rp.RootCid(), settings.Recursive); err != nil { 118 return err 119 } 120 121 return api.pinning.Flush(ctx) 122 } 123 124 func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opts ...caopts.PinUpdateOption) error { 125 ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Update", trace.WithAttributes( 126 attribute.String("from", from.String()), 127 attribute.String("to", to.String()), 128 )) 129 defer span.End() 130 131 settings, err := caopts.PinUpdateOptions(opts...) 132 if err != nil { 133 return err 134 } 135 136 span.SetAttributes(attribute.Bool("unpin", settings.Unpin)) 137 138 fp, _, err := api.core().ResolvePath(ctx, from) 139 if err != nil { 140 return err 141 } 142 143 tp, _, err := api.core().ResolvePath(ctx, to) 144 if err != nil { 145 return err 146 } 147 148 defer api.blockstore.PinLock(ctx).Unlock(ctx) 149 150 err = api.pinning.Update(ctx, fp.RootCid(), tp.RootCid(), settings.Unpin) 151 if err != nil { 152 return err 153 } 154 155 return api.pinning.Flush(ctx) 156 } 157 158 type pinStatus struct { 159 err error 160 cid cid.Cid 161 ok bool 162 badNodes []coreiface.BadPinNode 163 } 164 165 // BadNode is used in PinVerifyRes 166 type badNode struct { 167 path path.ImmutablePath 168 err error 169 } 170 171 func (s *pinStatus) Ok() bool { 172 return s.ok 173 } 174 175 func (s *pinStatus) BadNodes() []coreiface.BadPinNode { 176 return s.badNodes 177 } 178 179 func (s *pinStatus) Err() error { 180 return s.err 181 } 182 183 func (n *badNode) Path() path.ImmutablePath { 184 return n.path 185 } 186 187 func (n *badNode) Err() error { 188 return n.err 189 } 190 191 func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) { 192 ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Verify") 193 defer span.End() 194 195 visited := make(map[cid.Cid]*pinStatus) 196 bs := api.blockstore 197 DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) 198 getLinks := merkledag.GetLinksWithDAG(DAG) 199 200 var checkPin func(root cid.Cid) *pinStatus 201 checkPin = func(root cid.Cid) *pinStatus { 202 ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Verify.CheckPin", trace.WithAttributes(attribute.String("cid", root.String()))) 203 defer span.End() 204 205 if status, ok := visited[root]; ok { 206 return status 207 } 208 209 links, err := getLinks(ctx, root) 210 if err != nil { 211 status := &pinStatus{ok: false, cid: root} 212 status.badNodes = []coreiface.BadPinNode{&badNode{path: path.FromCid(root), err: err}} 213 visited[root] = status 214 return status 215 } 216 217 status := &pinStatus{ok: true, cid: root} 218 for _, lnk := range links { 219 res := checkPin(lnk.Cid) 220 if !res.ok { 221 status.ok = false 222 status.badNodes = append(status.badNodes, res.badNodes...) 223 } 224 } 225 226 visited[root] = status 227 return status 228 } 229 230 out := make(chan coreiface.PinStatus) 231 232 go func() { 233 defer close(out) 234 for p := range api.pinning.RecursiveKeys(ctx, false) { 235 var res *pinStatus 236 if p.Err != nil { 237 res = &pinStatus{err: p.Err} 238 } else { 239 res = checkPin(p.Pin.Key) 240 } 241 select { 242 case <-ctx.Done(): 243 return 244 case out <- res: 245 } 246 } 247 }() 248 249 return out, nil 250 } 251 252 type pinInfo struct { 253 pinType string 254 path path.ImmutablePath 255 name string 256 } 257 258 func (p *pinInfo) Path() path.ImmutablePath { 259 return p.path 260 } 261 262 func (p *pinInfo) Type() string { 263 return p.pinType 264 } 265 266 func (p *pinInfo) Name() string { 267 return p.name 268 } 269 270 // pinLsAll is an internal function for returning a list of pins 271 // 272 // The caller must keep reading results until the channel is closed to prevent 273 // leaking the goroutine that is fetching pins. 274 func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string, out chan<- coreiface.Pin) error { 275 defer close(out) 276 emittedSet := cid.NewSet() 277 278 AddToResultKeys := func(c cid.Cid, pinName, typeStr string) error { 279 if emittedSet.Visit(c) && (name == "" || strings.Contains(pinName, name)) { 280 select { 281 case out <- &pinInfo{ 282 pinType: typeStr, 283 name: pinName, 284 path: path.FromCid(c), 285 }: 286 case <-ctx.Done(): 287 return ctx.Err() 288 } 289 } 290 return nil 291 } 292 293 var rkeys []cid.Cid 294 var err error 295 if typeStr == "recursive" || typeStr == "all" { 296 for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) { 297 if streamedCid.Err != nil { 298 return streamedCid.Err 299 } 300 if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "recursive"); err != nil { 301 return err 302 } 303 rkeys = append(rkeys, streamedCid.Pin.Key) 304 } 305 } 306 if typeStr == "direct" || typeStr == "all" { 307 for streamedCid := range api.pinning.DirectKeys(ctx, detailed) { 308 if streamedCid.Err != nil { 309 return streamedCid.Err 310 } 311 if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "direct"); err != nil { 312 return err 313 } 314 } 315 } 316 if typeStr == "indirect" { 317 // We need to first visit the direct pins that have priority 318 // without emitting them 319 320 for streamedCid := range api.pinning.DirectKeys(ctx, detailed) { 321 if streamedCid.Err != nil { 322 return streamedCid.Err 323 } 324 emittedSet.Add(streamedCid.Pin.Key) 325 } 326 327 for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) { 328 if streamedCid.Err != nil { 329 return streamedCid.Err 330 } 331 emittedSet.Add(streamedCid.Pin.Key) 332 rkeys = append(rkeys, streamedCid.Pin.Key) 333 } 334 } 335 if typeStr == "indirect" || typeStr == "all" { 336 if len(rkeys) == 0 { 337 return nil 338 } 339 var addErr error 340 walkingSet := cid.NewSet() 341 for _, k := range rkeys { 342 err = merkledag.Walk( 343 ctx, merkledag.GetLinksWithDAG(api.dag), k, 344 func(c cid.Cid) bool { 345 if !walkingSet.Visit(c) { 346 return false 347 } 348 if emittedSet.Has(c) { 349 return true // skipped 350 } 351 addErr = AddToResultKeys(c, "", "indirect") 352 return addErr == nil 353 }, 354 merkledag.SkipRoot(), merkledag.Concurrent(), 355 ) 356 if err != nil { 357 return err 358 } 359 if addErr != nil { 360 return addErr 361 } 362 } 363 } 364 365 return nil 366 } 367 368 func (api *PinAPI) core() coreiface.CoreAPI { 369 return (*CoreAPI)(api) 370 }