/ src / pipeline / steps / download.ts
download.ts
  1  /**
  2   * Pipeline step: download — file download with concurrency and progress.
  3   *
  4   * Supports:
  5   * - Direct HTTP downloads (images, documents)
  6   * - yt-dlp integration for video platforms
  7   * - Browser cookie forwarding for authenticated downloads
  8   * - Filename templating and deduplication
  9   */
 10  
 11  import * as fs from 'node:fs';
 12  import * as path from 'node:path';
 13  import * as os from 'node:os';
 14  import type { IPage } from '../../types.js';
 15  import { render } from '../template.js';
 16  import { getErrorMessage } from '../../errors.js';
 17  import {
 18    httpDownload,
 19    ytdlpDownload,
 20    saveDocument,
 21    detectContentType,
 22    requiresYtdlp,
 23    sanitizeFilename,
 24    generateFilename,
 25    exportCookiesToNetscape,
 26    getTempDir,
 27    formatCookieHeader,
 28  } from '../../download/index.js';
 29  import { DownloadProgressTracker, formatBytes } from '../../download/progress.js';
 30  import { mapConcurrent } from '../../utils.js';
 31  
 32  export interface DownloadResult {
 33    status: 'success' | 'skipped' | 'failed';
 34    path?: string;
 35    size?: number;
 36    error?: string;
 37    duration?: number;
 38  }
 39  
 40  
 41  
 42  /**
 43   * Extract cookies from browser page.
 44   */
 45  async function extractBrowserCookies(page: IPage, domain: string): Promise<string> {
 46    try {
 47      const cookies = await page.getCookies({ domain });
 48      return formatCookieHeader(cookies);
 49    } catch {
 50      return '';
 51    }
 52  }
 53  
 54  /**
 55   * Extract cookies as array for yt-dlp Netscape format.
 56   */
 57  async function extractCookiesArray(
 58    page: IPage,
 59    domain: string,
 60  ): Promise<Array<{ name: string; value: string; domain: string; path: string; secure: boolean; httpOnly: boolean }>> {
 61    try {
 62      const cookies = await page.getCookies({ domain });
 63      return cookies
 64        .filter((cookie) => cookie.name)
 65        .map((cookie) => ({
 66          name: cookie.name,
 67          value: cookie.value,
 68          domain: cookie.domain,
 69          path: cookie.path ?? '/',
 70          secure: cookie.secure ?? false,
 71          httpOnly: cookie.httpOnly ?? false,
 72        }));
 73    } catch {
 74      return [];
 75    }
 76  }
 77  
 78  function dedupeCookies(
 79    cookies: Array<{ name: string; value: string; domain: string; path: string; secure: boolean; httpOnly: boolean }>,
 80  ): Array<{ name: string; value: string; domain: string; path: string; secure: boolean; httpOnly: boolean }> {
 81    const deduped = new Map<string, { name: string; value: string; domain: string; path: string; secure: boolean; httpOnly: boolean }>();
 82    for (const cookie of cookies) {
 83      deduped.set(`${cookie.domain}\t${cookie.path}\t${cookie.name}`, cookie);
 84    }
 85    return [...deduped.values()];
 86  }
 87  
 88  /**
 89   * Download step handler for YAML pipelines.
 90   *
 91   * Usage in YAML:
 92   * ```yaml
 93   * pipeline:
 94   *   - download:
 95   *       url: ${{ item.imageUrl }}
 96   *       dir: ./downloads
 97   *       filename: ${{ item.title }}.jpg
 98   *       concurrency: 5
 99   *       skip_existing: true
100   *       use_ytdlp: false
101   *       type: auto
102   * ```
103   */
104  interface DownloadParams {
105    url?: string;
106    dir?: string;
107    filename?: string;
108    concurrency?: number;
109    skip_existing?: boolean;
110    timeout?: number;
111    use_ytdlp?: boolean;
112    ytdlp_args?: unknown;
113    type?: string;
114    progress?: boolean;
115    content?: string;
116    metadata?: Record<string, unknown>;
117  }
118  
119  export async function stepDownload(
120    page: IPage | null,
121    params: unknown,
122    data: unknown,
123    args: Record<string, unknown>,
124  ): Promise<unknown> {
125    // Parse parameters with defaults
126    const p: DownloadParams =
127      typeof params === 'object' && params !== null ? (params as DownloadParams) : {};
128    const urlTemplate = typeof params === 'string' ? params : (p.url ?? '');
129    const dirTemplate = p.dir ?? './downloads';
130    const filenameTemplate = p.filename ?? '';
131    const concurrency = typeof p.concurrency === 'number' ? p.concurrency : 3;
132    const skipExisting = p.skip_existing !== false;
133    const timeout = typeof p.timeout === 'number' ? p.timeout * 1000 : 30000;
134    const useYtdlp = p.use_ytdlp ?? false;
135    const ytdlpArgs: string[] = Array.isArray(p.ytdlp_args)
136      ? p.ytdlp_args.map((v) => String(v))
137      : [];
138    const contentType = p.type ?? 'auto';
139    const showProgress = p.progress !== false;
140    const contentTemplate = p.content;
141    const metadataTemplate = p.metadata;
142  
143    // Resolve output directory
144    const dir = String(render(dirTemplate, { args, data }));
145    fs.mkdirSync(dir, { recursive: true });
146  
147    // Normalize data to array. Items are row records (string-keyed) produced by
148    // upstream steps; we treat them as Record<string, unknown> and narrow per-use.
149    const items: Array<Record<string, unknown>> =
150      Array.isArray(data) ? (data as Array<Record<string, unknown>>)
151      : data ? [data as Record<string, unknown>]
152      : [];
153    if (items.length === 0) {
154      return [];
155    }
156  
157    // Create progress tracker
158    const tracker = new DownloadProgressTracker(items.length, showProgress);
159  
160    // Cache cookie lookups per domain so mixed-domain batches stay isolated without repeated browser calls.
161    const cookieHeaderCache = new Map<string, Promise<string>>();
162    let cookiesFile: string | undefined;
163  
164    if (page) {
165      // For yt-dlp, we need to export cookies to Netscape format
166      if (useYtdlp || items.some((item, index) => {
167        const url = String(render(urlTemplate, { args, data, item, index }));
168        return requiresYtdlp(url);
169      })) {
170        try {
171          const ytdlpDomains = [...new Set(items.flatMap((item, index) => {
172            const url = String(render(urlTemplate, { args, data, item, index }));
173            if (!useYtdlp && !requiresYtdlp(url)) return [];
174            try {
175              return [new URL(url).hostname];
176            } catch {
177              return [];
178            }
179          }))];
180          const cookiesArray = dedupeCookies(
181            (await Promise.all(ytdlpDomains.map((domain) => extractCookiesArray(page, domain)))).flat(),
182          );
183  
184          if (cookiesArray.length > 0) {
185            const tempDir = getTempDir();
186            fs.mkdirSync(tempDir, { recursive: true });
187            cookiesFile = path.join(tempDir, `cookies_${Date.now()}.txt`);
188            exportCookiesToNetscape(cookiesArray, cookiesFile);
189          }
190        } catch {
191          // Ignore cookie extraction errors
192        }
193      }
194    }
195  
196    // Process downloads with concurrency
197    type DownloadedItem = Record<string, unknown> & { _download: DownloadResult };
198    const results = await mapConcurrent(items, concurrency, async (item, index): Promise<DownloadedItem> => {
199      const startTime = Date.now();
200  
201      // Render URL
202      const url = String(render(urlTemplate, { args, data, item, index }));
203      if (!url) {
204        tracker.onFileComplete(false);
205        return {
206          ...item,
207          _download: { status: 'failed', error: 'Empty URL' } as DownloadResult,
208        };
209      }
210  
211      // Render filename
212      let filename: string;
213      if (filenameTemplate) {
214        filename = String(render(filenameTemplate, { args, data, item, index }));
215      } else {
216        filename = generateFilename(url, index);
217      }
218      filename = sanitizeFilename(filename);
219  
220      const destPath = path.join(dir, filename);
221  
222      // Check if file exists and skip_existing is true
223      if (skipExisting && fs.existsSync(destPath)) {
224        tracker.onFileComplete(true, true);
225        return {
226          ...item,
227          _download: {
228            status: 'skipped',
229            path: destPath,
230            size: fs.statSync(destPath).size,
231          } as DownloadResult,
232        };
233      }
234  
235      // Create progress bar for this file
236      const progressBar = tracker.onFileStart(filename, index);
237  
238      // Determine download method
239      const detectedType = contentType === 'auto' ? detectContentType(url) : contentType;
240      const shouldUseYtdlp = useYtdlp || (detectedType === 'video' && requiresYtdlp(url));
241  
242      let result: { success: boolean; size: number; error?: string };
243  
244      try {
245        if (detectedType === 'document' && contentTemplate) {
246          // Save extracted content as document
247          const content = String(render(contentTemplate, { args, data, item, index }));
248          const metadata = metadataTemplate
249            ? Object.fromEntries(
250                Object.entries(metadataTemplate).map(([k, v]) => [k, render(v, { args, data, item, index })]),
251              )
252            : undefined;
253  
254          const ext = path.extname(filename).toLowerCase();
255          const format = ext === '.json' ? 'json' : ext === '.html' ? 'html' : 'markdown';
256          result = await saveDocument(content, destPath, format, metadata);
257  
258          if (progressBar) {
259            progressBar.complete(result.success, result.success ? formatBytes(result.size) : undefined);
260          }
261        } else if (shouldUseYtdlp) {
262          // Use yt-dlp for video downloads
263          result = await ytdlpDownload(url, destPath, {
264            cookiesFile,
265            extraArgs: ytdlpArgs,
266            onProgress: (percent) => {
267              if (progressBar) {
268                progressBar.update(percent, 100);
269              }
270            },
271          });
272  
273          if (progressBar) {
274            progressBar.complete(result.success, result.success ? formatBytes(result.size) : undefined);
275          }
276        } else {
277          // Direct HTTP download
278          let cookies = '';
279          if (page) {
280            try {
281              const targetDomain = new URL(url).hostname;
282              let cookiePromise = cookieHeaderCache.get(targetDomain);
283              if (!cookiePromise) {
284                cookiePromise = extractBrowserCookies(page, targetDomain);
285                cookieHeaderCache.set(targetDomain, cookiePromise);
286              }
287              cookies = await cookiePromise;
288            } catch {
289              cookies = '';
290            }
291          }
292  
293          result = await httpDownload(url, destPath, {
294            cookies,
295            timeout,
296            onProgress: (received, total) => {
297              if (progressBar) {
298                progressBar.update(received, total);
299              }
300            },
301          });
302  
303          if (progressBar) {
304            progressBar.complete(result.success, result.success ? formatBytes(result.size) : undefined);
305          }
306        }
307      } catch (err) {
308        const msg = getErrorMessage(err);
309        result = { success: false, size: 0, error: msg };
310        if (progressBar) {
311          progressBar.fail(msg);
312        }
313      }
314  
315      tracker.onFileComplete(result.success);
316  
317      const duration = Date.now() - startTime;
318  
319      return {
320        ...item,
321        _download: {
322          status: result.success ? 'success' : 'failed',
323          path: result.success ? destPath : undefined,
324          size: result.size,
325          error: result.error,
326          duration,
327        } as DownloadResult,
328      };
329    });
330  
331    // Cleanup temp cookie file
332    if (cookiesFile && fs.existsSync(cookiesFile)) {
333      try {
334        fs.unlinkSync(cookiesFile);
335      } catch {
336        // Ignore cleanup errors
337      }
338    }
339  
340    // Show summary
341    tracker.finish();
342  
343    return results;
344  }