/ app / back / index.mjs
index.mjs
  1  import path from "path"
  2  import uWS from "uWebSockets.js"
  3  import { readFile } from "node:fs/promises"
  4  import { ASK_CLIENT_SECRET } from "./secrets.mjs"
  5  
  6  import { uuidv4, verifyJWT } from "./util.mjs"
  7  import { Persist, Cache } from "./model.mjs"
  8  import { initEBSMessageHandler } from "./wsMsg.mjs"
  9  
 10  import * as constant from "./constants.mjs"
 11  
 12  const PORT = process.env.PORT || 1026
 13  ,     db = Persist({ dbName: "./askebs.sql" })
 14  ,     cache = Cache({ socket: { port: 6379 }})
 15  
 16  const loggingFlag = process.argv.indexOf("-logging")
 17  let LOGGING = loggingFlag > -1
 18  
 19  cache.initConnection()
 20  
 21  // @TODO: I would like to factor the logging helper code out but it's not critical
 22  let logHeader = ""
 23  logHeader += (`PID: ${process.pid}`).padEnd(24, " ")
 24  logHeader += "msg".padEnd(18, " ")
 25  logHeader += "channel id".padEnd(13, " ")
 26  logHeader += "user id".padEnd(13, " ")
 27  logHeader += "--------".padEnd(21, " ")
 28  logHeader += "\n" + (new Array(logHeader.length - 1).fill("=").join(""))
 29  
 30  logHeader = (new Array(logHeader.length/2 - 1).fill("=").join("")) + "\n" + logHeader
 31  
 32  let logCounter = 0
 33  let connCounter = 0
 34  
 35  function logLine(type, channel, user) {
 36    let line = new Date().toLocaleString().padEnd(24, " ")
 37  
 38    line += (`${type}   `).padEnd(18, " ")
 39    line += (`${channel}   `).padEnd(13, " ")
 40    line += (`${user}   `).padEnd(13, " ")
 41    line += (`_____`).padEnd(22, " ")
 42  
 43    return line
 44  }
 45  
 46  function reportTotalConnections() {
 47    if (!LOGGING) return
 48    console.log(new Date().toLocaleString().padEnd(24, " ") + `${connCounter} connections `.padEnd(logHeader.length/3 - 25, "-"))
 49  }
 50  
 51  function reportMessageAndBackbuffer(user, type, data) {
 52    if (!LOGGING) return
 53    let { channelId, userId } = user
 54  
 55    console.log(logLine(type, channelId ?? data.channelId ?? "---", userId ?? data.userId ?? "---"))
 56    logCounter++
 57  
 58    if (logCounter % 40 == 0) {
 59      console.log(logHeader)
 60    }
 61  }
 62  
 63  if (LOGGING) {
 64    console.log(logHeader)
 65    setInterval(reportTotalConnections, 5000)
 66  }
 67  
 68  // @NOTE: This is a helper function for processing the
 69  //        JSON body of a request to a uWS HTTP endpoint.
 70  function readJSON(res, cb) {
 71    let buffer
 72  
 73    res.onData((ab, isLast) => {
 74      let chunk = Buffer.from(ab)
 75      if (isLast) {
 76        if (buffer) {
 77          cb(JSON.parse(Buffer.concat([buffer, chunk])))
 78        } else {
 79          cb(JSON.parse(Buffer.concat([chunk])))
 80        }
 81      } else {
 82        if (buffer) {
 83          buffer = Buffer.concat([buffer, chunk])
 84        } else {
 85          buffer = Buffer.concat([chunk])
 86        }
 87      }
 88    })
 89  
 90    res.onAborted(() => { res.aborted = true })
 91  }
 92  
 93  function setCORSHeaders(req,res) {
 94    // https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS/Errors/CORSNotSupportingCredentials
 95    // @NOTE: Using the '*' value for this header doesn't work when credentials are present
 96    //        in request headers (as cookies, authorization headers, or client TLS). Although
 97    //        suddenly the requests are violating CORS policy even on pre-flight (without creds)
 98    //        this is a more appropriate way to respond to requests by explicitly targeting the req origin
 99    res.writeHeader("Access-Control-Allow-Origin", req.getHeader("origin"))
100    res.writeHeader("Access-Control-Allow-Headers", "Authorization, Content-Type")
101    res.writeHeader("Access-Control-Allow-Credentials", "true")
102    res.writeHeader("Vary", "Origin")
103  }
104  
105  const rankingBroadcastTimeouts = new Map() // channel IDs => {sessionIds, timeout IDs}
106  ,     openEndlessSessions = new Map() // channel IDs => { sessionIds, websocket connection objects }
107  ,     countdownTimeouts = new Map() // channel IDs => timeout IDs
108  
109  
110  const ASKEBS = uWS.SSLApp({
111    key_file_name: "privkey.pem",
112    cert_file_name: "fullchain.pem",
113  })
114  
115  ASKEBS.ws("/connect", {
116    compression: uWS.DEDICATED_COMPRESSOR_3KB,
117    maxPayloadLength: 16 * 1024 * 1024,
118    sendPingsAutomatically: true,
119    idleTimeout: 120,
120    upgrade: (res, req, context) => {
121      res.upgrade({ url: req.getUrl() },
122        req.getHeader("sec-websocket-key"),
123        req.getHeader("sec-websocket-protocol"),
124        req.getHeader("sec-websocket-extensions"), context)
125    },
126    open: (ws) => {
127      connCounter++
128      const ok = ws.send(JSON.stringify({
129        type: constant.AUTH,
130        data: {
131          message: "Connected to ASK EBS. Respond with authorization credentials."
132        }
133      }))
134    },
135    message: await initEBSMessageHandler(
136      db, cache, ASK_CLIENT_SECRET, ASKEBS,
137      reportMessageAndBackbuffer,
138      rankingBroadcastTimeouts,
139      openEndlessSessions,
140      countdownTimeouts
141    ),
142    close: (ws, code, message) => {
143      connCounter--
144      if (LOGGING) console.log(new Date().toLocaleString().padEnd(24, " ") + `${ws.userId}/${ws.channelId} closed their connection`.padEnd(logHeader.length/3 - 25, "-"))
145  
146    },
147    drain: (ws) => {
148      // do a better log here
149      // console.log(`Backpressure is up: ${ws.getBufferedAmount()}b `)
150    }
151  })
152  // @TODO: Add OPTIONS handlers to new endpoints
153  ASKEBS.options("/config", async (res, req) => {
154    res.writeHeader("Access-Control-Allow-Methods", "GET, POST")
155    setCORSHeaders(req,res)
156    res.end()
157    return
158  })
159  ASKEBS.get("/config", async (res, req) => {
160    setCORSHeaders(req,res)
161    res.onAborted(() => { res.aborted = true })
162    
163    if (!req.getHeader("authorization")) {
164      res.writeStatus("401")
165      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
166      res.end()
167      return
168    }
169  
170    const token = req.getHeader("authorization").split(" ")[1]
171    ,     payload = verifyJWT(token, ASK_CLIENT_SECRET)
172    
173    if (!payload || payload.role != "broadcaster") {
174      res.writeStatus("401")
175      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
176      res.end()
177      return
178    }
179  
180    const config = await db.getConfig(payload.channel_id)
181  
182    res.cork(() => {
183      res.end(JSON.stringify({
184        success: Boolean(config),
185        data: {
186          config: config || `No existing configuration for your channel.`
187        }
188      }))
189    })
190  })
191  ASKEBS.post("/config", async (res, req) => {
192    setCORSHeaders(req,res)
193  
194    if (!req.getHeader("authorization")) {
195      res.writeStatus("401")
196      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
197      res.end()
198      return
199    }
200  
201    const token = req.getHeader("authorization").split(" ")[1]
202    ,     payload = verifyJWT(token, ASK_CLIENT_SECRET)
203    
204    if (!payload) {
205      res.writeStatus("401")
206      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
207      res.end()
208      return
209    }
210  
211    readJSON(res, async (data) => {
212      const { channelId, userName, config } = data
213      ,     out = { success: true, data: `Config for ${userName} successfully updated!`}
214  
215      const preparedConfigData = [
216        channelId,  
217        userName,
218        config.viewer_points,
219        config.follower_points,
220        config.subscriber_points,
221        config.session_length,
222        config.session_mode,
223        config.enabled_submission_multipliers.join(" "),
224        config.enabled_upvote_multipliers.join(" "),
225        config.suspended.join(" "),
226        config.exclude_viewers,
227        config.exclude_followers,
228      ]
229  
230      // @TODO: Add some error handling here?
231      await db.setSessionConfig(preparedConfigData)
232      
233      res.cork(() => {
234        res.end(JSON.stringify(out))
235      }) 
236    })
237  })
238  ASKEBS.options("/question", async (res, req) => {
239    res.writeHeader("Allow", "PATCH")
240    res.writeHeader("Access-Control-Allow-Methods", "PATCH")
241    setCORSHeaders(req,res)
242    res.end()
243    return
244  })
245  ASKEBS.patch("/question", async (res, req) => {
246    setCORSHeaders(req,res)
247   
248    if (!req.getHeader("authorization")) {
249      res.writeStatus("401")
250      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
251      res.end()
252      return
253    }
254  
255    const token = req.getHeader("authorization").split(" ")[1]
256    ,     payload = verifyJWT(token, ASK_CLIENT_SECRET)
257    
258    if (!payload) {
259      res.writeStatus("401")
260      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
261      res.end()
262      return
263    }
264  
265    readJSON(res, async (data) => {
266      const { channelId, sessionId, userId, questionId, rating, revealed, locked } = data
267      ,     responseData = { success: true }
268     
269      if (channelId == payload.channel_id) {
270        // @NOTE: we should do some error handling in cases like this right?
271        //        - the question id could be invalid
272        //        - the types of the params could be invalid
273        //        ofc the likelihood is low but it is something which could be exploited
274  
275        if (rating) {
276          await db.updateQuestionRating([rating, questionId])
277          responseData.data = { questionId, rating }
278        }
279        if (revealed) {
280          const timestamp = Date.now()
281          await db.updateQuestionRevealed([timestamp, questionId])
282          responseData.data = { questionId, revealed: timestamp }
283        }
284        if (locked != undefined) {
285          if (locked) {
286            await db.suspendUser(channelId, userId)
287          } else {
288            const suspendedUsers = await db.getSuspendedUsers(channelId)
289            ,     suspendedUsersSet = new Set(suspendedUsers.split(" "))
290            
291            suspendedUsersSet.delete(userId)
292  
293            const updatedSuspendedUsers = [...suspendedUsersSet].join(" ")
294            await db.updateSuspendedUsers(channelId, updatedSuspendedUsers)
295          }
296          await db.updateQuestionLock(+locked, questionId)
297          responseData.data = { questionId, locked }
298        }
299      } else {
300        responseData.success = false
301        responseData.data = `You are not authorized to update this question!`
302      }
303  
304      if (responseData.success) {
305        await cache.invalidateEBSSessionData(channelId, sessionId)
306      }
307  
308      res.cork(() => {
309        res.writeStatus(responseData.success ? "200":"403").end(JSON.stringify(responseData), true)
310      })
311    })
312  })
313  ASKEBS.options("/questions/:sessionId", async (res, req) => {
314    res.writeHeader("Allow", "GET")
315    setCORSHeaders(req,res)
316    res.end()
317    return
318  })
319  ASKEBS.get("/questions/:sessionId", async (res, req) => {
320    setCORSHeaders(req,res)
321   
322    if (!req.getHeader("authorization")) {
323      res.writeStatus("401")
324      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
325      res.end()
326      return
327    }
328  
329    const token = req.getHeader("authorization").split(" ")[1]
330    ,     payload = verifyJWT(token, ASK_CLIENT_SECRET)
331    
332    if (!payload) {
333      res.writeStatus("401")
334      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
335      res.end()
336      return
337    }
338  
339    const sessionId = req.getParameter(0)
340    ,     responseData = { success: true }
341  
342    const questionData = db.getSessionQuestions(sessionId)
343    responseData.data = questionData
344  
345    if (questionData[0].channel_id != payload.channel_id) {
346      responseData.success = false
347      responseData.data = `You're not authorized to retrieve these questions!`
348    }
349  
350    res.cork(() => {
351      res.writeStatus(responseData.success ? "200":"403").end(JSON.stringify(responseData), true)
352    })
353  })
354  ASKEBS.options("/upvotes/:questionId", async (res, req) => {
355    res.writeHeader("Allow", "GET")
356    setCORSHeaders(req,res)
357    res.end()
358    return
359  })
360  ASKEBS.get("/upvotes/:questionId", async (res, req) => {
361    setCORSHeaders(req,res)
362   
363    if (!req.getHeader("authorization")) {
364      res.writeStatus("401")
365      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
366      res.end()
367      return
368    }
369  
370    const token = req.getHeader("authorization").split(" ")[1]
371    ,     payload = verifyJWT(token, ASK_CLIENT_SECRET)
372    
373    if (!payload) {
374      res.writeStatus("401")
375      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
376      res.end()
377      return
378    }
379  
380    const questionId = req.getParameter(0)
381    ,     responseData = { success: true }
382  
383    const questionUpvotes = await db.getQuestionUpvotes(questionId)
384    responseData.data = { questionId, upvotes: questionUpvotes }
385  
386    res.cork(() => {
387      res.writeStatus(responseData.success ? "200":"403").end(JSON.stringify(responseData), true)
388    })
389  })
390  ASKEBS.options("/sessions", async (res, req) => {
391    res.writeHeader("Allow", "GET")
392    setCORSHeaders(req,res)
393    res.end()
394    return
395  })
396  ASKEBS.get("/sessions", async (res, req) => {
397    setCORSHeaders(req,res)
398    res.onAborted(() => { res.aborted = true })
399   
400    if (!req.getHeader("authorization")) {
401      res.writeStatus("401")
402      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
403      res.end()
404      return
405    }
406  
407    const token = req.getHeader("authorization").split(" ")[1]
408    ,     payload = verifyJWT(token, ASK_CLIENT_SECRET)
409    
410    if (!payload) {
411      res.writeStatus("401")
412      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
413      res.end()
414      return
415    }
416  
417    const sessions = await db.getPreviousSessions(payload.channel_id, 10)
418  
419    res.cork(() => {
420      res.end(JSON.stringify({
421        success: Boolean(sessions),
422        data: {
423          sessions: sessions || `No previous sessions for your channel.`
424        }
425      }))
426    })
427  })
428  ASKEBS.options("/session/:id", async (res, req) => {
429    res.writeHeader("Allow", "GET")
430    setCORSHeaders(req,res)
431    res.end()
432    return
433  })
434  ASKEBS.get("/session/:id", async (res, req) => {
435    setCORSHeaders(req,res)
436    res.onAborted(() => { res.aborted = true })
437  
438    if (!req.getHeader("authorization")) {
439      res.writeStatus("401")
440      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
441      res.end()
442      return
443    }
444  
445    const token = req.getHeader("authorization").split(" ")[1]
446    ,     payload = verifyJWT(token, ASK_CLIENT_SECRET)
447    
448    if (!payload) {
449      res.writeStatus("401")
450      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
451      res.end()
452      return
453    }
454  
455    const sessionId = req.getParameter(0)
456    ,     channelId = payload.channel_id 
457    ,     cachedSessionData = await cache.getEBSSessionData(channelId, sessionId)
458  
459    let out = null
460  
461    if (cachedSessionData) {
462      out = cachedSessionData
463    } else {
464      const questions = await db.getAllQuestions(sessionId)
465      ,     purchases = await db.getAllPurchases(sessionId)
466      
467      out = JSON.stringify({
468        success: Boolean(questions && purchases),
469        data: {
470          questions: questions || `No previous sessions for your channel.`,
471          purchases: purchases || `No previous sessions for your channel.`
472        }
473      })
474  
475      await cache.setEBSSessionData(channelId, sessionId, out)
476    }
477  
478    res.cork(() => {
479      res.end(out)
480    })
481  })
482  ASKEBS.options("/overlay", async (res, req) => {
483    res.writeHeader("Allow", "GET")
484    setCORSHeaders(req,res)
485    res.end()
486    return
487  })
488  ASKEBS.get("/overlay", async (res, req) => {
489    setCORSHeaders(req,res)
490   
491    if (!req.getHeader("authorization")) {
492      res.writeStatus("401")
493      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
494      res.end()
495      return
496    }
497  
498    const token = req.getHeader("authorization").split(" ")[1]
499    ,     payload = verifyJWT(token, ASK_CLIENT_SECRET)
500    
501    if (!payload) {
502      res.writeStatus("401")
503      res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space")
504      res.end()
505      return
506    }
507  
508    // @TODO: Reject subsequent requests for the same channel, add a cache field indicating
509    //        whether channel overlay is open.
510    //        - Label the ws connection for the overlay and add a redundant 
511    //          cache delete call when that connection is closed.
512  
513    const overlayApp = await readFile("./overlay/index.html") // @TODO: Make path to overlay app a constant
514    res.cork(() => {
515      res.end(overlayApp)
516    })
517  })
518  // @NOTE: This is the default response. Any requests other than
519  //        the above endpoints is considered an invalid request
520  ASKEBS.any("/*", (res, req) => {
521    res.writeStatus("400")
522    res.end()
523  })
524  ASKEBS.listen(PORT, (listenSocket) => {
525    if (listenSocket) {
526      console.log((new Date().toLocaleString() + `   listening on port ${PORT}...`).padEnd(24, " "))
527    } else {
528      throw new Error(`Failed to bind to this port: ${PORT}`)
529    }
530  })