main.ts
1 import Koa from 'koa' 2 import Router from '@koa/router' 3 import cors from '@koa/cors' 4 import bodyParser from 'koa-bodyparser' 5 import { create as createKuboClient } from 'kubo-rpc-client' 6 7 // Create Kubo (IPFS) RPC client pointing to local Kubo by default 8 const IPFS_API_URL = process.env.IPFS_API_URL || 'http://127.0.0.1:5001/api/v0' 9 const ipfs = createKuboClient({ url: IPFS_API_URL }) 10 11 const app = new Koa() 12 const router = new Router({ prefix: '/api' }) 13 14 // Health check - ensures Kubo is reachable 15 router.get('/health', async (ctx) => { 16 try { 17 const id = await ipfs.id() 18 ctx.body = { ok: true, id } 19 } catch (err: any) { 20 ctx.status = 503 21 ctx.body = { ok: false, error: err?.message || String(err) } 22 } 23 }) 24 25 // List pubsub topics 26 router.get('/pubsub/topics', async (ctx) => { 27 const topics = await ipfs.pubsub.ls() 28 ctx.body = { topics } 29 }) 30 31 // List peers for a topic 32 router.get('/pubsub/peers/:topic', async (ctx) => { 33 const { topic } = ctx.params 34 const peers = await ipfs.pubsub.peers(topic) 35 ctx.body = { topic, peers } 36 }) 37 38 // Publish a message to a topic 39 // body: { topic: string, data: string, encoding?: 'utf8'|'base64' } 40 router.post('/pubsub/publish', async (ctx) => { 41 const { topic, data, encoding } = ctx.request.body as { topic?: string, data?: string, encoding?: 'utf8' | 'base64' } 42 if (!topic || typeof topic !== 'string') { 43 ctx.status = 400 44 ctx.body = { error: 'topic (string) is required' } 45 return 46 } 47 if (typeof data !== 'string') { 48 ctx.status = 400 49 ctx.body = { error: 'data (string) is required' } 50 return 51 } 52 const enc = encoding === 'base64' ? 'base64' : 'utf8' 53 const bytes = Buffer.from(data, enc) 54 await ipfs.pubsub.publish(topic, bytes) 55 ctx.body = { ok: true } 56 }) 57 58 // Subscribe using Server-Sent Events (SSE) 59 router.get('/pubsub/subscribe/:topic', async (ctx) => { 60 const { topic } = ctx.params 61 // Set SSE headers 62 ctx.req.setTimeout(0) 63 ctx.set('Content-Type', 'text/event-stream') 64 ctx.set('Cache-Control', 'no-cache') 65 ctx.set('Connection', 'keep-alive') 66 ctx.status = 200 67 68 const res = ctx.res 69 70 const writeEvent = (event: string, data: unknown) => { 71 res.write(`event: ${event}\n`) 72 res.write(`data: ${JSON.stringify(data)}\n\n`) 73 } 74 75 let aborted = false 76 const onClose = () => { aborted = true } 77 reqOnClose(ctx, onClose) 78 79 try { 80 writeEvent('open', { topic }) 81 82 // Use handler-based subscription API with AbortController 83 const controller = new AbortController() 84 const handler = (msg: any) => { 85 if (aborted) return 86 try { 87 const from = msg.from?.toString?.() ?? msg.from 88 89 const toHex = (val: any): string | undefined => { 90 if (val == null) return undefined 91 if (typeof val === 'bigint') return val.toString(16) 92 if (typeof val === 'string') return val 93 try { 94 if (val instanceof Uint8Array || Array.isArray(val)) return Buffer.from(val).toString('hex') 95 return Buffer.from(val as any).toString('hex') 96 } catch { 97 try { return String(val) } catch { return undefined } 98 } 99 } 100 101 const toBase64 = (val: any): string => { 102 if (val == null) return '' 103 if (typeof val === 'string') return Buffer.from(val, 'utf8').toString('base64') 104 try { 105 if (val instanceof Uint8Array || Array.isArray(val)) return Buffer.from(val).toString('base64') 106 if (typeof val === 'object' && Array.isArray((val as any).data)) return Buffer.from((val as any).data).toString('base64') 107 return Buffer.from(val as any).toString('base64') 108 } catch { 109 return '' 110 } 111 } 112 113 const seqno = toHex(msg.sequenceNumber ?? (msg as any).seqno) 114 const dataB64 = toBase64(msg.data) 115 writeEvent('message', { from, topic: msg.topic ?? topic, seqno, data: dataB64 }) 116 } catch (e) { 117 writeEvent('error', { error: (e as any)?.message || String(e) }) 118 } 119 } 120 121 await ipfs.pubsub.subscribe(topic, handler, { signal: controller.signal } as any) 122 123 // Keep the connection open until the client disconnects 124 await new Promise<void>((resolve) => { 125 reqOnClose(ctx, () => resolve()) 126 }) 127 128 // Abort the subscription and attempt to unsubscribe 129 controller.abort() 130 try { 131 // not all clients implement unsubscribe; guard with optional chaining 132 await (ipfs.pubsub as any).unsubscribe?.(topic, handler) 133 } catch {} 134 135 writeEvent('end', { topic }) 136 } catch (err: any) { 137 writeEvent('error', { error: err?.message || String(err) }) 138 } finally { 139 res.end() 140 } 141 }) 142 143 // IPFS cat by CID 144 router.get('/ipfs/cat/:cid', async (ctx) => { 145 const { cid } = ctx.params as { cid: string } 146 const encodingParam = String(ctx.query.encoding || 'base64') 147 const encoding = encodingParam === 'utf8' ? 'utf8' : 'base64' 148 if (!cid || typeof cid !== 'string') { 149 ctx.status = 400 150 ctx.body = { error: 'cid (string) is required' } 151 return 152 } 153 try { 154 const chunks: Uint8Array[] = [] 155 for await (const chunk of ipfs.cat(cid)) { 156 chunks.push(chunk) 157 } 158 const buf = Buffer.concat(chunks.map((c) => Buffer.from(c))) 159 const data = encoding === 'utf8' ? buf.toString('utf8') : buf.toString('base64') 160 ctx.body = { cid, data, encoding } 161 } catch (err: any) { 162 ctx.status = 500 163 ctx.body = { error: err?.message || String(err) } 164 } 165 }) 166 167 app 168 .use(cors()) 169 .use(bodyParser()) 170 .use(router.routes()) 171 .use(router.allowedMethods()) 172 173 const PORT = Number(process.env.PORT || 3399) 174 app.listen(PORT, () => { 175 console.log(`IPFS HTTP interface listening on http://localhost:${PORT}`) 176 console.log(`Using Kubo RPC at ${IPFS_API_URL}`) 177 }) 178 179 // Helper to detect when client disconnects 180 function reqOnClose(ctx: Koa.ParameterizedContext, fn: () => void) { 181 const req = ctx.req 182 const onClose = () => { 183 req.removeListener('close', onClose) 184 req.removeListener('end', onClose) 185 req.removeListener('error', onClose) 186 fn() 187 } 188 req.on('close', onClose) 189 req.on('end', onClose) 190 req.on('error', onClose) 191 }