/ server / cron / summarize.ts
summarize.ts
  1  import { defineCronHandler } from "#nuxt/cron";
  2  import { PrismaClient } from "@prisma/client";
  3  import { processSummariesByDate } from "~~/server/utils/summarize";
  4  import { handleLog } from "../utils/logging";
  5  
  6  const prisma = new PrismaClient({
  7    log: ['warn', 'error'],
  8  });
  9  
 10  export default defineCronHandler(
 11    "daily",
 12    async () => {
 13      try {
 14        const now = new Date();
 15        now.setHours(0, 0, 0, 0);
 16        const nowTimestamp = BigInt(now.getTime());
 17  
 18        const BATCH_SIZE = 5000;
 19        let processedCount = 0;
 20        let hasMore = true;
 21        
 22        while (hasMore) {
 23          const heartbeatsToSummarize = await prisma.heartbeats.findMany({
 24            where: {
 25              timestamp: { lt: nowTimestamp },
 26              summariesId: null,
 27            },
 28            orderBy: {
 29              timestamp: "asc",
 30            },
 31            include: {
 32              user: {
 33                select: {
 34                  keystrokeTimeout: true,
 35                },
 36              },
 37            },
 38            take: BATCH_SIZE,
 39          });
 40          
 41          if (heartbeatsToSummarize.length === 0) {
 42            hasMore = false;
 43            break;
 44          }
 45          
 46          processedCount += heartbeatsToSummarize.length;
 47  
 48          const userHeartbeats: Record<
 49            string,
 50            Array<(typeof heartbeatsToSummarize)[0]>
 51          > = {};
 52  
 53          heartbeatsToSummarize.forEach((heartbeat) => {
 54            const userId = heartbeat.userId;
 55  
 56            if (!userHeartbeats[userId]) {
 57              userHeartbeats[userId] = [];
 58            }
 59  
 60            userHeartbeats[userId].push(heartbeat);
 61          });
 62  
 63          for (const userId in userHeartbeats) {
 64            await processSummariesByDate(userId, userHeartbeats[userId]);
 65          }
 66          
 67          if (heartbeatsToSummarize.length < BATCH_SIZE) {
 68            hasMore = false;
 69          }
 70        }
 71  
 72        await generatePublicStats(now);
 73  
 74        handleLog(
 75          `Summarization complete. Processed ${processedCount} heartbeats.`
 76        );
 77      } catch (error) {
 78        console.error("Error in summarization cron job", error);
 79      }
 80    },
 81    {
 82      timeZone: "UTC",
 83      runOnInit: true,
 84    }
 85  );
 86  
 87  async function generatePublicStats(date: Date) {
 88    try {
 89      const statsDate = new Date(date);
 90      statsDate.setHours(0, 0, 0, 0);
 91  
 92      const existingStats = await prisma.stats.findUnique({
 93        where: { date: statsDate },
 94      });
 95  
 96      if (existingStats) {
 97        return;
 98      }
 99  
100      const totalUsers = await prisma.user.count();
101      const totalHeartbeats = await prisma.heartbeats.count();
102  
103      const summariesAggregate = await prisma.summaries.aggregate({
104        _sum: {
105          totalMinutes: true,
106        },
107      });
108  
109      const totalHours = Math.floor(
110        Number(summariesAggregate._sum.totalMinutes || 0) / 60
111      );
112  
113      const topEditorResult = await prisma.heartbeats.groupBy({
114        by: ["editor"],
115        _count: {
116          editor: true,
117        },
118        where: {
119          editor: {
120            not: null,
121          },
122        },
123        orderBy: {
124          _count: {
125            editor: "desc",
126          },
127        },
128        take: 1,
129      });
130  
131      const topLanguageResult = await prisma.heartbeats.groupBy({
132        by: ["language"],
133        _count: {
134          language: true,
135        },
136        where: {
137          language: {
138            not: null,
139          },
140        },
141        orderBy: {
142          _count: {
143            language: "desc",
144          },
145        },
146        take: 1,
147      });
148  
149      const topOSResult = await prisma.heartbeats.groupBy({
150        by: ["os"],
151        _count: {
152          os: true,
153        },
154        where: {
155          os: {
156            not: null,
157          },
158        },
159        orderBy: {
160          _count: {
161            os: "desc",
162          },
163        },
164        take: 1,
165      });
166  
167      const topEditor = topEditorResult[0]?.editor || "Unknown";
168      const topLanguage = topLanguageResult[0]?.language || "Unknown";
169      const topOS = topOSResult[0]?.os || "Unknown";
170  
171      await prisma.stats.create({
172        data: {
173          date: statsDate,
174          totalHours,
175          totalUsers: BigInt(totalUsers),
176          totalHeartbeats,
177          topEditor,
178          topLanguage,
179          topOS,
180        },
181      });
182  
183      handleLog(
184        `Generated public stats for ${statsDate.toISOString().split("T")[0]}`
185      );
186    } catch (error) {
187      console.error("Error generating public stats:", error);
188    }
189  }