xlsx.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 import io 6 import os 7 from pathlib import Path 8 from typing import Any, Literal 9 10 from haystack import Document, component, logging 11 from haystack.components.converters.utils import get_bytestream_from_source, normalize_metadata 12 from haystack.dataclasses import ByteStream 13 from haystack.lazy_imports import LazyImport 14 15 logger = logging.getLogger(__name__) 16 17 with LazyImport("Run 'pip install pandas openpyxl'") as pandas_xlsx_import: 18 import openpyxl 19 import pandas as pd 20 21 with LazyImport("Run 'pip install tabulate'") as tabulate_import: 22 from tabulate import tabulate # noqa: F401 # the library is used but not directly referenced 23 24 25 @component 26 class XLSXToDocument: 27 """ 28 Converts XLSX (Excel) files into Documents. 29 30 Supports reading data from specific sheets or all sheets in the Excel file. If all sheets are read, a Document is 31 created for each sheet. The content of the Document is the table which can be saved in CSV or Markdown format. 32 33 ### Usage example 34 35 ```python 36 from haystack.components.converters.xlsx import XLSXToDocument 37 from datetime import datetime 38 39 converter = XLSXToDocument() 40 results = converter.run( 41 sources=["test/test_files/xlsx/basic_tables_two_sheets.xlsx"], meta={"date_added": datetime.now().isoformat()} 42 ) 43 documents = results["documents"] 44 45 print(documents[0].content) 46 # >> ",A,B\\n1,col_a,col_b\\n2,1.5,test\\n" 47 ``` 48 """ 49 50 def __init__( 51 self, 52 table_format: Literal["csv", "markdown"] = "csv", 53 sheet_name: str | int | list[str | int] | None = None, 54 read_excel_kwargs: dict[str, Any] | None = None, 55 table_format_kwargs: dict[str, Any] | None = None, 56 *, 57 link_format: Literal["markdown", "plain", "none"] = "none", 58 store_full_path: bool = False, 59 ) -> None: 60 """ 61 Creates a XLSXToDocument component. 62 63 :param table_format: The format to convert the Excel file to. 64 :param sheet_name: The name of the sheet to read. If None, all sheets are read. 65 :param read_excel_kwargs: Additional arguments to pass to `pandas.read_excel`. 66 See https://pandas.pydata.org/docs/reference/api/pandas.read_excel.html#pandas-read-excel 67 :param table_format_kwargs: Additional keyword arguments to pass to the table format function. 68 - If `table_format` is "csv", these arguments are passed to `pandas.DataFrame.to_csv`. 69 See https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html#pandas-dataframe-to-csv 70 - If `table_format` is "markdown", these arguments are passed to `pandas.DataFrame.to_markdown`. 71 See https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_markdown.html#pandas-dataframe-to-markdown 72 :param link_format: The format for link output. Possible options: 73 - `"markdown"`: `[text](url)` 74 - `"plain"`: `text (url)` 75 - `"none"`: Only the text is extracted, link addresses are ignored. 76 :param store_full_path: 77 If True, the full path of the file is stored in the metadata of the document. 78 If False, only the file name is stored. 79 """ 80 pandas_xlsx_import.check() 81 self.table_format = table_format 82 if table_format not in ["csv", "markdown"]: 83 raise ValueError(f"Unsupported export format: {table_format}. Choose either 'csv' or 'markdown'.") 84 if link_format not in ("markdown", "plain", "none"): 85 msg = f"Unknown link format '{link_format}'. Supported formats are: 'markdown', 'plain', 'none'" 86 raise ValueError(msg) 87 if table_format == "markdown": 88 tabulate_import.check() 89 self.link_format = link_format 90 self.sheet_name = sheet_name 91 self.read_excel_kwargs = read_excel_kwargs or {} 92 self.table_format_kwargs = table_format_kwargs or {} 93 self.store_full_path = store_full_path 94 95 @component.output_types(documents=list[Document]) 96 def run( 97 self, sources: list[str | Path | ByteStream], meta: dict[str, Any] | list[dict[str, Any]] | None = None 98 ) -> dict[str, list[Document]]: 99 """ 100 Converts a XLSX file to a Document. 101 102 :param sources: 103 List of file paths or ByteStream objects. 104 :param meta: 105 Optional metadata to attach to the documents. 106 This value can be either a list of dictionaries or a single dictionary. 107 If it's a single dictionary, its content is added to the metadata of all produced documents. 108 If it's a list, the length of the list must match the number of sources, because the two lists will 109 be zipped. 110 If `sources` contains ByteStream objects, their `meta` will be added to the output documents. 111 :returns: 112 A dictionary with the following keys: 113 - `documents`: Created documents 114 """ 115 documents = [] 116 117 meta_list = normalize_metadata(meta, sources_count=len(sources)) 118 119 for source, metadata in zip(sources, meta_list, strict=True): 120 try: 121 bytestream = get_bytestream_from_source(source) 122 except Exception as e: 123 logger.warning("Could not read {source}. Skipping it. Error: {error}", source=source, error=e) 124 continue 125 126 try: 127 tables, tables_metadata = self._extract_tables(bytestream) 128 except Exception as e: 129 logger.warning( 130 "Could not read {source} and convert it to a Document, skipping. Error: {error}", 131 source=source, 132 error=e, 133 ) 134 continue 135 136 # Loop over tables and create a Document for each table 137 for table, excel_metadata in zip(tables, tables_metadata, strict=True): 138 merged_metadata = {**bytestream.meta, **metadata, **excel_metadata} 139 140 if not self.store_full_path and "file_path" in bytestream.meta: 141 file_path = bytestream.meta["file_path"] 142 merged_metadata["file_path"] = os.path.basename(file_path) 143 144 document = Document(content=table, meta=merged_metadata) 145 documents.append(document) 146 147 return {"documents": documents} 148 149 @staticmethod 150 def _generate_excel_column_names(n_cols: int) -> list[str]: 151 result = [] 152 for i in range(n_cols): 153 col_name = "" 154 num = i 155 while num >= 0: 156 col_name = chr(num % 26 + 65) + col_name 157 num = num // 26 - 1 158 result.append(col_name) 159 return result 160 161 def _extract_tables(self, bytestream: ByteStream) -> tuple[list[str], list[dict]]: 162 """ 163 Extract tables from an Excel file. 164 """ 165 file_bytes = io.BytesIO(bytestream.data) 166 resolved_read_excel_kwargs = { 167 **self.read_excel_kwargs, 168 "sheet_name": self.sheet_name, 169 "header": None, # Don't assign any pandas column labels 170 "engine": "openpyxl", # Use openpyxl as the engine to read the Excel file 171 } 172 sheet_to_dataframe = pd.read_excel(io=file_bytes, **resolved_read_excel_kwargs) 173 if isinstance(sheet_to_dataframe, pd.DataFrame): 174 sheet_to_dataframe = {self.sheet_name: sheet_to_dataframe} 175 176 # If link extraction is enabled, load the workbook with openpyxl to read hyperlinks 177 hyperlinks_by_sheet: dict[str | int | None, dict[tuple[int, int], str]] = {} 178 if self.link_format != "none": 179 file_bytes.seek(0) 180 wb = openpyxl.load_workbook(file_bytes, data_only=True) 181 for sheet_key in sheet_to_dataframe: 182 if isinstance(sheet_key, int): 183 ws = wb.worksheets[sheet_key] 184 elif sheet_key is None: 185 ws = wb.active 186 else: 187 ws = wb[sheet_key] 188 cell_links: dict[tuple[int, int], str] = {} 189 for row in ws.iter_rows(): 190 for cell in row: 191 if cell.hyperlink and cell.hyperlink.target: 192 # Convert to 0-based indices to match DataFrame positions 193 cell_links[(cell.row - 1, cell.column - 1)] = cell.hyperlink.target 194 hyperlinks_by_sheet[sheet_key] = cell_links 195 wb.close() 196 197 updated_sheet_to_dataframe = {} 198 for key in sheet_to_dataframe: 199 df = sheet_to_dataframe[key] 200 # Row starts at 1 in Excel 201 df.index = df.index + 1 202 # Excel column names are Alphabet Characters 203 header = self._generate_excel_column_names(df.shape[1]) 204 df.columns = header 205 206 # Apply hyperlinks to cell values 207 if key in hyperlinks_by_sheet: 208 for (row_idx, col_idx), url in hyperlinks_by_sheet[key].items(): 209 if row_idx < len(df) and col_idx < len(df.columns): 210 cell_value = df.iat[row_idx, col_idx] 211 text = str(cell_value) if pd.notna(cell_value) else "" 212 if self.link_format == "markdown": 213 df.iat[row_idx, col_idx] = f"[{text}]({url})" 214 else: 215 df.iat[row_idx, col_idx] = f"{text} ({url})" 216 217 updated_sheet_to_dataframe[key] = df 218 219 tables = [] 220 metadata = [] 221 for key, value in updated_sheet_to_dataframe.items(): 222 if self.table_format == "csv": 223 resolved_kwargs = {"index": True, "header": True, "lineterminator": "\n", **self.table_format_kwargs} 224 tables.append(value.to_csv(**resolved_kwargs)) 225 else: 226 resolved_kwargs = { 227 "index": True, 228 "headers": value.columns, 229 "tablefmt": "pipe", 230 **self.table_format_kwargs, 231 } 232 # to_markdown uses tabulate 233 tables.append(value.to_markdown(**resolved_kwargs)) 234 # add sheet_name to metadata 235 metadata.append({"xlsx": {"sheet_name": key}}) 236 return tables, metadata