parallel_ingestor.py
1 """Parallel ingestor for processing multiple files concurrently.""" 2 3 import logging 4 import os 5 from collections.abc import Callable 6 from concurrent.futures import Future, ThreadPoolExecutor, as_completed 7 from typing import Any 8 9 from src.cli.constants import SEPARATOR_CHAR, SEPARATOR_LENGTH 10 from src.cli.models import IngestionResult 11 12 13 class ParallelIngestor: 14 """Ingests multiple files in parallel using thread pools. 15 16 This ingestor allows processing multiple files concurrently to improve 17 throughput when ingesting large batches of documents. Uses threading 18 which is ideal for I/O-bound workloads like file loading, API calls, 19 and database operations. 20 """ 21 22 def __init__( 23 self, 24 max_workers: int | None = None, 25 ): 26 """Initialize the parallel ingestor. 27 28 Parameters 29 ---------- 30 max_workers 31 Maximum number of parallel workers. If None, uses the number of CPUs. 32 """ 33 self.max_workers = max_workers 34 self.logger = logging.getLogger(__name__) 35 36 def execute( 37 self, 38 files: list[str], 39 task_fn: Callable[[str, Any], IngestionResult], 40 task_args: dict[str, Any], 41 ) -> dict[str, list[IngestionResult]]: 42 """Execute ingestion tasks in parallel. 43 44 Parameters 45 ---------- 46 files 47 List of source identifiers (S3 URIs or local paths) to process. 48 task_fn 49 Function to execute for each file. Should accept (source_id, **task_args) 50 and return an IngestionResult. 51 task_args 52 Additional arguments to pass to the task function. 53 54 Returns 55 ------- 56 dict with 'successful' and 'failed' lists of IngestionResult objects. 57 """ 58 if not files: 59 self.logger.warning("No files to process") 60 return {"successful": [], "failed": []} 61 62 # Determine actual number of workers that will be used 63 cpu_count = os.cpu_count() or 1 64 actual_workers = self.max_workers if self.max_workers is not None else cpu_count 65 66 self.logger.info(f"Processing {len(files)} files in parallel using threading") 67 self.logger.info(f"CPU cores available: {cpu_count}") 68 self.logger.info(f"Workers to use: {actual_workers}") 69 70 results = {"successful": [], "failed": []} 71 futures: dict[Future, str] = {} 72 73 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: 74 # Submit all tasks 75 for source_id in files: 76 future = executor.submit(task_fn, source_id, **task_args) 77 futures[future] = source_id 78 79 # Process completed tasks as they finish 80 total = len(files) 81 82 for completed, future in enumerate(as_completed(futures), start=1): 83 source_id = futures[future] 84 85 try: 86 result = future.result() 87 if result.success: 88 results["successful"].append(result) 89 self.logger.info( 90 f"[{completed}/{total}] ✓ Successfully processed: {source_id}" 91 ) 92 else: 93 results["failed"].append(result) 94 self.logger.error( 95 f"[{completed}/{total}] ✗ Failed to process: {source_id} - {result.error}" 96 ) 97 except Exception as e: 98 self.logger.info(f"[{completed}/{total}] ✗ Exception processing {source_id}, {e}") 99 results["failed"].append( 100 IngestionResult( 101 file_path=source_id, 102 success=False, 103 error=str(e), 104 ) 105 ) 106 107 return results 108 109 def log_summary(self, results: dict[str, list[IngestionResult]]) -> None: 110 """Log a summary of the ingestion results. 111 112 Parameters 113 ---------- 114 results 115 Dictionary with 'successful' and 'failed' lists of results. 116 """ 117 successful = results["successful"] 118 failed = results["failed"] 119 total = len(successful) + len(failed) 120 121 self.logger.info("\n" + SEPARATOR_CHAR * SEPARATOR_LENGTH) 122 self.logger.info("Ingestion Summary") 123 self.logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH) 124 self.logger.info(f"Total files: {total}") 125 self.logger.info(f"✓ Successful: {len(successful)}") 126 self.logger.info(f"✗ Failed: {len(failed)}") 127 128 if successful: 129 total_chunks = sum(r.chunks_count for r in successful) 130 self.logger.info(f"Total chunks created: {total_chunks}") 131 132 if failed: 133 self.logger.info("\nFailed files:") 134 for result in failed: 135 self.logger.info(f" - {result.file_path}: {result.error}") 136 137 self.logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH + "\n") 138