autolog.py
1 import logging 2 3 from mlflow.langchain.constant import FLAVOR_NAME 4 from mlflow.telemetry.events import AutologgingEvent 5 from mlflow.telemetry.track import _record_event 6 from mlflow.utils.autologging_utils import autologging_integration 7 from mlflow.utils.autologging_utils.config import AutoLoggingConfig 8 from mlflow.utils.autologging_utils.safety import safe_patch 9 10 logger = logging.getLogger(__name__) 11 12 13 @autologging_integration(FLAVOR_NAME) 14 def autolog( 15 disable=False, 16 exclusive=False, 17 disable_for_unsupported_versions=False, 18 silent=False, 19 log_traces=True, 20 run_tracer_inline=False, 21 ): 22 """ 23 Enables (or disables) and configures autologging from Langchain to MLflow. 24 25 Args: 26 disable: If ``True``, disables the Langchain autologging integration. If ``False``, 27 enables the Langchain autologging integration. 28 exclusive: If ``True``, autologged content is not logged to user-created fluent runs. 29 If ``False``, autologged content is logged to the active fluent run, 30 which may be user-created. 31 disable_for_unsupported_versions: If ``True``, disable autologging for versions of 32 langchain that have not been tested against this version of the MLflow 33 client or are incompatible. 34 silent: If ``True``, suppress all event logs and warnings from MLflow during Langchain 35 autologging. If ``False``, show all events and warnings during Langchain 36 autologging. 37 log_traces: If ``True``, traces are logged for Langchain models by using 38 MlflowLangchainTracer as a callback during inference. If ``False``, no traces are 39 collected during inference. Default to ``True``. 40 run_tracer_inline: If ``True``, the MLflow tracer callback runs in the main async task 41 rather than being offloaded to a thread pool. This ensures proper context propagation 42 when combining autolog traces with manual ``@mlflow.trace`` decorators in async 43 scenarios (e.g., LangGraph's ``ainvoke``). Default is ``False`` for backward 44 compatibility. Set to ``True`` if you use manual ``@mlflow.trace`` decorators within 45 LangGraph nodes or tools and need them properly nested in the autolog trace. 46 """ 47 try: 48 from langchain_core.callbacks import BaseCallbackManager 49 50 safe_patch( 51 FLAVOR_NAME, 52 BaseCallbackManager, 53 "__init__", 54 _patched_callback_manager_init, 55 ) 56 except Exception as e: 57 logger.warning(f"Failed to enable tracing for LangChain. Error: {e}") 58 59 # Special handlings for edge cases. 60 try: 61 from langchain_core.callbacks import BaseCallbackManager 62 from langchain_core.runnables import RunnableSequence 63 64 safe_patch( 65 FLAVOR_NAME, 66 RunnableSequence, 67 "batch", 68 _patched_runnable_sequence_batch, 69 ) 70 71 safe_patch( 72 FLAVOR_NAME, 73 BaseCallbackManager, 74 "merge", 75 _patched_callback_manager_merge, 76 ) 77 except Exception: 78 logger.debug("Failed to patch RunnableSequence or BaseCallbackManager.", exc_info=True) 79 80 _record_event( 81 AutologgingEvent, {"flavor": FLAVOR_NAME, "log_traces": log_traces, "disable": disable} 82 ) 83 84 85 def _patched_callback_manager_init(original, self, *args, **kwargs): 86 from mlflow.langchain.langchain_tracer import MlflowLangchainTracer 87 from mlflow.utils.autologging_utils import get_autologging_config 88 89 original(self, *args, **kwargs) 90 91 if not AutoLoggingConfig.init(FLAVOR_NAME).log_traces: 92 return 93 94 for handler in self.inheritable_handlers: 95 if isinstance(handler, MlflowLangchainTracer): 96 return 97 98 run_tracer_inline = get_autologging_config(FLAVOR_NAME, "run_tracer_inline", True) 99 _handler = MlflowLangchainTracer(run_inline=run_tracer_inline) 100 self.add_handler(_handler, inherit=True) 101 102 103 def _patched_callback_manager_merge(original, self, *args, **kwargs): 104 """ 105 Patch BaseCallbackManager.merge to avoid a duplicated callback issue. 106 107 In the above patched __init__, we check `inheritable_handlers` to see if the MLflow tracer 108 is already propagated. This works when the `inheritable_handlers` is specified as constructor 109 arguments. However, in the `merge` method, LangChain does not use constructor but set 110 callbacks via the setter method. This causes duplicated callbacks injection. 111 https://github.com/langchain-ai/langchain/blob/d9a069c414a321e7a3f3638a32ecf8a37ec2d188/libs/core/langchain_core/callbacks/base.py#L962-L982 112 """ 113 from mlflow.langchain.langchain_tracer import MlflowLangchainTracer 114 115 # Get the MLflow callback inherited from parent 116 inherited = self.inheritable_handlers + args[0].inheritable_handlers 117 inherited_mlflow_cb = next( 118 (cb for cb in inherited if isinstance(cb, MlflowLangchainTracer)), None 119 ) 120 121 if not inherited_mlflow_cb: 122 return original(self, *args, **kwargs) 123 124 merged = original(self, *args, **kwargs) 125 # If a new MLflow callback is generated inside __init__, remove it 126 duplicate_mlflow_cbs = [ 127 cb 128 for cb in merged.inheritable_handlers 129 if isinstance(cb, MlflowLangchainTracer) and cb != inherited_mlflow_cb 130 ] 131 for cb in duplicate_mlflow_cbs: 132 merged.remove_handler(cb) 133 134 return merged 135 136 137 def _patched_runnable_sequence_batch(original, self, *args, **kwargs): 138 """ 139 Patch to terminate span context attachment during batch execution. 140 141 RunnableSequence's batch() methods are implemented in a peculiar way 142 that iterates on steps->items sequentially within the same thread. For example, if a 143 sequence has 2 steps and the batch size is 3, the execution flow will be: 144 - Step 1 for item 1 145 - Step 1 for item 2 146 - Step 1 for item 3 147 - Step 2 for item 1 148 - Step 2 for item 2 149 - Step 2 for item 3 150 Due to this behavior, we cannot attach the span to the context for this particular 151 API, otherwise spans for different inputs will be mixed up. 152 """ 153 from mlflow.langchain.langchain_tracer import _should_attach_span_to_context 154 155 original_state = _should_attach_span_to_context.get() 156 _should_attach_span_to_context.set(False) 157 try: 158 return original(self, *args, **kwargs) 159 finally: 160 _should_attach_span_to_context.set(original_state)