/ mlflow / langchain / autolog.py
autolog.py
  1  import logging
  2  
  3  from mlflow.langchain.constant import FLAVOR_NAME
  4  from mlflow.telemetry.events import AutologgingEvent
  5  from mlflow.telemetry.track import _record_event
  6  from mlflow.utils.autologging_utils import autologging_integration
  7  from mlflow.utils.autologging_utils.config import AutoLoggingConfig
  8  from mlflow.utils.autologging_utils.safety import safe_patch
  9  
 10  logger = logging.getLogger(__name__)
 11  
 12  
 13  @autologging_integration(FLAVOR_NAME)
 14  def autolog(
 15      disable=False,
 16      exclusive=False,
 17      disable_for_unsupported_versions=False,
 18      silent=False,
 19      log_traces=True,
 20      run_tracer_inline=False,
 21  ):
 22      """
 23      Enables (or disables) and configures autologging from Langchain to MLflow.
 24  
 25      Args:
 26          disable: If ``True``, disables the Langchain autologging integration. If ``False``,
 27              enables the Langchain autologging integration.
 28          exclusive: If ``True``, autologged content is not logged to user-created fluent runs.
 29              If ``False``, autologged content is logged to the active fluent run,
 30              which may be user-created.
 31          disable_for_unsupported_versions: If ``True``, disable autologging for versions of
 32              langchain that have not been tested against this version of the MLflow
 33              client or are incompatible.
 34          silent: If ``True``, suppress all event logs and warnings from MLflow during Langchain
 35              autologging. If ``False``, show all events and warnings during Langchain
 36              autologging.
 37          log_traces: If ``True``, traces are logged for Langchain models by using
 38              MlflowLangchainTracer as a callback during inference. If ``False``, no traces are
 39              collected during inference. Default to ``True``.
 40          run_tracer_inline: If ``True``, the MLflow tracer callback runs in the main async task
 41              rather than being offloaded to a thread pool. This ensures proper context propagation
 42              when combining autolog traces with manual ``@mlflow.trace`` decorators in async
 43              scenarios (e.g., LangGraph's ``ainvoke``). Default is ``False`` for backward
 44              compatibility. Set to ``True`` if you use manual ``@mlflow.trace`` decorators within
 45              LangGraph nodes or tools and need them properly nested in the autolog trace.
 46      """
 47      try:
 48          from langchain_core.callbacks import BaseCallbackManager
 49  
 50          safe_patch(
 51              FLAVOR_NAME,
 52              BaseCallbackManager,
 53              "__init__",
 54              _patched_callback_manager_init,
 55          )
 56      except Exception as e:
 57          logger.warning(f"Failed to enable tracing for LangChain. Error: {e}")
 58  
 59      # Special handlings for edge cases.
 60      try:
 61          from langchain_core.callbacks import BaseCallbackManager
 62          from langchain_core.runnables import RunnableSequence
 63  
 64          safe_patch(
 65              FLAVOR_NAME,
 66              RunnableSequence,
 67              "batch",
 68              _patched_runnable_sequence_batch,
 69          )
 70  
 71          safe_patch(
 72              FLAVOR_NAME,
 73              BaseCallbackManager,
 74              "merge",
 75              _patched_callback_manager_merge,
 76          )
 77      except Exception:
 78          logger.debug("Failed to patch RunnableSequence or BaseCallbackManager.", exc_info=True)
 79  
 80      _record_event(
 81          AutologgingEvent, {"flavor": FLAVOR_NAME, "log_traces": log_traces, "disable": disable}
 82      )
 83  
 84  
 85  def _patched_callback_manager_init(original, self, *args, **kwargs):
 86      from mlflow.langchain.langchain_tracer import MlflowLangchainTracer
 87      from mlflow.utils.autologging_utils import get_autologging_config
 88  
 89      original(self, *args, **kwargs)
 90  
 91      if not AutoLoggingConfig.init(FLAVOR_NAME).log_traces:
 92          return
 93  
 94      for handler in self.inheritable_handlers:
 95          if isinstance(handler, MlflowLangchainTracer):
 96              return
 97  
 98      run_tracer_inline = get_autologging_config(FLAVOR_NAME, "run_tracer_inline", True)
 99      _handler = MlflowLangchainTracer(run_inline=run_tracer_inline)
100      self.add_handler(_handler, inherit=True)
101  
102  
103  def _patched_callback_manager_merge(original, self, *args, **kwargs):
104      """
105      Patch BaseCallbackManager.merge to avoid a duplicated callback issue.
106  
107      In the above patched __init__, we check `inheritable_handlers` to see if the MLflow tracer
108      is already propagated. This works when the `inheritable_handlers` is specified as constructor
109      arguments. However, in the `merge` method, LangChain does not use constructor but set
110      callbacks via the setter method. This causes duplicated callbacks injection.
111      https://github.com/langchain-ai/langchain/blob/d9a069c414a321e7a3f3638a32ecf8a37ec2d188/libs/core/langchain_core/callbacks/base.py#L962-L982
112      """
113      from mlflow.langchain.langchain_tracer import MlflowLangchainTracer
114  
115      # Get the MLflow callback inherited from parent
116      inherited = self.inheritable_handlers + args[0].inheritable_handlers
117      inherited_mlflow_cb = next(
118          (cb for cb in inherited if isinstance(cb, MlflowLangchainTracer)), None
119      )
120  
121      if not inherited_mlflow_cb:
122          return original(self, *args, **kwargs)
123  
124      merged = original(self, *args, **kwargs)
125      # If a new MLflow callback is generated inside __init__, remove it
126      duplicate_mlflow_cbs = [
127          cb
128          for cb in merged.inheritable_handlers
129          if isinstance(cb, MlflowLangchainTracer) and cb != inherited_mlflow_cb
130      ]
131      for cb in duplicate_mlflow_cbs:
132          merged.remove_handler(cb)
133  
134      return merged
135  
136  
137  def _patched_runnable_sequence_batch(original, self, *args, **kwargs):
138      """
139      Patch to terminate span context attachment during batch execution.
140  
141      RunnableSequence's batch() methods are implemented in a peculiar way
142      that iterates on steps->items sequentially within the same thread. For example, if a
143      sequence has 2 steps and the batch size is 3, the execution flow will be:
144        - Step 1 for item 1
145        - Step 1 for item 2
146        - Step 1 for item 3
147        - Step 2 for item 1
148        - Step 2 for item 2
149        - Step 2 for item 3
150      Due to this behavior, we cannot attach the span to the context for this particular
151      API, otherwise spans for different inputs will be mixed up.
152      """
153      from mlflow.langchain.langchain_tracer import _should_attach_span_to_context
154  
155      original_state = _should_attach_span_to_context.get()
156      _should_attach_span_to_context.set(False)
157      try:
158          return original(self, *args, **kwargs)
159      finally:
160          _should_attach_span_to_context.set(original_state)