routing.go
1 package rpc 2 3 import ( 4 "bytes" 5 "context" 6 "encoding/base64" 7 "encoding/json" 8 9 "github.com/ipfs/boxo/path" 10 "github.com/ipfs/kubo/core/coreiface/options" 11 "github.com/libp2p/go-libp2p/core/peer" 12 "github.com/libp2p/go-libp2p/core/routing" 13 ) 14 15 type RoutingAPI HttpApi 16 17 func (api *RoutingAPI) Get(ctx context.Context, key string) ([]byte, error) { 18 resp, err := api.core().Request("routing/get", key).Send(ctx) 19 if err != nil { 20 return nil, err 21 } 22 if resp.Error != nil { 23 return nil, resp.Error 24 } 25 defer resp.Close() 26 27 var out routing.QueryEvent 28 29 dec := json.NewDecoder(resp.Output) 30 if err := dec.Decode(&out); err != nil { 31 return nil, err 32 } 33 34 res, err := base64.StdEncoding.DecodeString(out.Extra) 35 if err != nil { 36 return nil, err 37 } 38 39 return res, nil 40 } 41 42 func (api *RoutingAPI) Put(ctx context.Context, key string, value []byte, opts ...options.RoutingPutOption) error { 43 var cfg options.RoutingPutSettings 44 for _, o := range opts { 45 if err := o(&cfg); err != nil { 46 return err 47 } 48 } 49 50 resp, err := api.core().Request("routing/put", key). 51 Option("allow-offline", cfg.AllowOffline). 52 FileBody(bytes.NewReader(value)). 53 Send(ctx) 54 if err != nil { 55 return err 56 } 57 if resp.Error != nil { 58 return resp.Error 59 } 60 return nil 61 } 62 63 func (api *RoutingAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { 64 var out struct { 65 Type routing.QueryEventType 66 Responses []peer.AddrInfo 67 } 68 resp, err := api.core().Request("routing/findpeer", p.String()).Send(ctx) 69 if err != nil { 70 return peer.AddrInfo{}, err 71 } 72 if resp.Error != nil { 73 return peer.AddrInfo{}, resp.Error 74 } 75 defer resp.Close() 76 dec := json.NewDecoder(resp.Output) 77 for { 78 if err := dec.Decode(&out); err != nil { 79 return peer.AddrInfo{}, err 80 } 81 if out.Type == routing.FinalPeer { 82 return out.Responses[0], nil 83 } 84 } 85 } 86 87 func (api *RoutingAPI) FindProviders(ctx context.Context, p path.Path, opts ...options.RoutingFindProvidersOption) (<-chan peer.AddrInfo, error) { 88 options, err := options.RoutingFindProvidersOptions(opts...) 89 if err != nil { 90 return nil, err 91 } 92 93 rp, _, err := api.core().ResolvePath(ctx, p) 94 if err != nil { 95 return nil, err 96 } 97 98 resp, err := api.core().Request("routing/findprovs", rp.RootCid().String()). 99 Option("num-providers", options.NumProviders). 100 Send(ctx) 101 if err != nil { 102 return nil, err 103 } 104 if resp.Error != nil { 105 return nil, resp.Error 106 } 107 res := make(chan peer.AddrInfo) 108 109 go func() { 110 defer resp.Close() 111 defer close(res) 112 dec := json.NewDecoder(resp.Output) 113 114 for { 115 var out struct { 116 Extra string 117 Type routing.QueryEventType 118 Responses []peer.AddrInfo 119 } 120 121 if err := dec.Decode(&out); err != nil { 122 return // todo: handle this somehow 123 } 124 if out.Type == routing.QueryError { 125 return // usually a 'not found' error 126 // todo: handle other errors 127 } 128 if out.Type == routing.Provider { 129 for _, pi := range out.Responses { 130 select { 131 case res <- pi: 132 case <-ctx.Done(): 133 return 134 } 135 } 136 } 137 } 138 }() 139 140 return res, nil 141 } 142 143 func (api *RoutingAPI) Provide(ctx context.Context, p path.Path, opts ...options.RoutingProvideOption) error { 144 options, err := options.RoutingProvideOptions(opts...) 145 if err != nil { 146 return err 147 } 148 149 rp, _, err := api.core().ResolvePath(ctx, p) 150 if err != nil { 151 return err 152 } 153 154 return api.core().Request("routing/provide", rp.RootCid().String()). 155 Option("recursive", options.Recursive). 156 Exec(ctx, nil) 157 } 158 159 func (api *RoutingAPI) core() *HttpApi { 160 return (*HttpApi)(api) 161 }