/ haystack / core / pipeline / component_checks.py
component_checks.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  from typing import Any
  6  
  7  from haystack.core.component.types import InputSocket, _empty
  8  
  9  _NO_OUTPUT_PRODUCED = _empty
 10  
 11  
 12  def can_component_run(component: dict, inputs: dict) -> bool:
 13      """
 14      Checks if the component can run, given the current state of its inputs.
 15  
 16      A component needs to pass two gates so that it is ready to run:
 17      1. It has received all mandatory inputs.
 18      2. It has received a trigger.
 19      :param component: Component metadata and the component instance.
 20      :param inputs: Inputs for the component.
 21      """
 22      received_all_mandatory_inputs = are_all_sockets_ready(component, inputs, only_check_mandatory=True)
 23      received_trigger = has_any_trigger(component, inputs)
 24  
 25      return received_all_mandatory_inputs and received_trigger
 26  
 27  
 28  def has_any_trigger(component: dict, inputs: dict) -> bool:
 29      """
 30      Checks if a component was triggered to execute.
 31  
 32      There are 3 triggers:
 33      1. A predecessor provided input to the component.
 34      2. Input to the component was provided from outside the pipeline (e.g. user input).
 35      3. The component does not receive input from any other components in the pipeline and `Pipeline.run` was called.
 36  
 37      A trigger can only cause a component to execute ONCE because:
 38      1. Components consume inputs from predecessors before execution (they are deleted).
 39      2. Inputs from outside the pipeline can only trigger a component when it is executed for the first time.
 40      3. `Pipeline.run` can only trigger a component when it is executed for the first time.
 41  
 42      :param component: Component metadata and the component instance.
 43      :param inputs: Inputs for the component.
 44      """
 45      trigger_from_predecessor = any_predecessors_provided_input(component, inputs)
 46      trigger_from_user = has_user_input(inputs) and component["visits"] == 0
 47      trigger_without_inputs = can_not_receive_inputs_from_pipeline(component) and component["visits"] == 0
 48  
 49      return trigger_from_predecessor or trigger_from_user or trigger_without_inputs
 50  
 51  
 52  def are_all_sockets_ready(component: dict, inputs: dict, only_check_mandatory: bool = False) -> bool:
 53      """
 54      Checks if all sockets of a component have enough inputs for the component to execute.
 55  
 56      :param component: Component metadata and the component instance.
 57      :param inputs: Inputs for the component.
 58      :param only_check_mandatory: If only mandatory sockets should be checked.
 59      """
 60      filled_sockets = set()
 61      expected_sockets = set()
 62      if only_check_mandatory:
 63          sockets_to_check = {
 64              socket_name: socket for socket_name, socket in component["input_sockets"].items() if socket.is_mandatory
 65          }
 66      else:
 67          sockets_to_check = {
 68              socket_name: socket
 69              for socket_name, socket in component["input_sockets"].items()
 70              if socket.is_mandatory or len(socket.senders)
 71          }
 72  
 73      for socket_name, socket in sockets_to_check.items():
 74          socket_inputs = inputs.get(socket_name, [])
 75          expected_sockets.add(socket_name)
 76  
 77          # Check if socket has all required inputs or is a lazy variadic socket with any input
 78          if has_socket_received_all_inputs(socket, socket_inputs) or (
 79              socket.is_lazy_variadic and any_socket_input_received(socket_inputs)
 80          ):
 81              filled_sockets.add(socket_name)
 82  
 83      return filled_sockets == expected_sockets
 84  
 85  
 86  def any_predecessors_provided_input(component: dict, inputs: dict) -> bool:
 87      """
 88      Checks if a component received inputs from any predecessors.
 89  
 90      :param component: Component metadata and the component instance.
 91      :param inputs: Inputs for the component.
 92      """
 93      return any(
 94          any_socket_value_from_predecessor_received(inputs.get(socket_name, []))
 95          for socket_name in component["input_sockets"].keys()
 96      )
 97  
 98  
 99  def any_socket_value_from_predecessor_received(socket_inputs: list[dict[str, Any]]) -> bool:
100      """
101      Checks if a component socket received input from any predecessors.
102  
103      :param socket_inputs: Inputs for the component's socket.
104      """
105      # When sender is None, the input was provided from outside the pipeline.
106      return any(inp["value"] is not _NO_OUTPUT_PRODUCED and inp["sender"] is not None for inp in socket_inputs)
107  
108  
109  def has_user_input(inputs: dict) -> bool:
110      """
111      Checks if a component has received input from outside the pipeline (e.g. user input).
112  
113      :param inputs: Inputs for the component.
114      """
115      return any(inp for socket in inputs.values() for inp in socket if inp["sender"] is None)
116  
117  
118  def can_not_receive_inputs_from_pipeline(component: dict) -> bool:
119      """
120      Checks if a component can not receive inputs from any other components in the pipeline.
121  
122      :param: Component metadata and the component instance.
123      """
124      return all(len(sock.senders) == 0 for sock in component["input_sockets"].values())
125  
126  
127  def all_socket_predecessors_executed(socket: InputSocket, socket_inputs: list[dict[str, Any]]) -> bool:
128      """
129      Checks if all components connecting to an InputSocket have executed.
130  
131      :param: The InputSocket of a component.
132      :param: socket_inputs: Inputs for the socket.
133      """
134      expected_senders = set(socket.senders)
135      executed_senders = {inp["sender"] for inp in socket_inputs if inp["sender"] is not None}
136      return expected_senders == executed_senders
137  
138  
139  def any_socket_input_received(socket_inputs: list[dict]) -> bool:
140      """
141      Checks if a socket has received any input from any other components in the pipeline or from outside the pipeline.
142  
143      :param socket_inputs: Inputs for the socket.
144      """
145      return any(inp["value"] is not _NO_OUTPUT_PRODUCED for inp in socket_inputs)
146  
147  
148  def has_lazy_variadic_socket_received_all_inputs(socket: InputSocket, socket_inputs: list[dict]) -> bool:
149      """
150      Checks if a lazy variadic socket has received all expected inputs from other components in the pipeline.
151  
152      :param socket: The InputSocket of a component.
153      :param socket_inputs: Inputs for the socket.
154      """
155      expected_senders = set(socket.senders)
156      actual_senders = {
157          sock["sender"]
158          for sock in socket_inputs
159          if sock["value"] is not _NO_OUTPUT_PRODUCED and sock["sender"] is not None
160      }
161      return expected_senders == actual_senders
162  
163  
164  def has_socket_received_all_inputs(socket: InputSocket, socket_inputs: list[dict]) -> bool:
165      """
166      Checks if a socket has received all expected inputs.
167  
168      :param socket: The InputSocket of a component.
169      :param socket_inputs: Inputs for the socket.
170      """
171      # No inputs received for the socket, it is not filled.
172      if len(socket_inputs) == 0:
173          return False
174  
175      # The socket is greedy variadic and at least one input was produced, it is complete.
176      if (
177          socket.is_variadic
178          and socket.is_greedy
179          and any(sock["value"] is not _NO_OUTPUT_PRODUCED for sock in socket_inputs)
180      ):
181          return True
182  
183      # The socket is lazy variadic and all expected inputs were produced.
184      if socket.is_lazy_variadic and has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs):
185          return True
186  
187      # The socket is not variadic and the only expected input is complete.
188      return not socket.is_variadic and socket_inputs[0]["value"] is not _NO_OUTPUT_PRODUCED
189  
190  
191  def all_predecessors_executed(component: dict, inputs: dict) -> bool:
192      """
193      Checks if all predecessors of a component have executed.
194  
195      :param component: Component metadata and the component instance.
196      :param inputs: Inputs for the component.
197      """
198      return all(
199          all_socket_predecessors_executed(socket, inputs.get(socket_name, []))
200          for socket_name, socket in component["input_sockets"].items()
201      )
202  
203  
204  def are_all_lazy_variadic_sockets_resolved(component: dict, inputs: dict) -> bool:
205      """
206      Checks if the final state for all lazy variadic sockets of a component is resolved.
207  
208      :param component: Component metadata and the component instance.
209      :param inputs: Inputs for the component.
210      """
211      for socket_name, socket in component["input_sockets"].items():
212          if socket.is_lazy_variadic:
213              socket_inputs = inputs.get(socket_name, [])
214  
215              # Checks if a lazy variadic socket is ready to run.
216              # A socket is ready if either:
217              # - it has received all expected inputs, or
218              # - all its predecessors have executed
219              # If none of the conditions are met, the socket is not ready to run and we defer the component.
220              if not (
221                  has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs)
222                  or all_socket_predecessors_executed(socket, socket_inputs)
223              ):
224                  return False
225  
226      return True
227  
228  
229  def is_any_greedy_socket_ready(component: dict, inputs: dict) -> bool:
230      """
231      Checks if the component has any greedy socket that is ready to run.
232  
233      :param component: Component metadata and the component instance.
234      :param inputs: Inputs for the component.
235      """
236      for socket_name, socket in component["input_sockets"].items():
237          if socket.is_greedy and has_socket_received_all_inputs(socket, inputs.get(socket_name, [])):
238              return True
239  
240      return False