document_store.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 import asyncio 6 import json 7 import math 8 import re 9 import uuid 10 from collections import Counter 11 from collections.abc import Callable, Iterable 12 from concurrent.futures import ThreadPoolExecutor 13 from dataclasses import dataclass, replace 14 from pathlib import Path 15 from typing import Any, Literal 16 17 import numpy as np 18 19 from haystack import default_from_dict, default_to_dict, logging 20 from haystack.dataclasses import Document 21 from haystack.document_stores.errors import DocumentStoreError, DuplicateDocumentError 22 from haystack.document_stores.types import DuplicatePolicy 23 from haystack.utils import expit 24 from haystack.utils.filters import document_matches_filter 25 26 logger = logging.getLogger(__name__) 27 28 # document scores are essentially unbounded and will be scaled to values between 0 and 1 if scale_score is set to 29 # True (default). Scaling uses the expit function (inverse of the logit function) after applying a scaling factor 30 # (e.g., BM25_SCALING_FACTOR for the bm25_retrieval method). 31 # Larger scaling factor decreases scaled scores. For example, an input of 10 is scaled to 0.99 with 32 # BM25_SCALING_FACTOR=2 but to 0.78 with BM25_SCALING_FACTOR=8 (default). The defaults were chosen empirically. 33 # Increase the default if most unscaled scores are larger than expected (>30) and otherwise would incorrectly all be 34 # mapped to scores ~1. 35 BM25_SCALING_FACTOR = 8 36 DOT_PRODUCT_SCALING_FACTOR = 100 37 38 39 @dataclass 40 class BM25DocumentStats: 41 """ 42 A dataclass for managing document statistics for BM25 retrieval. 43 44 :param freq_token: A Counter of token frequencies in the document. 45 :param doc_len: Number of tokens in the document. 46 """ 47 48 freq_token: dict[str, int] 49 doc_len: int 50 51 52 # Global storage for all InMemoryDocumentStore instances, indexed by the index name. 53 _STORAGES: dict[str, dict[str, Document]] = {} 54 _BM25_STATS_STORAGES: dict[str, dict[str, BM25DocumentStats]] = {} 55 _AVERAGE_DOC_LEN_STORAGES: dict[str, float] = {} 56 _FREQ_VOCAB_FOR_IDF_STORAGES: dict[str, Counter] = {} 57 58 59 class InMemoryDocumentStore: 60 """ 61 Stores data in-memory. It's ephemeral and cannot be saved to disk. 62 """ 63 64 def __init__( 65 self, 66 bm25_tokenization_regex: str = r"(?u)\b\w+\b", 67 bm25_algorithm: Literal["BM25Okapi", "BM25L", "BM25Plus"] = "BM25L", 68 bm25_parameters: dict | None = None, 69 embedding_similarity_function: Literal["dot_product", "cosine"] = "dot_product", 70 index: str | None = None, 71 async_executor: ThreadPoolExecutor | None = None, 72 return_embedding: bool = True, 73 ) -> None: 74 """ 75 Initializes the DocumentStore. 76 77 :param bm25_tokenization_regex: The regular expression used to tokenize the text for BM25 retrieval. 78 :param bm25_algorithm: The BM25 algorithm to use. One of "BM25Okapi", "BM25L", or "BM25Plus". 79 :param bm25_parameters: Parameters for BM25 implementation in a dictionary format. 80 For example: `{'k1':1.5, 'b':0.75, 'epsilon':0.25}` 81 You can learn more about these parameters by visiting https://github.com/dorianbrown/rank_bm25. 82 :param embedding_similarity_function: The similarity function used to compare Documents embeddings. 83 One of "dot_product" (default) or "cosine". To choose the most appropriate function, look for information 84 about your embedding model. 85 :param index: A specific index to store the documents. If not specified, a random UUID is used. 86 Using the same index allows you to store documents across multiple InMemoryDocumentStore instances. 87 :param async_executor: 88 Optional ThreadPoolExecutor to use for async calls. If not provided, a single-threaded 89 executor will be initialized and used. 90 :param return_embedding: Whether to return the embedding of the retrieved Documents. Default is True. 91 """ 92 self.bm25_tokenization_regex = bm25_tokenization_regex 93 self.tokenizer = re.compile(bm25_tokenization_regex).findall 94 95 if index is None: 96 index = str(uuid.uuid4()) 97 98 self.index = index 99 if self.index not in _STORAGES: 100 _STORAGES[self.index] = {} 101 102 self.bm25_algorithm = bm25_algorithm 103 self.bm25_algorithm_inst = self._dispatch_bm25() 104 self.bm25_parameters = bm25_parameters or {} 105 self.embedding_similarity_function = embedding_similarity_function 106 107 # Per-document statistics 108 if self.index not in _BM25_STATS_STORAGES: 109 _BM25_STATS_STORAGES[self.index] = {} 110 111 if self.index not in _AVERAGE_DOC_LEN_STORAGES: 112 _AVERAGE_DOC_LEN_STORAGES[self.index] = 0.0 113 114 if self.index not in _FREQ_VOCAB_FOR_IDF_STORAGES: 115 _FREQ_VOCAB_FOR_IDF_STORAGES[self.index] = Counter() 116 117 # keep track of whether we own the executor if we created it we must also clean it up 118 self._owns_executor = async_executor is None 119 self.executor = ( 120 ThreadPoolExecutor(thread_name_prefix=f"async-inmemory-docstore-executor-{id(self)}", max_workers=1) 121 if async_executor is None 122 else async_executor 123 ) 124 self.return_embedding = return_embedding 125 126 def __del__(self) -> None: 127 """ 128 Cleanup when the instance is being destroyed. 129 """ 130 if hasattr(self, "_owns_executor") and self._owns_executor and hasattr(self, "executor"): 131 self.executor.shutdown(wait=True) 132 133 def shutdown(self) -> None: 134 """ 135 Explicitly shutdown the executor if we own it. 136 """ 137 if self._owns_executor: 138 self.executor.shutdown(wait=True) 139 140 @property 141 def storage(self) -> dict[str, Document]: 142 """ 143 Utility property that returns the storage used by this instance of InMemoryDocumentStore. 144 """ 145 return _STORAGES.get(self.index, {}) 146 147 @property 148 def _bm25_attr(self) -> dict[str, BM25DocumentStats]: 149 return _BM25_STATS_STORAGES.get(self.index, {}) 150 151 @property 152 def _avg_doc_len(self) -> float: 153 return _AVERAGE_DOC_LEN_STORAGES.get(self.index, 0.0) 154 155 @_avg_doc_len.setter 156 def _avg_doc_len(self, value: float) -> None: 157 _AVERAGE_DOC_LEN_STORAGES[self.index] = value 158 159 @property 160 def _freq_vocab_for_idf(self) -> Counter: 161 return _FREQ_VOCAB_FOR_IDF_STORAGES.get(self.index, Counter()) 162 163 def _dispatch_bm25(self) -> "Callable[[str, list[Document]], list[tuple[Document, float]]]": 164 """ 165 Select the correct BM25 algorithm based on user specification. 166 167 :returns: 168 The BM25 algorithm method. 169 """ 170 table = {"BM25Okapi": self._score_bm25okapi, "BM25L": self._score_bm25l, "BM25Plus": self._score_bm25plus} 171 172 if self.bm25_algorithm not in table: 173 raise ValueError(f"BM25 algorithm '{self.bm25_algorithm}' is not supported.") 174 return table[self.bm25_algorithm] 175 176 def _tokenize_bm25(self, text: str) -> list[str]: 177 """ 178 Tokenize text using the BM25 tokenization regex. 179 180 Here we explicitly create a tokenization method to encapsulate 181 all pre-processing logic used to create BM25 tokens, such as 182 lowercasing. This helps track the exact tokenization process 183 used for BM25 scoring at any given time. 184 185 :param text: 186 The text to tokenize. 187 :returns: 188 A list of tokens. 189 """ 190 text = text.lower() 191 return self.tokenizer(text) 192 193 def _score_bm25l(self, query: str, documents: list[Document]) -> list[tuple[Document, float]]: 194 """ 195 Calculate BM25L scores for the given query and filtered documents. 196 197 :param query: 198 The query string. 199 :param documents: 200 The list of documents to score, should be produced by 201 the filter_documents method; may be an empty list. 202 :returns: 203 A list of tuples, each containing a Document and its BM25L score. 204 """ 205 k = self.bm25_parameters.get("k1", 1.5) 206 b = self.bm25_parameters.get("b", 0.75) 207 delta = self.bm25_parameters.get("delta", 0.5) 208 209 def _compute_idf(tokens: list[str]) -> dict[str, float]: 210 """Per-token IDF computation for all tokens.""" 211 idf = {} 212 n_corpus = len(self._bm25_attr) 213 for tok in tokens: 214 n = self._freq_vocab_for_idf.get(tok, 0) 215 idf[tok] = math.log((n_corpus + 1.0) / (n + 0.5)) * int(n != 0) 216 return idf 217 218 def _compute_tf(token: str, freq: dict[str, int], doc_len: int) -> float: 219 """Per-token BM25L computation.""" 220 freq_term = freq.get(token, 0.0) 221 ctd = freq_term / (1 - b + b * doc_len / self._avg_doc_len) 222 return (1.0 + k) * (ctd + delta) / (k + ctd + delta) 223 224 idf = _compute_idf(self._tokenize_bm25(query)) 225 bm25_attr = {doc.id: self._bm25_attr[doc.id] for doc in documents} 226 227 ret = [] 228 for doc in documents: 229 doc_stats = bm25_attr[doc.id] 230 freq = doc_stats.freq_token 231 doc_len = doc_stats.doc_len 232 233 score = 0.0 234 for tok in idf.keys(): 235 score += idf[tok] * _compute_tf(tok, freq, doc_len) 236 ret.append((doc, score)) 237 238 return ret 239 240 def _score_bm25okapi(self, query: str, documents: list[Document]) -> list[tuple[Document, float]]: 241 """ 242 Calculate BM25Okapi scores for the given query and filtered documents. 243 244 :param query: 245 The query string. 246 :param documents: 247 The list of documents to score, should be produced by 248 the filter_documents method; may be an empty list. 249 :returns: 250 A list of tuples, each containing a Document and its BM25Okapi score. 251 """ 252 k = self.bm25_parameters.get("k1", 1.5) 253 b = self.bm25_parameters.get("b", 0.75) 254 epsilon = self.bm25_parameters.get("epsilon", 0.25) 255 256 def _compute_idf(tokens: list[str]) -> dict[str, float]: 257 """Per-token IDF computation for all tokens.""" 258 sum_idf = 0.0 259 neg_idf_tokens = [] 260 261 # Although this is a global statistic, we compute it here 262 # to make the computation more self-contained. And the 263 # complexity is O(vocab_size), which is acceptable. 264 idf = {} 265 for tok, n in self._freq_vocab_for_idf.items(): 266 idf[tok] = math.log((len(self._bm25_attr) - n + 0.5) / (n + 0.5)) 267 sum_idf += idf[tok] 268 if idf[tok] < 0: 269 neg_idf_tokens.append(tok) 270 271 eps = epsilon * sum_idf / len(self._freq_vocab_for_idf) 272 for tok in neg_idf_tokens: 273 idf[tok] = eps 274 return {tok: idf.get(tok, 0.0) for tok in tokens} 275 276 def _compute_tf(token: str, freq: dict[str, int], doc_len: int) -> float: 277 """Per-token BM25Okapi computation.""" 278 freq_term = freq.get(token, 0.0) 279 freq_norm = freq_term + k * (1 - b + b * doc_len / self._avg_doc_len) 280 return freq_term * (1.0 + k) / freq_norm 281 282 idf = _compute_idf(self._tokenize_bm25(query)) 283 bm25_attr = {doc.id: self._bm25_attr[doc.id] for doc in documents} 284 285 ret = [] 286 for doc in documents: 287 doc_stats = bm25_attr[doc.id] 288 freq = doc_stats.freq_token 289 doc_len = doc_stats.doc_len 290 291 score = 0.0 292 for tok in idf.keys(): 293 score += idf[tok] * _compute_tf(tok, freq, doc_len) 294 ret.append((doc, score)) 295 296 return ret 297 298 def _score_bm25plus(self, query: str, documents: list[Document]) -> list[tuple[Document, float]]: 299 """ 300 Calculate BM25+ scores for the given query and filtered documents. 301 302 This implementation follows the document on BM25 Wikipedia page, 303 which add 1 (smoothing factor) to document frequency when computing IDF. 304 305 :param query: 306 The query string. 307 :param documents: 308 The list of documents to score, should be produced by 309 the filter_documents method; may be an empty list. 310 :returns: 311 A list of tuples, each containing a Document and its BM25+ score. 312 """ 313 k = self.bm25_parameters.get("k1", 1.5) 314 b = self.bm25_parameters.get("b", 0.75) 315 delta = self.bm25_parameters.get("delta", 1.0) 316 317 def _compute_idf(tokens: list[str]) -> dict[str, float]: 318 """Per-token IDF computation.""" 319 idf = {} 320 n_corpus = len(self._bm25_attr) 321 for tok in tokens: 322 n = self._freq_vocab_for_idf.get(tok, 0) 323 idf[tok] = math.log(1 + (n_corpus - n + 0.5) / (n + 0.5)) * int(n != 0) 324 return idf 325 326 def _compute_tf(token: str, freq: dict[str, int], doc_len: float) -> float: 327 """Per-token normalized term frequency.""" 328 freq_term = freq.get(token, 0.0) 329 freq_damp = k * (1 - b + b * doc_len / self._avg_doc_len) 330 return freq_term * (1.0 + k) / (freq_term + freq_damp) + delta 331 332 idf = _compute_idf(self._tokenize_bm25(query)) 333 bm25_attr = {doc.id: self._bm25_attr[doc.id] for doc in documents} 334 335 ret = [] 336 for doc in documents: 337 doc_stats = bm25_attr[doc.id] 338 freq = doc_stats.freq_token 339 doc_len = doc_stats.doc_len 340 341 score = 0.0 342 for tok in idf.keys(): 343 score += idf[tok] * _compute_tf(tok, freq, doc_len) 344 ret.append((doc, score)) 345 346 return ret 347 348 def to_dict(self) -> dict[str, Any]: 349 """ 350 Serializes the component to a dictionary. 351 352 :returns: 353 Dictionary with serialized data. 354 """ 355 return default_to_dict( 356 self, 357 bm25_tokenization_regex=self.bm25_tokenization_regex, 358 bm25_algorithm=self.bm25_algorithm, 359 bm25_parameters=self.bm25_parameters, 360 embedding_similarity_function=self.embedding_similarity_function, 361 index=self.index, 362 return_embedding=self.return_embedding, 363 ) 364 365 @classmethod 366 def from_dict(cls, data: dict[str, Any]) -> "InMemoryDocumentStore": 367 """ 368 Deserializes the component from a dictionary. 369 370 :param data: 371 The dictionary to deserialize from. 372 :returns: 373 The deserialized component. 374 """ 375 return default_from_dict(cls, data) 376 377 def save_to_disk(self, path: str) -> None: 378 """ 379 Write the database and its data to disk as a JSON file. 380 381 :param path: The path to the JSON file. 382 """ 383 data: dict[str, Any] = self.to_dict() 384 data["documents"] = [doc.to_dict(flatten=False) for doc in self.storage.values()] 385 with open(path, "w") as f: 386 json.dump(data, f) 387 388 @classmethod 389 def load_from_disk(cls, path: str) -> "InMemoryDocumentStore": 390 """ 391 Load the database and its data from disk as a JSON file. 392 393 :param path: The path to the JSON file. 394 :returns: The loaded InMemoryDocumentStore. 395 """ 396 if Path(path).exists(): 397 try: 398 with open(path) as f: 399 data = json.load(f) 400 except Exception as e: 401 raise DocumentStoreError(f"Error loading InMemoryDocumentStore from disk. error: {e}") from e 402 403 documents = data.pop("documents") 404 cls_object = default_from_dict(cls, data) 405 cls_object.write_documents( 406 documents=[Document(**doc) for doc in documents], policy=DuplicatePolicy.OVERWRITE 407 ) 408 return cls_object 409 410 raise FileNotFoundError(f"File {path} not found.") 411 412 def count_documents(self) -> int: 413 """ 414 Returns the number of documents present in the DocumentStore. 415 """ 416 return len(self.storage.keys()) 417 418 def filter_documents(self, filters: dict[str, Any] | None = None) -> list[Document]: 419 """ 420 Returns the documents that match the filters provided. 421 422 :param filters: The filters to apply. For a detailed specification of the filters, refer to the 423 [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering). 424 :returns: A list of Documents that match the given filters. 425 """ 426 if filters: 427 InMemoryDocumentStore._validate_filters(filters) 428 docs = [doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)] 429 else: 430 docs = list(self.storage.values()) 431 432 if not self.return_embedding: 433 docs = [replace(doc, embedding=None) for doc in docs] 434 435 return docs 436 437 def write_documents(self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int: 438 """ 439 Refer to the DocumentStore.write_documents() protocol documentation. 440 441 If `policy` is set to `DuplicatePolicy.NONE` defaults to `DuplicatePolicy.FAIL`. 442 """ 443 if ( 444 not isinstance(documents, Iterable) 445 or isinstance(documents, str) 446 or any(not isinstance(doc, Document) for doc in documents) 447 ): 448 raise ValueError("Please provide a list of Documents.") 449 450 if policy == DuplicatePolicy.NONE: 451 policy = DuplicatePolicy.FAIL 452 453 written_documents = len(documents) 454 for document in documents: 455 if policy != DuplicatePolicy.OVERWRITE and document.id in self.storage.keys(): 456 if policy == DuplicatePolicy.FAIL: 457 raise DuplicateDocumentError(f"ID '{document.id}' already exists.") 458 if policy == DuplicatePolicy.SKIP: 459 logger.warning("ID '{document_id}' already exists", document_id=document.id) 460 written_documents -= 1 461 continue 462 463 # Since the statistics are updated in an incremental manner, 464 # we need to explicitly remove the existing document to revert 465 # the statistics before updating them with the new document. 466 if document.id in self.storage.keys(): 467 self.delete_documents([document.id]) 468 469 tokens = [] 470 if document.content is not None: 471 tokens = self._tokenize_bm25(document.content) 472 473 self.storage[document.id] = document 474 475 self._bm25_attr[document.id] = BM25DocumentStats(Counter(tokens), len(tokens)) 476 self._freq_vocab_for_idf.update(set(tokens)) 477 # Update avg doc len based on the new document and the previous average 478 n_docs = len(self._bm25_attr) 479 self._avg_doc_len = (len(tokens) + self._avg_doc_len * (n_docs - 1)) / n_docs 480 return written_documents 481 482 def delete_documents(self, document_ids: list[str]) -> None: 483 """ 484 Deletes all documents with matching document_ids from the DocumentStore. 485 486 :param document_ids: The document_ids to delete. 487 """ 488 for doc_id in document_ids: 489 if doc_id not in self.storage.keys(): 490 continue 491 del self.storage[doc_id] 492 493 # Update statistics accordingly 494 doc_stats = self._bm25_attr.pop(doc_id) 495 freq = doc_stats.freq_token 496 doc_len = doc_stats.doc_len 497 498 self._freq_vocab_for_idf.subtract(Counter(freq.keys())) 499 try: 500 self._avg_doc_len = (self._avg_doc_len * (len(self._bm25_attr) + 1) - doc_len) / len(self._bm25_attr) 501 except ZeroDivisionError: 502 self._avg_doc_len = 0 503 504 @staticmethod 505 def _validate_filters(filters: dict[str, Any] | None) -> None: 506 if filters and "operator" not in filters and "conditions" not in filters: 507 raise ValueError( 508 "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details." 509 ) 510 511 def delete_all_documents(self) -> None: 512 """ 513 Deletes all documents in the document store. 514 """ 515 _STORAGES[self.index] = {} 516 _BM25_STATS_STORAGES[self.index] = {} 517 _AVERAGE_DOC_LEN_STORAGES[self.index] = 0.0 518 _FREQ_VOCAB_FOR_IDF_STORAGES[self.index] = Counter() 519 520 def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: 521 """ 522 Updates the metadata of all documents that match the provided filters. 523 524 :param filters: The filters to apply to select documents for updating. 525 For filter syntax, see filter_documents. 526 :param meta: The metadata fields to update. These will be merged with existing metadata. 527 :returns: The number of documents updated. 528 :raises ValueError: if filters have invalid syntax. 529 """ 530 InMemoryDocumentStore._validate_filters(filters) 531 matching = [doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)] 532 for doc in matching: 533 doc.meta.update(meta) 534 self.storage[doc.id] = doc 535 return len(matching) 536 537 def delete_by_filter(self, filters: dict[str, Any]) -> int: 538 """ 539 Deletes all documents that match the provided filters. 540 541 :param filters: The filters to apply to select documents for deletion. 542 For filter syntax, see filter_documents. 543 :returns: The number of documents deleted. 544 :raises ValueError: if filters have invalid syntax. 545 """ 546 InMemoryDocumentStore._validate_filters(filters) 547 matching = [doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)] 548 doc_ids = [doc.id for doc in matching] 549 self.delete_documents(doc_ids) 550 return len(doc_ids) 551 552 def count_documents_by_filter(self, filters: dict[str, Any]) -> int: 553 """ 554 Returns the number of documents that match the provided filters. 555 556 :param filters: The filters to apply. 557 For a detailed specification of the filters, refer to the 558 [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering). 559 :returns: The number of documents that match the filters. 560 """ 561 if filters: 562 InMemoryDocumentStore._validate_filters(filters) 563 return sum(1 for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)) 564 return len(self.storage) 565 566 def count_unique_metadata_by_filter(self, filters: dict[str, Any], metadata_fields: list[str]) -> dict[str, int]: 567 """ 568 Returns the number of unique values for each specified metadata field from documents matching the filters. 569 570 :param filters: The filters to apply. 571 For a detailed specification of the filters, refer to the 572 [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering). 573 :param metadata_fields: List of field names to count unique values for. 574 Field names can include or omit the "meta." prefix. 575 :returns: A dictionary mapping each metadata field name (without "meta." prefix) 576 to the count of its unique values among the filtered documents. 577 """ 578 if filters: 579 InMemoryDocumentStore._validate_filters(filters) 580 docs = [doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)] 581 else: 582 docs = list(self.storage.values()) 583 584 result: dict[str, int] = {} 585 for field in metadata_fields: 586 key = field.removeprefix("meta.") if field.startswith("meta.") else field 587 values = {doc.meta.get(key) for doc in docs if key in doc.meta and doc.meta[key] is not None} 588 result[key] = len(values) 589 return result 590 591 def get_metadata_fields_info(self) -> dict[str, dict[str, str]]: 592 """ 593 Returns information about the metadata fields present in the stored documents. 594 595 Types are inferred from the stored values (keyword, int, float, boolean). 596 597 :returns: A dictionary mapping each metadata field name to a dict with a "type" key. 598 """ 599 type_map: dict[str, str] = {} 600 for doc in self.storage.values(): 601 for key, value in doc.meta.items(): 602 if value is None: 603 continue 604 if isinstance(value, bool): 605 type_map[key] = "boolean" 606 elif isinstance(value, int): 607 type_map[key] = "int" 608 elif isinstance(value, float): 609 type_map[key] = "float" 610 else: 611 type_map[key] = "keyword" 612 return {k: {"type": v} for k, v in type_map.items()} 613 614 def get_metadata_field_min_max(self, metadata_field: str) -> dict[str, Any]: 615 """ 616 Returns the minimum and maximum values for the given metadata field across all documents. 617 618 :param metadata_field: The metadata field name. Can include or omit the "meta." prefix. 619 :returns: A dictionary with "min" and "max" keys. Returns `{"min": None, "max": None}` 620 if the field is missing or has no values. 621 """ 622 key = metadata_field.removeprefix("meta.") if metadata_field.startswith("meta.") else metadata_field 623 values = [ 624 doc.meta[key] 625 for doc in self.storage.values() 626 if key in doc.meta and doc.meta[key] is not None and isinstance(doc.meta[key], (int, float, str)) 627 ] 628 if not values: 629 return {"min": None, "max": None} 630 try: 631 return {"min": min(values), "max": max(values)} 632 except TypeError: 633 return {"min": None, "max": None} 634 635 def get_metadata_field_unique_values( 636 self, metadata_field: str, search_term: str | None = None 637 ) -> tuple[list[str], int]: 638 """ 639 Returns unique values for a metadata field, optionally filtered by a search term in content. 640 641 :param metadata_field: The metadata field name. Can include or omit the "meta." prefix. 642 :param search_term: If set, only documents whose content contains this term (case-insensitive) 643 are considered. 644 :returns: A tuple of (list of unique values, total count of unique values). 645 """ 646 key = metadata_field.removeprefix("meta.") if metadata_field.startswith("meta.") else metadata_field 647 if search_term: 648 docs = [doc for doc in self.storage.values() if doc.content and search_term.lower() in doc.content.lower()] 649 else: 650 docs = list(self.storage.values()) 651 values = sorted({str(doc.meta[key]) for doc in docs if key in doc.meta and doc.meta[key] is not None}, key=str) 652 return values, len(values) 653 654 def bm25_retrieval( 655 self, query: str, filters: dict[str, Any] | None = None, top_k: int = 10, scale_score: bool = False 656 ) -> list[Document]: 657 """ 658 Retrieves documents that are most relevant to the query using BM25 algorithm. 659 660 :param query: The query string. 661 :param filters: A dictionary with filters to narrow down the search space. 662 :param top_k: The number of top documents to retrieve. Default is 10. 663 :param scale_score: Whether to scale the scores of the retrieved documents. Default is False. 664 :returns: A list of the top_k documents most relevant to the query. 665 """ 666 if not query: 667 raise ValueError("Query should be a non-empty string") 668 669 content_type_filter = {"field": "content", "operator": "!=", "value": None} 670 if filters: 671 if "operator" not in filters: 672 raise ValueError( 673 "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details." 674 ) 675 filters = {"operator": "AND", "conditions": [content_type_filter, filters]} 676 else: 677 filters = content_type_filter 678 679 all_documents = self.filter_documents(filters=filters) 680 if len(all_documents) == 0: 681 logger.info("No documents found for BM25 retrieval. Returning empty list.") 682 return [] 683 684 results = sorted(self.bm25_algorithm_inst(query, all_documents), key=lambda x: x[1], reverse=True)[:top_k] 685 686 # BM25Okapi can return meaningful negative values, so they should not be filtered out when scale_score is False. 687 # It's the only algorithm supported by rank_bm25 at the time of writing (2024) that can return negative scores. 688 # see https://github.com/deepset-ai/haystack/pull/6889 for more context. 689 negatives_are_valid = self.bm25_algorithm == "BM25Okapi" and not scale_score 690 691 # Create documents with the BM25 score to return them 692 return_documents = [] 693 for doc, score in results: 694 if scale_score: 695 score = expit(score / BM25_SCALING_FACTOR) 696 697 if not negatives_are_valid and score <= 0.0: 698 continue 699 700 doc_fields = doc.to_dict() 701 doc_fields["score"] = score 702 703 if not self.return_embedding and "embedding" in doc_fields: 704 doc_fields.pop("embedding") 705 706 return_document = Document.from_dict(doc_fields) 707 708 return_documents.append(return_document) 709 710 return return_documents 711 712 def embedding_retrieval( 713 self, 714 query_embedding: list[float], 715 filters: dict[str, Any] | None = None, 716 top_k: int = 10, 717 scale_score: bool = False, 718 return_embedding: bool | None = False, 719 ) -> list[Document]: 720 """ 721 Retrieves documents that are most similar to the query embedding using a vector similarity metric. 722 723 :param query_embedding: Embedding of the query. 724 :param filters: A dictionary with filters to narrow down the search space. 725 :param top_k: The number of top documents to retrieve. Default is 10. 726 :param scale_score: Whether to scale the scores of the retrieved Documents. Default is False. 727 :param return_embedding: Whether to return the embedding of the retrieved Documents. 728 If not provided, the value of the `return_embedding` parameter set at component 729 initialization will be used. Default is False. 730 :returns: A list of the top_k documents most relevant to the query. 731 :raises ValueError: if filters have invalid syntax. 732 """ 733 if len(query_embedding) == 0 or not isinstance(query_embedding[0], float): 734 raise ValueError("query_embedding should be a non-empty list of floats.") 735 736 if filters: 737 InMemoryDocumentStore._validate_filters(filters) 738 all_documents = [ 739 doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc) 740 ] 741 else: 742 all_documents = list(self.storage.values()) 743 744 documents_with_embeddings = [doc for doc in all_documents if doc.embedding is not None] 745 if len(documents_with_embeddings) == 0: 746 logger.warning( 747 "No Documents found with embeddings. Returning empty list. " 748 "To generate embeddings, use a DocumentEmbedder." 749 ) 750 return [] 751 if len(documents_with_embeddings) < len(all_documents): 752 logger.info( 753 "Skipping some Documents that don't have an embedding. To generate embeddings, use a DocumentEmbedder." 754 ) 755 756 scores = self._compute_query_embedding_similarity_scores( 757 embedding=query_embedding, documents=documents_with_embeddings, scale_score=scale_score 758 ) 759 760 resolved_return_embedding = self.return_embedding if return_embedding is None else return_embedding 761 762 # create Documents with the similarity score for the top k results 763 top_documents = [] 764 for doc, score in sorted(zip(documents_with_embeddings, scores, strict=True), key=lambda x: x[1], reverse=True)[ 765 :top_k 766 ]: 767 doc_fields = doc.to_dict() 768 doc_fields["score"] = score 769 if resolved_return_embedding is False: 770 doc_fields["embedding"] = None 771 top_documents.append(Document.from_dict(doc_fields)) 772 773 return top_documents 774 775 def _compute_query_embedding_similarity_scores( 776 self, embedding: list[float], documents: list[Document], scale_score: bool = False 777 ) -> list[float]: 778 """ 779 Computes the similarity scores between the query embedding and the embeddings of the documents. 780 781 :param embedding: Embedding of the query. 782 :param documents: A list of Documents. 783 :param scale_score: Whether to scale the scores of the Documents. Default is False. 784 :returns: A list of scores. 785 """ 786 787 query_embedding = np.array(embedding) 788 if query_embedding.ndim == 1: 789 query_embedding = np.expand_dims(a=query_embedding, axis=0) 790 791 try: 792 document_embeddings = np.array([doc.embedding for doc in documents]) 793 except ValueError as e: 794 if "inhomogeneous shape" in str(e): 795 raise DocumentStoreError( 796 "The embedding size of all Documents should be the same. " 797 "Please make sure that the Documents have been embedded with the same model." 798 ) from e 799 raise e 800 if document_embeddings.ndim == 1: 801 document_embeddings = np.expand_dims(a=document_embeddings, axis=0) 802 803 if self.embedding_similarity_function == "cosine": 804 # cosine similarity is a normed dot product 805 query_embedding /= np.linalg.norm(x=query_embedding, axis=1, keepdims=True) 806 document_embeddings /= np.linalg.norm(x=document_embeddings, axis=1, keepdims=True) 807 808 try: 809 scores = np.dot(a=query_embedding, b=document_embeddings.T)[0].tolist() 810 except ValueError as e: 811 if "shapes" in str(e) and "not aligned" in str(e): 812 raise DocumentStoreError( 813 "The embedding size of the query should be the same as the embedding size of the Documents. " 814 "Please make sure that the query has been embedded with the same model as the Documents." 815 ) from e 816 raise e 817 818 if scale_score: 819 if self.embedding_similarity_function == "dot_product": 820 scores = [expit(float(score / DOT_PRODUCT_SCALING_FACTOR)) for score in scores] 821 elif self.embedding_similarity_function == "cosine": 822 scores = [(score + 1) / 2 for score in scores] 823 824 return scores 825 826 async def count_documents_async(self) -> int: 827 """ 828 Returns the number of documents present in the DocumentStore. 829 """ 830 return len(self.storage.keys()) 831 832 async def filter_documents_async(self, filters: dict[str, Any] | None = None) -> list[Document]: 833 """ 834 Returns the documents that match the filters provided. 835 836 :param filters: The filters to apply. For a detailed specification of the filters, refer to the 837 [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering). 838 :returns: A list of Documents that match the given filters. 839 """ 840 return await asyncio.get_running_loop().run_in_executor( 841 self.executor, lambda: self.filter_documents(filters=filters) 842 ) 843 844 async def write_documents_async( 845 self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE 846 ) -> int: 847 """ 848 Refer to the DocumentStore.write_documents() protocol documentation. 849 850 If `policy` is set to `DuplicatePolicy.NONE` defaults to `DuplicatePolicy.FAIL`. 851 """ 852 return await asyncio.get_running_loop().run_in_executor( 853 self.executor, lambda: self.write_documents(documents=documents, policy=policy) 854 ) 855 856 async def delete_documents_async(self, document_ids: list[str]) -> None: 857 """ 858 Deletes all documents with matching document_ids from the DocumentStore. 859 860 :param document_ids: The document_ids to delete. 861 """ 862 await asyncio.get_running_loop().run_in_executor( 863 self.executor, lambda: self.delete_documents(document_ids=document_ids) 864 ) 865 866 async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: 867 """ 868 Updates the metadata of all documents that match the provided filters. 869 870 :param filters: The filters to apply to select documents for updating. 871 For filter syntax, see filter_documents. 872 :param meta: The metadata fields to update. These will be merged with existing metadata. 873 :returns: The number of documents updated. 874 """ 875 return await asyncio.get_running_loop().run_in_executor( 876 self.executor, lambda: self.update_by_filter(filters=filters, meta=meta) 877 ) 878 879 async def count_documents_by_filter_async(self, filters: dict[str, Any]) -> int: 880 """ 881 Returns the number of documents that match the provided filters. 882 883 :param filters: The filters to apply. 884 For a detailed specification of the filters, refer to the 885 [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering). 886 :returns: The number of documents that match the filters. 887 """ 888 return await asyncio.get_running_loop().run_in_executor( 889 self.executor, lambda: self.count_documents_by_filter(filters=filters) 890 ) 891 892 async def count_unique_metadata_by_filter_async( 893 self, filters: dict[str, Any], metadata_fields: list[str] 894 ) -> dict[str, int]: 895 """ 896 Returns the number of unique values for each specified metadata field from documents matching the filters. 897 898 :param filters: The filters to apply. 899 For a detailed specification of the filters, refer to the 900 [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering). 901 :param metadata_fields: List of field names to count unique values for. 902 Field names can include or omit the "meta." prefix. 903 :returns: A dictionary mapping each metadata field name (without "meta." prefix) 904 to the count of its unique values among the filtered documents. 905 """ 906 return await asyncio.get_running_loop().run_in_executor( 907 self.executor, 908 lambda: self.count_unique_metadata_by_filter(filters=filters, metadata_fields=metadata_fields), 909 ) 910 911 async def get_metadata_fields_info_async(self) -> dict[str, dict[str, str]]: 912 """ 913 Returns information about the metadata fields present in the stored documents. 914 915 Types are inferred from the stored values (keyword, int, float, boolean). 916 917 :returns: A dictionary mapping each metadata field name to a dict with a "type" key. 918 """ 919 return await asyncio.get_running_loop().run_in_executor(self.executor, self.get_metadata_fields_info) 920 921 async def get_metadata_field_min_max_async(self, metadata_field: str) -> dict[str, Any]: 922 """ 923 Returns the minimum and maximum values for the given metadata field across all documents. 924 925 :param metadata_field: The metadata field name. Can include or omit the "meta." prefix. 926 :returns: A dictionary with "min" and "max" keys. Returns `{"min": None, "max": None}` 927 if the field is missing or has no values. 928 """ 929 return await asyncio.get_running_loop().run_in_executor( 930 self.executor, lambda: self.get_metadata_field_min_max(metadata_field=metadata_field) 931 ) 932 933 async def get_metadata_field_unique_values_async( 934 self, metadata_field: str, search_term: str | None = None 935 ) -> tuple[list[str], int]: 936 """ 937 Returns unique values for a metadata field, optionally filtered by a search term in content. 938 939 :param metadata_field: The metadata field name. Can include or omit the "meta." prefix. 940 :param search_term: If set, only documents whose content contains this term (case-insensitive) 941 are considered. 942 :returns: A tuple of (list of unique values, total count of unique values). 943 """ 944 return await asyncio.get_running_loop().run_in_executor( 945 self.executor, 946 lambda: self.get_metadata_field_unique_values(metadata_field=metadata_field, search_term=search_term), 947 ) 948 949 async def delete_all_documents_async(self) -> None: 950 """ 951 Deletes all documents in the document store. 952 """ 953 await asyncio.get_running_loop().run_in_executor(self.executor, self.delete_all_documents) 954 955 async def bm25_retrieval_async( 956 self, query: str, filters: dict[str, Any] | None = None, top_k: int = 10, scale_score: bool = False 957 ) -> list[Document]: 958 """ 959 Retrieves documents that are most relevant to the query using BM25 algorithm. 960 961 :param query: The query string. 962 :param filters: A dictionary with filters to narrow down the search space. 963 :param top_k: The number of top documents to retrieve. Default is 10. 964 :param scale_score: Whether to scale the scores of the retrieved documents. Default is False. 965 :returns: A list of the top_k documents most relevant to the query. 966 """ 967 return await asyncio.get_running_loop().run_in_executor( 968 self.executor, 969 lambda: self.bm25_retrieval(query=query, filters=filters, top_k=top_k, scale_score=scale_score), 970 ) 971 972 async def embedding_retrieval_async( 973 self, 974 query_embedding: list[float], 975 filters: dict[str, Any] | None = None, 976 top_k: int = 10, 977 scale_score: bool = False, 978 return_embedding: bool = False, 979 ) -> list[Document]: 980 """ 981 Retrieves documents that are most similar to the query embedding using a vector similarity metric. 982 983 :param query_embedding: Embedding of the query. 984 :param filters: A dictionary with filters to narrow down the search space. 985 :param top_k: The number of top documents to retrieve. Default is 10. 986 :param scale_score: Whether to scale the scores of the retrieved Documents. Default is False. 987 :param return_embedding: Whether to return the embedding of the retrieved Documents. Default is False. 988 :returns: A list of the top_k documents most relevant to the query. 989 """ 990 return await asyncio.get_running_loop().run_in_executor( 991 self.executor, 992 lambda: self.embedding_retrieval( 993 query_embedding=query_embedding, 994 filters=filters, 995 top_k=top_k, 996 scale_score=scale_score, 997 return_embedding=return_embedding, 998 ), 999 )