pubsub.go
1 package rpc 2 3 import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "io" 8 9 iface "github.com/ipfs/kubo/core/coreiface" 10 caopts "github.com/ipfs/kubo/core/coreiface/options" 11 "github.com/libp2p/go-libp2p/core/peer" 12 mbase "github.com/multiformats/go-multibase" 13 ) 14 15 type PubsubAPI HttpApi 16 17 func (api *PubsubAPI) Ls(ctx context.Context) ([]string, error) { 18 var out struct { 19 Strings []string 20 } 21 22 if err := api.core().Request("pubsub/ls").Exec(ctx, &out); err != nil { 23 return nil, err 24 } 25 topics := make([]string, len(out.Strings)) 26 for n, mb := range out.Strings { 27 _, topic, err := mbase.Decode(mb) 28 if err != nil { 29 return nil, err 30 } 31 topics[n] = string(topic) 32 } 33 return topics, nil 34 } 35 36 func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) { 37 options, err := caopts.PubSubPeersOptions(opts...) 38 if err != nil { 39 return nil, err 40 } 41 42 var out struct { 43 Strings []string 44 } 45 46 var optionalTopic string 47 if len(options.Topic) > 0 { 48 optionalTopic = toMultibase([]byte(options.Topic)) 49 } 50 if err := api.core().Request("pubsub/peers", optionalTopic).Exec(ctx, &out); err != nil { 51 return nil, err 52 } 53 54 res := make([]peer.ID, len(out.Strings)) 55 for i, sid := range out.Strings { 56 id, err := peer.Decode(sid) 57 if err != nil { 58 return nil, err 59 } 60 res[i] = id 61 } 62 return res, nil 63 } 64 65 func (api *PubsubAPI) Publish(ctx context.Context, topic string, message []byte) error { 66 return api.core().Request("pubsub/pub", toMultibase([]byte(topic))). 67 FileBody(bytes.NewReader(message)). 68 Exec(ctx, nil) 69 } 70 71 type pubsubSub struct { 72 messages chan pubsubMessage 73 74 done chan struct{} 75 rcloser func() error 76 } 77 78 type pubsubMessage struct { 79 JFrom string `json:"from,omitempty"` 80 JData string `json:"data,omitempty"` 81 JSeqno string `json:"seqno,omitempty"` 82 JTopicIDs []string `json:"topicIDs,omitempty"` 83 84 // real values after unpacking from text/multibase envelopes 85 from peer.ID 86 data []byte 87 seqno []byte 88 topics []string 89 90 err error 91 } 92 93 func (msg *pubsubMessage) From() peer.ID { 94 return msg.from 95 } 96 97 func (msg *pubsubMessage) Data() []byte { 98 return msg.data 99 } 100 101 func (msg *pubsubMessage) Seq() []byte { 102 return msg.seqno 103 } 104 105 // TODO: do we want to keep this interface as []string, 106 // or change to more correct [][]byte? 107 func (msg *pubsubMessage) Topics() []string { 108 return msg.topics 109 } 110 111 func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) { 112 select { 113 case msg, ok := <-s.messages: 114 if !ok { 115 return nil, io.EOF 116 } 117 if msg.err != nil { 118 return nil, msg.err 119 } 120 // unpack values from text/multibase envelopes 121 var err error 122 msg.from, err = peer.Decode(msg.JFrom) 123 if err != nil { 124 return nil, err 125 } 126 _, msg.data, err = mbase.Decode(msg.JData) 127 if err != nil { 128 return nil, err 129 } 130 _, msg.seqno, err = mbase.Decode(msg.JSeqno) 131 if err != nil { 132 return nil, err 133 } 134 for _, mbt := range msg.JTopicIDs { 135 _, topic, err := mbase.Decode(mbt) 136 if err != nil { 137 return nil, err 138 } 139 msg.topics = append(msg.topics, string(topic)) 140 } 141 return &msg, nil 142 case <-ctx.Done(): 143 return nil, ctx.Err() 144 } 145 } 146 147 func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (iface.PubSubSubscription, error) { 148 /* right now we have no options (discover got deprecated) 149 options, err := caopts.PubSubSubscribeOptions(opts...) 150 if err != nil { 151 return nil, err 152 } 153 */ 154 resp, err := api.core().Request("pubsub/sub", toMultibase([]byte(topic))).Send(ctx) 155 if err != nil { 156 return nil, err 157 } 158 if resp.Error != nil { 159 return nil, resp.Error 160 } 161 162 sub := &pubsubSub{ 163 messages: make(chan pubsubMessage), 164 done: make(chan struct{}), 165 rcloser: func() error { 166 return resp.Cancel() 167 }, 168 } 169 170 dec := json.NewDecoder(resp.Output) 171 172 go func() { 173 defer close(sub.messages) 174 175 for { 176 var msg pubsubMessage 177 if err := dec.Decode(&msg); err != nil { 178 if err == io.EOF { 179 return 180 } 181 msg.err = err 182 } 183 184 select { 185 case sub.messages <- msg: 186 case <-sub.done: 187 return 188 case <-ctx.Done(): 189 return 190 } 191 } 192 }() 193 194 return sub, nil 195 } 196 197 func (s *pubsubSub) Close() error { 198 if s.done != nil { 199 close(s.done) 200 s.done = nil 201 } 202 return s.rcloser() 203 } 204 205 func (api *PubsubAPI) core() *HttpApi { 206 return (*HttpApi)(api) 207 } 208 209 // Encodes bytes into URL-safe multibase that can be sent over HTTP RPC (URL or body). 210 func toMultibase(data []byte) string { 211 mb, _ := mbase.Encode(mbase.Base64url, data) 212 return mb 213 }