/ haystack / logging.py
logging.py
  1  # SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
  2  #
  3  # SPDX-License-Identifier: Apache-2.0
  4  
  5  import builtins
  6  import functools
  7  import logging
  8  import os
  9  import sys
 10  import typing
 11  from typing import Any
 12  
 13  if typing.TYPE_CHECKING:
 14      from structlog.typing import EventDict, Processor, WrappedLogger
 15  
 16  HAYSTACK_LOGGING_USE_JSON_ENV_VAR = "HAYSTACK_LOGGING_USE_JSON"
 17  HAYSTACK_LOGGING_IGNORE_STRUCTLOG_ENV_VAR = "HAYSTACK_LOGGING_IGNORE_STRUCTLOG"
 18  
 19  
 20  class PatchedLogger(typing.Protocol):
 21      """Class which enables using type checkers to find wrong logger usage."""
 22  
 23      def debug(
 24          self,
 25          msg: str,
 26          *,
 27          _: Any = None,
 28          exc_info: Any = None,
 29          stack_info: Any = False,
 30          stacklevel: int = 1,
 31          **kwargs: Any,
 32      ) -> None:
 33          """Log a debug message."""
 34  
 35      def info(
 36          self,
 37          msg: str,
 38          *,
 39          _: Any = None,
 40          exc_info: Any = None,
 41          stack_info: Any = False,
 42          stacklevel: int = 1,
 43          **kwargs: Any,
 44      ) -> None:
 45          """Log an info message."""
 46  
 47      def warn(
 48          self,
 49          msg: str,
 50          *,
 51          _: Any = None,
 52          exc_info: Any = None,
 53          stack_info: Any = False,
 54          stacklevel: int = 1,
 55          **kwargs: Any,
 56      ) -> None:
 57          """Log a warning message."""
 58  
 59      def warning(
 60          self,
 61          msg: str,
 62          *,
 63          _: Any = None,
 64          exc_info: Any = None,
 65          stack_info: Any = False,
 66          stacklevel: int = 1,
 67          **kwargs: Any,
 68      ) -> None:
 69          """Log a warning message."""
 70  
 71      def error(
 72          self,
 73          msg: str,
 74          *,
 75          _: Any = None,
 76          exc_info: Any = None,
 77          stack_info: Any = False,
 78          stacklevel: int = 1,
 79          **kwargs: Any,
 80      ) -> None:
 81          """Log an error message."""
 82  
 83      def critical(
 84          self,
 85          msg: str,
 86          *,
 87          _: Any = None,
 88          exc_info: Any = None,
 89          stack_info: Any = False,
 90          stacklevel: int = 1,
 91          **kwargs: Any,
 92      ) -> None:
 93          """Log a critical message."""
 94  
 95      def exception(
 96          self,
 97          msg: str,
 98          *,
 99          _: Any = None,
100          exc_info: Any = None,
101          stack_info: Any = False,
102          stacklevel: int = 1,
103          **kwargs: Any,
104      ) -> None:
105          """Log an exception message."""
106  
107      def fatal(
108          self,
109          msg: str,
110          *,
111          _: Any = None,
112          exc_info: Any = None,
113          stack_info: Any = False,
114          stacklevel: int = 1,
115          **kwargs: Any,
116      ) -> None:
117          """Log a fatal message."""
118  
119      def log(
120          self,
121          level: int,
122          msg: str,
123          *,
124          _: Any = None,
125          exc_info: Any = None,
126          stack_info: Any = False,
127          stacklevel: int = 1,
128          **kwargs: Any,
129      ) -> None:
130          """Log a message."""
131  
132      def setLevel(self, level: int) -> None:
133          """Set the logging level."""
134  
135  
136  def patch_log_method_to_kwargs_only(func: typing.Callable) -> typing.Callable:
137      """A decorator to make sure that a function is only called with keyword arguments."""
138  
139      @functools.wraps(func)
140      def _log_only_with_kwargs(
141          msg: str, *, _: Any = None, exc_info: Any = None, stack_info: Any = False, stacklevel: int = 1, **kwargs: Any
142      ) -> typing.Callable:  # we need the `_` to avoid a syntax error
143          existing_extra = kwargs.pop("extra", {})
144          return func(
145              # we need to increase the stacklevel by 1 to point to the correct caller
146              # (otherwise it points to this function)
147              msg,
148              exc_info=exc_info,
149              stack_info=stack_info,
150              stacklevel=stacklevel + 1,
151              extra={**existing_extra, **kwargs},
152          )
153  
154      return _log_only_with_kwargs
155  
156  
157  def patch_log_with_level_method_to_kwargs_only(func: typing.Callable) -> typing.Callable:
158      """A decorator to make sure that a function is only called with keyword arguments."""
159  
160      @functools.wraps(func)
161      def _log_only_with_kwargs(
162          level: int | str,
163          msg: str,
164          *,
165          _: Any = None,
166          exc_info: Any = None,
167          stack_info: Any = False,
168          stacklevel: int = 1,
169          **kwargs: Any,  # we need the `_` to avoid a syntax error
170      ) -> typing.Callable:
171          existing_extra = kwargs.pop("extra", {})
172  
173          return func(
174              level,
175              msg,
176              exc_info=exc_info,
177              stack_info=stack_info,
178              # we need to increase the stacklevel by 1 to point to the correct caller
179              # (otherwise it points to this function)
180              stacklevel=stacklevel + 1,
181              extra={**existing_extra, **kwargs},
182          )
183  
184      return _log_only_with_kwargs
185  
186  
187  def patch_make_records_to_use_kwarg_string_interpolation(original_make_records: typing.Callable) -> typing.Callable:
188      """A decorator to ensure string interpolation is used."""
189  
190      @functools.wraps(original_make_records)
191      def _wrapper(
192          name: str,
193          level: int | str,
194          fn: str,
195          lno: int,
196          msg: str,
197          args: Any,  # noqa: ARG001
198          exc_info: Any,
199          func: Any = None,
200          extra: Any = None,
201          sinfo: Any = None,
202      ) -> typing.Callable:
203          safe_extra = extra or {}
204          try:
205              interpolated_msg = msg.format(**safe_extra)
206          except (KeyError, ValueError, IndexError):
207              interpolated_msg = msg
208          return original_make_records(name, level, fn, lno, interpolated_msg, (), exc_info, func, extra, sinfo)
209  
210      return _wrapper
211  
212  
213  def _patch_structlog_call_information(logger: logging.Logger) -> None:
214      # structlog patches the findCaller to hide itself from the traceback.
215      # We need to patch their patch to hide `haystack.logging` from the traceback.
216      try:
217          from structlog._frames import _find_first_app_frame_and_name, _format_stack
218          from structlog.stdlib import _FixedFindCallerLogger
219  
220          if not isinstance(logger, _FixedFindCallerLogger):
221              return
222  
223          # completely copied from structlog. We only add `haystack.logging` to the list of ignored frames
224          def findCaller(stack_info: bool = False, stacklevel: int = 1) -> tuple[str, int, str, str | None]:  # noqa: ARG001
225              try:
226                  sinfo: str | None
227                  # we need to exclude `haystack.logging` from the stack
228                  f, name = _find_first_app_frame_and_name(["logging", "haystack.logging"])
229                  sinfo = _format_stack(f) if stack_info else None
230              except Exception as error:
231                  print(f"Error in findCaller: {error}")
232  
233              return f.f_code.co_filename, f.f_lineno, f.f_code.co_name, sinfo
234  
235          logger.findCaller = findCaller  # type: ignore
236      except ImportError:
237          pass
238  
239  
240  def getLogger(name: str) -> PatchedLogger:
241      """
242      Get the Haystack logger, a patched version of the one from the standard library.
243  
244      We patch the default logger methods to make sure that they are only called with keyword arguments.
245      We enforce keyword-arguments because
246          - it brings in consistency
247          - it makes structure logging effective, not just an available feature
248      """
249      logger = logging.getLogger(name)
250      logger.debug = patch_log_method_to_kwargs_only(logger.debug)  # type: ignore
251      logger.info = patch_log_method_to_kwargs_only(logger.info)  # type: ignore
252      logger.warn = patch_log_method_to_kwargs_only(logger.warn)  # type: ignore
253      logger.warning = patch_log_method_to_kwargs_only(logger.warning)  # type: ignore
254      logger.error = patch_log_method_to_kwargs_only(logger.error)  # type: ignore
255      logger.critical = patch_log_method_to_kwargs_only(logger.critical)  # type: ignore
256      logger.exception = patch_log_method_to_kwargs_only(logger.exception)  # type: ignore
257      logger.fatal = patch_log_method_to_kwargs_only(logger.fatal)  # type: ignore
258      logger.log = patch_log_with_level_method_to_kwargs_only(logger.log)  # type: ignore
259  
260      _patch_structlog_call_information(logger)
261  
262      # We also patch the `makeRecord` method to use keyword string interpolation
263      logger.makeRecord = patch_make_records_to_use_kwarg_string_interpolation(logger.makeRecord)  # type: ignore
264  
265      return typing.cast(PatchedLogger, logger)
266  
267  
268  def add_line_and_file(_: "WrappedLogger", __: str, event_dict: "EventDict") -> "EventDict":
269      """Add line and file to log entries."""
270      stdlib_record = event_dict.get("_record")
271      if not stdlib_record:
272          return event_dict
273  
274      event_dict["lineno"] = stdlib_record.lineno
275      event_dict["module"] = stdlib_record.name
276  
277      return event_dict
278  
279  
280  def correlate_logs_with_traces(_: "WrappedLogger", __: str, event_dict: "EventDict") -> "EventDict":
281      """
282      Add correlation data for logs.
283  
284      This is useful if you want to correlate logs with traces.
285      """
286      import haystack.tracing.tracer  # to avoid circular imports
287  
288      if not haystack.tracing.is_tracing_enabled():
289          return event_dict
290  
291      current_span = haystack.tracing.tracer.current_span()
292      if current_span:
293          event_dict.update(current_span.get_correlation_data_for_logs())
294  
295      return event_dict
296  
297  
298  def configure_logging(use_json: bool | None = None) -> None:
299      """
300      Configure logging for Haystack.
301  
302      - If `structlog` is not installed, we keep everything as it is. The user is responsible for configuring logging
303        themselves.
304      - If `structlog` is installed, we configure it to format log entries including its key-value data. To disable this
305        behavior set the environment variable `HAYSTACK_LOGGING_IGNORE_STRUCTLOG` to `true`.
306      - If `structlog` is installed, you can JSON format all logs. Enable this by
307          - setting the `use_json` parameter to `True` when calling this function
308          - setting the environment variable `HAYSTACK_LOGGING_USE_JSON` to `true`
309      """
310      import haystack.utils.jupyter  # to avoid circular imports
311  
312      try:
313          import structlog
314          from structlog.processors import ExceptionRenderer
315          from structlog.tracebacks import ExceptionDictTransformer
316  
317      except ImportError:
318          # structlog is not installed - fall back to standard logging
319          return
320  
321      if os.getenv(HAYSTACK_LOGGING_IGNORE_STRUCTLOG_ENV_VAR, "false").lower() == "true":
322          # If the user wants to ignore structlog, we don't configure it and fall back to standard logging
323          return
324  
325      # We roughly follow the structlog documentation here:
326      # https://www.structlog.org/en/stable/standard-library.html#rendering-using-structlog-based-formatters-within-logging
327      # This means that we use structlog to format the log entries for entries emitted via `logging` and `structlog`.
328  
329      if use_json is None:  # explicit parameter takes precedence over everything else
330          use_json_env_var = os.getenv(HAYSTACK_LOGGING_USE_JSON_ENV_VAR)
331          if use_json_env_var is None:
332              # We try to guess if we are in an interactive terminal or not
333              interactive_terminal = (
334                  sys.stderr.isatty() or hasattr(builtins, "__IPYTHON__") or haystack.utils.jupyter.is_in_jupyter()
335              )
336              use_json = not interactive_terminal
337          else:
338              # User gave us an explicit value via environment variable
339              use_json = use_json_env_var.lower() == "true"
340  
341      shared_processors: list[Processor] = [
342          # Add the log level to the event_dict for structlog to use
343          structlog.stdlib.add_log_level,
344          # Adds the current timestamp in ISO format to logs
345          structlog.processors.TimeStamper(fmt="iso"),
346          structlog.contextvars.merge_contextvars,
347          add_line_and_file,
348      ]
349  
350      if use_json:
351          # We only need that in sophisticated production setups where we want to correlate logs with traces
352          shared_processors.append(correlate_logs_with_traces)
353  
354      structlog.configure(
355          processors=shared_processors + [structlog.stdlib.ProcessorFormatter.wrap_for_formatter],
356          logger_factory=structlog.stdlib.LoggerFactory(ignore_frame_names=["haystack.logging"]),
357          cache_logger_on_first_use=True,
358          # This is a filter that will filter out log entries that are below the log level of the root logger.
359          wrapper_class=structlog.make_filtering_bound_logger(min_level=logging.root.getEffectiveLevel()),
360      )
361  
362      renderers: list[Processor]
363      if use_json:
364          renderers = [
365              ExceptionRenderer(
366                  # don't show locals in production logs - this can be quite sensitive information
367                  ExceptionDictTransformer(show_locals=False)
368              ),
369              structlog.processors.JSONRenderer(),
370          ]
371      else:
372          renderers = [structlog.dev.ConsoleRenderer()]
373  
374      formatter = structlog.stdlib.ProcessorFormatter(
375          # These run ONLY on `logging` entries that do NOT originate within
376          # structlog.
377          foreign_pre_chain=shared_processors
378          + [
379              # Add the information from the `logging` `extras` to the event dictionary
380              structlog.stdlib.ExtraAdder()
381          ],
382          # These run on ALL entries after the pre_chain is done.
383          processors=[
384              # Remove _record & _from_structlog. to avoid that this metadata is added to the final log record
385              structlog.stdlib.ProcessorFormatter.remove_processors_meta,
386              *renderers,
387          ],
388      )
389  
390      handler = logging.StreamHandler()
391      handler.name = "HaystackLoggingHandler"
392      # Use OUR `ProcessorFormatter` to format all `logging` entries.
393      handler.setFormatter(formatter)
394  
395      root_logger = logging.getLogger()
396      # avoid adding our handler twice
397      old_handlers = [
398          h
399          for h in root_logger.handlers
400          if not (isinstance(h, logging.StreamHandler) and h.name == "HaystackLoggingHandler")
401      ]
402      new_handlers = [handler, *old_handlers]
403      root_logger.handlers = new_handlers