sample.mjs
1 import * as irc from "./irc.mjs" 2 import * as util from "./util.mjs" 3 import * as scrape from "./scrape.mjs" 4 5 const WEEKDAYS = ["MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN"] 6 7 function makeWeekdayString(arr) { 8 const daysStreamed = new Set(arr.map(d => WEEKDAYS[new Date(d).getDay()])) 9 let dateStr = "" 10 11 for (const day of WEEKDAYS) { dateStr += daysStreamed.has(day) ? `${day},` : "," } 12 if (dateStr) { dateStr = dateStr.substr(0,dateStr.length-1) } 13 14 return dateStr 15 } 16 17 function aggregateAndPrepareSamples(samples) { 18 const agg = util.stackObjects(samples.filter(s => s.duration > 0)) 19 let prepared = false 20 21 if (agg) { 22 prepared = { 23 $channelId: samples[0].channelId, 24 $userName: samples[0].userName, 25 $content: util.getModeFromArray(agg.topic), 26 $activeViewerPercentage: util.averageArray(agg.averageViewers.map((v, i) => agg.chatters[i] / v)), 27 $averageViewers: util.averageArray(agg.averageViewers), 28 $averageSampleLength: util.averageArray(agg.duration), 29 $averageBitsPerSample: util.averageArray(agg.bits), 30 $averageQuestionsPerMinute: util.averageArray(agg.questions.map((q,i) => q / agg.duration[i])), 31 $averageMessagesPerMinute: util.averageArray(agg.messages.map((m, i) => m / agg.duration[i])), 32 $averageChattersPerMinute: util.averageArray(agg.chatters.map((c, i) => c / agg.duration[i])), 33 $weekdays: makeWeekdayString(agg.start), 34 $name: "", 35 $status: 0, 36 } 37 } 38 39 return prepared 40 } 41 42 export function summarizeSample(sample) { 43 return Object.assign({},{ 44 collectionStart: sample.collectionStart, 45 userName: sample.userName, 46 channelId: sample.channelId, 47 viewers: sample.viewers[sample.viewers.length-1], 48 chatters: sample.uniqueChatters.size, 49 messages: sample.totalMessages, 50 questions: sample.validQuestions.length, 51 bits: sample.totalBits, 52 }) 53 } 54 55 function makeNewSample(stream, length, maxViewers) { 56 // stream: { channelId, userName, topic, title, viewers, start } 57 return Object.assign({}, stream, { 58 timeoutMs: length, 59 uniqueChatters: new Set(), 60 validQuestions: new Array(), 61 totalMessages: 0, 62 totalBits: 0, 63 collectionStart: null, 64 collectionEnd: null, 65 viewers: [stream.viewers], 66 targetViewers: maxViewers, 67 inactiveTimeout: null, 68 viewerCountTimeout: null, 69 completionTimeout: null, 70 updateBuffer: { bits: 0, chatters: 0, messages: 0, questions: 0 }, 71 }) 72 } 73 74 export async function MakeSampleCollector(DB, RealTimeUpdates, ActiveSamples, SampleConfig, ChatReader, Browser, twitch) { 75 const SampleCollector = {} 76 77 SampleCollector.beginSample = (stream) => { 78 const { collectingSamples, maxViewers, seenChannels, sampleLength } = SampleConfig 79 80 if (collectingSamples) { 81 let sample = makeNewSample(stream, sampleLength, maxViewers) 82 83 // Define function to call when sample should terminate 84 const resolveSample = (status) => { 85 console.log(`[${status.toUpperCase()}] ${sample.userName} (${sample.channelId})`) 86 const { collectingSamples, maxViewers, seenChannels, sampleLength, maxActiveSamples } = SampleConfig 87 88 sample.collectionEnd = Date.now() 89 90 if (sample.collectionEnd - sample.collectionStart > 60e3*25) { 91 sample.duration = Math.floor((sample.collectionEnd - sample.collectionStart)/60e3) 92 sample.averageViewers = sample.viewers.reduce((acc,vc) => acc+vc) / sample.viewers.length 93 sample.messages = sample.totalMessages 94 sample.questions = sample.validQuestions.length 95 sample.chatters = sample.uniqueChatters.size 96 sample.bits = sample.totalBits 97 98 DB.addSample(sample) 99 100 const allSamplesForChannel = DB.getAllSamplesByChannel(sample.userName) 101 const prepared = aggregateAndPrepareSamples(allSamplesForChannel) 102 103 if ((prepared) && 104 (prepared.$averageViewers * prepared.$activeViewerPercentage >= 30) && 105 (prepared.$activeViewerPercentage >= 0.12) && 106 (prepared.$averageQuestionsPerMinute > 0) && 107 (prepared.$averageBitsPerSample > 0)) 108 { 109 scrape.GetTwitchStreamerInfo(Browser, prepared.$userName) 110 .then(({ email, links, followers }) => { 111 prepared.$email = email 112 prepared.$socials = links 113 prepared.$followers = followers 114 115 DB.addLead(prepared) 116 RealTimeUpdates.buffer({ type: "lead", data: prepared }) 117 }) 118 } 119 } 120 121 RealTimeUpdates.buffer({ type: "resolve", data: { userName: sample.userName, status }}) 122 ChatReader.emitter.emit("end", sample.userName) 123 124 clearTimeout(sample.viewerCountTimeout) 125 clearTimeout(sample.inactiveTimeout) 126 clearTimeout(sample.completionTimeout) 127 128 ActiveSamples.delete(sample.userName) 129 130 sample.viewerCountTimeout = null 131 sample.inactiveTimeout = null 132 sample.completionTimeout = null 133 sample = null 134 135 if (collectingSamples) { 136 twitch.GetStreams(maxViewers, 1, seenChannels) 137 .then(streams => { for (const s of streams) { SampleCollector.beginSample(s) } }) 138 } 139 } 140 141 const messageConsumer = (message) => { 142 if (!sample) return 143 144 const parsed = irc.ParseMessage(message) 145 146 const initialChatters = sample.uniqueChatters.size 147 148 sample.totalBits += parsed.bits 149 sample.uniqueChatters.add(parsed.user) 150 sample.totalMessages++ 151 152 sample.updateBuffer.bits += parsed.bits 153 sample.updateBuffer.chatters += sample.uniqueChatters.size - initialChatters 154 sample.updateBuffer.messages++ 155 156 if (irc.FilterMessage(parsed) && irc.ValidateQuestion(parsed.message)) { 157 sample.validQuestions.push({ 158 channel: sample.userName, 159 channelId: sample.channelId, 160 asker: parsed.user, 161 text: parsed.message, 162 }) 163 sample.updateBuffer.questions++ 164 } 165 166 sample.inactiveTimeout.refresh() 167 } 168 169 sample.viewerCountTimeout = setTimeout(() => { 170 if (!sample) return // don't submit the API request if sample is resolved 171 twitch.HelixGetStreams(sample.userName) 172 .then(([streams, cursor]) => { 173 if (!sample) return // don't attempt to update sample after receiving response is sample is resolved 174 if (streams.length == 0) { 175 resolveSample("offline") 176 return 177 } 178 179 const viewerCount = streams[0].viewers 180 sample.viewers.push(viewerCount) 181 182 if (sample.viewers.length > 10) { 183 const viewerAverage = util.averageArray(sample.viewers) 184 185 if (Math.abs(viewerAverage-sample.targetViewers) > (sample.targetViewers * 0.2)) { 186 resolveSample("threshold") 187 return 188 } 189 } 190 191 const sampleUpdate = Object.assign({}, { 192 userName: sample.userName, 193 viewers: sample.viewers.length > 1 ? sample.viewers.at(-1)-sample.viewers.at(-2) : 0, 194 }, sample.updateBuffer) 195 196 let updateSum = 0 197 for (const [key, value] of Object.entries(sampleUpdate)) { 198 if (key == "userName") continue 199 updateSum += value 200 } 201 202 if (updateSum > 0) { 203 RealTimeUpdates.buffer({ type: "sample", data: sampleUpdate }) 204 sample.updateBuffer = { bits: 0, chatters: 0, messages: 0, questions: 0 } 205 } 206 207 sample.viewerCountTimeout.refresh() 208 }) 209 .catch(console.error) 210 }, 30e3) 211 212 sample.inactiveTimeout = setTimeout(() => { resolveSample("inactive") }, 60e3*5) 213 sample.completionTimeout = setTimeout(() => { resolveSample("complete") }, sample.timeoutMs) 214 215 sample.collectionStart = Date.now() 216 ActiveSamples.set(sample.userName, sample) 217 RealTimeUpdates.buffer({ type: "begin", data: { sample: summarizeSample(sample) }}) 218 ChatReader.emitter.emit("start", sample.userName, messageConsumer) 219 220 console.log(`[SAMPLE] ${sample.userName.padStart(28, " ")}`) 221 } 222 } 223 224 SampleCollector.beginSampling = async () => { 225 SampleConfig.collectingSamples = true 226 const { maxViewers, maxActiveSamples, seenChannels } = SampleConfig 227 const streams = await twitch.GetStreams(maxViewers, maxActiveSamples, seenChannels) 228 for (const stream of streams) { 229 await util.sleep(7e3) 230 SampleCollector.beginSample(stream) 231 } 232 } 233 234 return SampleCollector 235 } 236 237 function reportSampleProgress(sample) { 238 const totalChatters = `${sample.uniqueChatters.size}`.padEnd(8, " ") 239 , totalQuestions = `${sample.validQuestions.length}`.padEnd(8, " ") 240 , totalMessages = `${sample.totalMessages}`.padEnd(8, " ") 241 , totalBits = `${sample.totalBits}`.padEnd(8, " ") 242 , totalViewers = `${sample.viewers[sample.viewers.length-1]}`.padEnd(8, " ") 243 244 let logLine = `\x1b[33m${sample.userName}\x1b[0m `.padStart(31) 245 246 logLine += `\x1b[36m${totalViewers}\x1b[0m ` 247 logLine += `\x1b[31m${totalChatters}\x1b[0m ` 248 logLine += `\x1b[93m${totalQuestions}\x1b[0m ` 249 logLine += `\x1b[32m${totalMessages}\x1b[0m ` 250 logLine += `\x1b[95m${totalBits}\x1b[0m` 251 252 console.log(logLine) 253 }