routing.go
1 package commands 2 3 import ( 4 "context" 5 "encoding/base64" 6 "errors" 7 "fmt" 8 "io" 9 "strings" 10 "time" 11 12 "github.com/ipfs/kubo/config" 13 cmdenv "github.com/ipfs/kubo/core/commands/cmdenv" 14 "github.com/ipfs/kubo/core/commands/cmdutils" 15 "github.com/ipfs/kubo/core/node" 16 mh "github.com/multiformats/go-multihash" 17 18 dag "github.com/ipfs/boxo/ipld/merkledag" 19 "github.com/ipfs/boxo/ipns" 20 "github.com/ipfs/boxo/provider" 21 cid "github.com/ipfs/go-cid" 22 cmds "github.com/ipfs/go-ipfs-cmds" 23 ipld "github.com/ipfs/go-ipld-format" 24 iface "github.com/ipfs/kubo/core/coreiface" 25 "github.com/ipfs/kubo/core/coreiface/options" 26 peer "github.com/libp2p/go-libp2p/core/peer" 27 routing "github.com/libp2p/go-libp2p/core/routing" 28 ) 29 30 var errAllowOffline = errors.New("can't put while offline: pass `--allow-offline` to override") 31 32 const ( 33 dhtVerboseOptionName = "verbose" 34 numProvidersOptionName = "num-providers" 35 allowOfflineOptionName = "allow-offline" 36 ) 37 38 var RoutingCmd = &cmds.Command{ 39 Helptext: cmds.HelpText{ 40 Tagline: "Issue routing commands.", 41 ShortDescription: ``, 42 }, 43 44 Subcommands: map[string]*cmds.Command{ 45 "findprovs": findProvidersRoutingCmd, 46 "findpeer": findPeerRoutingCmd, 47 "get": getValueRoutingCmd, 48 "put": putValueRoutingCmd, 49 "provide": provideRefRoutingCmd, 50 "reprovide": reprovideRoutingCmd, 51 }, 52 } 53 54 var findProvidersRoutingCmd = &cmds.Command{ 55 Helptext: cmds.HelpText{ 56 Tagline: "Find peers that can provide a specific value, given a key.", 57 ShortDescription: "Outputs a list of newline-delimited provider Peer IDs.", 58 }, 59 60 Arguments: []cmds.Argument{ 61 cmds.StringArg("key", true, true, "The key to find providers for."), 62 }, 63 Options: []cmds.Option{ 64 cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), 65 cmds.IntOption(numProvidersOptionName, "n", "The number of providers to find.").WithDefault(20), 66 }, 67 Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { 68 n, err := cmdenv.GetNode(env) 69 if err != nil { 70 return err 71 } 72 73 if !n.IsOnline { 74 return ErrNotOnline 75 } 76 77 numProviders, _ := req.Options[numProvidersOptionName].(int) 78 if numProviders < 1 { 79 return errors.New("number of providers must be greater than 0") 80 } 81 82 c, err := cid.Parse(req.Arguments[0]) 83 if err != nil { 84 return err 85 } 86 87 ctx, cancel := context.WithCancel(req.Context) 88 ctx, events := routing.RegisterForQueryEvents(ctx) 89 90 go func() { 91 defer cancel() 92 pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders) 93 for p := range pchan { 94 np := cmdutils.CloneAddrInfo(p) 95 routing.PublishQueryEvent(ctx, &routing.QueryEvent{ 96 Type: routing.Provider, 97 Responses: []*peer.AddrInfo{&np}, 98 }) 99 } 100 }() 101 for e := range events { 102 if err := res.Emit(e); err != nil { 103 return err 104 } 105 } 106 107 return nil 108 }, 109 Encoders: cmds.EncoderMap{ 110 cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { 111 pfm := pfuncMap{ 112 routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { 113 if verbose { 114 fmt.Fprintf(out, "* closest peer %s\n", obj.ID) 115 } 116 return nil 117 }, 118 routing.Provider: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { 119 prov := obj.Responses[0] 120 if verbose { 121 fmt.Fprintf(out, "provider: ") 122 } 123 fmt.Fprintf(out, "%s\n", prov.ID) 124 if verbose { 125 for _, a := range prov.Addrs { 126 fmt.Fprintf(out, "\t%s\n", a) 127 } 128 } 129 return nil 130 }, 131 } 132 133 verbose, _ := req.Options[dhtVerboseOptionName].(bool) 134 return printEvent(out, w, verbose, pfm) 135 }), 136 }, 137 Type: routing.QueryEvent{}, 138 } 139 140 const ( 141 recursiveOptionName = "recursive" 142 ) 143 144 var provideRefRoutingCmd = &cmds.Command{ 145 Status: cmds.Experimental, 146 Helptext: cmds.HelpText{ 147 Tagline: "Announce to the network that you are providing given values.", 148 }, 149 150 Arguments: []cmds.Argument{ 151 cmds.StringArg("key", true, true, "The key[s] to send provide records for.").EnableStdin(), 152 }, 153 Options: []cmds.Option{ 154 cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), 155 cmds.BoolOption(recursiveOptionName, "r", "Recursively provide entire graph."), 156 }, 157 Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { 158 nd, err := cmdenv.GetNode(env) 159 if err != nil { 160 return err 161 } 162 163 if !nd.IsOnline { 164 return ErrNotOnline 165 } 166 // respect global config 167 cfg, err := nd.Repo.Config() 168 if err != nil { 169 return err 170 } 171 if !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) { 172 return errors.New("invalid configuration: Provide.Enabled is set to 'false'") 173 } 174 175 if len(nd.PeerHost.Network().Conns()) == 0 && !cfg.HasHTTPProviderConfigured() { 176 // Node is depending on DHT for providing (no custom HTTP provider 177 // configured) and currently has no connected peers. 178 return errors.New("cannot provide, no connected peers") 179 } 180 181 // If we reach here with no connections but HTTP provider configured, 182 // we proceed with the provide operation via HTTP 183 184 // Needed to parse stdin args. 185 // TODO: Lazy Load 186 err = req.ParseBodyArgs() 187 if err != nil { 188 return err 189 } 190 191 rec, _ := req.Options[recursiveOptionName].(bool) 192 193 var cids []cid.Cid 194 for _, arg := range req.Arguments { 195 c, err := cid.Decode(arg) 196 if err != nil { 197 return err 198 } 199 200 has, err := nd.Blockstore.Has(req.Context, c) 201 if err != nil { 202 return err 203 } 204 205 if !has { 206 return fmt.Errorf("block %s not found locally, cannot provide", c) 207 } 208 209 cids = append(cids, c) 210 } 211 212 ctx, cancel := context.WithCancel(req.Context) 213 ctx, events := routing.RegisterForQueryEvents(ctx) 214 215 var provideErr error 216 // TODO: not sure if necessary to call StartProviding for `ipfs routing 217 // provide <cid>`, since either cid is already being provided, or it will 218 // be garbage collected and not reprovided anyway. So we may simply stick 219 // with a single (optimistic) provide, and skip StartProviding call. 220 go func() { 221 defer cancel() 222 if rec { 223 provideErr = provideCidsRec(ctx, nd.Provider, nd.DAG, cids) 224 } else { 225 provideErr = provideCids(nd.Provider, cids) 226 } 227 if provideErr != nil { 228 routing.PublishQueryEvent(ctx, &routing.QueryEvent{ 229 Type: routing.QueryError, 230 Extra: provideErr.Error(), 231 }) 232 } 233 }() 234 235 if nd.HasActiveDHTClient() { 236 // If node has a DHT client, provide immediately the supplied cids before 237 // returning. 238 for _, c := range cids { 239 if err = provideCIDSync(req.Context, nd.DHTClient, c); err != nil { 240 return fmt.Errorf("error providing cid: %w", err) 241 } 242 } 243 } 244 245 for e := range events { 246 if err := res.Emit(e); err != nil { 247 return err 248 } 249 } 250 251 return provideErr 252 }, 253 Encoders: cmds.EncoderMap{ 254 cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { 255 pfm := pfuncMap{ 256 routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { 257 if verbose { 258 fmt.Fprintf(out, "sending provider record to peer %s\n", obj.ID) 259 } 260 return nil 261 }, 262 } 263 264 verbose, _ := req.Options[dhtVerboseOptionName].(bool) 265 return printEvent(out, w, verbose, pfm) 266 }), 267 }, 268 Type: routing.QueryEvent{}, 269 } 270 271 var reprovideRoutingCmd = &cmds.Command{ 272 Status: cmds.Experimental, 273 Helptext: cmds.HelpText{ 274 Tagline: "Trigger reprovider.", 275 ShortDescription: ` 276 Trigger reprovider to announce our data to network. 277 `, 278 }, 279 Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { 280 nd, err := cmdenv.GetNode(env) 281 if err != nil { 282 return err 283 } 284 285 if !nd.IsOnline { 286 return ErrNotOnline 287 } 288 289 // respect global config 290 cfg, err := nd.Repo.Config() 291 if err != nil { 292 return err 293 } 294 if !cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) { 295 return errors.New("invalid configuration: Provide.Enabled is set to 'false'") 296 } 297 if cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) == 0 { 298 return errors.New("invalid configuration: Provide.DHT.Interval is set to '0'") 299 } 300 provideSys, ok := nd.Provider.(provider.Reprovider) 301 if !ok { 302 return errors.New("manual reprovide only available with legacy provider (Provide.DHT.SweepEnabled=false)") 303 } 304 305 err = provideSys.Reprovide(req.Context) 306 if err != nil { 307 return err 308 } 309 310 return nil 311 }, 312 } 313 314 func provideCids(prov node.DHTProvider, cids []cid.Cid) error { 315 mhs := make([]mh.Multihash, len(cids)) 316 for i, c := range cids { 317 mhs[i] = c.Hash() 318 } 319 // providing happens asynchronously 320 return prov.StartProviding(true, mhs...) 321 } 322 323 func provideCidsRec(ctx context.Context, prov node.DHTProvider, dserv ipld.DAGService, cids []cid.Cid) error { 324 for _, c := range cids { 325 kset := cid.NewSet() 326 err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, kset.Visit) 327 if err != nil { 328 return err 329 } 330 if err = provideCids(prov, kset.Keys()); err != nil { 331 return err 332 } 333 } 334 return nil 335 } 336 337 var findPeerRoutingCmd = &cmds.Command{ 338 Helptext: cmds.HelpText{ 339 Tagline: "Find the multiaddresses associated with a Peer ID.", 340 ShortDescription: "Outputs a list of newline-delimited multiaddresses.", 341 }, 342 343 Arguments: []cmds.Argument{ 344 cmds.StringArg("peerID", true, true, "The ID of the peer to search for."), 345 }, 346 Options: []cmds.Option{ 347 cmds.BoolOption(dhtVerboseOptionName, "v", "Print extra information."), 348 }, 349 Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { 350 nd, err := cmdenv.GetNode(env) 351 if err != nil { 352 return err 353 } 354 355 if !nd.IsOnline { 356 return ErrNotOnline 357 } 358 359 pid, err := peer.Decode(req.Arguments[0]) 360 if err != nil { 361 return err 362 } 363 364 if pid == nd.Identity { 365 return ErrSelfUnsupported 366 } 367 368 ctx, cancel := context.WithCancel(req.Context) 369 ctx, events := routing.RegisterForQueryEvents(ctx) 370 371 var findPeerErr error 372 go func() { 373 defer cancel() 374 var pi peer.AddrInfo 375 pi, findPeerErr = nd.Routing.FindPeer(ctx, pid) 376 if findPeerErr != nil { 377 routing.PublishQueryEvent(ctx, &routing.QueryEvent{ 378 Type: routing.QueryError, 379 Extra: findPeerErr.Error(), 380 }) 381 return 382 } 383 384 routing.PublishQueryEvent(ctx, &routing.QueryEvent{ 385 Type: routing.FinalPeer, 386 Responses: []*peer.AddrInfo{&pi}, 387 }) 388 }() 389 390 for e := range events { 391 if err := res.Emit(e); err != nil { 392 return err 393 } 394 } 395 396 return findPeerErr 397 }, 398 Encoders: cmds.EncoderMap{ 399 cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { 400 pfm := pfuncMap{ 401 routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { 402 pi := obj.Responses[0] 403 for _, a := range pi.Addrs { 404 fmt.Fprintf(out, "%s\n", a) 405 } 406 return nil 407 }, 408 } 409 410 verbose, _ := req.Options[dhtVerboseOptionName].(bool) 411 return printEvent(out, w, verbose, pfm) 412 }), 413 }, 414 Type: routing.QueryEvent{}, 415 } 416 417 var getValueRoutingCmd = &cmds.Command{ 418 Status: cmds.Experimental, 419 Helptext: cmds.HelpText{ 420 Tagline: "Given a key, query the routing system for its best value.", 421 ShortDescription: ` 422 Outputs the best value for the given key. 423 424 There may be several different values for a given key stored in the routing 425 system; in this context 'best' means the record that is most desirable. There is 426 no one metric for 'best': it depends entirely on the key type. For IPNS, 'best' 427 is the record that is both valid and has the highest sequence number (freshest). 428 Different key types can specify other 'best' rules. 429 `, 430 }, 431 432 Arguments: []cmds.Argument{ 433 cmds.StringArg("key", true, true, "The key to find a value for."), 434 }, 435 Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { 436 api, err := cmdenv.GetApi(env, req) 437 if err != nil { 438 return err 439 } 440 441 r, err := api.Routing().Get(req.Context, req.Arguments[0]) 442 if err != nil { 443 return err 444 } 445 446 return res.Emit(routing.QueryEvent{ 447 Extra: base64.StdEncoding.EncodeToString(r), 448 Type: routing.Value, 449 }) 450 }, 451 Encoders: cmds.EncoderMap{ 452 cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, obj *routing.QueryEvent) error { 453 res, err := base64.StdEncoding.DecodeString(obj.Extra) 454 if err != nil { 455 return err 456 } 457 _, err = w.Write(res) 458 return err 459 }), 460 }, 461 Type: routing.QueryEvent{}, 462 } 463 464 var putValueRoutingCmd = &cmds.Command{ 465 Status: cmds.Experimental, 466 Helptext: cmds.HelpText{ 467 Tagline: "Write a key/value pair to the routing system.", 468 ShortDescription: ` 469 Given a key of the form /foo/bar and a valid value for that key, this will write 470 that value to the routing system with that key. 471 472 Keys have two parts: a keytype (foo) and the key name (bar). IPNS uses the 473 /ipns keytype, and expects the key name to be a Peer ID. IPNS entries are 474 specifically formatted (protocol buffer). 475 476 You may only use keytypes that are supported in your ipfs binary: currently 477 this is only /ipns. Unless you have a relatively deep understanding of the 478 go-ipfs routing internals, you likely want to be using 'ipfs name publish' instead 479 of this. 480 481 The value must be a valid value for the given key type. For example, if the key 482 is /ipns/QmFoo, the value must be IPNS record (protobuf) signed with the key 483 identified by QmFoo. 484 `, 485 }, 486 487 Arguments: []cmds.Argument{ 488 cmds.StringArg("key", true, false, "The key to store the value at."), 489 cmds.FileArg("value-file", true, false, "A path to a file containing the value to store.").EnableStdin(), 490 }, 491 Options: []cmds.Option{ 492 cmds.BoolOption(allowOfflineOptionName, "When offline, save the IPNS record to the local datastore without broadcasting to the network instead of simply failing."), 493 }, 494 Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { 495 api, err := cmdenv.GetApi(env, req) 496 if err != nil { 497 return err 498 } 499 500 file, err := cmdenv.GetFileArg(req.Files.Entries()) 501 if err != nil { 502 return err 503 } 504 defer file.Close() 505 506 data, err := io.ReadAll(file) 507 if err != nil { 508 return err 509 } 510 511 allowOffline, _ := req.Options[allowOfflineOptionName].(bool) 512 513 opts := []options.RoutingPutOption{ 514 options.Put.AllowOffline(allowOffline), 515 } 516 517 ipnsName, err := ipns.NameFromString(req.Arguments[0]) 518 if err != nil { 519 return err 520 } 521 522 err = api.Routing().Put(req.Context, req.Arguments[0], data, opts...) 523 if err != nil { 524 if err == iface.ErrOffline { 525 err = errAllowOffline 526 } 527 return err 528 } 529 530 return res.Emit(routing.QueryEvent{ 531 Type: routing.Value, 532 ID: ipnsName.Peer(), 533 }) 534 }, 535 Encoders: cmds.EncoderMap{ 536 cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { 537 pfm := pfuncMap{ 538 routing.FinalPeer: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { 539 if verbose { 540 fmt.Fprintf(out, "* closest peer %s\n", obj.ID) 541 } 542 return nil 543 }, 544 routing.Value: func(obj *routing.QueryEvent, out io.Writer, verbose bool) error { 545 fmt.Fprintf(out, "%s\n", obj.ID) 546 return nil 547 }, 548 } 549 550 verbose, _ := req.Options[dhtVerboseOptionName].(bool) 551 552 return printEvent(out, w, verbose, pfm) 553 }), 554 }, 555 Type: routing.QueryEvent{}, 556 } 557 558 type ( 559 printFunc func(obj *routing.QueryEvent, out io.Writer, verbose bool) error 560 pfuncMap map[routing.QueryEventType]printFunc 561 ) 562 563 func printEvent(obj *routing.QueryEvent, out io.Writer, verbose bool, override pfuncMap) error { 564 if verbose { 565 fmt.Fprintf(out, "%s: ", time.Now().Format("15:04:05.000")) 566 } 567 568 if override != nil { 569 if pf, ok := override[obj.Type]; ok { 570 return pf(obj, out, verbose) 571 } 572 } 573 574 switch obj.Type { 575 case routing.SendingQuery: 576 if verbose { 577 fmt.Fprintf(out, "* querying %s\n", obj.ID) 578 } 579 case routing.Value: 580 if verbose { 581 fmt.Fprintf(out, "got value: '%s'\n", obj.Extra) 582 } else { 583 fmt.Fprint(out, obj.Extra) 584 } 585 case routing.PeerResponse: 586 if verbose { 587 fmt.Fprintf(out, "* %s says use ", obj.ID) 588 for _, p := range obj.Responses { 589 fmt.Fprintf(out, "%s ", p.ID) 590 } 591 fmt.Fprintln(out) 592 } 593 case routing.QueryError: 594 if verbose { 595 fmt.Fprintf(out, "error: %s\n", obj.Extra) 596 } 597 case routing.DialingPeer: 598 if verbose { 599 fmt.Fprintf(out, "dialing peer: %s\n", obj.ID) 600 } 601 case routing.AddingPeer: 602 if verbose { 603 fmt.Fprintf(out, "adding peer to query: %s\n", obj.ID) 604 } 605 case routing.FinalPeer: 606 default: 607 if verbose { 608 fmt.Fprintf(out, "unrecognized event type: %d\n", obj.Type) 609 } 610 } 611 return nil 612 } 613 614 func escapeDhtKey(s string) (string, error) { 615 parts := strings.Split(s, "/") 616 if len(parts) != 3 || 617 parts[0] != "" || 618 !(parts[1] == "ipns" || parts[1] == "pk") { 619 return "", errors.New("invalid key") 620 } 621 622 k, err := peer.Decode(parts[2]) 623 if err != nil { 624 return "", err 625 } 626 627 return strings.Join(append(parts[:2], string(k)), "/"), nil 628 }