/ src / pipeline / steps / retrieve_step.py
retrieve_step.py
 1  from __future__ import annotations
 2  
 3  """Step that retrieves relevant documents from the vector store."""
 4  
 5  import logging
 6  
 7  from ...config import Config
 8  from ...retrievers.filters import FilterBuilderFactory
 9  from ...retrievers.protocol import Retriever
10  from ..contexts.query_context import QueryContext
11  
12  
13  class RetrieveStep:
14      """Step that retrieves relevant documents from the vector store."""
15  
16      def __init__(self, retriever: Retriever):
17          """Initialize the retrieve step.
18  
19          Parameters
20          ----------
21          retriever
22              Retriever instance created by RetrieverFactory.
23          """
24          self.retriever = retriever
25          self.config = Config.get_config()
26          self._logger = logging.getLogger(__name__)
27  
28      def run(self, context: QueryContext) -> None:
29          """Retrieve relevant documents for the query.
30  
31          Parameters
32          ----------
33          context
34              Query context with user_query set.
35          """
36  
37          if self._should_apply_db_filter(context):
38              self._logger.info(
39                  f"Applying tag-based access control (silent mode): role='{context.user_role}'"
40              )
41              self._build_and_set_filter(context)
42  
43          self._retrieve_documents(context)
44  
45      def _should_apply_db_filter(self, context: QueryContext) -> bool:
46          """Determine if DB-level filtering should be applied.
47  
48          Returns False if:
49          - No access control is configured
50          - notify_on_denied_access is enabled (filtering happens in AccessControlStep)
51          """
52          if not context.user_role or not context.role_mapping:
53              return False
54  
55          if self.config.access_control.notify_on_denied_access:
56              self._logger.info(
57                  "Skipping DB filter - notify_on_denied_access enabled, AccessControlStep will handle filtering"
58              )
59              return False
60  
61          return True
62  
63      def _build_and_set_filter(self, context: QueryContext) -> None:
64          """Build and set the metadata filter on the retriever."""
65          vector_store_type = self.config.vector_store.store_name
66  
67          try:
68              builder = FilterBuilderFactory.create(vector_store_type)
69              metadata_filter = builder.build(
70                  user_role=context.user_role,
71                  role_mapping=context.role_mapping,
72              )
73              self.retriever.set_filter(metadata_filter)
74              self._logger.info(f"Set metadata filter on retriever: {metadata_filter}")
75          except ValueError as e:
76              self._logger.warning(f"Could not create filter builder: {e}")
77  
78      def _retrieve_documents(self, context: QueryContext) -> None:
79          """Execute the retrieval and store results in context."""
80          self._logger.info(f"Retrieving documents for query: {context.user_query}")
81  
82          results_with_scores = self.retriever.retrieve_with_scores(context.user_query)
83          context.retrieved_docs = results_with_scores
84  
85          self._logger.info(f"Retrieved {len(results_with_scores)} documents")