index.mjs
1 import path from "path" 2 import uWS from "uWebSockets.js" 3 import { readFile } from "node:fs/promises" 4 import { ASK_CLIENT_SECRET } from "./secrets.mjs" 5 6 import { uuidv4, verifyJWT } from "./util.mjs" 7 import { Persist, Cache } from "./model.mjs" 8 import { initEBSMessageHandler } from "./wsMsg.mjs" 9 10 import * as constant from "./constants.mjs" 11 12 const PORT = process.env.PORT || 1026 13 , db = Persist({ dbName: "./askebs.sql" }) 14 , cache = Cache({ socket: { port: 6379 }}) 15 16 const loggingFlag = process.argv.indexOf("-logging") 17 let LOGGING = loggingFlag > -1 18 19 cache.initConnection() 20 21 // @TODO: I would like to factor the logging helper code out but it's not critical 22 let logHeader = "" 23 logHeader += (`PID: ${process.pid}`).padEnd(24, " ") 24 logHeader += "msg".padEnd(18, " ") 25 logHeader += "channel id".padEnd(13, " ") 26 logHeader += "user id".padEnd(13, " ") 27 logHeader += "--------".padEnd(21, " ") 28 logHeader += "\n" + (new Array(logHeader.length - 1).fill("=").join("")) 29 30 logHeader = (new Array(logHeader.length/2 - 1).fill("=").join("")) + "\n" + logHeader 31 32 let logCounter = 0 33 let connCounter = 0 34 35 function logLine(type, channel, user) { 36 let line = new Date().toLocaleString().padEnd(24, " ") 37 38 line += (`${type} `).padEnd(18, " ") 39 line += (`${channel} `).padEnd(13, " ") 40 line += (`${user} `).padEnd(13, " ") 41 line += (`_____`).padEnd(22, " ") 42 43 return line 44 } 45 46 function reportTotalConnections() { 47 if (!LOGGING) return 48 console.log(new Date().toLocaleString().padEnd(24, " ") + `${connCounter} connections `.padEnd(logHeader.length/3 - 25, "-")) 49 } 50 51 function reportMessageAndBackbuffer(user, type, data) { 52 if (!LOGGING) return 53 let { channelId, userId } = user 54 55 console.log(logLine(type, channelId ?? data.channelId ?? "---", userId ?? data.userId ?? "---")) 56 logCounter++ 57 58 if (logCounter % 40 == 0) { 59 console.log(logHeader) 60 } 61 } 62 63 if (LOGGING) { 64 console.log(logHeader) 65 setInterval(reportTotalConnections, 5000) 66 } 67 68 // @NOTE: This is a helper function for processing the 69 // JSON body of a request to a uWS HTTP endpoint. 70 function readJSON(res, cb) { 71 let buffer 72 73 res.onData((ab, isLast) => { 74 let chunk = Buffer.from(ab) 75 if (isLast) { 76 if (buffer) { 77 cb(JSON.parse(Buffer.concat([buffer, chunk]))) 78 } else { 79 cb(JSON.parse(Buffer.concat([chunk]))) 80 } 81 } else { 82 if (buffer) { 83 buffer = Buffer.concat([buffer, chunk]) 84 } else { 85 buffer = Buffer.concat([chunk]) 86 } 87 } 88 }) 89 90 res.onAborted(() => { res.aborted = true }) 91 } 92 93 function setCORSHeaders(req,res) { 94 // https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS/Errors/CORSNotSupportingCredentials 95 // @NOTE: Using the '*' value for this header doesn't work when credentials are present 96 // in request headers (as cookies, authorization headers, or client TLS). Although 97 // suddenly the requests are violating CORS policy even on pre-flight (without creds) 98 // this is a more appropriate way to respond to requests by explicitly targeting the req origin 99 res.writeHeader("Access-Control-Allow-Origin", req.getHeader("origin")) 100 res.writeHeader("Access-Control-Allow-Headers", "Authorization, Content-Type") 101 res.writeHeader("Access-Control-Allow-Credentials", "true") 102 res.writeHeader("Vary", "Origin") 103 } 104 105 const rankingBroadcastTimeouts = new Map() // channel IDs => {sessionIds, timeout IDs} 106 , openEndlessSessions = new Map() // channel IDs => { sessionIds, websocket connection objects } 107 , countdownTimeouts = new Map() // channel IDs => timeout IDs 108 109 110 const ASKEBS = uWS.SSLApp({ 111 key_file_name: "privkey.pem", 112 cert_file_name: "fullchain.pem", 113 }) 114 115 ASKEBS.ws("/connect", { 116 compression: uWS.DEDICATED_COMPRESSOR_3KB, 117 maxPayloadLength: 16 * 1024 * 1024, 118 sendPingsAutomatically: true, 119 idleTimeout: 120, 120 upgrade: (res, req, context) => { 121 res.upgrade({ url: req.getUrl() }, 122 req.getHeader("sec-websocket-key"), 123 req.getHeader("sec-websocket-protocol"), 124 req.getHeader("sec-websocket-extensions"), context) 125 }, 126 open: (ws) => { 127 connCounter++ 128 const ok = ws.send(JSON.stringify({ 129 type: constant.AUTH, 130 data: { 131 message: "Connected to ASK EBS. Respond with authorization credentials." 132 } 133 })) 134 }, 135 message: await initEBSMessageHandler( 136 db, cache, ASK_CLIENT_SECRET, ASKEBS, 137 reportMessageAndBackbuffer, 138 rankingBroadcastTimeouts, 139 openEndlessSessions, 140 countdownTimeouts 141 ), 142 close: (ws, code, message) => { 143 connCounter-- 144 if (LOGGING) console.log(new Date().toLocaleString().padEnd(24, " ") + `${ws.userId}/${ws.channelId} closed their connection`.padEnd(logHeader.length/3 - 25, "-")) 145 146 }, 147 drain: (ws) => { 148 // do a better log here 149 // console.log(`Backpressure is up: ${ws.getBufferedAmount()}b `) 150 } 151 }) 152 // @TODO: Add OPTIONS handlers to new endpoints 153 ASKEBS.options("/config", async (res, req) => { 154 res.writeHeader("Access-Control-Allow-Methods", "GET, POST") 155 setCORSHeaders(req,res) 156 res.end() 157 return 158 }) 159 ASKEBS.get("/config", async (res, req) => { 160 setCORSHeaders(req,res) 161 res.onAborted(() => { res.aborted = true }) 162 163 if (!req.getHeader("authorization")) { 164 res.writeStatus("401") 165 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 166 res.end() 167 return 168 } 169 170 const token = req.getHeader("authorization").split(" ")[1] 171 , payload = verifyJWT(token, ASK_CLIENT_SECRET) 172 173 if (!payload || payload.role != "broadcaster") { 174 res.writeStatus("401") 175 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 176 res.end() 177 return 178 } 179 180 const config = await db.getConfig(payload.channel_id) 181 182 res.cork(() => { 183 res.end(JSON.stringify({ 184 success: Boolean(config), 185 data: { 186 config: config || `No existing configuration for your channel.` 187 } 188 })) 189 }) 190 }) 191 ASKEBS.post("/config", async (res, req) => { 192 setCORSHeaders(req,res) 193 194 if (!req.getHeader("authorization")) { 195 res.writeStatus("401") 196 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 197 res.end() 198 return 199 } 200 201 const token = req.getHeader("authorization").split(" ")[1] 202 , payload = verifyJWT(token, ASK_CLIENT_SECRET) 203 204 if (!payload) { 205 res.writeStatus("401") 206 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 207 res.end() 208 return 209 } 210 211 readJSON(res, async (data) => { 212 const { channelId, userName, config } = data 213 , out = { success: true, data: `Config for ${userName} successfully updated!`} 214 215 const preparedConfigData = [ 216 channelId, 217 userName, 218 config.viewer_points, 219 config.follower_points, 220 config.subscriber_points, 221 config.session_length, 222 config.session_mode, 223 config.enabled_submission_multipliers.join(" "), 224 config.enabled_upvote_multipliers.join(" "), 225 config.suspended.join(" "), 226 config.exclude_viewers, 227 config.exclude_followers, 228 ] 229 230 // @TODO: Add some error handling here? 231 await db.setSessionConfig(preparedConfigData) 232 233 res.cork(() => { 234 res.end(JSON.stringify(out)) 235 }) 236 }) 237 }) 238 ASKEBS.options("/question", async (res, req) => { 239 res.writeHeader("Allow", "PATCH") 240 res.writeHeader("Access-Control-Allow-Methods", "PATCH") 241 setCORSHeaders(req,res) 242 res.end() 243 return 244 }) 245 ASKEBS.patch("/question", async (res, req) => { 246 setCORSHeaders(req,res) 247 248 if (!req.getHeader("authorization")) { 249 res.writeStatus("401") 250 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 251 res.end() 252 return 253 } 254 255 const token = req.getHeader("authorization").split(" ")[1] 256 , payload = verifyJWT(token, ASK_CLIENT_SECRET) 257 258 if (!payload) { 259 res.writeStatus("401") 260 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 261 res.end() 262 return 263 } 264 265 readJSON(res, async (data) => { 266 const { channelId, sessionId, userId, questionId, rating, revealed, locked } = data 267 , responseData = { success: true } 268 269 if (channelId == payload.channel_id) { 270 // @NOTE: we should do some error handling in cases like this right? 271 // - the question id could be invalid 272 // - the types of the params could be invalid 273 // ofc the likelihood is low but it is something which could be exploited 274 275 if (rating) { 276 await db.updateQuestionRating([rating, questionId]) 277 responseData.data = { questionId, rating } 278 } 279 if (revealed) { 280 const timestamp = Date.now() 281 await db.updateQuestionRevealed([timestamp, questionId]) 282 responseData.data = { questionId, revealed: timestamp } 283 } 284 if (locked != undefined) { 285 if (locked) { 286 await db.suspendUser(channelId, userId) 287 } else { 288 const suspendedUsers = await db.getSuspendedUsers(channelId) 289 , suspendedUsersSet = new Set(suspendedUsers.split(" ")) 290 291 suspendedUsersSet.delete(userId) 292 293 const updatedSuspendedUsers = [...suspendedUsersSet].join(" ") 294 await db.updateSuspendedUsers(channelId, updatedSuspendedUsers) 295 } 296 await db.updateQuestionLock(+locked, questionId) 297 responseData.data = { questionId, locked } 298 } 299 } else { 300 responseData.success = false 301 responseData.data = `You are not authorized to update this question!` 302 } 303 304 if (responseData.success) { 305 await cache.invalidateEBSSessionData(channelId, sessionId) 306 } 307 308 res.cork(() => { 309 res.writeStatus(responseData.success ? "200":"403").end(JSON.stringify(responseData), true) 310 }) 311 }) 312 }) 313 ASKEBS.options("/questions/:sessionId", async (res, req) => { 314 res.writeHeader("Allow", "GET") 315 setCORSHeaders(req,res) 316 res.end() 317 return 318 }) 319 ASKEBS.get("/questions/:sessionId", async (res, req) => { 320 setCORSHeaders(req,res) 321 322 if (!req.getHeader("authorization")) { 323 res.writeStatus("401") 324 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 325 res.end() 326 return 327 } 328 329 const token = req.getHeader("authorization").split(" ")[1] 330 , payload = verifyJWT(token, ASK_CLIENT_SECRET) 331 332 if (!payload) { 333 res.writeStatus("401") 334 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 335 res.end() 336 return 337 } 338 339 const sessionId = req.getParameter(0) 340 , responseData = { success: true } 341 342 const questionData = db.getSessionQuestions(sessionId) 343 responseData.data = questionData 344 345 if (questionData[0].channel_id != payload.channel_id) { 346 responseData.success = false 347 responseData.data = `You're not authorized to retrieve these questions!` 348 } 349 350 res.cork(() => { 351 res.writeStatus(responseData.success ? "200":"403").end(JSON.stringify(responseData), true) 352 }) 353 }) 354 ASKEBS.options("/upvotes/:questionId", async (res, req) => { 355 res.writeHeader("Allow", "GET") 356 setCORSHeaders(req,res) 357 res.end() 358 return 359 }) 360 ASKEBS.get("/upvotes/:questionId", async (res, req) => { 361 setCORSHeaders(req,res) 362 363 if (!req.getHeader("authorization")) { 364 res.writeStatus("401") 365 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 366 res.end() 367 return 368 } 369 370 const token = req.getHeader("authorization").split(" ")[1] 371 , payload = verifyJWT(token, ASK_CLIENT_SECRET) 372 373 if (!payload) { 374 res.writeStatus("401") 375 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 376 res.end() 377 return 378 } 379 380 const questionId = req.getParameter(0) 381 , responseData = { success: true } 382 383 const questionUpvotes = await db.getQuestionUpvotes(questionId) 384 responseData.data = { questionId, upvotes: questionUpvotes } 385 386 res.cork(() => { 387 res.writeStatus(responseData.success ? "200":"403").end(JSON.stringify(responseData), true) 388 }) 389 }) 390 ASKEBS.options("/sessions", async (res, req) => { 391 res.writeHeader("Allow", "GET") 392 setCORSHeaders(req,res) 393 res.end() 394 return 395 }) 396 ASKEBS.get("/sessions", async (res, req) => { 397 setCORSHeaders(req,res) 398 res.onAborted(() => { res.aborted = true }) 399 400 if (!req.getHeader("authorization")) { 401 res.writeStatus("401") 402 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 403 res.end() 404 return 405 } 406 407 const token = req.getHeader("authorization").split(" ")[1] 408 , payload = verifyJWT(token, ASK_CLIENT_SECRET) 409 410 if (!payload) { 411 res.writeStatus("401") 412 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 413 res.end() 414 return 415 } 416 417 const sessions = await db.getPreviousSessions(payload.channel_id, 10) 418 419 res.cork(() => { 420 res.end(JSON.stringify({ 421 success: Boolean(sessions), 422 data: { 423 sessions: sessions || `No previous sessions for your channel.` 424 } 425 })) 426 }) 427 }) 428 ASKEBS.options("/session/:id", async (res, req) => { 429 res.writeHeader("Allow", "GET") 430 setCORSHeaders(req,res) 431 res.end() 432 return 433 }) 434 ASKEBS.get("/session/:id", async (res, req) => { 435 setCORSHeaders(req,res) 436 res.onAborted(() => { res.aborted = true }) 437 438 if (!req.getHeader("authorization")) { 439 res.writeStatus("401") 440 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 441 res.end() 442 return 443 } 444 445 const token = req.getHeader("authorization").split(" ")[1] 446 , payload = verifyJWT(token, ASK_CLIENT_SECRET) 447 448 if (!payload) { 449 res.writeStatus("401") 450 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 451 res.end() 452 return 453 } 454 455 const sessionId = req.getParameter(0) 456 , channelId = payload.channel_id 457 , cachedSessionData = await cache.getEBSSessionData(channelId, sessionId) 458 459 let out = null 460 461 if (cachedSessionData) { 462 out = cachedSessionData 463 } else { 464 const questions = await db.getAllQuestions(sessionId) 465 , purchases = await db.getAllPurchases(sessionId) 466 467 out = JSON.stringify({ 468 success: Boolean(questions && purchases), 469 data: { 470 questions: questions || `No previous sessions for your channel.`, 471 purchases: purchases || `No previous sessions for your channel.` 472 } 473 }) 474 475 await cache.setEBSSessionData(channelId, sessionId, out) 476 } 477 478 res.cork(() => { 479 res.end(out) 480 }) 481 }) 482 ASKEBS.options("/overlay", async (res, req) => { 483 res.writeHeader("Allow", "GET") 484 setCORSHeaders(req,res) 485 res.end() 486 return 487 }) 488 ASKEBS.get("/overlay", async (res, req) => { 489 setCORSHeaders(req,res) 490 491 if (!req.getHeader("authorization")) { 492 res.writeStatus("401") 493 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 494 res.end() 495 return 496 } 497 498 const token = req.getHeader("authorization").split(" ")[1] 499 , payload = verifyJWT(token, ASK_CLIENT_SECRET) 500 501 if (!payload) { 502 res.writeStatus("401") 503 res.writeHeader("WWW-Authenticate", "Bearer https://ask.offbeat.space") 504 res.end() 505 return 506 } 507 508 // @TODO: Reject subsequent requests for the same channel, add a cache field indicating 509 // whether channel overlay is open. 510 // - Label the ws connection for the overlay and add a redundant 511 // cache delete call when that connection is closed. 512 513 const overlayApp = await readFile("./overlay/index.html") // @TODO: Make path to overlay app a constant 514 res.cork(() => { 515 res.end(overlayApp) 516 }) 517 }) 518 // @NOTE: This is the default response. Any requests other than 519 // the above endpoints is considered an invalid request 520 ASKEBS.any("/*", (res, req) => { 521 res.writeStatus("400") 522 res.end() 523 }) 524 ASKEBS.listen(PORT, (listenSocket) => { 525 if (listenSocket) { 526 console.log((new Date().toLocaleString() + ` listening on port ${PORT}...`).padEnd(24, " ")) 527 } else { 528 throw new Error(`Failed to bind to this port: ${PORT}`) 529 } 530 })