updates.mjs
1 const MAX_UPDATES = 20 2 3 function aggregateSampleStats(target, newData) { 4 target.viewers += newData.viewers 5 target.bits += newData.bits 6 target.chatters += newData.chatters 7 target.messages += newData.messages 8 target.questions += newData.questions 9 } 10 11 function makeUpdateMessage(begin, resolve, sample, live, lead) { 12 const message = { type: "UPDATE" } 13 14 message.data = { 15 lead: Object.fromEntries(lead), 16 sample: Object.fromEntries(sample), 17 live: Array.from(live.entries()), 18 resolve: Object.fromEntries(resolve), 19 begin: Object.fromEntries(begin) 20 } 21 22 return JSON.stringify(message) 23 } 24 25 export function MakeRealTimeUpdates(ws) { 26 const scheduledUpdates = new Map() 27 const sample = new Map() 28 const begin = new Map() 29 const live = new Map() 30 const lead = new Map() 31 const resolve = new Map() 32 33 let updateCount = 0 34 35 function buffer({ type, data }) { 36 switch (type) { 37 case "lead": { 38 lead.set(data.$userName, data) 39 break 40 } 41 case "begin": { 42 const { sample } = data 43 begin.set(sample.userName, sample) 44 break 45 } 46 case "resolve": { 47 const { userName, status } = data 48 resolve.set(userName, status) 49 break 50 } 51 case "sample": { 52 const { userName } = data 53 if (!sample.has(userName)) { 54 sample.set(userName, data) 55 } else { 56 aggregateSampleStats(sample.get(userName), data) 57 } 58 break 59 } 60 case "live": { 61 live.clear() 62 for (const [userName, channelId] of data.entries()) { 63 live.set(userName, channelId) 64 } 65 break 66 } 67 default: {} 68 } 69 70 if (++updateCount > MAX_UPDATES) { 71 ws.publish("updates", makeUpdateMessage(begin, resolve, sample, live, lead)) 72 73 resolve.clear() 74 sample.clear() 75 begin.clear() 76 lead.clear() 77 78 updateCount = 0 79 } 80 } 81 82 return { 83 buffer, 84 async schedule(type, interval, dataFn) { 85 const cb = async () => { 86 const data = await dataFn() 87 buffer({ type, data }) 88 } 89 await cb() // perform scheduled update immediately 90 scheduledUpdates.set(type, setInterval(async () => { await cb() }, interval)) 91 }, 92 unschedule(id) { 93 clearInterval(scheduledUpdates.get(id)) 94 scheduledUpdates.delete(id) 95 } 96 } 97 }