/ core / commands / routing.go
routing.go
  1  package commands
  2  
  3  import (
  4  	"context"
  5  	"encoding/base64"
  6  	"errors"
  7  	"fmt"
  8  	"io"
  9  	"strings"
 10  	"time"
 11  
 12  	"github.com/ipfs/kubo/config"
 13  	cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
 14  	"github.com/ipfs/kubo/core/commands/cmdutils"
 15  	"github.com/ipfs/kubo/core/node"
 16  	mh "github.com/multiformats/go-multihash"
 17  
 18  	dag "github.com/ipfs/boxo/ipld/merkledag"
 19  	"github.com/ipfs/boxo/ipns"
 20  	"github.com/ipfs/boxo/provider"
 21  	cid "github.com/ipfs/go-cid"
 22  	cmds "github.com/ipfs/go-ipfs-cmds"
 23  	ipld "github.com/ipfs/go-ipld-format"
 24  	iface "github.com/ipfs/kubo/core/coreiface"
 25  	"github.com/ipfs/kubo/core/coreiface/options"
 26  	peer "github.com/libp2p/go-libp2p/core/peer"
 27  	routing "github.com/libp2p/go-libp2p/core/routing"
 28  )
 29  
 30  var errAllowOffline = errors.New("can't put while offline: pass `--allow-offline` to override")
 31  
 32  const (
 33  	dhtVerboseOptionName   = "verbose"
 34  	numProvidersOptionName = "num-providers"
 35  	allowOfflineOptionName = "allow-offline"
 36  )
 37  
 38  var RoutingCmd = &cmds.Command{
 39  	Helptext: cmds.HelpText{
 40  		Tagline:          "Issue routing commands.",
 41  		ShortDescription: ``,
 42  	},
 43  
 44  	Subcommands: map[string]*cmds.Command{
 45  		"findprovs": findProvidersRoutingCmd,
 46  		"findpeer":  findPeerRoutingCmd,
 47  		"get":       getValueRoutingCmd,
 48  		"put":       putValueRoutingCmd,
 49  		"provide":   provideRefRoutingCmd,
 50  		"reprovide": reprovideRoutingCmd,
 51  	},
 52  }
 53  
 54  var findProvidersRoutingCmd = &cmds.Command{
 55  	Helptext: cmds.HelpText{
 56  		Tagline:          "Find peers that can provide a specific value, given a key.",
 57  		ShortDescription: "Outputs a list of newline-delimited provider Peer IDs.",
 58  	},
 59  
 60  	Arguments: []cmds.Argument{
 61  		cmds.StringArg("key", true, true, "The key to find providers for."),
 62  	},
 63  	Options: []cmds.Option{
 64  		cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."),
 65  		cmds.IntOption(numProvidersOptionName, "n", "The number of providers to find.").WithDefault(20),
 66  	},
 67  	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
 68  		n, err := cmdenv.GetNode(env)
 69  		if err != nil {
 70  			return err
 71  		}
 72  
 73  		if !n.IsOnline {
 74  			return ErrNotOnline
 75  		}
 76  
 77  		numProviders, _ := req.Options[numProvidersOptionName].(int)
 78  		if numProviders < 1 {
 79  			return errors.New("number of providers must be greater than 0")
 80  		}
 81  
 82  		c, err := cid.Parse(req.Arguments[0])
 83  		if err != nil {
 84  			return err
 85  		}
 86  
 87  		ctx, cancel := context.WithCancel(req.Context)
 88  		ctx, events := routing.RegisterForQueryEvents(ctx)
 89  
 90  		go func() {
 91  			defer cancel()
 92  			pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders)
 93  			for p := range pchan {
 94  				np := cmdutils.CloneAddrInfo(p)
 95  				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
 96  					Type:      routing.Provider,
 97  					Responses: []*peer.AddrInfo{&np},
 98  				})
 99  			}
100  		}()
101  		for e := range events {
102  			if err := res.Emit(e); err != nil {
103  				return err
104  			}
105  		}
106  
107  		return nil
108  	},
109  	Encoders: cmds.EncoderMap{
110  		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error {
111  			pfm := pfuncMap{
112  				routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error {
113  					if verbose {
114  						fmt.Fprintf(out, "* closest peer %s\n", obj.ID)
115  					}
116  					return nil
117  				},
118  				routing.Provider: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error {
119  					prov := obj.Responses[0]
120  					if verbose {
121  						fmt.Fprintf(out, "provider: ")
122  					}
123  					fmt.Fprintf(out, "%s\n", prov.ID)
124  					if verbose {
125  						for _, a := range prov.Addrs {
126  							fmt.Fprintf(out, "\t%s\n", a)
127  						}
128  					}
129  					return nil
130  				},
131  			}
132  
133  			verbose, _ := req.Options[dhtVerboseOptionName].(bool)
134  			return printEvent(out, w, verbose, pfm)
135  		}),
136  	},
137  	Type: routing.QueryEvent{},
138  }
139  
140  const (
141  	recursiveOptionName = "recursive"
142  )
143  
144  var provideRefRoutingCmd = &cmds.Command{
145  	Status: cmds.Experimental,
146  	Helptext: cmds.HelpText{
147  		Tagline: "Announce to the network that you are providing given values.",
148  	},
149  
150  	Arguments: []cmds.Argument{
151  		cmds.StringArg("key", true, true, "The key[s] to send provide records for.").EnableStdin(),
152  	},
153  	Options: []cmds.Option{
154  		cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."),
155  		cmds.BoolOption(recursiveOptionName, "r", "Recursively provide entire graph."),
156  	},
157  	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
158  		nd, err := cmdenv.GetNode(env)
159  		if err != nil {
160  			return err
161  		}
162  
163  		if !nd.IsOnline {
164  			return ErrNotOnline
165  		}
166  		// respect global config
167  		cfg, err := nd.Repo.Config()
168  		if err != nil {
169  			return err
170  		}
171  		if !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) {
172  			return errors.New("invalid configuration: Provide.Enabled is set to 'false'")
173  		}
174  
175  		if len(nd.PeerHost.Network().Conns()) == 0 && !cfg.HasHTTPProviderConfigured() {
176  			// Node is depending on DHT for providing (no custom HTTP provider
177  			// configured) and currently has no connected peers.
178  			return errors.New("cannot provide, no connected peers")
179  		}
180  
181  		// If we reach here with no connections but HTTP provider configured,
182  		// we proceed with the provide operation via HTTP
183  
184  		// Needed to parse stdin args.
185  		// TODO: Lazy Load
186  		err = req.ParseBodyArgs()
187  		if err != nil {
188  			return err
189  		}
190  
191  		rec, _ := req.Options[recursiveOptionName].(bool)
192  
193  		var cids []cid.Cid
194  		for _, arg := range req.Arguments {
195  			c, err := cid.Decode(arg)
196  			if err != nil {
197  				return err
198  			}
199  
200  			has, err := nd.Blockstore.Has(req.Context, c)
201  			if err != nil {
202  				return err
203  			}
204  
205  			if !has {
206  				return fmt.Errorf("block %s not found locally, cannot provide", c)
207  			}
208  
209  			cids = append(cids, c)
210  		}
211  
212  		ctx, cancel := context.WithCancel(req.Context)
213  		ctx, events := routing.RegisterForQueryEvents(ctx)
214  
215  		var provideErr error
216  		// TODO: not sure if necessary to call StartProviding for `ipfs routing
217  		// provide <cid>`, since either cid is already being provided, or it will
218  		// be garbage collected and not reprovided anyway. So we may simply stick
219  		// with a single (optimistic) provide, and skip StartProviding call.
220  		go func() {
221  			defer cancel()
222  			if rec {
223  				provideErr = provideCidsRec(ctx, nd.Provider, nd.DAG, cids)
224  			} else {
225  				provideErr = provideCids(nd.Provider, cids)
226  			}
227  			if provideErr != nil {
228  				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
229  					Type:  routing.QueryError,
230  					Extra: provideErr.Error(),
231  				})
232  			}
233  		}()
234  
235  		if nd.HasActiveDHTClient() {
236  			// If node has a DHT client, provide immediately the supplied cids before
237  			// returning.
238  			for _, c := range cids {
239  				if err = provideCIDSync(req.Context, nd.DHTClient, c); err != nil {
240  					return fmt.Errorf("error providing cid: %w", err)
241  				}
242  			}
243  		}
244  
245  		for e := range events {
246  			if err := res.Emit(e); err != nil {
247  				return err
248  			}
249  		}
250  
251  		return provideErr
252  	},
253  	Encoders: cmds.EncoderMap{
254  		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error {
255  			pfm := pfuncMap{
256  				routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error {
257  					if verbose {
258  						fmt.Fprintf(out, "sending provider record to peer %s\n", obj.ID)
259  					}
260  					return nil
261  				},
262  			}
263  
264  			verbose, _ := req.Options[dhtVerboseOptionName].(bool)
265  			return printEvent(out, w, verbose, pfm)
266  		}),
267  	},
268  	Type: routing.QueryEvent{},
269  }
270  
271  var reprovideRoutingCmd = &cmds.Command{
272  	Status: cmds.Experimental,
273  	Helptext: cmds.HelpText{
274  		Tagline: "Trigger reprovider.",
275  		ShortDescription: `
276  Trigger reprovider to announce our data to network.
277  `,
278  	},
279  	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
280  		nd, err := cmdenv.GetNode(env)
281  		if err != nil {
282  			return err
283  		}
284  
285  		if !nd.IsOnline {
286  			return ErrNotOnline
287  		}
288  
289  		// respect global config
290  		cfg, err := nd.Repo.Config()
291  		if err != nil {
292  			return err
293  		}
294  		if !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) {
295  			return errors.New("invalid configuration: Provide.Enabled is set to 'false'")
296  		}
297  		if cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0 {
298  			return errors.New("invalid configuration: Provide.DHT.Interval is set to '0'")
299  		}
300  		provideSys, ok := nd.Provider.(provider.Reprovider)
301  		if !ok {
302  			return errors.New("manual reprovide only available with legacy provider (Provide.DHT.SweepEnabled=false)")
303  		}
304  
305  		err = provideSys.Reprovide(req.Context)
306  		if err != nil {
307  			return err
308  		}
309  
310  		return nil
311  	},
312  }
313  
314  func provideCids(prov node.DHTProvider, cids []cid.Cid) error {
315  	mhs := make([]mh.Multihash, len(cids))
316  	for i, c := range cids {
317  		mhs[i] = c.Hash()
318  	}
319  	// providing happens asynchronously
320  	return prov.StartProviding(true, mhs...)
321  }
322  
323  func provideCidsRec(ctx context.Context, prov node.DHTProvider, dserv ipld.DAGService, cids []cid.Cid) error {
324  	for _, c := range cids {
325  		kset := cid.NewSet()
326  		err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
327  		if err != nil {
328  			return err
329  		}
330  		if err = provideCids(prov, kset.Keys()); err != nil {
331  			return err
332  		}
333  	}
334  	return nil
335  }
336  
337  var findPeerRoutingCmd = &cmds.Command{
338  	Helptext: cmds.HelpText{
339  		Tagline:          "Find the multiaddresses associated with a Peer ID.",
340  		ShortDescription: "Outputs a list of newline-delimited multiaddresses.",
341  	},
342  
343  	Arguments: []cmds.Argument{
344  		cmds.StringArg("peerID", true, true, "The ID of the peer to search for."),
345  	},
346  	Options: []cmds.Option{
347  		cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."),
348  	},
349  	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
350  		nd, err := cmdenv.GetNode(env)
351  		if err != nil {
352  			return err
353  		}
354  
355  		if !nd.IsOnline {
356  			return ErrNotOnline
357  		}
358  
359  		pid, err := peer.Decode(req.Arguments[0])
360  		if err != nil {
361  			return err
362  		}
363  
364  		if pid == nd.Identity {
365  			return ErrSelfUnsupported
366  		}
367  
368  		ctx, cancel := context.WithCancel(req.Context)
369  		ctx, events := routing.RegisterForQueryEvents(ctx)
370  
371  		var findPeerErr error
372  		go func() {
373  			defer cancel()
374  			var pi peer.AddrInfo
375  			pi, findPeerErr = nd.Routing.FindPeer(ctx, pid)
376  			if findPeerErr != nil {
377  				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
378  					Type:  routing.QueryError,
379  					Extra: findPeerErr.Error(),
380  				})
381  				return
382  			}
383  
384  			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
385  				Type:      routing.FinalPeer,
386  				Responses: []*peer.AddrInfo{&pi},
387  			})
388  		}()
389  
390  		for e := range events {
391  			if err := res.Emit(e); err != nil {
392  				return err
393  			}
394  		}
395  
396  		return findPeerErr
397  	},
398  	Encoders: cmds.EncoderMap{
399  		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error {
400  			pfm := pfuncMap{
401  				routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error {
402  					pi := obj.Responses[0]
403  					for _, a := range pi.Addrs {
404  						fmt.Fprintf(out, "%s\n", a)
405  					}
406  					return nil
407  				},
408  			}
409  
410  			verbose, _ := req.Options[dhtVerboseOptionName].(bool)
411  			return printEvent(out, w, verbose, pfm)
412  		}),
413  	},
414  	Type: routing.QueryEvent{},
415  }
416  
417  var getValueRoutingCmd = &cmds.Command{
418  	Status: cmds.Experimental,
419  	Helptext: cmds.HelpText{
420  		Tagline: "Given a key, query the routing system for its best value.",
421  		ShortDescription: `
422  Outputs the best value for the given key.
423  
424  There may be several different values for a given key stored in the routing
425  system; in this context 'best' means the record that is most desirable. There is
426  no one metric for 'best': it depends entirely on the key type. For IPNS, 'best'
427  is the record that is both valid and has the highest sequence number (freshest).
428  Different key types can specify other 'best' rules.
429  `,
430  	},
431  
432  	Arguments: []cmds.Argument{
433  		cmds.StringArg("key", true, true, "The key to find a value for."),
434  	},
435  	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
436  		api, err := cmdenv.GetApi(env, req)
437  		if err != nil {
438  			return err
439  		}
440  
441  		r, err := api.Routing().Get(req.Context, req.Arguments[0])
442  		if err != nil {
443  			return err
444  		}
445  
446  		return res.Emit(routing.QueryEvent{
447  			Extra: base64.StdEncoding.EncodeToString(r),
448  			Type:  routing.Value,
449  		})
450  	},
451  	Encoders: cmds.EncoderMap{
452  		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, obj *routing.QueryEvent) error {
453  			res, err := base64.StdEncoding.DecodeString(obj.Extra)
454  			if err != nil {
455  				return err
456  			}
457  			_, err = w.Write(res)
458  			return err
459  		}),
460  	},
461  	Type: routing.QueryEvent{},
462  }
463  
464  var putValueRoutingCmd = &cmds.Command{
465  	Status: cmds.Experimental,
466  	Helptext: cmds.HelpText{
467  		Tagline: "Write a key/value pair to the routing system.",
468  		ShortDescription: `
469  Given a key of the form /foo/bar and a valid value for that key, this will write
470  that value to the routing system with that key.
471  
472  Keys have two parts: a keytype (foo) and the key name (bar). IPNS uses the
473  /ipns keytype, and expects the key name to be a Peer ID. IPNS entries are
474  specifically formatted (protocol buffer).
475  
476  You may only use keytypes that are supported in your ipfs binary: currently
477  this is only /ipns. Unless you have a relatively deep understanding of the
478  go-ipfs routing internals, you likely want to be using 'ipfs name publish' instead
479  of this.
480  
481  The value must be a valid value for the given key type. For example, if the key
482  is /ipns/QmFoo, the value must be IPNS record (protobuf) signed with the key
483  identified by QmFoo.
484  `,
485  	},
486  
487  	Arguments: []cmds.Argument{
488  		cmds.StringArg("key", true, false, "The key to store the value at."),
489  		cmds.FileArg("value-file", true, false, "A path to a file containing the value to store.").EnableStdin(),
490  	},
491  	Options: []cmds.Option{
492  		cmds.BoolOption(allowOfflineOptionName, "When offline, save the IPNS record to the local datastore without broadcasting to the network instead of simply failing."),
493  	},
494  	Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
495  		api, err := cmdenv.GetApi(env, req)
496  		if err != nil {
497  			return err
498  		}
499  
500  		file, err := cmdenv.GetFileArg(req.Files.Entries())
501  		if err != nil {
502  			return err
503  		}
504  		defer file.Close()
505  
506  		data, err := io.ReadAll(file)
507  		if err != nil {
508  			return err
509  		}
510  
511  		allowOffline, _ := req.Options[allowOfflineOptionName].(bool)
512  
513  		opts := []options.RoutingPutOption{
514  			options.Put.AllowOffline(allowOffline),
515  		}
516  
517  		ipnsName, err := ipns.NameFromString(req.Arguments[0])
518  		if err != nil {
519  			return err
520  		}
521  
522  		err = api.Routing().Put(req.Context, req.Arguments[0], data, opts...)
523  		if err != nil {
524  			if err == iface.ErrOffline {
525  				err = errAllowOffline
526  			}
527  			return err
528  		}
529  
530  		return res.Emit(routing.QueryEvent{
531  			Type: routing.Value,
532  			ID:   ipnsName.Peer(),
533  		})
534  	},
535  	Encoders: cmds.EncoderMap{
536  		cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error {
537  			pfm := pfuncMap{
538  				routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error {
539  					if verbose {
540  						fmt.Fprintf(out, "* closest peer %s\n", obj.ID)
541  					}
542  					return nil
543  				},
544  				routing.Value: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error {
545  					fmt.Fprintf(out, "%s\n", obj.ID)
546  					return nil
547  				},
548  			}
549  
550  			verbose, _ := req.Options[dhtVerboseOptionName].(bool)
551  
552  			return printEvent(out, w, verbose, pfm)
553  		}),
554  	},
555  	Type: routing.QueryEvent{},
556  }
557  
558  type (
559  	printFunc func(obj *routing.QueryEvent, out io.Writer, verbose bool) error
560  	pfuncMap  map[routing.QueryEventType]printFunc
561  )
562  
563  func printEvent(obj *routing.QueryEvent, out io.Writer, verbose bool, override pfuncMap) error {
564  	if verbose {
565  		fmt.Fprintf(out, "%s: ", time.Now().Format("15:04:05.000"))
566  	}
567  
568  	if override != nil {
569  		if pf, ok := override[obj.Type]; ok {
570  			return pf(obj, out, verbose)
571  		}
572  	}
573  
574  	switch obj.Type {
575  	case routing.SendingQuery:
576  		if verbose {
577  			fmt.Fprintf(out, "* querying %s\n", obj.ID)
578  		}
579  	case routing.Value:
580  		if verbose {
581  			fmt.Fprintf(out, "got value: '%s'\n", obj.Extra)
582  		} else {
583  			fmt.Fprint(out, obj.Extra)
584  		}
585  	case routing.PeerResponse:
586  		if verbose {
587  			fmt.Fprintf(out, "* %s says use ", obj.ID)
588  			for _, p := range obj.Responses {
589  				fmt.Fprintf(out, "%s ", p.ID)
590  			}
591  			fmt.Fprintln(out)
592  		}
593  	case routing.QueryError:
594  		if verbose {
595  			fmt.Fprintf(out, "error: %s\n", obj.Extra)
596  		}
597  	case routing.DialingPeer:
598  		if verbose {
599  			fmt.Fprintf(out, "dialing peer: %s\n", obj.ID)
600  		}
601  	case routing.AddingPeer:
602  		if verbose {
603  			fmt.Fprintf(out, "adding peer to query: %s\n", obj.ID)
604  		}
605  	case routing.FinalPeer:
606  	default:
607  		if verbose {
608  			fmt.Fprintf(out, "unrecognized event type: %d\n", obj.Type)
609  		}
610  	}
611  	return nil
612  }
613  
614  func escapeDhtKey(s string) (string, error) {
615  	parts := strings.Split(s, "/")
616  	if len(parts) != 3 ||
617  		parts[0] != "" ||
618  		!(parts[1] == "ipns" || parts[1] == "pk") {
619  		return "", errors.New("invalid key")
620  	}
621  
622  	k, err := peer.Decode(parts[2])
623  	if err != nil {
624  		return "", err
625  	}
626  
627  	return strings.Join(append(parts[:2], string(k)), "/"), nil
628  }