/ src / cli / parallel_ingestor.py
parallel_ingestor.py
  1  """Parallel ingestor for processing multiple files concurrently."""
  2  
  3  import logging
  4  import os
  5  from collections.abc import Callable
  6  from concurrent.futures import Future, ThreadPoolExecutor, as_completed
  7  from typing import Any
  8  
  9  from src.cli.constants import SEPARATOR_CHAR, SEPARATOR_LENGTH
 10  from src.cli.models import IngestionResult
 11  
 12  
 13  class ParallelIngestor:
 14      """Ingests multiple files in parallel using thread pools.
 15  
 16      This ingestor allows processing multiple files concurrently to improve
 17      throughput when ingesting large batches of documents. Uses threading
 18      which is ideal for I/O-bound workloads like file loading, API calls,
 19      and database operations.
 20      """
 21  
 22      def __init__(
 23          self,
 24          max_workers: int | None = None,
 25      ):
 26          """Initialize the parallel ingestor.
 27  
 28          Parameters
 29          ----------
 30          max_workers
 31              Maximum number of parallel workers. If None, uses the number of CPUs.
 32          """
 33          self.max_workers = max_workers
 34          self.logger = logging.getLogger(__name__)
 35  
 36      def execute(
 37          self,
 38          files: list[str],
 39          task_fn: Callable[[str, Any], IngestionResult],
 40          task_args: dict[str, Any],
 41      ) -> dict[str, list[IngestionResult]]:
 42          """Execute ingestion tasks in parallel.
 43  
 44          Parameters
 45          ----------
 46          files
 47              List of source identifiers (S3 URIs or local paths) to process.
 48          task_fn
 49              Function to execute for each file. Should accept (source_id, **task_args)
 50              and return an IngestionResult.
 51          task_args
 52              Additional arguments to pass to the task function.
 53  
 54          Returns
 55          -------
 56          dict with 'successful' and 'failed' lists of IngestionResult objects.
 57          """
 58          if not files:
 59              self.logger.warning("No files to process")
 60              return {"successful": [], "failed": []}
 61  
 62          # Determine actual number of workers that will be used
 63          cpu_count = os.cpu_count() or 1
 64          actual_workers = self.max_workers if self.max_workers is not None else cpu_count
 65  
 66          self.logger.info(f"Processing {len(files)} files in parallel using threading")
 67          self.logger.info(f"CPU cores available: {cpu_count}")
 68          self.logger.info(f"Workers to use: {actual_workers}")
 69  
 70          results = {"successful": [], "failed": []}
 71          futures: dict[Future, str] = {}
 72  
 73          with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
 74              # Submit all tasks
 75              for source_id in files:
 76                  future = executor.submit(task_fn, source_id, **task_args)
 77                  futures[future] = source_id
 78  
 79              # Process completed tasks as they finish
 80              total = len(files)
 81  
 82              for completed, future in enumerate(as_completed(futures), start=1):
 83                  source_id = futures[future]
 84  
 85                  try:
 86                      result = future.result()
 87                      if result.success:
 88                          results["successful"].append(result)
 89                          self.logger.info(
 90                              f"[{completed}/{total}] ✓ Successfully processed: {source_id}"
 91                          )
 92                      else:
 93                          results["failed"].append(result)
 94                          self.logger.error(
 95                              f"[{completed}/{total}] ✗ Failed to process: {source_id} - {result.error}"
 96                          )
 97                  except Exception as e:
 98                      self.logger.info(f"[{completed}/{total}] ✗ Exception processing {source_id}, {e}")
 99                      results["failed"].append(
100                          IngestionResult(
101                              file_path=source_id,
102                              success=False,
103                              error=str(e),
104                          )
105                      )
106  
107          return results
108  
109      def log_summary(self, results: dict[str, list[IngestionResult]]) -> None:
110          """Log a summary of the ingestion results.
111  
112          Parameters
113          ----------
114          results
115              Dictionary with 'successful' and 'failed' lists of results.
116          """
117          successful = results["successful"]
118          failed = results["failed"]
119          total = len(successful) + len(failed)
120  
121          self.logger.info("\n" + SEPARATOR_CHAR * SEPARATOR_LENGTH)
122          self.logger.info("Ingestion Summary")
123          self.logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH)
124          self.logger.info(f"Total files: {total}")
125          self.logger.info(f"✓ Successful: {len(successful)}")
126          self.logger.info(f"✗ Failed: {len(failed)}")
127  
128          if successful:
129              total_chunks = sum(r.chunks_count for r in successful)
130              self.logger.info(f"Total chunks created: {total_chunks}")
131  
132          if failed:
133              self.logger.info("\nFailed files:")
134              for result in failed:
135                  self.logger.info(f"  - {result.file_path}: {result.error}")
136  
137          self.logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH + "\n")
138