main.ts
  1  import Koa from 'koa'
  2  import Router from '@koa/router'
  3  import cors from '@koa/cors'
  4  import bodyParser from 'koa-bodyparser'
  5  import { create as createKuboClient } from 'kubo-rpc-client'
  6  
  7  // Create Kubo (IPFS) RPC client pointing to local Kubo by default
  8  const IPFS_API_URL = process.env.IPFS_API_URL || 'http://127.0.0.1:5001/api/v0'
  9  const ipfs = createKuboClient({ url: IPFS_API_URL })
 10  
 11  const app = new Koa()
 12  const router = new Router({ prefix: '/api' })
 13  
 14  // Health check - ensures Kubo is reachable
 15  router.get('/health', async (ctx) => {
 16    try {
 17      const id = await ipfs.id()
 18      ctx.body = { ok: true, id }
 19    } catch (err: any) {
 20      ctx.status = 503
 21      ctx.body = { ok: false, error: err?.message || String(err) }
 22    }
 23  })
 24  
 25  // List pubsub topics
 26  router.get('/pubsub/topics', async (ctx) => {
 27    const topics = await ipfs.pubsub.ls()
 28    ctx.body = { topics }
 29  })
 30  
 31  // List peers for a topic
 32  router.get('/pubsub/peers/:topic', async (ctx) => {
 33    const { topic } = ctx.params
 34    const peers = await ipfs.pubsub.peers(topic)
 35    ctx.body = { topic, peers }
 36  })
 37  
 38  // Publish a message to a topic
 39  // body: { topic: string, data: string, encoding?: 'utf8'|'base64' }
 40  router.post('/pubsub/publish', async (ctx) => {
 41    const { topic, data, encoding } = ctx.request.body as { topic?: string, data?: string, encoding?: 'utf8' | 'base64' }
 42    if (!topic || typeof topic !== 'string') {
 43      ctx.status = 400
 44      ctx.body = { error: 'topic (string) is required' }
 45      return
 46    }
 47    if (typeof data !== 'string') {
 48      ctx.status = 400
 49      ctx.body = { error: 'data (string) is required' }
 50      return
 51    }
 52    const enc = encoding === 'base64' ? 'base64' : 'utf8'
 53    const bytes = Buffer.from(data, enc)
 54    await ipfs.pubsub.publish(topic, bytes)
 55    ctx.body = { ok: true }
 56  })
 57  
 58  // Subscribe using Server-Sent Events (SSE)
 59  router.get('/pubsub/subscribe/:topic', async (ctx) => {
 60    const { topic } = ctx.params
 61    // Set SSE headers
 62    ctx.req.setTimeout(0)
 63    ctx.set('Content-Type', 'text/event-stream')
 64    ctx.set('Cache-Control', 'no-cache')
 65    ctx.set('Connection', 'keep-alive')
 66    ctx.status = 200
 67  
 68    const res = ctx.res
 69  
 70    const writeEvent = (event: string, data: unknown) => {
 71      res.write(`event: ${event}\n`)
 72      res.write(`data: ${JSON.stringify(data)}\n\n`)
 73    }
 74  
 75    let aborted = false
 76    const onClose = () => { aborted = true }
 77    reqOnClose(ctx, onClose)
 78  
 79    try {
 80      writeEvent('open', { topic })
 81  
 82      // Use handler-based subscription API with AbortController
 83      const controller = new AbortController()
 84      const handler = (msg: any) => {
 85        if (aborted) return
 86        try {
 87          const from = msg.from?.toString?.() ?? msg.from
 88  
 89          const toHex = (val: any): string | undefined => {
 90            if (val == null) return undefined
 91            if (typeof val === 'bigint') return val.toString(16)
 92            if (typeof val === 'string') return val
 93            try {
 94              if (val instanceof Uint8Array || Array.isArray(val)) return Buffer.from(val).toString('hex')
 95              return Buffer.from(val as any).toString('hex')
 96            } catch {
 97              try { return String(val) } catch { return undefined }
 98            }
 99          }
100  
101          const toBase64 = (val: any): string => {
102            if (val == null) return ''
103            if (typeof val === 'string') return Buffer.from(val, 'utf8').toString('base64')
104            try {
105              if (val instanceof Uint8Array || Array.isArray(val)) return Buffer.from(val).toString('base64')
106              if (typeof val === 'object' && Array.isArray((val as any).data)) return Buffer.from((val as any).data).toString('base64')
107              return Buffer.from(val as any).toString('base64')
108            } catch {
109              return ''
110            }
111          }
112  
113          const seqno = toHex(msg.sequenceNumber ?? (msg as any).seqno)
114          const dataB64 = toBase64(msg.data)
115          writeEvent('message', { from, topic: msg.topic ?? topic, seqno, data: dataB64 })
116        } catch (e) {
117          writeEvent('error', { error: (e as any)?.message || String(e) })
118        }
119      }
120  
121      await ipfs.pubsub.subscribe(topic, handler, { signal: controller.signal } as any)
122  
123      // Keep the connection open until the client disconnects
124      await new Promise<void>((resolve) => {
125        reqOnClose(ctx, () => resolve())
126      })
127  
128      // Abort the subscription and attempt to unsubscribe
129      controller.abort()
130      try {
131        // not all clients implement unsubscribe; guard with optional chaining
132        await (ipfs.pubsub as any).unsubscribe?.(topic, handler)
133      } catch {}
134  
135      writeEvent('end', { topic })
136    } catch (err: any) {
137      writeEvent('error', { error: err?.message || String(err) })
138    } finally {
139      res.end()
140    }
141  })
142  
143  // IPFS cat by CID
144  router.get('/ipfs/cat/:cid', async (ctx) => {
145    const { cid } = ctx.params as { cid: string }
146    const encodingParam = String(ctx.query.encoding || 'base64')
147    const encoding = encodingParam === 'utf8' ? 'utf8' : 'base64'
148    if (!cid || typeof cid !== 'string') {
149      ctx.status = 400
150      ctx.body = { error: 'cid (string) is required' }
151      return
152    }
153    try {
154      const chunks: Uint8Array[] = []
155      for await (const chunk of ipfs.cat(cid)) {
156        chunks.push(chunk)
157      }
158      const buf = Buffer.concat(chunks.map((c) => Buffer.from(c)))
159      const data = encoding === 'utf8' ? buf.toString('utf8') : buf.toString('base64')
160      ctx.body = { cid, data, encoding }
161    } catch (err: any) {
162      ctx.status = 500
163      ctx.body = { error: err?.message || String(err) }
164    }
165  })
166  
167  app
168    .use(cors())
169    .use(bodyParser())
170    .use(router.routes())
171    .use(router.allowedMethods())
172  
173  const PORT = Number(process.env.PORT || 3399)
174  app.listen(PORT, () => {
175    console.log(`IPFS HTTP interface listening on http://localhost:${PORT}`)
176    console.log(`Using Kubo RPC at ${IPFS_API_URL}`)
177  })
178  
179  // Helper to detect when client disconnects
180  function reqOnClose(ctx: Koa.ParameterizedContext, fn: () => void) {
181    const req = ctx.req
182    const onClose = () => {
183      req.removeListener('close', onClose)
184      req.removeListener('end', onClose)
185      req.removeListener('error', onClose)
186      fn()
187    }
188    req.on('close', onClose)
189    req.on('end', onClose)
190    req.on('error', onClose)
191  }