/ client / rpc / pubsub.go
pubsub.go
  1  package rpc
  2  
  3  import (
  4  	"bytes"
  5  	"context"
  6  	"encoding/json"
  7  	"io"
  8  
  9  	iface "github.com/ipfs/kubo/core/coreiface"
 10  	caopts "github.com/ipfs/kubo/core/coreiface/options"
 11  	"github.com/libp2p/go-libp2p/core/peer"
 12  	mbase "github.com/multiformats/go-multibase"
 13  )
 14  
 15  type PubsubAPI HttpApi
 16  
 17  func (api *PubsubAPI) Ls(ctx context.Context) ([]string, error) {
 18  	var out struct {
 19  		Strings []string
 20  	}
 21  
 22  	if err := api.core().Request("pubsub/ls").Exec(ctx, &out); err != nil {
 23  		return nil, err
 24  	}
 25  	topics := make([]string, len(out.Strings))
 26  	for n, mb := range out.Strings {
 27  		_, topic, err := mbase.Decode(mb)
 28  		if err != nil {
 29  			return nil, err
 30  		}
 31  		topics[n] = string(topic)
 32  	}
 33  	return topics, nil
 34  }
 35  
 36  func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
 37  	options, err := caopts.PubSubPeersOptions(opts...)
 38  	if err != nil {
 39  		return nil, err
 40  	}
 41  
 42  	var out struct {
 43  		Strings []string
 44  	}
 45  
 46  	var optionalTopic string
 47  	if len(options.Topic) > 0 {
 48  		optionalTopic = toMultibase([]byte(options.Topic))
 49  	}
 50  	if err := api.core().Request("pubsub/peers", optionalTopic).Exec(ctx, &out); err != nil {
 51  		return nil, err
 52  	}
 53  
 54  	res := make([]peer.ID, len(out.Strings))
 55  	for i, sid := range out.Strings {
 56  		id, err := peer.Decode(sid)
 57  		if err != nil {
 58  			return nil, err
 59  		}
 60  		res[i] = id
 61  	}
 62  	return res, nil
 63  }
 64  
 65  func (api *PubsubAPI) Publish(ctx context.Context, topic string, message []byte) error {
 66  	return api.core().Request("pubsub/pub", toMultibase([]byte(topic))).
 67  		FileBody(bytes.NewReader(message)).
 68  		Exec(ctx, nil)
 69  }
 70  
 71  type pubsubSub struct {
 72  	messages chan pubsubMessage
 73  
 74  	done    chan struct{}
 75  	rcloser func() error
 76  }
 77  
 78  type pubsubMessage struct {
 79  	JFrom     string   `json:"from,omitempty"`
 80  	JData     string   `json:"data,omitempty"`
 81  	JSeqno    string   `json:"seqno,omitempty"`
 82  	JTopicIDs []string `json:"topicIDs,omitempty"`
 83  
 84  	// real values after unpacking from text/multibase envelopes
 85  	from   peer.ID
 86  	data   []byte
 87  	seqno  []byte
 88  	topics []string
 89  
 90  	err error
 91  }
 92  
 93  func (msg *pubsubMessage) From() peer.ID {
 94  	return msg.from
 95  }
 96  
 97  func (msg *pubsubMessage) Data() []byte {
 98  	return msg.data
 99  }
100  
101  func (msg *pubsubMessage) Seq() []byte {
102  	return msg.seqno
103  }
104  
105  // TODO: do we want to keep this interface as []string,
106  // or change to more correct [][]byte?
107  func (msg *pubsubMessage) Topics() []string {
108  	return msg.topics
109  }
110  
111  func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) {
112  	select {
113  	case msg, ok := <-s.messages:
114  		if !ok {
115  			return nil, io.EOF
116  		}
117  		if msg.err != nil {
118  			return nil, msg.err
119  		}
120  		// unpack values from text/multibase envelopes
121  		var err error
122  		msg.from, err = peer.Decode(msg.JFrom)
123  		if err != nil {
124  			return nil, err
125  		}
126  		_, msg.data, err = mbase.Decode(msg.JData)
127  		if err != nil {
128  			return nil, err
129  		}
130  		_, msg.seqno, err = mbase.Decode(msg.JSeqno)
131  		if err != nil {
132  			return nil, err
133  		}
134  		for _, mbt := range msg.JTopicIDs {
135  			_, topic, err := mbase.Decode(mbt)
136  			if err != nil {
137  				return nil, err
138  			}
139  			msg.topics = append(msg.topics, string(topic))
140  		}
141  		return &msg, nil
142  	case <-ctx.Done():
143  		return nil, ctx.Err()
144  	}
145  }
146  
147  func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (iface.PubSubSubscription, error) {
148  	/* right now we have no options (discover got deprecated)
149  	options, err := caopts.PubSubSubscribeOptions(opts...)
150  	if err != nil {
151  		return nil, err
152  	}
153  	*/
154  	resp, err := api.core().Request("pubsub/sub", toMultibase([]byte(topic))).Send(ctx)
155  	if err != nil {
156  		return nil, err
157  	}
158  	if resp.Error != nil {
159  		return nil, resp.Error
160  	}
161  
162  	sub := &pubsubSub{
163  		messages: make(chan pubsubMessage),
164  		done:     make(chan struct{}),
165  		rcloser: func() error {
166  			return resp.Cancel()
167  		},
168  	}
169  
170  	dec := json.NewDecoder(resp.Output)
171  
172  	go func() {
173  		defer close(sub.messages)
174  
175  		for {
176  			var msg pubsubMessage
177  			if err := dec.Decode(&msg); err != nil {
178  				if err == io.EOF {
179  					return
180  				}
181  				msg.err = err
182  			}
183  
184  			select {
185  			case sub.messages <- msg:
186  			case <-sub.done:
187  				return
188  			case <-ctx.Done():
189  				return
190  			}
191  		}
192  	}()
193  
194  	return sub, nil
195  }
196  
197  func (s *pubsubSub) Close() error {
198  	if s.done != nil {
199  		close(s.done)
200  		s.done = nil
201  	}
202  	return s.rcloser()
203  }
204  
205  func (api *PubsubAPI) core() *HttpApi {
206  	return (*HttpApi)(api)
207  }
208  
209  // Encodes bytes into URL-safe multibase that can be sent over HTTP RPC (URL or body).
210  func toMultibase(data []byte) string {
211  	mb, _ := mbase.Encode(mbase.Base64url, data)
212  	return mb
213  }