/ src / pipeline / steps / fetch.ts
fetch.ts
  1  /**
  2   * Pipeline step: fetch — HTTP API requests.
  3   */
  4  
  5  import { CliError, getErrorMessage } from '../../errors.js';
  6  import { log } from '../../logger.js';
  7  import type { IPage } from '../../types.js';
  8  import { render } from '../template.js';
  9  
 10  import { isRecord, mapConcurrent } from '../../utils.js';
 11  
 12  
 13  
 14  /** Single URL fetch helper */
 15  async function fetchSingle(
 16    page: IPage | null, url: string, method: string,
 17    queryParams: Record<string, unknown>, headers: Record<string, unknown>,
 18    args: Record<string, unknown>, data: unknown,
 19  ): Promise<unknown> {
 20    const renderedParams: Record<string, string> = {};
 21    for (const [k, v] of Object.entries(queryParams)) renderedParams[k] = String(render(v, { args, data }));
 22    const renderedHeaders: Record<string, string> = {};
 23    for (const [k, v] of Object.entries(headers)) renderedHeaders[k] = String(render(v, { args, data }));
 24  
 25    let finalUrl = url;
 26    if (Object.keys(renderedParams).length > 0) {
 27      const qs = new URLSearchParams(renderedParams).toString();
 28      finalUrl = `${finalUrl}${finalUrl.includes('?') ? '&' : '?'}${qs}`;
 29    }
 30  
 31    if (page === null) {
 32      const resp = await fetch(finalUrl, { method: method.toUpperCase(), headers: renderedHeaders });
 33      if (!resp.ok) {
 34        throw new CliError('FETCH_ERROR', `HTTP ${resp.status} ${resp.statusText} from ${finalUrl}`);
 35      }
 36      return resp.json();
 37    }
 38  
 39    const headersJs = JSON.stringify(renderedHeaders);
 40    const urlJs = JSON.stringify(finalUrl);
 41    const methodJs = JSON.stringify(method.toUpperCase());
 42    // Return error status instead of throwing inside evaluate to avoid CDP wrapper
 43    // rewriting the message (CDP prepends "Evaluate error: " to thrown errors).
 44    const result = await page.evaluate(`
 45      async () => {
 46        const resp = await fetch(${urlJs}, {
 47          method: ${methodJs}, headers: ${headersJs}, credentials: "include"
 48        });
 49        if (!resp.ok) {
 50          return { __httpError: resp.status, statusText: resp.statusText };
 51        }
 52        return await resp.json();
 53      }
 54    `);
 55    if (result && typeof result === 'object' && '__httpError' in result) {
 56      const { __httpError: status, statusText } = result as { __httpError: number; statusText: string };
 57      throw new CliError('FETCH_ERROR', `HTTP ${status} ${statusText} from ${finalUrl}`);
 58    }
 59    return result;
 60  }
 61  
 62  /**
 63   * Batch fetch: send all URLs into the browser as a single evaluate() call.
 64   * This eliminates N-1 cross-process IPC round trips, performing all fetches
 65   * inside the V8 engine and returning results as one JSON array.
 66   */
 67  async function fetchBatchInBrowser(
 68    page: IPage, urls: string[], method: string,
 69    headers: Record<string, string>, concurrency: number,
 70  ): Promise<unknown[]> {
 71    const headersJs = JSON.stringify(headers);
 72    const urlsJs = JSON.stringify(urls);
 73    const methodJs = JSON.stringify(method);
 74    return (await page.evaluate(`
 75      async () => {
 76        const urls = ${urlsJs};
 77        const method = ${methodJs};
 78        const headers = ${headersJs};
 79        const concurrency = ${concurrency};
 80  
 81        const results = new Array(urls.length);
 82        let idx = 0;
 83  
 84        async function worker() {
 85          while (idx < urls.length) {
 86            const i = idx++;
 87            try {
 88              const resp = await fetch(urls[i], { method, headers, credentials: "include" });
 89              if (!resp.ok) {
 90                throw new Error('HTTP ' + resp.status + ' ' + resp.statusText + ' from ' + urls[i]);
 91              }
 92              results[i] = await resp.json();
 93            } catch (e) {
 94              results[i] = { error: e instanceof Error ? e.message : String(e) };
 95              // Note: getErrorMessage() is a Node.js utility — can't use it inside evaluate()
 96            }
 97          }
 98        }
 99  
100        const workers = Array.from({ length: Math.min(concurrency, urls.length) }, () => worker());
101        await Promise.all(workers);
102        return results;
103      }
104    `)) as unknown[];
105  }
106  
107  export async function stepFetch(page: IPage | null, params: unknown, data: unknown, args: Record<string, unknown>): Promise<unknown> {
108    const paramObject = isRecord(params) ? params : {};
109    const urlOrObj = typeof params === 'string' ? params : (paramObject.url ?? '');
110    const method = typeof paramObject.method === 'string' ? paramObject.method : 'GET';
111    const queryParams = isRecord(paramObject.params) ? paramObject.params : {};
112    const headers = isRecord(paramObject.headers) ? paramObject.headers : {};
113    const urlTemplate = String(urlOrObj);
114  
115    // Per-item fetch when data is array and URL references item
116    if (Array.isArray(data) && urlTemplate.includes('item')) {
117      const concurrency = typeof paramObject.concurrency === 'number' ? paramObject.concurrency : 5;
118  
119      // Render all URLs upfront
120      const renderedHeaders: Record<string, string> = {};
121      for (const [k, v] of Object.entries(headers)) renderedHeaders[k] = String(render(v, { args, data }));
122      const renderedParams: Record<string, string> = {};
123      for (const [k, v] of Object.entries(queryParams)) renderedParams[k] = String(render(v, { args, data }));
124  
125      const urls = data.map((item, index) => {
126        let url = String(render(urlTemplate, { args, data, item, index }));
127        if (Object.keys(renderedParams).length > 0) {
128          const qs = new URLSearchParams(renderedParams).toString();
129          url = `${url}${url.includes('?') ? '&' : '?'}${qs}`;
130        }
131        return url;
132      });
133  
134      // BATCH IPC: if browser is available, batch all fetches into a single evaluate() call
135      if (page !== null) {
136        const results = await fetchBatchInBrowser(page, urls, method.toUpperCase(), renderedHeaders, concurrency);
137        for (let i = 0; i < results.length; i++) {
138          const r = results[i];
139          if (r && typeof r === 'object' && 'error' in r) {
140            log.warn(`Batch fetch failed for ${urls[i]}: ${(r as { error: string }).error}`);
141          }
142        }
143        return results;
144      }
145  
146      // Non-browser: use concurrent pool (already optimized)
147      return mapConcurrent(data, concurrency, async (item, index) => {
148        const itemUrl = String(render(urlTemplate, { args, data, item, index }));
149        try {
150          return await fetchSingle(null, itemUrl, method, queryParams, headers, args, data);
151        } catch (error) {
152          const message = getErrorMessage(error);
153          log.warn(`Batch fetch failed for ${itemUrl}: ${message}`);
154          return { error: message };
155        }
156      });
157    }
158    const url = render(urlOrObj, { args, data });
159    return fetchSingle(page, String(url), method, queryParams, headers, args, data);
160  }