component_checks.py
1 # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai> 2 # 3 # SPDX-License-Identifier: Apache-2.0 4 5 from typing import Any 6 7 from haystack.core.component.types import InputSocket, _empty 8 9 _NO_OUTPUT_PRODUCED = _empty 10 11 12 def can_component_run(component: dict, inputs: dict) -> bool: 13 """ 14 Checks if the component can run, given the current state of its inputs. 15 16 A component needs to pass two gates so that it is ready to run: 17 1. It has received all mandatory inputs. 18 2. It has received a trigger. 19 :param component: Component metadata and the component instance. 20 :param inputs: Inputs for the component. 21 """ 22 received_all_mandatory_inputs = are_all_sockets_ready(component, inputs, only_check_mandatory=True) 23 received_trigger = has_any_trigger(component, inputs) 24 25 return received_all_mandatory_inputs and received_trigger 26 27 28 def has_any_trigger(component: dict, inputs: dict) -> bool: 29 """ 30 Checks if a component was triggered to execute. 31 32 There are 3 triggers: 33 1. A predecessor provided input to the component. 34 2. Input to the component was provided from outside the pipeline (e.g. user input). 35 3. The component does not receive input from any other components in the pipeline and `Pipeline.run` was called. 36 37 A trigger can only cause a component to execute ONCE because: 38 1. Components consume inputs from predecessors before execution (they are deleted). 39 2. Inputs from outside the pipeline can only trigger a component when it is executed for the first time. 40 3. `Pipeline.run` can only trigger a component when it is executed for the first time. 41 42 :param component: Component metadata and the component instance. 43 :param inputs: Inputs for the component. 44 """ 45 trigger_from_predecessor = any_predecessors_provided_input(component, inputs) 46 trigger_from_user = has_user_input(inputs) and component["visits"] == 0 47 trigger_without_inputs = can_not_receive_inputs_from_pipeline(component) and component["visits"] == 0 48 49 return trigger_from_predecessor or trigger_from_user or trigger_without_inputs 50 51 52 def are_all_sockets_ready(component: dict, inputs: dict, only_check_mandatory: bool = False) -> bool: 53 """ 54 Checks if all sockets of a component have enough inputs for the component to execute. 55 56 :param component: Component metadata and the component instance. 57 :param inputs: Inputs for the component. 58 :param only_check_mandatory: If only mandatory sockets should be checked. 59 """ 60 filled_sockets = set() 61 expected_sockets = set() 62 if only_check_mandatory: 63 sockets_to_check = { 64 socket_name: socket for socket_name, socket in component["input_sockets"].items() if socket.is_mandatory 65 } 66 else: 67 sockets_to_check = { 68 socket_name: socket 69 for socket_name, socket in component["input_sockets"].items() 70 if socket.is_mandatory or len(socket.senders) 71 } 72 73 for socket_name, socket in sockets_to_check.items(): 74 socket_inputs = inputs.get(socket_name, []) 75 expected_sockets.add(socket_name) 76 77 # Check if socket has all required inputs or is a lazy variadic socket with any input 78 if has_socket_received_all_inputs(socket, socket_inputs) or ( 79 socket.is_lazy_variadic and any_socket_input_received(socket_inputs) 80 ): 81 filled_sockets.add(socket_name) 82 83 return filled_sockets == expected_sockets 84 85 86 def any_predecessors_provided_input(component: dict, inputs: dict) -> bool: 87 """ 88 Checks if a component received inputs from any predecessors. 89 90 :param component: Component metadata and the component instance. 91 :param inputs: Inputs for the component. 92 """ 93 return any( 94 any_socket_value_from_predecessor_received(inputs.get(socket_name, [])) 95 for socket_name in component["input_sockets"].keys() 96 ) 97 98 99 def any_socket_value_from_predecessor_received(socket_inputs: list[dict[str, Any]]) -> bool: 100 """ 101 Checks if a component socket received input from any predecessors. 102 103 :param socket_inputs: Inputs for the component's socket. 104 """ 105 # When sender is None, the input was provided from outside the pipeline. 106 return any(inp["value"] is not _NO_OUTPUT_PRODUCED and inp["sender"] is not None for inp in socket_inputs) 107 108 109 def has_user_input(inputs: dict) -> bool: 110 """ 111 Checks if a component has received input from outside the pipeline (e.g. user input). 112 113 :param inputs: Inputs for the component. 114 """ 115 return any(inp for socket in inputs.values() for inp in socket if inp["sender"] is None) 116 117 118 def can_not_receive_inputs_from_pipeline(component: dict) -> bool: 119 """ 120 Checks if a component can not receive inputs from any other components in the pipeline. 121 122 :param: Component metadata and the component instance. 123 """ 124 return all(len(sock.senders) == 0 for sock in component["input_sockets"].values()) 125 126 127 def all_socket_predecessors_executed(socket: InputSocket, socket_inputs: list[dict[str, Any]]) -> bool: 128 """ 129 Checks if all components connecting to an InputSocket have executed. 130 131 :param: The InputSocket of a component. 132 :param: socket_inputs: Inputs for the socket. 133 """ 134 expected_senders = set(socket.senders) 135 executed_senders = {inp["sender"] for inp in socket_inputs if inp["sender"] is not None} 136 return expected_senders == executed_senders 137 138 139 def any_socket_input_received(socket_inputs: list[dict]) -> bool: 140 """ 141 Checks if a socket has received any input from any other components in the pipeline or from outside the pipeline. 142 143 :param socket_inputs: Inputs for the socket. 144 """ 145 return any(inp["value"] is not _NO_OUTPUT_PRODUCED for inp in socket_inputs) 146 147 148 def has_lazy_variadic_socket_received_all_inputs(socket: InputSocket, socket_inputs: list[dict]) -> bool: 149 """ 150 Checks if a lazy variadic socket has received all expected inputs from other components in the pipeline. 151 152 :param socket: The InputSocket of a component. 153 :param socket_inputs: Inputs for the socket. 154 """ 155 expected_senders = set(socket.senders) 156 actual_senders = { 157 sock["sender"] 158 for sock in socket_inputs 159 if sock["value"] is not _NO_OUTPUT_PRODUCED and sock["sender"] is not None 160 } 161 return expected_senders == actual_senders 162 163 164 def has_socket_received_all_inputs(socket: InputSocket, socket_inputs: list[dict]) -> bool: 165 """ 166 Checks if a socket has received all expected inputs. 167 168 :param socket: The InputSocket of a component. 169 :param socket_inputs: Inputs for the socket. 170 """ 171 # No inputs received for the socket, it is not filled. 172 if len(socket_inputs) == 0: 173 return False 174 175 # The socket is greedy variadic and at least one input was produced, it is complete. 176 if ( 177 socket.is_variadic 178 and socket.is_greedy 179 and any(sock["value"] is not _NO_OUTPUT_PRODUCED for sock in socket_inputs) 180 ): 181 return True 182 183 # The socket is lazy variadic and all expected inputs were produced. 184 if socket.is_lazy_variadic and has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs): 185 return True 186 187 # The socket is not variadic and the only expected input is complete. 188 return not socket.is_variadic and socket_inputs[0]["value"] is not _NO_OUTPUT_PRODUCED 189 190 191 def all_predecessors_executed(component: dict, inputs: dict) -> bool: 192 """ 193 Checks if all predecessors of a component have executed. 194 195 :param component: Component metadata and the component instance. 196 :param inputs: Inputs for the component. 197 """ 198 return all( 199 all_socket_predecessors_executed(socket, inputs.get(socket_name, [])) 200 for socket_name, socket in component["input_sockets"].items() 201 ) 202 203 204 def are_all_lazy_variadic_sockets_resolved(component: dict, inputs: dict) -> bool: 205 """ 206 Checks if the final state for all lazy variadic sockets of a component is resolved. 207 208 :param component: Component metadata and the component instance. 209 :param inputs: Inputs for the component. 210 """ 211 for socket_name, socket in component["input_sockets"].items(): 212 if socket.is_lazy_variadic: 213 socket_inputs = inputs.get(socket_name, []) 214 215 # Checks if a lazy variadic socket is ready to run. 216 # A socket is ready if either: 217 # - it has received all expected inputs, or 218 # - all its predecessors have executed 219 # If none of the conditions are met, the socket is not ready to run and we defer the component. 220 if not ( 221 has_lazy_variadic_socket_received_all_inputs(socket, socket_inputs) 222 or all_socket_predecessors_executed(socket, socket_inputs) 223 ): 224 return False 225 226 return True 227 228 229 def is_any_greedy_socket_ready(component: dict, inputs: dict) -> bool: 230 """ 231 Checks if the component has any greedy socket that is ready to run. 232 233 :param component: Component metadata and the component instance. 234 :param inputs: Inputs for the component. 235 """ 236 for socket_name, socket in component["input_sockets"].items(): 237 if socket.is_greedy and has_socket_received_all_inputs(socket, inputs.get(socket_name, [])): 238 return True 239 240 return False