csv.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  import csv
  6  import io
  7  import os
  8  from pathlib import Path
  9  from typing import Any, Literal
 10  
 11  from haystack import Document, component, logging
 12  from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata
 13  from haystack.dataclasses import ByteStream
 14  
 15  logger = logging.getLogger(__name__)
 16  
 17  _ROW_MODE_SIZE_WARN_BYTES = 5 * 1024 * 1024  # ~5MB; warn when parsing rows might be memory-heavy
 18  
 19  
 20  @component
 21  class CSVToDocument:
 22      """
 23      Converts CSV files to Documents.
 24  
 25      By default, it uses UTF-8 encoding when converting files but
 26      you can also set a custom encoding.
 27      It can attach metadata to the resulting documents.
 28  
 29      ### Usage example
 30      ```python
 31      from haystack.components.converters.csv import CSVToDocument
 32      from datetime import datetime
 33  
 34      converter = CSVToDocument()
 35      results = converter.run(
 36          sources=["test/test_files/csv/sample_1.csv"], meta={"date_added": datetime.now().isoformat()}
 37      )
 38      documents = results["documents"]
 39  
 40      print(documents[0].content)
 41      # >>  'col1,col2\\nrow1,row1\\nrow2,row2\\n'
 42      ```
 43      """
 44  
 45      def __init__(
 46          self,
 47          encoding: str = "utf-8",
 48          store_full_path: bool = False,
 49          *,
 50          conversion_mode: Literal["file", "row"] = "file",
 51          delimiter: str = ",",
 52          quotechar: str = '"',
 53      ) -> None:
 54          """
 55          Creates a CSVToDocument component.
 56  
 57          :param encoding:
 58              The encoding of the csv files to convert.
 59              If the encoding is specified in the metadata of a source ByteStream,
 60              it overrides this value.
 61          :param store_full_path:
 62              If True, the full path of the file is stored in the metadata of the document.
 63              If False, only the file name is stored.
 64          :param conversion_mode:
 65              - "file" (default): one Document per CSV file whose content is the raw CSV text.
 66              - "row": convert each CSV row to its own Document (requires `content_column` in `run()`).
 67          :param delimiter:
 68              CSV delimiter used when parsing in row mode (passed to ``csv.DictReader``).
 69          :param quotechar:
 70              CSV quote character used when parsing in row mode (passed to ``csv.DictReader``).
 71          """
 72          self.encoding = encoding
 73          self.store_full_path = store_full_path
 74          self.conversion_mode = conversion_mode
 75          self.delimiter = delimiter
 76          self.quotechar = quotechar
 77  
 78          # Basic validation
 79          if len(self.delimiter) != 1:
 80              raise ValueError("CSVToDocument: delimiter must be a single character.")
 81          if len(self.quotechar) != 1:
 82              raise ValueError("CSVToDocument: quotechar must be a single character.")
 83  
 84      @component.output_types(documents=list[Document])
 85      def run(
 86          self,
 87          sources: list[str | Path | ByteStream],
 88          *,
 89          content_column: str | None = None,
 90          meta: dict[str, Any] | list[dict[str, Any]] | None = None,
 91      ) -> dict[str, Any]:
 92          """
 93          Converts CSV files to a Document (file mode) or to one Document per row (row mode).
 94  
 95          :param sources:
 96              List of file paths or ByteStream objects.
 97          :param content_column:
 98              **Required when** ``conversion_mode="row"``.
 99              The column name whose values become ``Document.content`` for each row.
100              The column must exist in the CSV header.
101          :param meta:
102              Optional metadata to attach to the documents.
103              This value can be either a list of dictionaries or a single dictionary.
104              If it's a single dictionary, its content is added to the metadata of all produced documents.
105              If it's a list, the length of the list must match the number of sources, because the two lists will
106              be zipped.
107              If `sources` contains ByteStream objects, their `meta` will be added to the output documents.
108          :returns:
109              A dictionary with the following keys:
110              - `documents`: Created documents
111          """
112          documents: list[Document] = []
113  
114          meta_list = normalize_metadata(meta, sources_count=len(sources))
115  
116          for source, metadata in zip(sources, meta_list, strict=True):
117              try:
118                  bytestream = get_bytestream_from_source(source)
119              except Exception as e:
120                  logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e)
121                  continue
122  
123              try:
124                  encoding = bytestream.meta.get("encoding", self.encoding)
125                  raw = io.BytesIO(bytestream.data).getvalue()
126                  data = raw.decode(encoding=encoding)
127              except Exception as e:
128                  logger.warning(
129                      "Could not convert file {source}. Skipping it. Error message: {error}", source=source, error=e
130                  )
131                  continue
132  
133              merged_metadata = {**bytestream.meta, **metadata}
134  
135              if not self.store_full_path and "file_path" in bytestream.meta:
136                  file_path = bytestream.meta.get("file_path")
137                  if file_path:  # Ensure the value is not None for mypy
138                      merged_metadata["file_path"] = os.path.basename(file_path)
139  
140              # Mode: file (backward-compatible default) -> one Document per file
141              if self.conversion_mode == "file":
142                  documents.append(Document(content=data, meta=merged_metadata))
143                  continue
144  
145              # --- ROW MODE (strict) ---
146              # Require content_column in run(); no fallback
147              if not content_column:
148                  raise ValueError(
149                      "CSVToDocument(row): 'content_column' is required in run() when conversion_mode='row'."
150                  )
151  
152              # Warn for large CSVs in row mode (memory consideration)
153              try:
154                  size_bytes = len(raw)
155                  if size_bytes > _ROW_MODE_SIZE_WARN_BYTES:
156                      logger.warning(
157                          "CSVToDocument(row): parsing a large CSV (~{mb:.1f} MB). "
158                          "Consider chunking/streaming if you hit memory issues.",
159                          mb=size_bytes / (1024 * 1024),
160                      )
161              except Exception:
162                  pass
163  
164              # Create DictReader; if this fails, raise (no fallback)
165              try:
166                  reader = csv.DictReader(io.StringIO(data), delimiter=self.delimiter, quotechar=self.quotechar)
167              except Exception as e:
168                  raise RuntimeError(f"CSVToDocument(row): could not parse CSV rows for {source}: {e}") from e
169  
170              # Validate header contains content_column; strict error if missing
171              header = reader.fieldnames or []
172              if content_column not in header:
173                  raise ValueError(
174                      f"CSVToDocument(row): content_column='{content_column}' not found in header "
175                      f"for {source}. Available columns: {header}"
176                  )
177  
178              # Build documents; if a row processing fails, raise immediately (no skip)
179              for i, row in enumerate(reader):
180                  try:
181                      doc = self._build_document_from_row(
182                          row=row, base_meta=merged_metadata, row_index=i, content_column=content_column
183                      )
184                  except Exception as e:
185                      raise RuntimeError(f"CSVToDocument(row): failed to process row {i} for {source}: {e}") from e
186                  documents.append(doc)
187  
188          return {"documents": documents}
189  
190      # ----- helpers -----
191      def _safe_value(self, value: Any) -> str:
192          """Normalize CSV cell values: None -> '', everything -> str."""
193          return "" if value is None else str(value)
194  
195      def _build_document_from_row(
196          self, row: dict[str, Any], base_meta: dict[str, Any], row_index: int, content_column: str
197      ) -> Document:
198          """
199          Build a ``Document`` from one parsed CSV row.
200  
201          :param row: Mapping of column name to cell value for the current row
202              (as produced by ``csv.DictReader``).
203          :param base_meta: File-level and user-provided metadata to start from
204              (for example: ``file_path``, ``encoding``).
205          :param row_index: Zero-based row index in the CSV; stored as
206              ``row_number`` in the output document's metadata.
207          :param content_column: Column name to use for ``Document.content``.
208          :returns: A ``Document`` with chosen content and merged metadata.
209              Remaining row columns are added to ``meta`` with collision-safe
210              keys (prefixed with ``csv_`` if needed).
211          """
212          row_meta = dict(base_meta)
213  
214          # content (strict: content_column must exist; validated by caller)
215          content = self._safe_value(row.get(content_column))
216  
217          # merge remaining columns into meta with collision handling
218          for k, v in row.items():
219              if k == content_column:
220                  continue
221              key_to_use = k
222              if key_to_use in row_meta:
223                  # Avoid clobbering existing meta like file_path/encoding; prefix and de-dupe
224                  base_key = f"csv_{key_to_use}"
225                  key_to_use = base_key
226                  suffix = 1
227                  while key_to_use in row_meta:
228                      key_to_use = f"{base_key}_{suffix}"
229                      suffix += 1
230              row_meta[key_to_use] = self._safe_value(v)
231  
232          row_meta["row_number"] = row_index
233          return Document(content=content, meta=row_meta)