/ client / rpc / routing.go
routing.go
  1  package rpc
  2  
  3  import (
  4  	"bytes"
  5  	"context"
  6  	"encoding/base64"
  7  	"encoding/json"
  8  
  9  	"github.com/ipfs/boxo/path"
 10  	"github.com/ipfs/kubo/core/coreiface/options"
 11  	"github.com/libp2p/go-libp2p/core/peer"
 12  	"github.com/libp2p/go-libp2p/core/routing"
 13  )
 14  
 15  type RoutingAPI HttpApi
 16  
 17  func (api *RoutingAPI) Get(ctx context.Context, key string) ([]byte, error) {
 18  	resp, err := api.core().Request("routing/get", key).Send(ctx)
 19  	if err != nil {
 20  		return nil, err
 21  	}
 22  	if resp.Error != nil {
 23  		return nil, resp.Error
 24  	}
 25  	defer resp.Close()
 26  
 27  	var out routing.QueryEvent
 28  
 29  	dec := json.NewDecoder(resp.Output)
 30  	if err := dec.Decode(&out); err != nil {
 31  		return nil, err
 32  	}
 33  
 34  	res, err := base64.StdEncoding.DecodeString(out.Extra)
 35  	if err != nil {
 36  		return nil, err
 37  	}
 38  
 39  	return res, nil
 40  }
 41  
 42  func (api *RoutingAPI) Put(ctx context.Context, key string, value []byte, opts ...options.RoutingPutOption) error {
 43  	var cfg options.RoutingPutSettings
 44  	for _, o := range opts {
 45  		if err := o(&cfg); err != nil {
 46  			return err
 47  		}
 48  	}
 49  
 50  	resp, err := api.core().Request("routing/put", key).
 51  		Option("allow-offline", cfg.AllowOffline).
 52  		FileBody(bytes.NewReader(value)).
 53  		Send(ctx)
 54  	if err != nil {
 55  		return err
 56  	}
 57  	if resp.Error != nil {
 58  		return resp.Error
 59  	}
 60  	return nil
 61  }
 62  
 63  func (api *RoutingAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
 64  	var out struct {
 65  		Type      routing.QueryEventType
 66  		Responses []peer.AddrInfo
 67  	}
 68  	resp, err := api.core().Request("routing/findpeer", p.String()).Send(ctx)
 69  	if err != nil {
 70  		return peer.AddrInfo{}, err
 71  	}
 72  	if resp.Error != nil {
 73  		return peer.AddrInfo{}, resp.Error
 74  	}
 75  	defer resp.Close()
 76  	dec := json.NewDecoder(resp.Output)
 77  	for {
 78  		if err := dec.Decode(&out); err != nil {
 79  			return peer.AddrInfo{}, err
 80  		}
 81  		if out.Type == routing.FinalPeer {
 82  			return out.Responses[0], nil
 83  		}
 84  	}
 85  }
 86  
 87  func (api *RoutingAPI) FindProviders(ctx context.Context, p path.Path, opts ...options.RoutingFindProvidersOption) (<-chan peer.AddrInfo, error) {
 88  	options, err := options.RoutingFindProvidersOptions(opts...)
 89  	if err != nil {
 90  		return nil, err
 91  	}
 92  
 93  	rp, _, err := api.core().ResolvePath(ctx, p)
 94  	if err != nil {
 95  		return nil, err
 96  	}
 97  
 98  	resp, err := api.core().Request("routing/findprovs", rp.RootCid().String()).
 99  		Option("num-providers", options.NumProviders).
100  		Send(ctx)
101  	if err != nil {
102  		return nil, err
103  	}
104  	if resp.Error != nil {
105  		return nil, resp.Error
106  	}
107  	res := make(chan peer.AddrInfo)
108  
109  	go func() {
110  		defer resp.Close()
111  		defer close(res)
112  		dec := json.NewDecoder(resp.Output)
113  
114  		for {
115  			var out struct {
116  				Extra     string
117  				Type      routing.QueryEventType
118  				Responses []peer.AddrInfo
119  			}
120  
121  			if err := dec.Decode(&out); err != nil {
122  				return // todo: handle this somehow
123  			}
124  			if out.Type == routing.QueryError {
125  				return // usually a 'not found' error
126  				// todo: handle other errors
127  			}
128  			if out.Type == routing.Provider {
129  				for _, pi := range out.Responses {
130  					select {
131  					case res <- pi:
132  					case <-ctx.Done():
133  						return
134  					}
135  				}
136  			}
137  		}
138  	}()
139  
140  	return res, nil
141  }
142  
143  func (api *RoutingAPI) Provide(ctx context.Context, p path.Path, opts ...options.RoutingProvideOption) error {
144  	options, err := options.RoutingProvideOptions(opts...)
145  	if err != nil {
146  		return err
147  	}
148  
149  	rp, _, err := api.core().ResolvePath(ctx, p)
150  	if err != nil {
151  		return err
152  	}
153  
154  	return api.core().Request("routing/provide", rp.RootCid().String()).
155  		Option("recursive", options.Recursive).
156  		Exec(ctx, nil)
157  }
158  
159  func (api *RoutingAPI) core() *HttpApi {
160  	return (*HttpApi)(api)
161  }