csv.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 import csv 6 import io 7 import os 8 from pathlib import Path 9 from typing import Any, Literal 10 11 from haystack import Document, component, logging 12 from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata 13 from haystack.dataclasses import ByteStream 14 15 logger = logging.getLogger(__name__) 16 17 _ROW_MODE_SIZE_WARN_BYTES = 5 * 1024 * 1024 # ~5MB; warn when parsing rows might be memory-heavy 18 19 20 @component 21 class CSVToDocument: 22 """ 23 Converts CSV files to Documents. 24 25 By default, it uses UTF-8 encoding when converting files but 26 you can also set a custom encoding. 27 It can attach metadata to the resulting documents. 28 29 ### Usage example 30 ```python 31 from haystack.components.converters.csv import CSVToDocument 32 from datetime import datetime 33 34 converter = CSVToDocument() 35 results = converter.run( 36 sources=["test/test_files/csv/sample_1.csv"], meta={"date_added": datetime.now().isoformat()} 37 ) 38 documents = results["documents"] 39 40 print(documents[0].content) 41 # >> 'col1,col2\\nrow1,row1\\nrow2,row2\\n' 42 ``` 43 """ 44 45 def __init__( 46 self, 47 encoding: str = "utf-8", 48 store_full_path: bool = False, 49 *, 50 conversion_mode: Literal["file", "row"] = "file", 51 delimiter: str = ",", 52 quotechar: str = '"', 53 ) -> None: 54 """ 55 Creates a CSVToDocument component. 56 57 :param encoding: 58 The encoding of the csv files to convert. 59 If the encoding is specified in the metadata of a source ByteStream, 60 it overrides this value. 61 :param store_full_path: 62 If True, the full path of the file is stored in the metadata of the document. 63 If False, only the file name is stored. 64 :param conversion_mode: 65 - "file" (default): one Document per CSV file whose content is the raw CSV text. 66 - "row": convert each CSV row to its own Document (requires `content_column` in `run()`). 67 :param delimiter: 68 CSV delimiter used when parsing in row mode (passed to ``csv.DictReader``). 69 :param quotechar: 70 CSV quote character used when parsing in row mode (passed to ``csv.DictReader``). 71 """ 72 self.encoding = encoding 73 self.store_full_path = store_full_path 74 self.conversion_mode = conversion_mode 75 self.delimiter = delimiter 76 self.quotechar = quotechar 77 78 # Basic validation 79 if len(self.delimiter) != 1: 80 raise ValueError("CSVToDocument: delimiter must be a single character.") 81 if len(self.quotechar) != 1: 82 raise ValueError("CSVToDocument: quotechar must be a single character.") 83 84 @component.output_types(documents=list[Document]) 85 def run( 86 self, 87 sources: list[str | Path | ByteStream], 88 *, 89 content_column: str | None = None, 90 meta: dict[str, Any] | list[dict[str, Any]] | None = None, 91 ) -> dict[str, Any]: 92 """ 93 Converts CSV files to a Document (file mode) or to one Document per row (row mode). 94 95 :param sources: 96 List of file paths or ByteStream objects. 97 :param content_column: 98 **Required when** ``conversion_mode="row"``. 99 The column name whose values become ``Document.content`` for each row. 100 The column must exist in the CSV header. 101 :param meta: 102 Optional metadata to attach to the documents. 103 This value can be either a list of dictionaries or a single dictionary. 104 If it's a single dictionary, its content is added to the metadata of all produced documents. 105 If it's a list, the length of the list must match the number of sources, because the two lists will 106 be zipped. 107 If `sources` contains ByteStream objects, their `meta` will be added to the output documents. 108 :returns: 109 A dictionary with the following keys: 110 - `documents`: Created documents 111 """ 112 documents: list[Document] = [] 113 114 meta_list = normalize_metadata(meta, sources_count=len(sources)) 115 116 for source, metadata in zip(sources, meta_list, strict=True): 117 try: 118 bytestream = get_bytestream_from_source(source) 119 except Exception as e: 120 logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e) 121 continue 122 123 try: 124 encoding = bytestream.meta.get("encoding", self.encoding) 125 raw = io.BytesIO(bytestream.data).getvalue() 126 data = raw.decode(encoding=encoding) 127 except Exception as e: 128 logger.warning( 129 "Could not convert file {source}. Skipping it. Error message: {error}", source=source, error=e 130 ) 131 continue 132 133 merged_metadata = {**bytestream.meta, **metadata} 134 135 if not self.store_full_path and "file_path" in bytestream.meta: 136 file_path = bytestream.meta.get("file_path") 137 if file_path: # Ensure the value is not None for mypy 138 merged_metadata["file_path"] = os.path.basename(file_path) 139 140 # Mode: file (backward-compatible default) -> one Document per file 141 if self.conversion_mode == "file": 142 documents.append(Document(content=data, meta=merged_metadata)) 143 continue 144 145 # --- ROW MODE (strict) --- 146 # Require content_column in run(); no fallback 147 if not content_column: 148 raise ValueError( 149 "CSVToDocument(row): 'content_column' is required in run() when conversion_mode='row'." 150 ) 151 152 # Warn for large CSVs in row mode (memory consideration) 153 try: 154 size_bytes = len(raw) 155 if size_bytes > _ROW_MODE_SIZE_WARN_BYTES: 156 logger.warning( 157 "CSVToDocument(row): parsing a large CSV (~{mb:.1f} MB). " 158 "Consider chunking/streaming if you hit memory issues.", 159 mb=size_bytes / (1024 * 1024), 160 ) 161 except Exception: 162 pass 163 164 # Create DictReader; if this fails, raise (no fallback) 165 try: 166 reader = csv.DictReader(io.StringIO(data), delimiter=self.delimiter, quotechar=self.quotechar) 167 except Exception as e: 168 raise RuntimeError(f"CSVToDocument(row): could not parse CSV rows for {source}: {e}") from e 169 170 # Validate header contains content_column; strict error if missing 171 header = reader.fieldnames or [] 172 if content_column not in header: 173 raise ValueError( 174 f"CSVToDocument(row): content_column='{content_column}' not found in header " 175 f"for {source}. Available columns: {header}" 176 ) 177 178 # Build documents; if a row processing fails, raise immediately (no skip) 179 for i, row in enumerate(reader): 180 try: 181 doc = self._build_document_from_row( 182 row=row, base_meta=merged_metadata, row_index=i, content_column=content_column 183 ) 184 except Exception as e: 185 raise RuntimeError(f"CSVToDocument(row): failed to process row {i} for {source}: {e}") from e 186 documents.append(doc) 187 188 return {"documents": documents} 189 190 # ----- helpers ----- 191 def _safe_value(self, value: Any) -> str: 192 """Normalize CSV cell values: None -> '', everything -> str.""" 193 return "" if value is None else str(value) 194 195 def _build_document_from_row( 196 self, row: dict[str, Any], base_meta: dict[str, Any], row_index: int, content_column: str 197 ) -> Document: 198 """ 199 Build a ``Document`` from one parsed CSV row. 200 201 :param row: Mapping of column name to cell value for the current row 202 (as produced by ``csv.DictReader``). 203 :param base_meta: File-level and user-provided metadata to start from 204 (for example: ``file_path``, ``encoding``). 205 :param row_index: Zero-based row index in the CSV; stored as 206 ``row_number`` in the output document's metadata. 207 :param content_column: Column name to use for ``Document.content``. 208 :returns: A ``Document`` with chosen content and merged metadata. 209 Remaining row columns are added to ``meta`` with collision-safe 210 keys (prefixed with ``csv_`` if needed). 211 """ 212 row_meta = dict(base_meta) 213 214 # content (strict: content_column must exist; validated by caller) 215 content = self._safe_value(row.get(content_column)) 216 217 # merge remaining columns into meta with collision handling 218 for k, v in row.items(): 219 if k == content_column: 220 continue 221 key_to_use = k 222 if key_to_use in row_meta: 223 # Avoid clobbering existing meta like file_path/encoding; prefix and de-dupe 224 base_key = f"csv_{key_to_use}" 225 key_to_use = base_key 226 suffix = 1 227 while key_to_use in row_meta: 228 key_to_use = f"{base_key}_{suffix}" 229 suffix += 1 230 row_meta[key_to_use] = self._safe_value(v) 231 232 row_meta["row_number"] = row_index 233 return Document(content=content, meta=row_meta)