download.ts
1 /** 2 * Pipeline step: download — file download with concurrency and progress. 3 * 4 * Supports: 5 * - Direct HTTP downloads (images, documents) 6 * - yt-dlp integration for video platforms 7 * - Browser cookie forwarding for authenticated downloads 8 * - Filename templating and deduplication 9 */ 10 11 import * as fs from 'node:fs'; 12 import * as path from 'node:path'; 13 import * as os from 'node:os'; 14 import type { IPage } from '../../types.js'; 15 import { render } from '../template.js'; 16 import { getErrorMessage } from '../../errors.js'; 17 import { 18 httpDownload, 19 ytdlpDownload, 20 saveDocument, 21 detectContentType, 22 requiresYtdlp, 23 sanitizeFilename, 24 generateFilename, 25 exportCookiesToNetscape, 26 getTempDir, 27 formatCookieHeader, 28 } from '../../download/index.js'; 29 import { DownloadProgressTracker, formatBytes } from '../../download/progress.js'; 30 import { mapConcurrent } from '../../utils.js'; 31 32 export interface DownloadResult { 33 status: 'success' | 'skipped' | 'failed'; 34 path?: string; 35 size?: number; 36 error?: string; 37 duration?: number; 38 } 39 40 41 42 /** 43 * Extract cookies from browser page. 44 */ 45 async function extractBrowserCookies(page: IPage, domain: string): Promise<string> { 46 try { 47 const cookies = await page.getCookies({ domain }); 48 return formatCookieHeader(cookies); 49 } catch { 50 return ''; 51 } 52 } 53 54 /** 55 * Extract cookies as array for yt-dlp Netscape format. 56 */ 57 async function extractCookiesArray( 58 page: IPage, 59 domain: string, 60 ): Promise<Array<{ name: string; value: string; domain: string; path: string; secure: boolean; httpOnly: boolean }>> { 61 try { 62 const cookies = await page.getCookies({ domain }); 63 return cookies 64 .filter((cookie) => cookie.name) 65 .map((cookie) => ({ 66 name: cookie.name, 67 value: cookie.value, 68 domain: cookie.domain, 69 path: cookie.path ?? '/', 70 secure: cookie.secure ?? false, 71 httpOnly: cookie.httpOnly ?? false, 72 })); 73 } catch { 74 return []; 75 } 76 } 77 78 function dedupeCookies( 79 cookies: Array<{ name: string; value: string; domain: string; path: string; secure: boolean; httpOnly: boolean }>, 80 ): Array<{ name: string; value: string; domain: string; path: string; secure: boolean; httpOnly: boolean }> { 81 const deduped = new Map<string, { name: string; value: string; domain: string; path: string; secure: boolean; httpOnly: boolean }>(); 82 for (const cookie of cookies) { 83 deduped.set(`${cookie.domain}\t${cookie.path}\t${cookie.name}`, cookie); 84 } 85 return [...deduped.values()]; 86 } 87 88 /** 89 * Download step handler for YAML pipelines. 90 * 91 * Usage in YAML: 92 * ```yaml 93 * pipeline: 94 * - download: 95 * url: ${{ item.imageUrl }} 96 * dir: ./downloads 97 * filename: ${{ item.title }}.jpg 98 * concurrency: 5 99 * skip_existing: true 100 * use_ytdlp: false 101 * type: auto 102 * ``` 103 */ 104 interface DownloadParams { 105 url?: string; 106 dir?: string; 107 filename?: string; 108 concurrency?: number; 109 skip_existing?: boolean; 110 timeout?: number; 111 use_ytdlp?: boolean; 112 ytdlp_args?: unknown; 113 type?: string; 114 progress?: boolean; 115 content?: string; 116 metadata?: Record<string, unknown>; 117 } 118 119 export async function stepDownload( 120 page: IPage | null, 121 params: unknown, 122 data: unknown, 123 args: Record<string, unknown>, 124 ): Promise<unknown> { 125 // Parse parameters with defaults 126 const p: DownloadParams = 127 typeof params === 'object' && params !== null ? (params as DownloadParams) : {}; 128 const urlTemplate = typeof params === 'string' ? params : (p.url ?? ''); 129 const dirTemplate = p.dir ?? './downloads'; 130 const filenameTemplate = p.filename ?? ''; 131 const concurrency = typeof p.concurrency === 'number' ? p.concurrency : 3; 132 const skipExisting = p.skip_existing !== false; 133 const timeout = typeof p.timeout === 'number' ? p.timeout * 1000 : 30000; 134 const useYtdlp = p.use_ytdlp ?? false; 135 const ytdlpArgs: string[] = Array.isArray(p.ytdlp_args) 136 ? p.ytdlp_args.map((v) => String(v)) 137 : []; 138 const contentType = p.type ?? 'auto'; 139 const showProgress = p.progress !== false; 140 const contentTemplate = p.content; 141 const metadataTemplate = p.metadata; 142 143 // Resolve output directory 144 const dir = String(render(dirTemplate, { args, data })); 145 fs.mkdirSync(dir, { recursive: true }); 146 147 // Normalize data to array. Items are row records (string-keyed) produced by 148 // upstream steps; we treat them as Record<string, unknown> and narrow per-use. 149 const items: Array<Record<string, unknown>> = 150 Array.isArray(data) ? (data as Array<Record<string, unknown>>) 151 : data ? [data as Record<string, unknown>] 152 : []; 153 if (items.length === 0) { 154 return []; 155 } 156 157 // Create progress tracker 158 const tracker = new DownloadProgressTracker(items.length, showProgress); 159 160 // Cache cookie lookups per domain so mixed-domain batches stay isolated without repeated browser calls. 161 const cookieHeaderCache = new Map<string, Promise<string>>(); 162 let cookiesFile: string | undefined; 163 164 if (page) { 165 // For yt-dlp, we need to export cookies to Netscape format 166 if (useYtdlp || items.some((item, index) => { 167 const url = String(render(urlTemplate, { args, data, item, index })); 168 return requiresYtdlp(url); 169 })) { 170 try { 171 const ytdlpDomains = [...new Set(items.flatMap((item, index) => { 172 const url = String(render(urlTemplate, { args, data, item, index })); 173 if (!useYtdlp && !requiresYtdlp(url)) return []; 174 try { 175 return [new URL(url).hostname]; 176 } catch { 177 return []; 178 } 179 }))]; 180 const cookiesArray = dedupeCookies( 181 (await Promise.all(ytdlpDomains.map((domain) => extractCookiesArray(page, domain)))).flat(), 182 ); 183 184 if (cookiesArray.length > 0) { 185 const tempDir = getTempDir(); 186 fs.mkdirSync(tempDir, { recursive: true }); 187 cookiesFile = path.join(tempDir, `cookies_${Date.now()}.txt`); 188 exportCookiesToNetscape(cookiesArray, cookiesFile); 189 } 190 } catch { 191 // Ignore cookie extraction errors 192 } 193 } 194 } 195 196 // Process downloads with concurrency 197 type DownloadedItem = Record<string, unknown> & { _download: DownloadResult }; 198 const results = await mapConcurrent(items, concurrency, async (item, index): Promise<DownloadedItem> => { 199 const startTime = Date.now(); 200 201 // Render URL 202 const url = String(render(urlTemplate, { args, data, item, index })); 203 if (!url) { 204 tracker.onFileComplete(false); 205 return { 206 ...item, 207 _download: { status: 'failed', error: 'Empty URL' } as DownloadResult, 208 }; 209 } 210 211 // Render filename 212 let filename: string; 213 if (filenameTemplate) { 214 filename = String(render(filenameTemplate, { args, data, item, index })); 215 } else { 216 filename = generateFilename(url, index); 217 } 218 filename = sanitizeFilename(filename); 219 220 const destPath = path.join(dir, filename); 221 222 // Check if file exists and skip_existing is true 223 if (skipExisting && fs.existsSync(destPath)) { 224 tracker.onFileComplete(true, true); 225 return { 226 ...item, 227 _download: { 228 status: 'skipped', 229 path: destPath, 230 size: fs.statSync(destPath).size, 231 } as DownloadResult, 232 }; 233 } 234 235 // Create progress bar for this file 236 const progressBar = tracker.onFileStart(filename, index); 237 238 // Determine download method 239 const detectedType = contentType === 'auto' ? detectContentType(url) : contentType; 240 const shouldUseYtdlp = useYtdlp || (detectedType === 'video' && requiresYtdlp(url)); 241 242 let result: { success: boolean; size: number; error?: string }; 243 244 try { 245 if (detectedType === 'document' && contentTemplate) { 246 // Save extracted content as document 247 const content = String(render(contentTemplate, { args, data, item, index })); 248 const metadata = metadataTemplate 249 ? Object.fromEntries( 250 Object.entries(metadataTemplate).map(([k, v]) => [k, render(v, { args, data, item, index })]), 251 ) 252 : undefined; 253 254 const ext = path.extname(filename).toLowerCase(); 255 const format = ext === '.json' ? 'json' : ext === '.html' ? 'html' : 'markdown'; 256 result = await saveDocument(content, destPath, format, metadata); 257 258 if (progressBar) { 259 progressBar.complete(result.success, result.success ? formatBytes(result.size) : undefined); 260 } 261 } else if (shouldUseYtdlp) { 262 // Use yt-dlp for video downloads 263 result = await ytdlpDownload(url, destPath, { 264 cookiesFile, 265 extraArgs: ytdlpArgs, 266 onProgress: (percent) => { 267 if (progressBar) { 268 progressBar.update(percent, 100); 269 } 270 }, 271 }); 272 273 if (progressBar) { 274 progressBar.complete(result.success, result.success ? formatBytes(result.size) : undefined); 275 } 276 } else { 277 // Direct HTTP download 278 let cookies = ''; 279 if (page) { 280 try { 281 const targetDomain = new URL(url).hostname; 282 let cookiePromise = cookieHeaderCache.get(targetDomain); 283 if (!cookiePromise) { 284 cookiePromise = extractBrowserCookies(page, targetDomain); 285 cookieHeaderCache.set(targetDomain, cookiePromise); 286 } 287 cookies = await cookiePromise; 288 } catch { 289 cookies = ''; 290 } 291 } 292 293 result = await httpDownload(url, destPath, { 294 cookies, 295 timeout, 296 onProgress: (received, total) => { 297 if (progressBar) { 298 progressBar.update(received, total); 299 } 300 }, 301 }); 302 303 if (progressBar) { 304 progressBar.complete(result.success, result.success ? formatBytes(result.size) : undefined); 305 } 306 } 307 } catch (err) { 308 const msg = getErrorMessage(err); 309 result = { success: false, size: 0, error: msg }; 310 if (progressBar) { 311 progressBar.fail(msg); 312 } 313 } 314 315 tracker.onFileComplete(result.success); 316 317 const duration = Date.now() - startTime; 318 319 return { 320 ...item, 321 _download: { 322 status: result.success ? 'success' : 'failed', 323 path: result.success ? destPath : undefined, 324 size: result.size, 325 error: result.error, 326 duration, 327 } as DownloadResult, 328 }; 329 }); 330 331 // Cleanup temp cookie file 332 if (cookiesFile && fs.existsSync(cookiesFile)) { 333 try { 334 fs.unlinkSync(cookiesFile); 335 } catch { 336 // Ignore cleanup errors 337 } 338 } 339 340 // Show summary 341 tracker.finish(); 342 343 return results; 344 }