/ support / ebsSupport / sample.mjs
sample.mjs
  1  import * as irc from "./irc.mjs"
  2  import * as util from "./util.mjs"
  3  import * as scrape from "./scrape.mjs"
  4  
  5  const WEEKDAYS = ["MON", "TUE", "WED", "THU", "FRI", "SAT", "SUN"]
  6  
  7  function makeWeekdayString(arr) {
  8    const daysStreamed = new Set(arr.map(d => WEEKDAYS[new Date(d).getDay()]))
  9    let dateStr = ""
 10  
 11    for (const day of WEEKDAYS) { dateStr += daysStreamed.has(day) ? `${day},` : "," }
 12    if (dateStr) { dateStr = dateStr.substr(0,dateStr.length-1) }
 13  
 14    return dateStr
 15  }
 16  
 17  function aggregateAndPrepareSamples(samples) {
 18    const agg = util.stackObjects(samples.filter(s => s.duration > 0))
 19    let prepared = false
 20  
 21    if (agg) {
 22      prepared = {
 23        $channelId: samples[0].channelId, 
 24        $userName: samples[0].userName,
 25        $content: util.getModeFromArray(agg.topic),
 26        $activeViewerPercentage: util.averageArray(agg.averageViewers.map((v, i) => agg.chatters[i] / v)),
 27        $averageViewers: util.averageArray(agg.averageViewers),
 28        $averageSampleLength: util.averageArray(agg.duration),
 29        $averageBitsPerSample: util.averageArray(agg.bits),
 30        $averageQuestionsPerMinute: util.averageArray(agg.questions.map((q,i) => q / agg.duration[i])),
 31        $averageMessagesPerMinute: util.averageArray(agg.messages.map((m, i) => m / agg.duration[i])),
 32        $averageChattersPerMinute: util.averageArray(agg.chatters.map((c, i) => c / agg.duration[i])),
 33        $weekdays: makeWeekdayString(agg.start),
 34        $name: "",
 35        $status: 0,
 36      } 
 37    }
 38  
 39    return prepared
 40  }
 41  
 42  export function summarizeSample(sample) {
 43    return Object.assign({},{
 44      collectionStart: sample.collectionStart,
 45      userName: sample.userName,
 46      channelId: sample.channelId,
 47      viewers: sample.viewers[sample.viewers.length-1],
 48      chatters: sample.uniqueChatters.size,
 49      messages: sample.totalMessages,
 50      questions: sample.validQuestions.length,
 51      bits: sample.totalBits,
 52    })
 53  }
 54  
 55  function makeNewSample(stream, length, maxViewers) {
 56    // stream: { channelId, userName, topic, title, viewers, start }
 57    return Object.assign({}, stream, {
 58      timeoutMs: length,
 59      uniqueChatters: new Set(),
 60      validQuestions: new Array(),
 61      totalMessages: 0,
 62      totalBits: 0,
 63      collectionStart: null,
 64      collectionEnd: null,
 65      viewers: [stream.viewers],
 66      targetViewers: maxViewers,
 67      inactiveTimeout: null,
 68      viewerCountTimeout: null,
 69      completionTimeout: null,
 70      updateBuffer: { bits: 0, chatters: 0, messages: 0, questions: 0 },
 71    })
 72  }
 73  
 74  export async function MakeSampleCollector(DB, RealTimeUpdates, ActiveSamples, SampleConfig, ChatReader, Browser, twitch) {
 75    const SampleCollector = {}
 76  
 77    SampleCollector.beginSample = (stream) => {
 78      const { collectingSamples, maxViewers, seenChannels, sampleLength } = SampleConfig
 79  
 80      if (collectingSamples) {
 81        let sample = makeNewSample(stream, sampleLength, maxViewers)
 82  
 83        // Define function to call when sample should terminate
 84        const resolveSample = (status) => {
 85          console.log(`[${status.toUpperCase()}] ${sample.userName} (${sample.channelId})`)
 86          const { collectingSamples, maxViewers, seenChannels, sampleLength, maxActiveSamples } = SampleConfig
 87  
 88          sample.collectionEnd = Date.now()
 89  
 90          if (sample.collectionEnd - sample.collectionStart > 60e3*25) {
 91            sample.duration = Math.floor((sample.collectionEnd - sample.collectionStart)/60e3)
 92            sample.averageViewers = sample.viewers.reduce((acc,vc) => acc+vc) / sample.viewers.length
 93            sample.messages = sample.totalMessages
 94            sample.questions = sample.validQuestions.length
 95            sample.chatters = sample.uniqueChatters.size
 96            sample.bits = sample.totalBits
 97  
 98            DB.addSample(sample)
 99  
100            const allSamplesForChannel = DB.getAllSamplesByChannel(sample.userName)
101            const prepared = aggregateAndPrepareSamples(allSamplesForChannel)
102            
103            if ((prepared) &&
104                (prepared.$averageViewers * prepared.$activeViewerPercentage >= 30) &&
105                (prepared.$activeViewerPercentage >= 0.12) &&
106                (prepared.$averageQuestionsPerMinute > 0) && 
107                (prepared.$averageBitsPerSample > 0))
108            {
109              scrape.GetTwitchStreamerInfo(Browser, prepared.$userName)
110              .then(({ email, links, followers }) => {
111                prepared.$email = email
112                prepared.$socials = links
113                prepared.$followers = followers
114  
115                DB.addLead(prepared)
116                RealTimeUpdates.buffer({ type: "lead", data: prepared })
117              })
118            }
119          }
120  
121          RealTimeUpdates.buffer({ type: "resolve", data: { userName: sample.userName, status }})
122          ChatReader.emitter.emit("end", sample.userName)
123  
124          clearTimeout(sample.viewerCountTimeout)
125          clearTimeout(sample.inactiveTimeout)
126          clearTimeout(sample.completionTimeout)
127          
128          ActiveSamples.delete(sample.userName)
129  
130          sample.viewerCountTimeout = null
131          sample.inactiveTimeout = null
132          sample.completionTimeout = null
133          sample = null
134  
135          if (collectingSamples) {
136            twitch.GetStreams(maxViewers, 1, seenChannels)
137              .then(streams => { for (const s of streams) { SampleCollector.beginSample(s) } })
138          }
139        }
140  
141        const messageConsumer = (message) => {
142          if (!sample) return
143  
144          const parsed = irc.ParseMessage(message)
145  
146          const initialChatters = sample.uniqueChatters.size
147  
148          sample.totalBits += parsed.bits
149          sample.uniqueChatters.add(parsed.user)
150          sample.totalMessages++
151  
152          sample.updateBuffer.bits += parsed.bits
153          sample.updateBuffer.chatters += sample.uniqueChatters.size - initialChatters
154          sample.updateBuffer.messages++
155  
156          if (irc.FilterMessage(parsed) && irc.ValidateQuestion(parsed.message)) {
157            sample.validQuestions.push({
158              channel: sample.userName,
159              channelId: sample.channelId,
160              asker: parsed.user,
161              text: parsed.message,
162            })
163            sample.updateBuffer.questions++ 
164          }
165  
166          sample.inactiveTimeout.refresh()
167        }
168        
169        sample.viewerCountTimeout = setTimeout(() => {
170          if (!sample) return // don't submit the API request if sample is resolved
171          twitch.HelixGetStreams(sample.userName)
172            .then(([streams, cursor]) => {
173              if (!sample) return // don't attempt to update sample after receiving response is sample is resolved
174              if (streams.length == 0) {
175                resolveSample("offline")
176                return
177              }
178  
179              const viewerCount = streams[0].viewers
180              sample.viewers.push(viewerCount)
181  
182              if (sample.viewers.length > 10) {
183                const viewerAverage = util.averageArray(sample.viewers)
184  
185                if (Math.abs(viewerAverage-sample.targetViewers) > (sample.targetViewers * 0.2)) {
186                  resolveSample("threshold")
187                  return
188                }
189              }
190  
191              const sampleUpdate = Object.assign({}, {
192                userName: sample.userName,
193                viewers: sample.viewers.length > 1 ? sample.viewers.at(-1)-sample.viewers.at(-2) : 0, 
194              }, sample.updateBuffer)
195  
196              let updateSum = 0
197              for (const [key, value] of Object.entries(sampleUpdate)) {
198                if (key == "userName") continue
199                updateSum += value
200              }
201  
202              if (updateSum > 0) {
203                RealTimeUpdates.buffer({ type: "sample", data: sampleUpdate })
204                sample.updateBuffer = { bits: 0, chatters: 0, messages: 0, questions: 0 }
205              }
206  
207              sample.viewerCountTimeout.refresh()
208            })
209           .catch(console.error) 
210        }, 30e3)
211  
212        sample.inactiveTimeout   = setTimeout(() => { resolveSample("inactive") }, 60e3*5) 
213        sample.completionTimeout = setTimeout(() => { resolveSample("complete") }, sample.timeoutMs)
214  
215        sample.collectionStart = Date.now()
216        ActiveSamples.set(sample.userName, sample)
217        RealTimeUpdates.buffer({ type: "begin", data: { sample: summarizeSample(sample) }})
218        ChatReader.emitter.emit("start", sample.userName, messageConsumer)
219  
220        console.log(`[SAMPLE] ${sample.userName.padStart(28, " ")}`)
221      }
222    }
223  
224    SampleCollector.beginSampling = async () => {
225      SampleConfig.collectingSamples = true
226      const { maxViewers, maxActiveSamples, seenChannels } = SampleConfig
227      const streams = await twitch.GetStreams(maxViewers, maxActiveSamples, seenChannels)
228      for (const stream of streams) {
229        await util.sleep(7e3)
230        SampleCollector.beginSample(stream)
231      }
232    }
233   
234    return SampleCollector
235  }
236  
237  function reportSampleProgress(sample) {
238    const totalChatters = `${sample.uniqueChatters.size}`.padEnd(8, " ")
239    ,    totalQuestions = `${sample.validQuestions.length}`.padEnd(8, " ")
240    ,     totalMessages = `${sample.totalMessages}`.padEnd(8, " ")
241    ,         totalBits = `${sample.totalBits}`.padEnd(8, " ")
242    ,      totalViewers = `${sample.viewers[sample.viewers.length-1]}`.padEnd(8, " ")
243  
244    let logLine = `\x1b[33m${sample.userName}\x1b[0m `.padStart(31)
245  
246    logLine += `\x1b[36m${totalViewers}\x1b[0m `
247    logLine += `\x1b[31m${totalChatters}\x1b[0m `
248    logLine += `\x1b[93m${totalQuestions}\x1b[0m `
249    logLine += `\x1b[32m${totalMessages}\x1b[0m `
250    logLine += `\x1b[95m${totalBits}\x1b[0m`
251  
252    console.log(logLine)
253  }