/ core / coreapi / pin.go
pin.go
  1  package coreapi
  2  
  3  import (
  4  	"context"
  5  	"fmt"
  6  	"strings"
  7  
  8  	bserv "github.com/ipfs/boxo/blockservice"
  9  	offline "github.com/ipfs/boxo/exchange/offline"
 10  	"github.com/ipfs/boxo/ipld/merkledag"
 11  	"github.com/ipfs/boxo/path"
 12  	pin "github.com/ipfs/boxo/pinning/pinner"
 13  	"github.com/ipfs/go-cid"
 14  	coreiface "github.com/ipfs/kubo/core/coreiface"
 15  	caopts "github.com/ipfs/kubo/core/coreiface/options"
 16  	"go.opentelemetry.io/otel/attribute"
 17  	"go.opentelemetry.io/otel/trace"
 18  
 19  	"github.com/ipfs/kubo/tracing"
 20  )
 21  
 22  type PinAPI CoreAPI
 23  
 24  func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOption) error {
 25  	ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Add", trace.WithAttributes(attribute.String("path", p.String())))
 26  	defer span.End()
 27  
 28  	dagNode, err := api.core().ResolveNode(ctx, p)
 29  	if err != nil {
 30  		return fmt.Errorf("pin: %s", err)
 31  	}
 32  
 33  	settings, err := caopts.PinAddOptions(opts...)
 34  	if err != nil {
 35  		return err
 36  	}
 37  
 38  	span.SetAttributes(attribute.Bool("recursive", settings.Recursive))
 39  
 40  	defer api.blockstore.PinLock(ctx).Unlock(ctx)
 41  
 42  	err = api.pinning.Pin(ctx, dagNode, settings.Recursive, settings.Name)
 43  	if err != nil {
 44  		return fmt.Errorf("pin: %s", err)
 45  	}
 46  
 47  	return api.pinning.Flush(ctx)
 48  }
 49  
 50  func (api *PinAPI) Ls(ctx context.Context, pins chan<- coreiface.Pin, opts ...caopts.PinLsOption) error {
 51  	ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Ls")
 52  	defer span.End()
 53  
 54  	settings, err := caopts.PinLsOptions(opts...)
 55  	if err != nil {
 56  		close(pins)
 57  		return err
 58  	}
 59  
 60  	span.SetAttributes(attribute.String("type", settings.Type))
 61  
 62  	switch settings.Type {
 63  	case "all", "direct", "indirect", "recursive":
 64  	default:
 65  		close(pins)
 66  		return fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.Type)
 67  	}
 68  
 69  	return api.pinLsAll(ctx, settings.Type, settings.Detailed, settings.Name, pins)
 70  }
 71  
 72  func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) {
 73  	ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "IsPinned", trace.WithAttributes(attribute.String("path", p.String())))
 74  	defer span.End()
 75  
 76  	resolved, _, err := api.core().ResolvePath(ctx, p)
 77  	if err != nil {
 78  		return "", false, fmt.Errorf("error resolving path: %s", err)
 79  	}
 80  
 81  	settings, err := caopts.PinIsPinnedOptions(opts...)
 82  	if err != nil {
 83  		return "", false, err
 84  	}
 85  
 86  	span.SetAttributes(attribute.String("withtype", settings.WithType))
 87  
 88  	mode, ok := pin.StringToMode(settings.WithType)
 89  	if !ok {
 90  		return "", false, fmt.Errorf("invalid type '%s', must be one of {direct, indirect, recursive, all}", settings.WithType)
 91  	}
 92  
 93  	return api.pinning.IsPinnedWithType(ctx, resolved.RootCid(), mode)
 94  }
 95  
 96  // Rm pin rm api
 97  func (api *PinAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.PinRmOption) error {
 98  	ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Rm", trace.WithAttributes(attribute.String("path", p.String())))
 99  	defer span.End()
100  
101  	rp, _, err := api.core().ResolvePath(ctx, p)
102  	if err != nil {
103  		return err
104  	}
105  
106  	settings, err := caopts.PinRmOptions(opts...)
107  	if err != nil {
108  		return err
109  	}
110  
111  	span.SetAttributes(attribute.Bool("recursive", settings.Recursive))
112  
113  	// Note: after unpin the pin sets are flushed to the blockstore, so we need
114  	// to take a lock to prevent a concurrent garbage collection
115  	defer api.blockstore.PinLock(ctx).Unlock(ctx)
116  
117  	if err = api.pinning.Unpin(ctx, rp.RootCid(), settings.Recursive); err != nil {
118  		return err
119  	}
120  
121  	return api.pinning.Flush(ctx)
122  }
123  
124  func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opts ...caopts.PinUpdateOption) error {
125  	ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Update", trace.WithAttributes(
126  		attribute.String("from", from.String()),
127  		attribute.String("to", to.String()),
128  	))
129  	defer span.End()
130  
131  	settings, err := caopts.PinUpdateOptions(opts...)
132  	if err != nil {
133  		return err
134  	}
135  
136  	span.SetAttributes(attribute.Bool("unpin", settings.Unpin))
137  
138  	fp, _, err := api.core().ResolvePath(ctx, from)
139  	if err != nil {
140  		return err
141  	}
142  
143  	tp, _, err := api.core().ResolvePath(ctx, to)
144  	if err != nil {
145  		return err
146  	}
147  
148  	defer api.blockstore.PinLock(ctx).Unlock(ctx)
149  
150  	err = api.pinning.Update(ctx, fp.RootCid(), tp.RootCid(), settings.Unpin)
151  	if err != nil {
152  		return err
153  	}
154  
155  	return api.pinning.Flush(ctx)
156  }
157  
158  type pinStatus struct {
159  	err      error
160  	cid      cid.Cid
161  	ok       bool
162  	badNodes []coreiface.BadPinNode
163  }
164  
165  // BadNode is used in PinVerifyRes
166  type badNode struct {
167  	path path.ImmutablePath
168  	err  error
169  }
170  
171  func (s *pinStatus) Ok() bool {
172  	return s.ok
173  }
174  
175  func (s *pinStatus) BadNodes() []coreiface.BadPinNode {
176  	return s.badNodes
177  }
178  
179  func (s *pinStatus) Err() error {
180  	return s.err
181  }
182  
183  func (n *badNode) Path() path.ImmutablePath {
184  	return n.path
185  }
186  
187  func (n *badNode) Err() error {
188  	return n.err
189  }
190  
191  func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) {
192  	ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Verify")
193  	defer span.End()
194  
195  	visited := make(map[cid.Cid]*pinStatus)
196  	bs := api.blockstore
197  	DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
198  	getLinks := merkledag.GetLinksWithDAG(DAG)
199  
200  	var checkPin func(root cid.Cid) *pinStatus
201  	checkPin = func(root cid.Cid) *pinStatus {
202  		ctx, span := tracing.Span(ctx, "CoreAPI.PinAPI", "Verify.CheckPin", trace.WithAttributes(attribute.String("cid", root.String())))
203  		defer span.End()
204  
205  		if status, ok := visited[root]; ok {
206  			return status
207  		}
208  
209  		links, err := getLinks(ctx, root)
210  		if err != nil {
211  			status := &pinStatus{ok: false, cid: root}
212  			status.badNodes = []coreiface.BadPinNode{&badNode{path: path.FromCid(root), err: err}}
213  			visited[root] = status
214  			return status
215  		}
216  
217  		status := &pinStatus{ok: true, cid: root}
218  		for _, lnk := range links {
219  			res := checkPin(lnk.Cid)
220  			if !res.ok {
221  				status.ok = false
222  				status.badNodes = append(status.badNodes, res.badNodes...)
223  			}
224  		}
225  
226  		visited[root] = status
227  		return status
228  	}
229  
230  	out := make(chan coreiface.PinStatus)
231  
232  	go func() {
233  		defer close(out)
234  		for p := range api.pinning.RecursiveKeys(ctx, false) {
235  			var res *pinStatus
236  			if p.Err != nil {
237  				res = &pinStatus{err: p.Err}
238  			} else {
239  				res = checkPin(p.Pin.Key)
240  			}
241  			select {
242  			case <-ctx.Done():
243  				return
244  			case out <- res:
245  			}
246  		}
247  	}()
248  
249  	return out, nil
250  }
251  
252  type pinInfo struct {
253  	pinType string
254  	path    path.ImmutablePath
255  	name    string
256  }
257  
258  func (p *pinInfo) Path() path.ImmutablePath {
259  	return p.path
260  }
261  
262  func (p *pinInfo) Type() string {
263  	return p.pinType
264  }
265  
266  func (p *pinInfo) Name() string {
267  	return p.name
268  }
269  
270  // pinLsAll is an internal function for returning a list of pins
271  //
272  // The caller must keep reading results until the channel is closed to prevent
273  // leaking the goroutine that is fetching pins.
274  func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string, detailed bool, name string, out chan<- coreiface.Pin) error {
275  	defer close(out)
276  	emittedSet := cid.NewSet()
277  
278  	AddToResultKeys := func(c cid.Cid, pinName, typeStr string) error {
279  		if emittedSet.Visit(c) && (name == "" || strings.Contains(pinName, name)) {
280  			select {
281  			case out <- &pinInfo{
282  				pinType: typeStr,
283  				name:    pinName,
284  				path:    path.FromCid(c),
285  			}:
286  			case <-ctx.Done():
287  				return ctx.Err()
288  			}
289  		}
290  		return nil
291  	}
292  
293  	var rkeys []cid.Cid
294  	var err error
295  	if typeStr == "recursive" || typeStr == "all" {
296  		for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
297  			if streamedCid.Err != nil {
298  				return streamedCid.Err
299  			}
300  			if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "recursive"); err != nil {
301  				return err
302  			}
303  			rkeys = append(rkeys, streamedCid.Pin.Key)
304  		}
305  	}
306  	if typeStr == "direct" || typeStr == "all" {
307  		for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
308  			if streamedCid.Err != nil {
309  				return streamedCid.Err
310  			}
311  			if err = AddToResultKeys(streamedCid.Pin.Key, streamedCid.Pin.Name, "direct"); err != nil {
312  				return err
313  			}
314  		}
315  	}
316  	if typeStr == "indirect" {
317  		// We need to first visit the direct pins that have priority
318  		// without emitting them
319  
320  		for streamedCid := range api.pinning.DirectKeys(ctx, detailed) {
321  			if streamedCid.Err != nil {
322  				return streamedCid.Err
323  			}
324  			emittedSet.Add(streamedCid.Pin.Key)
325  		}
326  
327  		for streamedCid := range api.pinning.RecursiveKeys(ctx, detailed) {
328  			if streamedCid.Err != nil {
329  				return streamedCid.Err
330  			}
331  			emittedSet.Add(streamedCid.Pin.Key)
332  			rkeys = append(rkeys, streamedCid.Pin.Key)
333  		}
334  	}
335  	if typeStr == "indirect" || typeStr == "all" {
336  		if len(rkeys) == 0 {
337  			return nil
338  		}
339  		var addErr error
340  		walkingSet := cid.NewSet()
341  		for _, k := range rkeys {
342  			err = merkledag.Walk(
343  				ctx, merkledag.GetLinksWithDAG(api.dag), k,
344  				func(c cid.Cid) bool {
345  					if !walkingSet.Visit(c) {
346  						return false
347  					}
348  					if emittedSet.Has(c) {
349  						return true // skipped
350  					}
351  					addErr = AddToResultKeys(c, "", "indirect")
352  					return addErr == nil
353  				},
354  				merkledag.SkipRoot(), merkledag.Concurrent(),
355  			)
356  			if err != nil {
357  				return err
358  			}
359  			if addErr != nil {
360  				return addErr
361  			}
362  		}
363  	}
364  
365  	return nil
366  }
367  
368  func (api *PinAPI) core() coreiface.CoreAPI {
369  	return (*CoreAPI)(api)
370  }