/ haystack / document_stores / in_memory / document_store.py
document_store.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  import asyncio
  6  import json
  7  import math
  8  import re
  9  import uuid
 10  from collections import Counter
 11  from collections.abc import Callable, Iterable
 12  from concurrent.futures import ThreadPoolExecutor
 13  from dataclasses import dataclass, replace
 14  from pathlib import Path
 15  from typing import Any, Literal
 16  
 17  import numpy as np
 18  
 19  from haystack import default_from_dict, default_to_dict, logging
 20  from haystack.dataclasses import Document
 21  from haystack.document_stores.errors import DocumentStoreError, DuplicateDocumentError
 22  from haystack.document_stores.types import DuplicatePolicy
 23  from haystack.utils import expit
 24  from haystack.utils.filters import document_matches_filter
 25  
 26  logger = logging.getLogger(__name__)
 27  
 28  # document scores are essentially unbounded and will be scaled to values between 0 and 1 if scale_score is set to
 29  # True (default). Scaling uses the expit function (inverse of the logit function) after applying a scaling factor
 30  # (e.g., BM25_SCALING_FACTOR for the bm25_retrieval method).
 31  # Larger scaling factor decreases scaled scores. For example, an input of 10 is scaled to 0.99 with
 32  # BM25_SCALING_FACTOR=2 but to 0.78 with BM25_SCALING_FACTOR=8 (default). The defaults were chosen empirically.
 33  # Increase the default if most unscaled scores are larger than expected (>30) and otherwise would incorrectly all be
 34  # mapped to scores ~1.
 35  BM25_SCALING_FACTOR = 8
 36  DOT_PRODUCT_SCALING_FACTOR = 100
 37  
 38  
 39  @dataclass
 40  class BM25DocumentStats:
 41      """
 42      A dataclass for managing document statistics for BM25 retrieval.
 43  
 44      :param freq_token: A Counter of token frequencies in the document.
 45      :param doc_len: Number of tokens in the document.
 46      """
 47  
 48      freq_token: dict[str, int]
 49      doc_len: int
 50  
 51  
 52  # Global storage for all InMemoryDocumentStore instances, indexed by the index name.
 53  _STORAGES: dict[str, dict[str, Document]] = {}
 54  _BM25_STATS_STORAGES: dict[str, dict[str, BM25DocumentStats]] = {}
 55  _AVERAGE_DOC_LEN_STORAGES: dict[str, float] = {}
 56  _FREQ_VOCAB_FOR_IDF_STORAGES: dict[str, Counter] = {}
 57  
 58  
 59  class InMemoryDocumentStore:
 60      """
 61      Stores data in-memory. It's ephemeral and cannot be saved to disk.
 62      """
 63  
 64      def __init__(
 65          self,
 66          bm25_tokenization_regex: str = r"(?u)\b\w+\b",
 67          bm25_algorithm: Literal["BM25Okapi", "BM25L", "BM25Plus"] = "BM25L",
 68          bm25_parameters: dict | None = None,
 69          embedding_similarity_function: Literal["dot_product", "cosine"] = "dot_product",
 70          index: str | None = None,
 71          async_executor: ThreadPoolExecutor | None = None,
 72          return_embedding: bool = True,
 73      ) -> None:
 74          """
 75          Initializes the DocumentStore.
 76  
 77          :param bm25_tokenization_regex: The regular expression used to tokenize the text for BM25 retrieval.
 78          :param bm25_algorithm: The BM25 algorithm to use. One of "BM25Okapi", "BM25L", or "BM25Plus".
 79          :param bm25_parameters: Parameters for BM25 implementation in a dictionary format.
 80              For example: `{'k1':1.5, 'b':0.75, 'epsilon':0.25}`
 81              You can learn more about these parameters by visiting https://github.com/dorianbrown/rank_bm25.
 82          :param embedding_similarity_function: The similarity function used to compare Documents embeddings.
 83              One of "dot_product" (default) or "cosine". To choose the most appropriate function, look for information
 84              about your embedding model.
 85          :param index: A specific index to store the documents. If not specified, a random UUID is used.
 86              Using the same index allows you to store documents across multiple InMemoryDocumentStore instances.
 87          :param async_executor:
 88              Optional ThreadPoolExecutor to use for async calls. If not provided, a single-threaded
 89              executor will be initialized and used.
 90          :param return_embedding: Whether to return the embedding of the retrieved Documents. Default is True.
 91          """
 92          self.bm25_tokenization_regex = bm25_tokenization_regex
 93          self.tokenizer = re.compile(bm25_tokenization_regex).findall
 94  
 95          if index is None:
 96              index = str(uuid.uuid4())
 97  
 98          self.index = index
 99          if self.index not in _STORAGES:
100              _STORAGES[self.index] = {}
101  
102          self.bm25_algorithm = bm25_algorithm
103          self.bm25_algorithm_inst = self._dispatch_bm25()
104          self.bm25_parameters = bm25_parameters or {}
105          self.embedding_similarity_function = embedding_similarity_function
106  
107          # Per-document statistics
108          if self.index not in _BM25_STATS_STORAGES:
109              _BM25_STATS_STORAGES[self.index] = {}
110  
111          if self.index not in _AVERAGE_DOC_LEN_STORAGES:
112              _AVERAGE_DOC_LEN_STORAGES[self.index] = 0.0
113  
114          if self.index not in _FREQ_VOCAB_FOR_IDF_STORAGES:
115              _FREQ_VOCAB_FOR_IDF_STORAGES[self.index] = Counter()
116  
117          # keep track of whether we own the executor if we created it we must also clean it up
118          self._owns_executor = async_executor is None
119          self.executor = (
120              ThreadPoolExecutor(thread_name_prefix=f"async-inmemory-docstore-executor-{id(self)}", max_workers=1)
121              if async_executor is None
122              else async_executor
123          )
124          self.return_embedding = return_embedding
125  
126      def __del__(self) -> None:
127          """
128          Cleanup when the instance is being destroyed.
129          """
130          if hasattr(self, "_owns_executor") and self._owns_executor and hasattr(self, "executor"):
131              self.executor.shutdown(wait=True)
132  
133      def shutdown(self) -> None:
134          """
135          Explicitly shutdown the executor if we own it.
136          """
137          if self._owns_executor:
138              self.executor.shutdown(wait=True)
139  
140      @property
141      def storage(self) -> dict[str, Document]:
142          """
143          Utility property that returns the storage used by this instance of InMemoryDocumentStore.
144          """
145          return _STORAGES.get(self.index, {})
146  
147      @property
148      def _bm25_attr(self) -> dict[str, BM25DocumentStats]:
149          return _BM25_STATS_STORAGES.get(self.index, {})
150  
151      @property
152      def _avg_doc_len(self) -> float:
153          return _AVERAGE_DOC_LEN_STORAGES.get(self.index, 0.0)
154  
155      @_avg_doc_len.setter
156      def _avg_doc_len(self, value: float) -> None:
157          _AVERAGE_DOC_LEN_STORAGES[self.index] = value
158  
159      @property
160      def _freq_vocab_for_idf(self) -> Counter:
161          return _FREQ_VOCAB_FOR_IDF_STORAGES.get(self.index, Counter())
162  
163      def _dispatch_bm25(self) -> "Callable[[str, list[Document]], list[tuple[Document, float]]]":
164          """
165          Select the correct BM25 algorithm based on user specification.
166  
167          :returns:
168              The BM25 algorithm method.
169          """
170          table = {"BM25Okapi": self._score_bm25okapi, "BM25L": self._score_bm25l, "BM25Plus": self._score_bm25plus}
171  
172          if self.bm25_algorithm not in table:
173              raise ValueError(f"BM25 algorithm '{self.bm25_algorithm}' is not supported.")
174          return table[self.bm25_algorithm]
175  
176      def _tokenize_bm25(self, text: str) -> list[str]:
177          """
178          Tokenize text using the BM25 tokenization regex.
179  
180          Here we explicitly create a tokenization method to encapsulate
181          all pre-processing logic used to create BM25 tokens, such as
182          lowercasing. This helps track the exact tokenization process
183          used for BM25 scoring at any given time.
184  
185          :param text:
186              The text to tokenize.
187          :returns:
188              A list of tokens.
189          """
190          text = text.lower()
191          return self.tokenizer(text)
192  
193      def _score_bm25l(self, query: str, documents: list[Document]) -> list[tuple[Document, float]]:
194          """
195          Calculate BM25L scores for the given query and filtered documents.
196  
197          :param query:
198              The query string.
199          :param documents:
200              The list of documents to score, should be produced by
201              the filter_documents method; may be an empty list.
202          :returns:
203              A list of tuples, each containing a Document and its BM25L score.
204          """
205          k = self.bm25_parameters.get("k1", 1.5)
206          b = self.bm25_parameters.get("b", 0.75)
207          delta = self.bm25_parameters.get("delta", 0.5)
208  
209          def _compute_idf(tokens: list[str]) -> dict[str, float]:
210              """Per-token IDF computation for all tokens."""
211              idf = {}
212              n_corpus = len(self._bm25_attr)
213              for tok in tokens:
214                  n = self._freq_vocab_for_idf.get(tok, 0)
215                  idf[tok] = math.log((n_corpus + 1.0) / (n + 0.5)) * int(n != 0)
216              return idf
217  
218          def _compute_tf(token: str, freq: dict[str, int], doc_len: int) -> float:
219              """Per-token BM25L computation."""
220              freq_term = freq.get(token, 0.0)
221              ctd = freq_term / (1 - b + b * doc_len / self._avg_doc_len)
222              return (1.0 + k) * (ctd + delta) / (k + ctd + delta)
223  
224          idf = _compute_idf(self._tokenize_bm25(query))
225          bm25_attr = {doc.id: self._bm25_attr[doc.id] for doc in documents}
226  
227          ret = []
228          for doc in documents:
229              doc_stats = bm25_attr[doc.id]
230              freq = doc_stats.freq_token
231              doc_len = doc_stats.doc_len
232  
233              score = 0.0
234              for tok in idf.keys():
235                  score += idf[tok] * _compute_tf(tok, freq, doc_len)
236              ret.append((doc, score))
237  
238          return ret
239  
240      def _score_bm25okapi(self, query: str, documents: list[Document]) -> list[tuple[Document, float]]:
241          """
242          Calculate BM25Okapi scores for the given query and filtered documents.
243  
244          :param query:
245              The query string.
246          :param documents:
247              The list of documents to score, should be produced by
248              the filter_documents method; may be an empty list.
249          :returns:
250              A list of tuples, each containing a Document and its BM25Okapi score.
251          """
252          k = self.bm25_parameters.get("k1", 1.5)
253          b = self.bm25_parameters.get("b", 0.75)
254          epsilon = self.bm25_parameters.get("epsilon", 0.25)
255  
256          def _compute_idf(tokens: list[str]) -> dict[str, float]:
257              """Per-token IDF computation for all tokens."""
258              sum_idf = 0.0
259              neg_idf_tokens = []
260  
261              # Although this is a global statistic, we compute it here
262              # to make the computation more self-contained. And the
263              # complexity is O(vocab_size), which is acceptable.
264              idf = {}
265              for tok, n in self._freq_vocab_for_idf.items():
266                  idf[tok] = math.log((len(self._bm25_attr) - n + 0.5) / (n + 0.5))
267                  sum_idf += idf[tok]
268                  if idf[tok] < 0:
269                      neg_idf_tokens.append(tok)
270  
271              eps = epsilon * sum_idf / len(self._freq_vocab_for_idf)
272              for tok in neg_idf_tokens:
273                  idf[tok] = eps
274              return {tok: idf.get(tok, 0.0) for tok in tokens}
275  
276          def _compute_tf(token: str, freq: dict[str, int], doc_len: int) -> float:
277              """Per-token BM25Okapi computation."""
278              freq_term = freq.get(token, 0.0)
279              freq_norm = freq_term + k * (1 - b + b * doc_len / self._avg_doc_len)
280              return freq_term * (1.0 + k) / freq_norm
281  
282          idf = _compute_idf(self._tokenize_bm25(query))
283          bm25_attr = {doc.id: self._bm25_attr[doc.id] for doc in documents}
284  
285          ret = []
286          for doc in documents:
287              doc_stats = bm25_attr[doc.id]
288              freq = doc_stats.freq_token
289              doc_len = doc_stats.doc_len
290  
291              score = 0.0
292              for tok in idf.keys():
293                  score += idf[tok] * _compute_tf(tok, freq, doc_len)
294              ret.append((doc, score))
295  
296          return ret
297  
298      def _score_bm25plus(self, query: str, documents: list[Document]) -> list[tuple[Document, float]]:
299          """
300          Calculate BM25+ scores for the given query and filtered documents.
301  
302          This implementation follows the document on BM25 Wikipedia page,
303          which add 1 (smoothing factor) to document frequency when computing IDF.
304  
305          :param query:
306              The query string.
307          :param documents:
308              The list of documents to score, should be produced by
309              the filter_documents method; may be an empty list.
310          :returns:
311              A list of tuples, each containing a Document and its BM25+ score.
312          """
313          k = self.bm25_parameters.get("k1", 1.5)
314          b = self.bm25_parameters.get("b", 0.75)
315          delta = self.bm25_parameters.get("delta", 1.0)
316  
317          def _compute_idf(tokens: list[str]) -> dict[str, float]:
318              """Per-token IDF computation."""
319              idf = {}
320              n_corpus = len(self._bm25_attr)
321              for tok in tokens:
322                  n = self._freq_vocab_for_idf.get(tok, 0)
323                  idf[tok] = math.log(1 + (n_corpus - n + 0.5) / (n + 0.5)) * int(n != 0)
324              return idf
325  
326          def _compute_tf(token: str, freq: dict[str, int], doc_len: float) -> float:
327              """Per-token normalized term frequency."""
328              freq_term = freq.get(token, 0.0)
329              freq_damp = k * (1 - b + b * doc_len / self._avg_doc_len)
330              return freq_term * (1.0 + k) / (freq_term + freq_damp) + delta
331  
332          idf = _compute_idf(self._tokenize_bm25(query))
333          bm25_attr = {doc.id: self._bm25_attr[doc.id] for doc in documents}
334  
335          ret = []
336          for doc in documents:
337              doc_stats = bm25_attr[doc.id]
338              freq = doc_stats.freq_token
339              doc_len = doc_stats.doc_len
340  
341              score = 0.0
342              for tok in idf.keys():
343                  score += idf[tok] * _compute_tf(tok, freq, doc_len)
344              ret.append((doc, score))
345  
346          return ret
347  
348      def to_dict(self) -> dict[str, Any]:
349          """
350          Serializes the component to a dictionary.
351  
352          :returns:
353              Dictionary with serialized data.
354          """
355          return default_to_dict(
356              self,
357              bm25_tokenization_regex=self.bm25_tokenization_regex,
358              bm25_algorithm=self.bm25_algorithm,
359              bm25_parameters=self.bm25_parameters,
360              embedding_similarity_function=self.embedding_similarity_function,
361              index=self.index,
362              return_embedding=self.return_embedding,
363          )
364  
365      @classmethod
366      def from_dict(cls, data: dict[str, Any]) -> "InMemoryDocumentStore":
367          """
368          Deserializes the component from a dictionary.
369  
370          :param data:
371              The dictionary to deserialize from.
372          :returns:
373              The deserialized component.
374          """
375          return default_from_dict(cls, data)
376  
377      def save_to_disk(self, path: str) -> None:
378          """
379          Write the database and its data to disk as a JSON file.
380  
381          :param path: The path to the JSON file.
382          """
383          data: dict[str, Any] = self.to_dict()
384          data["documents"] = [doc.to_dict(flatten=False) for doc in self.storage.values()]
385          with open(path, "w") as f:
386              json.dump(data, f)
387  
388      @classmethod
389      def load_from_disk(cls, path: str) -> "InMemoryDocumentStore":
390          """
391          Load the database and its data from disk as a JSON file.
392  
393          :param path: The path to the JSON file.
394          :returns: The loaded InMemoryDocumentStore.
395          """
396          if Path(path).exists():
397              try:
398                  with open(path) as f:
399                      data = json.load(f)
400              except Exception as e:
401                  raise DocumentStoreError(f"Error loading InMemoryDocumentStore from disk. error: {e}") from e
402  
403              documents = data.pop("documents")
404              cls_object = default_from_dict(cls, data)
405              cls_object.write_documents(
406                  documents=[Document(**doc) for doc in documents], policy=DuplicatePolicy.OVERWRITE
407              )
408              return cls_object
409  
410          raise FileNotFoundError(f"File {path} not found.")
411  
412      def count_documents(self) -> int:
413          """
414          Returns the number of documents present in the DocumentStore.
415          """
416          return len(self.storage.keys())
417  
418      def filter_documents(self, filters: dict[str, Any] | None = None) -> list[Document]:
419          """
420          Returns the documents that match the filters provided.
421  
422          :param filters: The filters to apply. For a detailed specification of the filters, refer to the
423              [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering).
424          :returns: A list of Documents that match the given filters.
425          """
426          if filters:
427              InMemoryDocumentStore._validate_filters(filters)
428              docs = [doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)]
429          else:
430              docs = list(self.storage.values())
431  
432          if not self.return_embedding:
433              docs = [replace(doc, embedding=None) for doc in docs]
434  
435          return docs
436  
437      def write_documents(self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
438          """
439          Refer to the DocumentStore.write_documents() protocol documentation.
440  
441          If `policy` is set to `DuplicatePolicy.NONE` defaults to `DuplicatePolicy.FAIL`.
442          """
443          if (
444              not isinstance(documents, Iterable)
445              or isinstance(documents, str)
446              or any(not isinstance(doc, Document) for doc in documents)
447          ):
448              raise ValueError("Please provide a list of Documents.")
449  
450          if policy == DuplicatePolicy.NONE:
451              policy = DuplicatePolicy.FAIL
452  
453          written_documents = len(documents)
454          for document in documents:
455              if policy != DuplicatePolicy.OVERWRITE and document.id in self.storage.keys():
456                  if policy == DuplicatePolicy.FAIL:
457                      raise DuplicateDocumentError(f"ID '{document.id}' already exists.")
458                  if policy == DuplicatePolicy.SKIP:
459                      logger.warning("ID '{document_id}' already exists", document_id=document.id)
460                      written_documents -= 1
461                      continue
462  
463              # Since the statistics are updated in an incremental manner,
464              # we need to explicitly remove the existing document to revert
465              # the statistics before updating them with the new document.
466              if document.id in self.storage.keys():
467                  self.delete_documents([document.id])
468  
469              tokens = []
470              if document.content is not None:
471                  tokens = self._tokenize_bm25(document.content)
472  
473              self.storage[document.id] = document
474  
475              self._bm25_attr[document.id] = BM25DocumentStats(Counter(tokens), len(tokens))
476              self._freq_vocab_for_idf.update(set(tokens))
477              # Update avg doc len based on the new document and the previous average
478              n_docs = len(self._bm25_attr)
479              self._avg_doc_len = (len(tokens) + self._avg_doc_len * (n_docs - 1)) / n_docs
480          return written_documents
481  
482      def delete_documents(self, document_ids: list[str]) -> None:
483          """
484          Deletes all documents with matching document_ids from the DocumentStore.
485  
486          :param document_ids: The document_ids to delete.
487          """
488          for doc_id in document_ids:
489              if doc_id not in self.storage.keys():
490                  continue
491              del self.storage[doc_id]
492  
493              # Update statistics accordingly
494              doc_stats = self._bm25_attr.pop(doc_id)
495              freq = doc_stats.freq_token
496              doc_len = doc_stats.doc_len
497  
498              self._freq_vocab_for_idf.subtract(Counter(freq.keys()))
499              try:
500                  self._avg_doc_len = (self._avg_doc_len * (len(self._bm25_attr) + 1) - doc_len) / len(self._bm25_attr)
501              except ZeroDivisionError:
502                  self._avg_doc_len = 0
503  
504      @staticmethod
505      def _validate_filters(filters: dict[str, Any] | None) -> None:
506          if filters and "operator" not in filters and "conditions" not in filters:
507              raise ValueError(
508                  "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
509              )
510  
511      def delete_all_documents(self) -> None:
512          """
513          Deletes all documents in the document store.
514          """
515          _STORAGES[self.index] = {}
516          _BM25_STATS_STORAGES[self.index] = {}
517          _AVERAGE_DOC_LEN_STORAGES[self.index] = 0.0
518          _FREQ_VOCAB_FOR_IDF_STORAGES[self.index] = Counter()
519  
520      def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
521          """
522          Updates the metadata of all documents that match the provided filters.
523  
524          :param filters: The filters to apply to select documents for updating.
525              For filter syntax, see filter_documents.
526          :param meta: The metadata fields to update. These will be merged with existing metadata.
527          :returns: The number of documents updated.
528          :raises ValueError: if filters have invalid syntax.
529          """
530          InMemoryDocumentStore._validate_filters(filters)
531          matching = [doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)]
532          for doc in matching:
533              doc.meta.update(meta)
534              self.storage[doc.id] = doc
535          return len(matching)
536  
537      def delete_by_filter(self, filters: dict[str, Any]) -> int:
538          """
539          Deletes all documents that match the provided filters.
540  
541          :param filters: The filters to apply to select documents for deletion.
542              For filter syntax, see filter_documents.
543          :returns: The number of documents deleted.
544          :raises ValueError: if filters have invalid syntax.
545          """
546          InMemoryDocumentStore._validate_filters(filters)
547          matching = [doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)]
548          doc_ids = [doc.id for doc in matching]
549          self.delete_documents(doc_ids)
550          return len(doc_ids)
551  
552      def count_documents_by_filter(self, filters: dict[str, Any]) -> int:
553          """
554          Returns the number of documents that match the provided filters.
555  
556          :param filters: The filters to apply.
557              For a detailed specification of the filters, refer to the
558              [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering).
559          :returns: The number of documents that match the filters.
560          """
561          if filters:
562              InMemoryDocumentStore._validate_filters(filters)
563              return sum(1 for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc))
564          return len(self.storage)
565  
566      def count_unique_metadata_by_filter(self, filters: dict[str, Any], metadata_fields: list[str]) -> dict[str, int]:
567          """
568          Returns the number of unique values for each specified metadata field from documents matching the filters.
569  
570          :param filters: The filters to apply.
571              For a detailed specification of the filters, refer to the
572              [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering).
573          :param metadata_fields: List of field names to count unique values for.
574              Field names can include or omit the "meta." prefix.
575          :returns: A dictionary mapping each metadata field name (without "meta." prefix)
576              to the count of its unique values among the filtered documents.
577          """
578          if filters:
579              InMemoryDocumentStore._validate_filters(filters)
580              docs = [doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)]
581          else:
582              docs = list(self.storage.values())
583  
584          result: dict[str, int] = {}
585          for field in metadata_fields:
586              key = field.removeprefix("meta.") if field.startswith("meta.") else field
587              values = {doc.meta.get(key) for doc in docs if key in doc.meta and doc.meta[key] is not None}
588              result[key] = len(values)
589          return result
590  
591      def get_metadata_fields_info(self) -> dict[str, dict[str, str]]:
592          """
593          Returns information about the metadata fields present in the stored documents.
594  
595          Types are inferred from the stored values (keyword, int, float, boolean).
596  
597          :returns: A dictionary mapping each metadata field name to a dict with a "type" key.
598          """
599          type_map: dict[str, str] = {}
600          for doc in self.storage.values():
601              for key, value in doc.meta.items():
602                  if value is None:
603                      continue
604                  if isinstance(value, bool):
605                      type_map[key] = "boolean"
606                  elif isinstance(value, int):
607                      type_map[key] = "int"
608                  elif isinstance(value, float):
609                      type_map[key] = "float"
610                  else:
611                      type_map[key] = "keyword"
612          return {k: {"type": v} for k, v in type_map.items()}
613  
614      def get_metadata_field_min_max(self, metadata_field: str) -> dict[str, Any]:
615          """
616          Returns the minimum and maximum values for the given metadata field across all documents.
617  
618          :param metadata_field: The metadata field name. Can include or omit the "meta." prefix.
619          :returns: A dictionary with "min" and "max" keys. Returns `{"min": None, "max": None}`
620              if the field is missing or has no values.
621          """
622          key = metadata_field.removeprefix("meta.") if metadata_field.startswith("meta.") else metadata_field
623          values = [
624              doc.meta[key]
625              for doc in self.storage.values()
626              if key in doc.meta and doc.meta[key] is not None and isinstance(doc.meta[key], (int, float, str))
627          ]
628          if not values:
629              return {"min": None, "max": None}
630          try:
631              return {"min": min(values), "max": max(values)}
632          except TypeError:
633              return {"min": None, "max": None}
634  
635      def get_metadata_field_unique_values(
636          self, metadata_field: str, search_term: str | None = None
637      ) -> tuple[list[str], int]:
638          """
639          Returns unique values for a metadata field, optionally filtered by a search term in content.
640  
641          :param metadata_field: The metadata field name. Can include or omit the "meta." prefix.
642          :param search_term: If set, only documents whose content contains this term (case-insensitive)
643              are considered.
644          :returns: A tuple of (list of unique values, total count of unique values).
645          """
646          key = metadata_field.removeprefix("meta.") if metadata_field.startswith("meta.") else metadata_field
647          if search_term:
648              docs = [doc for doc in self.storage.values() if doc.content and search_term.lower() in doc.content.lower()]
649          else:
650              docs = list(self.storage.values())
651          values = sorted({str(doc.meta[key]) for doc in docs if key in doc.meta and doc.meta[key] is not None}, key=str)
652          return values, len(values)
653  
654      def bm25_retrieval(
655          self, query: str, filters: dict[str, Any] | None = None, top_k: int = 10, scale_score: bool = False
656      ) -> list[Document]:
657          """
658          Retrieves documents that are most relevant to the query using BM25 algorithm.
659  
660          :param query: The query string.
661          :param filters: A dictionary with filters to narrow down the search space.
662          :param top_k: The number of top documents to retrieve. Default is 10.
663          :param scale_score: Whether to scale the scores of the retrieved documents. Default is False.
664          :returns: A list of the top_k documents most relevant to the query.
665          """
666          if not query:
667              raise ValueError("Query should be a non-empty string")
668  
669          content_type_filter = {"field": "content", "operator": "!=", "value": None}
670          if filters:
671              if "operator" not in filters:
672                  raise ValueError(
673                      "Invalid filter syntax. See https://docs.haystack.deepset.ai/docs/metadata-filtering for details."
674                  )
675              filters = {"operator": "AND", "conditions": [content_type_filter, filters]}
676          else:
677              filters = content_type_filter
678  
679          all_documents = self.filter_documents(filters=filters)
680          if len(all_documents) == 0:
681              logger.info("No documents found for BM25 retrieval. Returning empty list.")
682              return []
683  
684          results = sorted(self.bm25_algorithm_inst(query, all_documents), key=lambda x: x[1], reverse=True)[:top_k]
685  
686          # BM25Okapi can return meaningful negative values, so they should not be filtered out when scale_score is False.
687          # It's the only algorithm supported by rank_bm25 at the time of writing (2024) that can return negative scores.
688          # see https://github.com/deepset-ai/haystack/pull/6889 for more context.
689          negatives_are_valid = self.bm25_algorithm == "BM25Okapi" and not scale_score
690  
691          # Create documents with the BM25 score to return them
692          return_documents = []
693          for doc, score in results:
694              if scale_score:
695                  score = expit(score / BM25_SCALING_FACTOR)
696  
697              if not negatives_are_valid and score <= 0.0:
698                  continue
699  
700              doc_fields = doc.to_dict()
701              doc_fields["score"] = score
702  
703              if not self.return_embedding and "embedding" in doc_fields:
704                  doc_fields.pop("embedding")
705  
706              return_document = Document.from_dict(doc_fields)
707  
708              return_documents.append(return_document)
709  
710          return return_documents
711  
712      def embedding_retrieval(
713          self,
714          query_embedding: list[float],
715          filters: dict[str, Any] | None = None,
716          top_k: int = 10,
717          scale_score: bool = False,
718          return_embedding: bool | None = False,
719      ) -> list[Document]:
720          """
721          Retrieves documents that are most similar to the query embedding using a vector similarity metric.
722  
723          :param query_embedding: Embedding of the query.
724          :param filters: A dictionary with filters to narrow down the search space.
725          :param top_k: The number of top documents to retrieve. Default is 10.
726          :param scale_score: Whether to scale the scores of the retrieved Documents. Default is False.
727          :param return_embedding: Whether to return the embedding of the retrieved Documents.
728              If not provided, the value of the `return_embedding` parameter set at component
729              initialization will be used. Default is False.
730          :returns: A list of the top_k documents most relevant to the query.
731          :raises ValueError: if filters have invalid syntax.
732          """
733          if len(query_embedding) == 0 or not isinstance(query_embedding[0], float):
734              raise ValueError("query_embedding should be a non-empty list of floats.")
735  
736          if filters:
737              InMemoryDocumentStore._validate_filters(filters)
738              all_documents = [
739                  doc for doc in self.storage.values() if document_matches_filter(filters=filters, document=doc)
740              ]
741          else:
742              all_documents = list(self.storage.values())
743  
744          documents_with_embeddings = [doc for doc in all_documents if doc.embedding is not None]
745          if len(documents_with_embeddings) == 0:
746              logger.warning(
747                  "No Documents found with embeddings. Returning empty list. "
748                  "To generate embeddings, use a DocumentEmbedder."
749              )
750              return []
751          if len(documents_with_embeddings) < len(all_documents):
752              logger.info(
753                  "Skipping some Documents that don't have an embedding. To generate embeddings, use a DocumentEmbedder."
754              )
755  
756          scores = self._compute_query_embedding_similarity_scores(
757              embedding=query_embedding, documents=documents_with_embeddings, scale_score=scale_score
758          )
759  
760          resolved_return_embedding = self.return_embedding if return_embedding is None else return_embedding
761  
762          # create Documents with the similarity score for the top k results
763          top_documents = []
764          for doc, score in sorted(zip(documents_with_embeddings, scores, strict=True), key=lambda x: x[1], reverse=True)[
765              :top_k
766          ]:
767              doc_fields = doc.to_dict()
768              doc_fields["score"] = score
769              if resolved_return_embedding is False:
770                  doc_fields["embedding"] = None
771              top_documents.append(Document.from_dict(doc_fields))
772  
773          return top_documents
774  
775      def _compute_query_embedding_similarity_scores(
776          self, embedding: list[float], documents: list[Document], scale_score: bool = False
777      ) -> list[float]:
778          """
779          Computes the similarity scores between the query embedding and the embeddings of the documents.
780  
781          :param embedding: Embedding of the query.
782          :param documents: A list of Documents.
783          :param scale_score: Whether to scale the scores of the Documents. Default is False.
784          :returns: A list of scores.
785          """
786  
787          query_embedding = np.array(embedding)
788          if query_embedding.ndim == 1:
789              query_embedding = np.expand_dims(a=query_embedding, axis=0)
790  
791          try:
792              document_embeddings = np.array([doc.embedding for doc in documents])
793          except ValueError as e:
794              if "inhomogeneous shape" in str(e):
795                  raise DocumentStoreError(
796                      "The embedding size of all Documents should be the same. "
797                      "Please make sure that the Documents have been embedded with the same model."
798                  ) from e
799              raise e
800          if document_embeddings.ndim == 1:
801              document_embeddings = np.expand_dims(a=document_embeddings, axis=0)
802  
803          if self.embedding_similarity_function == "cosine":
804              # cosine similarity is a normed dot product
805              query_embedding /= np.linalg.norm(x=query_embedding, axis=1, keepdims=True)
806              document_embeddings /= np.linalg.norm(x=document_embeddings, axis=1, keepdims=True)
807  
808          try:
809              scores = np.dot(a=query_embedding, b=document_embeddings.T)[0].tolist()
810          except ValueError as e:
811              if "shapes" in str(e) and "not aligned" in str(e):
812                  raise DocumentStoreError(
813                      "The embedding size of the query should be the same as the embedding size of the Documents. "
814                      "Please make sure that the query has been embedded with the same model as the Documents."
815                  ) from e
816              raise e
817  
818          if scale_score:
819              if self.embedding_similarity_function == "dot_product":
820                  scores = [expit(float(score / DOT_PRODUCT_SCALING_FACTOR)) for score in scores]
821              elif self.embedding_similarity_function == "cosine":
822                  scores = [(score + 1) / 2 for score in scores]
823  
824          return scores
825  
826      async def count_documents_async(self) -> int:
827          """
828          Returns the number of documents present in the DocumentStore.
829          """
830          return len(self.storage.keys())
831  
832      async def filter_documents_async(self, filters: dict[str, Any] | None = None) -> list[Document]:
833          """
834          Returns the documents that match the filters provided.
835  
836          :param filters: The filters to apply. For a detailed specification of the filters, refer to the
837              [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering).
838          :returns: A list of Documents that match the given filters.
839          """
840          return await asyncio.get_running_loop().run_in_executor(
841              self.executor, lambda: self.filter_documents(filters=filters)
842          )
843  
844      async def write_documents_async(
845          self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE
846      ) -> int:
847          """
848          Refer to the DocumentStore.write_documents() protocol documentation.
849  
850          If `policy` is set to `DuplicatePolicy.NONE` defaults to `DuplicatePolicy.FAIL`.
851          """
852          return await asyncio.get_running_loop().run_in_executor(
853              self.executor, lambda: self.write_documents(documents=documents, policy=policy)
854          )
855  
856      async def delete_documents_async(self, document_ids: list[str]) -> None:
857          """
858          Deletes all documents with matching document_ids from the DocumentStore.
859  
860          :param document_ids: The document_ids to delete.
861          """
862          await asyncio.get_running_loop().run_in_executor(
863              self.executor, lambda: self.delete_documents(document_ids=document_ids)
864          )
865  
866      async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, Any]) -> int:
867          """
868          Updates the metadata of all documents that match the provided filters.
869  
870          :param filters: The filters to apply to select documents for updating.
871              For filter syntax, see filter_documents.
872          :param meta: The metadata fields to update. These will be merged with existing metadata.
873          :returns: The number of documents updated.
874          """
875          return await asyncio.get_running_loop().run_in_executor(
876              self.executor, lambda: self.update_by_filter(filters=filters, meta=meta)
877          )
878  
879      async def count_documents_by_filter_async(self, filters: dict[str, Any]) -> int:
880          """
881          Returns the number of documents that match the provided filters.
882  
883          :param filters: The filters to apply.
884              For a detailed specification of the filters, refer to the
885              [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering).
886          :returns: The number of documents that match the filters.
887          """
888          return await asyncio.get_running_loop().run_in_executor(
889              self.executor, lambda: self.count_documents_by_filter(filters=filters)
890          )
891  
892      async def count_unique_metadata_by_filter_async(
893          self, filters: dict[str, Any], metadata_fields: list[str]
894      ) -> dict[str, int]:
895          """
896          Returns the number of unique values for each specified metadata field from documents matching the filters.
897  
898          :param filters: The filters to apply.
899              For a detailed specification of the filters, refer to the
900              [documentation](https://docs.haystack.deepset.ai/docs/metadata-filtering).
901          :param metadata_fields: List of field names to count unique values for.
902              Field names can include or omit the "meta." prefix.
903          :returns: A dictionary mapping each metadata field name (without "meta." prefix)
904              to the count of its unique values among the filtered documents.
905          """
906          return await asyncio.get_running_loop().run_in_executor(
907              self.executor,
908              lambda: self.count_unique_metadata_by_filter(filters=filters, metadata_fields=metadata_fields),
909          )
910  
911      async def get_metadata_fields_info_async(self) -> dict[str, dict[str, str]]:
912          """
913          Returns information about the metadata fields present in the stored documents.
914  
915          Types are inferred from the stored values (keyword, int, float, boolean).
916  
917          :returns: A dictionary mapping each metadata field name to a dict with a "type" key.
918          """
919          return await asyncio.get_running_loop().run_in_executor(self.executor, self.get_metadata_fields_info)
920  
921      async def get_metadata_field_min_max_async(self, metadata_field: str) -> dict[str, Any]:
922          """
923          Returns the minimum and maximum values for the given metadata field across all documents.
924  
925          :param metadata_field: The metadata field name. Can include or omit the "meta." prefix.
926          :returns: A dictionary with "min" and "max" keys. Returns `{"min": None, "max": None}`
927              if the field is missing or has no values.
928          """
929          return await asyncio.get_running_loop().run_in_executor(
930              self.executor, lambda: self.get_metadata_field_min_max(metadata_field=metadata_field)
931          )
932  
933      async def get_metadata_field_unique_values_async(
934          self, metadata_field: str, search_term: str | None = None
935      ) -> tuple[list[str], int]:
936          """
937          Returns unique values for a metadata field, optionally filtered by a search term in content.
938  
939          :param metadata_field: The metadata field name. Can include or omit the "meta." prefix.
940          :param search_term: If set, only documents whose content contains this term (case-insensitive)
941              are considered.
942          :returns: A tuple of (list of unique values, total count of unique values).
943          """
944          return await asyncio.get_running_loop().run_in_executor(
945              self.executor,
946              lambda: self.get_metadata_field_unique_values(metadata_field=metadata_field, search_term=search_term),
947          )
948  
949      async def delete_all_documents_async(self) -> None:
950          """
951          Deletes all documents in the document store.
952          """
953          await asyncio.get_running_loop().run_in_executor(self.executor, self.delete_all_documents)
954  
955      async def bm25_retrieval_async(
956          self, query: str, filters: dict[str, Any] | None = None, top_k: int = 10, scale_score: bool = False
957      ) -> list[Document]:
958          """
959          Retrieves documents that are most relevant to the query using BM25 algorithm.
960  
961          :param query: The query string.
962          :param filters: A dictionary with filters to narrow down the search space.
963          :param top_k: The number of top documents to retrieve. Default is 10.
964          :param scale_score: Whether to scale the scores of the retrieved documents. Default is False.
965          :returns: A list of the top_k documents most relevant to the query.
966          """
967          return await asyncio.get_running_loop().run_in_executor(
968              self.executor,
969              lambda: self.bm25_retrieval(query=query, filters=filters, top_k=top_k, scale_score=scale_score),
970          )
971  
972      async def embedding_retrieval_async(
973          self,
974          query_embedding: list[float],
975          filters: dict[str, Any] | None = None,
976          top_k: int = 10,
977          scale_score: bool = False,
978          return_embedding: bool = False,
979      ) -> list[Document]:
980          """
981          Retrieves documents that are most similar to the query embedding using a vector similarity metric.
982  
983          :param query_embedding: Embedding of the query.
984          :param filters: A dictionary with filters to narrow down the search space.
985          :param top_k: The number of top documents to retrieve. Default is 10.
986          :param scale_score: Whether to scale the scores of the retrieved Documents. Default is False.
987          :param return_embedding: Whether to return the embedding of the retrieved Documents. Default is False.
988          :returns: A list of the top_k documents most relevant to the query.
989          """
990          return await asyncio.get_running_loop().run_in_executor(
991              self.executor,
992              lambda: self.embedding_retrieval(
993                  query_embedding=query_embedding,
994                  filters=filters,
995                  top_k=top_k,
996                  scale_score=scale_score,
997                  return_embedding=return_embedding,
998              ),
999          )