ingest.py
1 #!/usr/bin/env python3 2 """Main script to ingest media files into the vector database.""" 3 4 import logging 5 import sys 6 import time 7 from pathlib import Path 8 9 from src import ( 10 ChunkerFactory, 11 Config, 12 EmbeddingModelFactory, 13 LoaderFactory, 14 PreprocessorFactory, 15 VectorStoreFactory, 16 ) 17 from src.cli.constants import ( 18 EXIT_CODE_ERROR, 19 SEPARATOR_CHAR, 20 SEPARATOR_LENGTH, 21 ) 22 from src.cli.models import IngestionConfig, IngestionResult 23 from src.cli.parallel_ingestor import ParallelIngestor 24 from src.input_sources import InputSource, InputSourceFactory, InputSourceType 25 from src.loaders.constants import SUPPORTED_FILE_EXTENSIONS 26 from src.loaders.helpers import LoaderHelper 27 from src.logger import Logger 28 from src.pipeline import IngestionContext, PipelineExecutor, PipelineStatus 29 from src.pipeline.steps import ( 30 ChunkStep, 31 EmbeddingGenerationStep, 32 LoadStep, 33 PreprocessStep, 34 SaveStep, 35 ) 36 37 38 def _build_ingestion_steps( 39 file_path: Path, 40 ingestion_config: IngestionConfig, 41 ) -> list: 42 """Build the list of pipeline steps based on configuration. 43 44 Parameters 45 ---------- 46 file_path 47 Path to the media file to ingest. 48 ingestion_config 49 Ingestion configuration object. 50 51 Returns 52 ------- 53 list 54 List of pipeline steps to execute. 55 """ 56 logger = logging.getLogger(__name__) 57 steps = [] 58 config = Config.get_config() 59 pipeline_config = config.pipeline.ingestion 60 61 logger.info(f"Load step - enabled: {pipeline_config.load_enabled}") 62 if pipeline_config.load_enabled: 63 loader_name, loader_config_from_mapping = ( 64 LoaderHelper.get_loader_config_for_file(file_path) 65 ) 66 file_extension = file_path.suffix.lower() 67 logger.info( 68 f"Converting {file_extension} file to Markdown " 69 f"(loader: {loader_name})..." 70 ) 71 loader_config = LoaderHelper.create_loader_config( 72 file_path, 73 loader_name, 74 loader_config_from_mapping, 75 ) 76 loader = LoaderFactory.create(loader_name, **loader_config) 77 steps.append(LoadStep(loader)) 78 79 logger.info(f"Preprocess step - enabled: {pipeline_config.preprocess_enabled}") 80 if pipeline_config.preprocess_enabled: 81 preprocessing_config = config.preprocessing.preprocessing_config or {} 82 preprocessor = PreprocessorFactory.create( 83 config.preprocessing.preprocessing_name, 84 **preprocessing_config, 85 ) 86 steps.append(PreprocessStep(preprocessor)) 87 88 logger.info(f"Chunk step - enabled: {pipeline_config.chunk_enabled}") 89 if pipeline_config.chunk_enabled: 90 chunker_config = config.chunking.chunker_config or {} 91 embedding_config = config.embedding.embed_config or {} 92 chunker = ChunkerFactory.create( 93 config.chunking.chunker_name, 94 **chunker_config, 95 **embedding_config, 96 ) 97 steps.append(ChunkStep(chunker)) 98 99 logger.info(f"Save step - enabled: {pipeline_config.save_enabled}") 100 if ingestion_config.embedding_model and ingestion_config.vector_store: 101 logger.info(f"Embedding chunks (model={config.embedding.embed_name})...") 102 steps.append(EmbeddingGenerationStep(ingestion_config.embedding_model, config.embedding.embed_name)) 103 logger.info("Storing in vector database...") 104 steps.append(SaveStep(ingestion_config.vector_store)) 105 106 return steps 107 108 109 def ingest_file( 110 source_id: str, 111 ingestion_config: IngestionConfig, 112 ) -> IngestionResult: 113 """Ingest a media file into the vector database using the pipeline pattern. 114 115 Parameters 116 ---------- 117 source_id 118 Source identifier (S3 URI or local file path). 119 ingestion_config 120 Ingestion configuration object. 121 Returns 122 ------- 123 IngestionResult 124 Result object containing success status, chunks count, and any errors. 125 """ 126 logger = logging.getLogger(__name__) 127 logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH) 128 logger.info(f"Ingesting: {source_id}") 129 logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH) 130 131 config = Config.get_config() 132 133 # Create InputSource instance for this worker 134 input_source = _create_input_source_from_config() 135 136 try: 137 file_path_resolved = input_source.get_file(source_id) 138 139 # Get access tags for this file based on folder mapping or default 140 # Use source_id (original path) instead of file_path_resolved (temp path) to preserve folder structure 141 access_tags = config.access_control.get_tags_from_file(source_id) 142 logger.info(f"Access tags for file: {access_tags}") 143 144 context = IngestionContext( 145 source_id=source_id, 146 file_path=file_path_resolved, 147 access_tags=access_tags, 148 ) 149 150 # Build steps list dynamically based on pipeline configuration 151 steps = _build_ingestion_steps(file_path_resolved, ingestion_config) 152 153 executor = PipelineExecutor(steps) 154 context = executor.execute(context) 155 156 if context.status == PipelineStatus.FAILED: 157 return IngestionResult( 158 file_path=source_id, 159 success=False, 160 error=context.error, 161 ) 162 163 logger.info("\n" + SEPARATOR_CHAR * SEPARATOR_LENGTH) 164 logger.info("Ingestion Complete!") 165 logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH) 166 logger.info(f"✓ File processed: {source_id}") 167 if context.markdown_path: 168 logger.info(f"✓ Markdown file: {context.markdown_path}") 169 logger.info(f"✓ Chunks created: {len(context.chunks)}") 170 logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH + "\n") 171 172 return IngestionResult( 173 file_path=source_id, 174 success=True, 175 chunks_count=len(context.chunks), 176 markdown_path=context.markdown_path, 177 ) 178 finally: 179 # Clean up temporary files for this worker 180 input_source.cleanup() 181 182 183 def _process_files_parallel( 184 ingestion_config: IngestionConfig, 185 ) -> bool: 186 """Process files in parallel. 187 188 Parameters 189 ---------- 190 ingestion_config 191 Ingestion configuration object. 192 193 Returns 194 ------- 195 bool 196 True if all files processed successfully, False otherwise. 197 """ 198 logger = logging.getLogger(__name__) 199 logger.info("Parallel processing enabled") 200 201 config = Config.get_config() 202 pipeline_config = config.pipeline.ingestion 203 parallel_ingestor = ParallelIngestor( 204 max_workers=pipeline_config.max_workers, 205 ) 206 207 results = parallel_ingestor.execute( 208 files=ingestion_config.source_ids, 209 task_fn=ingest_file, 210 task_args={ 211 "ingestion_config": ingestion_config, 212 }, 213 ) 214 215 parallel_ingestor.log_summary(results) 216 217 if results["failed"]: 218 logger.error(f"✗ {len(results['failed'])} file(s) failed to process") 219 return False 220 221 logger.info("✓ All files processed successfully.") 222 return True 223 224 225 def _process_files_sequential( 226 ingestion_config: IngestionConfig, 227 ) -> bool: 228 """Process files sequentially. 229 230 Parameters 231 ---------- 232 ingestion_config 233 Ingestion configuration object. 234 235 Returns 236 ------- 237 bool 238 True if all files processed successfully, False otherwise. 239 """ 240 logger = logging.getLogger(__name__) 241 logger.info("Sequential processing (parallel disabled)") 242 243 failed_files = [] 244 245 for source_id in ingestion_config.source_ids: 246 result = ingest_file( 247 source_id, 248 ingestion_config, 249 ) 250 251 if not result.success: 252 failed_files.append((source_id, result.error)) 253 254 if failed_files: 255 logger.error("\nFailed files:") 256 for source_id, error in failed_files: 257 logger.error(f" - {source_id}: {error}") 258 logger.error(f"\n✗ {len(failed_files)} file(s) failed to process") 259 return False 260 261 logger.info("✓ All files processed successfully.") 262 return True 263 264 def _create_input_source_from_config() -> InputSource: 265 """Create an InputSource instance from configuration. 266 267 Returns 268 ------- 269 InputSource 270 Configured input source instance. 271 """ 272 config = Config.get_config() 273 source_type = InputSourceType(config.input_source.source_type) 274 source_config = config.input_source.source_config or {} 275 input_path = config.paths.input_path 276 277 source_config = {**source_config, "base_path": str(input_path)} 278 return InputSourceFactory.create(source_type, source_config) 279 280 def main(): 281 """Run the ingestion pipeline for one or more media files.""" 282 config = Config.get_config() 283 284 Logger.setup(config) 285 logger = logging.getLogger(__name__) 286 287 # Create input source and list files 288 input_source = _create_input_source_from_config() 289 source_ids = input_source.list_files("", list(SUPPORTED_FILE_EXTENSIONS)) 290 input_source.cleanup() 291 292 logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH) 293 logger.info("RAG Media Ingestion Pipeline") 294 logger.info(SEPARATOR_CHAR * SEPARATOR_LENGTH) 295 logger.info(f"Using input source: {config.input_source.source_type}") 296 logger.info(f"Source config: {config.input_source.source_config}") 297 logger.info(f"Inputs: {len(source_ids)} file(s)") 298 logger.info(f"Chunker: {config.chunking.chunker_name}") 299 logger.info(f"Embedding model: {config.embedding.embed_name}") 300 logger.info(f"Database location: {config.vector_store.store_config.get('persist_directory')}") 301 logger.info(f"Collection: {config.vector_store.store_config.get('collection_name')}") 302 logger.info(f"Access tags: {config.access_control.default_access_tags}") 303 304 try: 305 pipeline_config = config.pipeline.ingestion 306 embedding_model = None 307 vector_store = None 308 309 # Only create embedding_model and vector_store if save step is enabled 310 if pipeline_config.save_enabled: 311 embedding_model = EmbeddingModelFactory.create( 312 config.embedding.embed_name, 313 **(config.embedding.embed_config or {}), 314 ) 315 316 store_config = { 317 "embedding_function": embedding_model, 318 **(config.vector_store.store_config or {}), 319 } 320 vector_store = VectorStoreFactory.create( 321 config.vector_store.store_name, 322 **store_config, 323 ) 324 325 # Create ingestion configuration 326 ingestion_config = IngestionConfig( 327 source_ids=source_ids, 328 embedding_model=embedding_model, 329 vector_store=vector_store, 330 ) 331 332 # Process files in parallel if enabled, otherwise sequentially 333 start_time = time.time() 334 process_fn = _process_files_parallel if pipeline_config.parallel_enabled else _process_files_sequential 335 process_fn(ingestion_config) 336 elapsed_time = time.time() - start_time 337 338 logger.info("\n" + SEPARATOR_CHAR * SEPARATOR_LENGTH) 339 logger.info("Ingestion complete!") 340 logger.info(f"Total time: {elapsed_time:.2f} seconds ({elapsed_time/60:.2f} minutes)") 341 logger.info(f"{len(source_ids)} file(s) processed successfully") 342 343 except Exception as exc: 344 logger.error(f"\n✗ Error during ingestion: {exc}", exc_info=True) 345 sys.exit(EXIT_CODE_ERROR) 346 347 348 if __name__ == "__main__": 349 main() 350