fetch.ts
1 /** 2 * Pipeline step: fetch — HTTP API requests. 3 */ 4 5 import { CliError, getErrorMessage } from '../../errors.js'; 6 import { log } from '../../logger.js'; 7 import type { IPage } from '../../types.js'; 8 import { render } from '../template.js'; 9 10 import { isRecord, mapConcurrent } from '../../utils.js'; 11 12 13 14 /** Single URL fetch helper */ 15 async function fetchSingle( 16 page: IPage | null, url: string, method: string, 17 queryParams: Record<string, unknown>, headers: Record<string, unknown>, 18 args: Record<string, unknown>, data: unknown, 19 ): Promise<unknown> { 20 const renderedParams: Record<string, string> = {}; 21 for (const [k, v] of Object.entries(queryParams)) renderedParams[k] = String(render(v, { args, data })); 22 const renderedHeaders: Record<string, string> = {}; 23 for (const [k, v] of Object.entries(headers)) renderedHeaders[k] = String(render(v, { args, data })); 24 25 let finalUrl = url; 26 if (Object.keys(renderedParams).length > 0) { 27 const qs = new URLSearchParams(renderedParams).toString(); 28 finalUrl = `${finalUrl}${finalUrl.includes('?') ? '&' : '?'}${qs}`; 29 } 30 31 if (page === null) { 32 const resp = await fetch(finalUrl, { method: method.toUpperCase(), headers: renderedHeaders }); 33 if (!resp.ok) { 34 throw new CliError('FETCH_ERROR', `HTTP ${resp.status} ${resp.statusText} from ${finalUrl}`); 35 } 36 return resp.json(); 37 } 38 39 const headersJs = JSON.stringify(renderedHeaders); 40 const urlJs = JSON.stringify(finalUrl); 41 const methodJs = JSON.stringify(method.toUpperCase()); 42 // Return error status instead of throwing inside evaluate to avoid CDP wrapper 43 // rewriting the message (CDP prepends "Evaluate error: " to thrown errors). 44 const result = await page.evaluate(` 45 async () => { 46 const resp = await fetch(${urlJs}, { 47 method: ${methodJs}, headers: ${headersJs}, credentials: "include" 48 }); 49 if (!resp.ok) { 50 return { __httpError: resp.status, statusText: resp.statusText }; 51 } 52 return await resp.json(); 53 } 54 `); 55 if (result && typeof result === 'object' && '__httpError' in result) { 56 const { __httpError: status, statusText } = result as { __httpError: number; statusText: string }; 57 throw new CliError('FETCH_ERROR', `HTTP ${status} ${statusText} from ${finalUrl}`); 58 } 59 return result; 60 } 61 62 /** 63 * Batch fetch: send all URLs into the browser as a single evaluate() call. 64 * This eliminates N-1 cross-process IPC round trips, performing all fetches 65 * inside the V8 engine and returning results as one JSON array. 66 */ 67 async function fetchBatchInBrowser( 68 page: IPage, urls: string[], method: string, 69 headers: Record<string, string>, concurrency: number, 70 ): Promise<unknown[]> { 71 const headersJs = JSON.stringify(headers); 72 const urlsJs = JSON.stringify(urls); 73 const methodJs = JSON.stringify(method); 74 return (await page.evaluate(` 75 async () => { 76 const urls = ${urlsJs}; 77 const method = ${methodJs}; 78 const headers = ${headersJs}; 79 const concurrency = ${concurrency}; 80 81 const results = new Array(urls.length); 82 let idx = 0; 83 84 async function worker() { 85 while (idx < urls.length) { 86 const i = idx++; 87 try { 88 const resp = await fetch(urls[i], { method, headers, credentials: "include" }); 89 if (!resp.ok) { 90 throw new Error('HTTP ' + resp.status + ' ' + resp.statusText + ' from ' + urls[i]); 91 } 92 results[i] = await resp.json(); 93 } catch (e) { 94 results[i] = { error: e instanceof Error ? e.message : String(e) }; 95 // Note: getErrorMessage() is a Node.js utility — can't use it inside evaluate() 96 } 97 } 98 } 99 100 const workers = Array.from({ length: Math.min(concurrency, urls.length) }, () => worker()); 101 await Promise.all(workers); 102 return results; 103 } 104 `)) as unknown[]; 105 } 106 107 export async function stepFetch(page: IPage | null, params: unknown, data: unknown, args: Record<string, unknown>): Promise<unknown> { 108 const paramObject = isRecord(params) ? params : {}; 109 const urlOrObj = typeof params === 'string' ? params : (paramObject.url ?? ''); 110 const method = typeof paramObject.method === 'string' ? paramObject.method : 'GET'; 111 const queryParams = isRecord(paramObject.params) ? paramObject.params : {}; 112 const headers = isRecord(paramObject.headers) ? paramObject.headers : {}; 113 const urlTemplate = String(urlOrObj); 114 115 // Per-item fetch when data is array and URL references item 116 if (Array.isArray(data) && urlTemplate.includes('item')) { 117 const concurrency = typeof paramObject.concurrency === 'number' ? paramObject.concurrency : 5; 118 119 // Render all URLs upfront 120 const renderedHeaders: Record<string, string> = {}; 121 for (const [k, v] of Object.entries(headers)) renderedHeaders[k] = String(render(v, { args, data })); 122 const renderedParams: Record<string, string> = {}; 123 for (const [k, v] of Object.entries(queryParams)) renderedParams[k] = String(render(v, { args, data })); 124 125 const urls = data.map((item, index) => { 126 let url = String(render(urlTemplate, { args, data, item, index })); 127 if (Object.keys(renderedParams).length > 0) { 128 const qs = new URLSearchParams(renderedParams).toString(); 129 url = `${url}${url.includes('?') ? '&' : '?'}${qs}`; 130 } 131 return url; 132 }); 133 134 // BATCH IPC: if browser is available, batch all fetches into a single evaluate() call 135 if (page !== null) { 136 const results = await fetchBatchInBrowser(page, urls, method.toUpperCase(), renderedHeaders, concurrency); 137 for (let i = 0; i < results.length; i++) { 138 const r = results[i]; 139 if (r && typeof r === 'object' && 'error' in r) { 140 log.warn(`Batch fetch failed for ${urls[i]}: ${(r as { error: string }).error}`); 141 } 142 } 143 return results; 144 } 145 146 // Non-browser: use concurrent pool (already optimized) 147 return mapConcurrent(data, concurrency, async (item, index) => { 148 const itemUrl = String(render(urlTemplate, { args, data, item, index })); 149 try { 150 return await fetchSingle(null, itemUrl, method, queryParams, headers, args, data); 151 } catch (error) { 152 const message = getErrorMessage(error); 153 log.warn(`Batch fetch failed for ${itemUrl}: ${message}`); 154 return { error: message }; 155 } 156 }); 157 } 158 const url = render(urlOrObj, { args, data }); 159 return fetchSingle(page, String(url), method, queryParams, headers, args, data); 160 }