xlsx.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  import io
  6  import os
  7  from pathlib import Path
  8  from typing import Any, Literal
  9  
 10  from haystack import Document, component, logging
 11  from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata
 12  from haystack.dataclasses import ByteStream
 13  from haystack.lazy_imports import LazyImport
 14  
 15  logger = logging.getLogger(__name__)
 16  
 17  with LazyImport("Run 'pip install pandas openpyxl'") as pandas_xlsx_import:
 18      import openpyxl
 19      import pandas as pd
 20  
 21  with LazyImport("Run 'pip install tabulate'") as tabulate_import:
 22      from tabulate import tabulate  # noqa: F401 # the library is used but not directly referenced
 23  
 24  
 25  @component
 26  class XLSXToDocument:
 27      """
 28      Converts XLSX (Excel) files into Documents.
 29  
 30      Supports reading data from specific sheets or all sheets in the Excel file. If all sheets are read, a Document is
 31      created for each sheet. The content of the Document is the table which can be saved in CSV or Markdown format.
 32  
 33      ### Usage example
 34  
 35      ```python
 36      from haystack.components.converters.xlsx import XLSXToDocument
 37      from datetime import datetime
 38  
 39      converter = XLSXToDocument()
 40      results = converter.run(
 41          sources=["test/test_files/xlsx/basic_tables_two_sheets.xlsx"], meta={"date_added": datetime.now().isoformat()}
 42      )
 43      documents = results["documents"]
 44  
 45      print(documents[0].content)
 46      # >> ",A,B\\n1,col_a,col_b\\n2,1.5,test\\n"
 47      ```
 48      """
 49  
 50      def __init__(
 51          self,
 52          table_format: Literal["csv", "markdown"] = "csv",
 53          sheet_name: str | int | list[str | int] | None = None,
 54          read_excel_kwargs: dict[str, Any] | None = None,
 55          table_format_kwargs: dict[str, Any] | None = None,
 56          *,
 57          link_format: Literal["markdown", "plain", "none"] = "none",
 58          store_full_path: bool = False,
 59      ) -> None:
 60          """
 61          Creates a XLSXToDocument component.
 62  
 63          :param table_format: The format to convert the Excel file to.
 64          :param sheet_name: The name of the sheet to read. If None, all sheets are read.
 65          :param read_excel_kwargs: Additional arguments to pass to `pandas.read_excel`.
 66              See https://pandas.pydata.org/docs/reference/api/pandas.read_excel.html#pandas-read-excel
 67          :param table_format_kwargs: Additional keyword arguments to pass to the table format function.
 68              - If `table_format` is "csv", these arguments are passed to `pandas.DataFrame.to_csv`.
 69                See https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html#pandas-dataframe-to-csv
 70              - If `table_format` is "markdown", these arguments are passed to `pandas.DataFrame.to_markdown`.
 71                See https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_markdown.html#pandas-dataframe-to-markdown
 72          :param link_format: The format for link output. Possible options:
 73              - `"markdown"`: `[text](url)`
 74              - `"plain"`: `text (url)`
 75              - `"none"`: Only the text is extracted, link addresses are ignored.
 76          :param store_full_path:
 77              If True, the full path of the file is stored in the metadata of the document.
 78              If False, only the file name is stored.
 79          """
 80          pandas_xlsx_import.check()
 81          self.table_format = table_format
 82          if table_format not in ["csv", "markdown"]:
 83              raise ValueError(f"Unsupported export format: {table_format}. Choose either 'csv' or 'markdown'.")
 84          if link_format not in ("markdown", "plain", "none"):
 85              msg = f"Unknown link format '{link_format}'. Supported formats are: 'markdown', 'plain', 'none'"
 86              raise ValueError(msg)
 87          if table_format == "markdown":
 88              tabulate_import.check()
 89          self.link_format = link_format
 90          self.sheet_name = sheet_name
 91          self.read_excel_kwargs = read_excel_kwargs or {}
 92          self.table_format_kwargs = table_format_kwargs or {}
 93          self.store_full_path = store_full_path
 94  
 95      @component.output_types(documents=list[Document])
 96      def run(
 97          self, sources: list[str | Path | ByteStream], meta: dict[str, Any] | list[dict[str, Any]] | None = None
 98      ) -> dict[str, list[Document]]:
 99          """
100          Converts a XLSX file to a Document.
101  
102          :param sources:
103              List of file paths or ByteStream objects.
104          :param meta:
105              Optional metadata to attach to the documents.
106              This value can be either a list of dictionaries or a single dictionary.
107              If it's a single dictionary, its content is added to the metadata of all produced documents.
108              If it's a list, the length of the list must match the number of sources, because the two lists will
109              be zipped.
110              If `sources` contains ByteStream objects, their `meta` will be added to the output documents.
111          :returns:
112              A dictionary with the following keys:
113              - `documents`: Created documents
114          """
115          documents = []
116  
117          meta_list = normalize_metadata(meta, sources_count=len(sources))
118  
119          for source, metadata in zip(sources, meta_list, strict=True):
120              try:
121                  bytestream = get_bytestream_from_source(source)
122              except Exception as e:
123                  logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e)
124                  continue
125  
126              try:
127                  tables, tables_metadata = self._extract_tables(bytestream)
128              except Exception as e:
129                  logger.warning(
130                      "Could not read {source} and convert it to a Document, skipping. Error: {error}",
131                      source=source,
132                      error=e,
133                  )
134                  continue
135  
136              # Loop over tables and create a Document for each table
137              for table, excel_metadata in zip(tables, tables_metadata, strict=True):
138                  merged_metadata = {**bytestream.meta, **metadata, **excel_metadata}
139  
140                  if not self.store_full_path and "file_path" in bytestream.meta:
141                      file_path = bytestream.meta["file_path"]
142                      merged_metadata["file_path"] = os.path.basename(file_path)
143  
144                  document = Document(content=table, meta=merged_metadata)
145                  documents.append(document)
146  
147          return {"documents": documents}
148  
149      @staticmethod
150      def _generate_excel_column_names(n_cols: int) -> list[str]:
151          result = []
152          for i in range(n_cols):
153              col_name = ""
154              num = i
155              while num >= 0:
156                  col_name = chr(num % 26 + 65) + col_name
157                  num = num // 26 - 1
158              result.append(col_name)
159          return result
160  
161      def _extract_tables(self, bytestream: ByteStream) -> tuple[list[str], list[dict]]:
162          """
163          Extract tables from an Excel file.
164          """
165          file_bytes = io.BytesIO(bytestream.data)
166          resolved_read_excel_kwargs = {
167              **self.read_excel_kwargs,
168              "sheet_name": self.sheet_name,
169              "header": None,  # Don't assign any pandas column labels
170              "engine": "openpyxl",  # Use openpyxl as the engine to read the Excel file
171          }
172          sheet_to_dataframe = pd.read_excel(io=file_bytes, **resolved_read_excel_kwargs)
173          if isinstance(sheet_to_dataframe, pd.DataFrame):
174              sheet_to_dataframe = {self.sheet_name: sheet_to_dataframe}
175  
176          # If link extraction is enabled, load the workbook with openpyxl to read hyperlinks
177          hyperlinks_by_sheet: dict[str | int | None, dict[tuple[int, int], str]] = {}
178          if self.link_format != "none":
179              file_bytes.seek(0)
180              wb = openpyxl.load_workbook(file_bytes, data_only=True)
181              for sheet_key in sheet_to_dataframe:
182                  if isinstance(sheet_key, int):
183                      ws = wb.worksheets[sheet_key]
184                  elif sheet_key is None:
185                      ws = wb.active
186                  else:
187                      ws = wb[sheet_key]
188                  cell_links: dict[tuple[int, int], str] = {}
189                  for row in ws.iter_rows():
190                      for cell in row:
191                          if cell.hyperlink and cell.hyperlink.target:
192                              # Convert to 0-based indices to match DataFrame positions
193                              cell_links[(cell.row - 1, cell.column - 1)] = cell.hyperlink.target
194                  hyperlinks_by_sheet[sheet_key] = cell_links
195              wb.close()
196  
197          updated_sheet_to_dataframe = {}
198          for key in sheet_to_dataframe:
199              df = sheet_to_dataframe[key]
200              # Row starts at 1 in Excel
201              df.index = df.index + 1
202              # Excel column names are Alphabet Characters
203              header = self._generate_excel_column_names(df.shape[1])
204              df.columns = header
205  
206              # Apply hyperlinks to cell values
207              if key in hyperlinks_by_sheet:
208                  for (row_idx, col_idx), url in hyperlinks_by_sheet[key].items():
209                      if row_idx < len(df) and col_idx < len(df.columns):
210                          cell_value = df.iat[row_idx, col_idx]
211                          text = str(cell_value) if pd.notna(cell_value) else ""
212                          if self.link_format == "markdown":
213                              df.iat[row_idx, col_idx] = f"[{text}]({url})"
214                          else:
215                              df.iat[row_idx, col_idx] = f"{text} ({url})"
216  
217              updated_sheet_to_dataframe[key] = df
218  
219          tables = []
220          metadata = []
221          for key, value in updated_sheet_to_dataframe.items():
222              if self.table_format == "csv":
223                  resolved_kwargs = {"index": True, "header": True, "lineterminator": "\n", **self.table_format_kwargs}
224                  tables.append(value.to_csv(**resolved_kwargs))
225              else:
226                  resolved_kwargs = {
227                      "index": True,
228                      "headers": value.columns,
229                      "tablefmt": "pipe",
230                      **self.table_format_kwargs,
231                  }
232                  # to_markdown uses tabulate
233                  tables.append(value.to_markdown(**resolved_kwargs))
234              # add sheet_name to metadata
235              metadata.append({"xlsx": {"sheet_name": key}})
236          return tables, metadata