/ src / cli / ingest.py
ingest.py
  1  #!/usr/bin/env python3
  2  """Main script to ingest media files into the vector database."""
  3  
  4  import logging
  5  import sys
  6  import time
  7  from pathlib import Path
  8  
  9  from src import (
 10      ChunkerFactory,
 11      Config,
 12      EmbeddingModelFactory,
 13      LoaderFactory,
 14      PreprocessorFactory,
 15      VectorStoreFactory,
 16  )
 17  from src.cli.constants import (
 18      EXIT_CODE_ERROR,
 19      SEPARATOR_CHAR,
 20      SEPARATOR_LENGTH,
 21  )
 22  from src.cli.models import IngestionConfig, IngestionResult
 23  from src.cli.parallel_ingestor import ParallelIngestor
 24  from src.input_sources import InputSource, InputSourceFactory, InputSourceType
 25  from src.loaders.constants import SUPPORTED_FILE_EXTENSIONS
 26  from src.loaders.helpers import LoaderHelper
 27  from src.logger import Logger
 28  from src.pipeline import IngestionContext, PipelineExecutor, PipelineStatus
 29  from src.pipeline.steps import (
 30      ChunkStep,
 31      EmbeddingGenerationStep,
 32      LoadStep,
 33      PreprocessStep,
 34      SaveStep,
 35  )
 36  
 37  
 38  def _build_ingestion_steps(
 39      file_path: Path,
 40      ingestion_config: IngestionConfig,
 41  ) -> list:
 42      """Build the list of pipeline steps based on configuration.
 43  
 44      Parameters
 45      ----------
 46      file_path
 47          Path to the media file to ingest.
 48      ingestion_config
 49          Ingestion configuration object.
 50  
 51      Returns
 52      -------
 53      list
 54          List of pipeline steps to execute.
 55      """
 56      logger = logging.getLogger(__name__)
 57      steps = []
 58      config = Config.get_config()
 59      pipeline_config = config.pipeline.ingestion
 60  
 61      logger.info(f"Load step - enabled: {pipeline_config.load_enabled}")
 62      if pipeline_config.load_enabled:
 63          loader_name, loader_config_from_mapping = (
 64              LoaderHelper.get_loader_config_for_file(file_path)
 65          )
 66          file_extension = file_path.suffix.lower()
 67          logger.info(
 68              f"Converting {file_extension} file to Markdown "
 69              f"(loader: {loader_name})..."
 70          )
 71          loader_config = LoaderHelper.create_loader_config(
 72              file_path,
 73              loader_name,
 74              loader_config_from_mapping,
 75          )
 76          loader = LoaderFactory.create(loader_name, **loader_config)
 77          steps.append(LoadStep(loader))
 78  
 79      logger.info(f"Preprocess step - enabled: {pipeline_config.preprocess_enabled}")
 80      if pipeline_config.preprocess_enabled:
 81          preprocessing_config = config.preprocessing.preprocessing_config or {}
 82          preprocessor = PreprocessorFactory.create(
 83              config.preprocessing.preprocessing_name,
 84              **preprocessing_config,
 85          )
 86          steps.append(PreprocessStep(preprocessor))
 87  
 88      logger.info(f"Chunk step - enabled: {pipeline_config.chunk_enabled}")
 89      if pipeline_config.chunk_enabled:
 90          chunker_config = config.chunking.chunker_config or {}
 91          embedding_config = config.embedding.embed_config or {}
 92          chunker = ChunkerFactory.create(
 93              config.chunking.chunker_name,
 94              **chunker_config,
 95              **embedding_config,
 96          )
 97          steps.append(ChunkStep(chunker))
 98  
 99      logger.info(f"Save step - enabled: {pipeline_config.save_enabled}")
100      if ingestion_config.embedding_model and ingestion_config.vector_store:
101          logger.info(f"Embedding chunks (model={config.embedding.embed_name})...")
102          steps.append(EmbeddingGenerationStep(ingestion_config.embedding_model, config.embedding.embed_name))
103          logger.info("Storing in vector database...")
104          steps.append(SaveStep(ingestion_config.vector_store))
105  
106      return steps
107  
108  
109  def ingest_file(
110      source_id: str,
111      ingestion_config: IngestionConfig,
112  ) -> IngestionResult:
113      """Ingest a media file into the vector database using the pipeline pattern.
114  
115      Parameters
116      ----------
117      source_id
118          Source identifier (S3 URI or local file path).
119      ingestion_config
120          Ingestion configuration object.
121      Returns
122      -------
123      IngestionResult
124          Result object containing success status, chunks count, and any errors.
125      """
126      logger = logging.getLogger(__name__)
127      logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH)
128      logger.info(f"Ingesting: {source_id}")
129      logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH)
130  
131      config = Config.get_config()
132  
133      # Create InputSource instance for this worker
134      input_source = _create_input_source_from_config()
135  
136      try:
137          file_path_resolved = input_source.get_file(source_id)
138  
139          # Get access tags for this file based on folder mapping or default
140          # Use source_id (original path) instead of file_path_resolved (temp path) to preserve folder structure
141          access_tags = config.access_control.get_tags_from_file(source_id)
142          logger.info(f"Access tags for file: {access_tags}")
143  
144          context = IngestionContext(
145              source_id=source_id,
146              file_path=file_path_resolved,
147              access_tags=access_tags,
148          )
149  
150          # Build steps list dynamically based on pipeline configuration
151          steps = _build_ingestion_steps(file_path_resolved, ingestion_config)
152  
153          executor = PipelineExecutor(steps)
154          context = executor.execute(context)
155  
156          if context.status == PipelineStatus.FAILED:
157              return IngestionResult(
158                  file_path=source_id,
159                  success=False,
160                  error=context.error,
161              )
162  
163          logger.info("\n" + SEPARATOR_CHAR * SEPARATOR_LENGTH)
164          logger.info("Ingestion Complete!")
165          logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH)
166          logger.info(f"✓ File processed: {source_id}")
167          if context.markdown_path:
168              logger.info(f"✓ Markdown file: {context.markdown_path}")
169          logger.info(f"✓ Chunks created: {len(context.chunks)}")
170          logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH + "\n")
171  
172          return IngestionResult(
173              file_path=source_id,
174              success=True,
175              chunks_count=len(context.chunks),
176              markdown_path=context.markdown_path,
177          )
178      finally:
179          # Clean up temporary files for this worker
180          input_source.cleanup()
181  
182  
183  def _process_files_parallel(
184      ingestion_config: IngestionConfig,
185  ) -> bool:
186      """Process files in parallel.
187  
188      Parameters
189      ----------
190      ingestion_config
191          Ingestion configuration object.
192  
193      Returns
194      -------
195      bool
196          True if all files processed successfully, False otherwise.
197      """
198      logger = logging.getLogger(__name__)
199      logger.info("Parallel processing enabled")
200  
201      config = Config.get_config()
202      pipeline_config = config.pipeline.ingestion
203      parallel_ingestor = ParallelIngestor(
204          max_workers=pipeline_config.max_workers,
205      )
206  
207      results = parallel_ingestor.execute(
208          files=ingestion_config.source_ids,
209          task_fn=ingest_file,
210          task_args={
211              "ingestion_config": ingestion_config,
212          },
213      )
214  
215      parallel_ingestor.log_summary(results)
216  
217      if results["failed"]:
218          logger.error(f"✗ {len(results['failed'])} file(s) failed to process")
219          return False
220  
221      logger.info("✓ All files processed successfully.")
222      return True
223  
224  
225  def _process_files_sequential(
226      ingestion_config: IngestionConfig,
227  ) -> bool:
228      """Process files sequentially.
229  
230      Parameters
231      ----------
232      ingestion_config
233          Ingestion configuration object.
234  
235      Returns
236      -------
237      bool
238          True if all files processed successfully, False otherwise.
239      """
240      logger = logging.getLogger(__name__)
241      logger.info("Sequential processing (parallel disabled)")
242  
243      failed_files = []
244  
245      for source_id in ingestion_config.source_ids:
246          result = ingest_file(
247              source_id,
248              ingestion_config,
249          )
250  
251          if not result.success:
252              failed_files.append((source_id, result.error))
253  
254      if failed_files:
255          logger.error("\nFailed files:")
256          for source_id, error in failed_files:
257              logger.error(f"  - {source_id}: {error}")
258          logger.error(f"\n✗ {len(failed_files)} file(s) failed to process")
259          return False
260  
261      logger.info("✓ All files processed successfully.")
262      return True
263  
264  def _create_input_source_from_config() -> InputSource:
265      """Create an InputSource instance from configuration.
266  
267      Returns
268      -------
269      InputSource
270          Configured input source instance.
271      """
272      config = Config.get_config()
273      source_type = InputSourceType(config.input_source.source_type)
274      source_config = config.input_source.source_config or {}
275      input_path = config.paths.input_path
276  
277      source_config = {**source_config, "base_path": str(input_path)}
278      return InputSourceFactory.create(source_type, source_config)
279  
280  def main():
281      """Run the ingestion pipeline for one or more media files."""
282      config = Config.get_config()
283  
284      Logger.setup(config)
285      logger = logging.getLogger(__name__)
286  
287      # Create input source and list files
288      input_source = _create_input_source_from_config()
289      source_ids = input_source.list_files("", list(SUPPORTED_FILE_EXTENSIONS))
290      input_source.cleanup()
291  
292      logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH)
293      logger.info("RAG Media Ingestion Pipeline")
294      logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH)
295      logger.info(f"Using input source: {config.input_source.source_type}")
296      logger.info(f"Source config: {config.input_source.source_config}")
297      logger.info(f"Inputs: {len(source_ids)} file(s)")
298      logger.info(f"Chunker: {config.chunking.chunker_name}")
299      logger.info(f"Embedding model: {config.embedding.embed_name}")
300      logger.info(f"Database location: {config.vector_store.store_config.get('persist_directory')}")
301      logger.info(f"Collection: {config.vector_store.store_config.get('collection_name')}")
302      logger.info(f"Access tags: {config.access_control.default_access_tags}")
303  
304      try:
305          pipeline_config = config.pipeline.ingestion
306          embedding_model = None
307          vector_store = None
308  
309          # Only create embedding_model and vector_store if save step is enabled
310          if pipeline_config.save_enabled:
311              embedding_model = EmbeddingModelFactory.create(
312                  config.embedding.embed_name,
313                  **(config.embedding.embed_config or {}),
314              )
315  
316              store_config = {
317                  "embedding_function": embedding_model,
318                  **(config.vector_store.store_config or {}),
319              }
320              vector_store = VectorStoreFactory.create(
321                  config.vector_store.store_name,
322                  **store_config,
323              )
324  
325          # Create ingestion configuration
326          ingestion_config = IngestionConfig(
327              source_ids=source_ids,
328              embedding_model=embedding_model,
329              vector_store=vector_store,
330          )
331  
332          # Process files in parallel if enabled, otherwise sequentially
333          start_time = time.time()
334          process_fn = _process_files_parallel if pipeline_config.parallel_enabled else _process_files_sequential
335          process_fn(ingestion_config)
336          elapsed_time = time.time() - start_time
337  
338          logger.info("\n" + SEPARATOR_CHAR * SEPARATOR_LENGTH)
339          logger.info("Ingestion complete!")
340          logger.info(f"Total time: {elapsed_time:.2f} seconds ({elapsed_time/60:.2f} minutes)")
341          logger.info(f"{len(source_ids)} file(s) processed successfully")
342  
343      except Exception as exc:
344          logger.error(f"\n✗ Error during ingestion: {exc}", exc_info=True)
345          sys.exit(EXIT_CODE_ERROR)
346  
347  
348  if __name__ == "__main__":
349      main()
350