/ mlflow / semantic_kernel / tracing_utils.py
tracing_utils.py
  1  import logging
  2  from typing import Any
  3  
  4  from opentelemetry import context as otel_context_api
  5  from opentelemetry import trace as otel_trace
  6  from opentelemetry.trace import get_current_span
  7  from semantic_kernel.contents.chat_history import ChatHistory
  8  from semantic_kernel.contents.kernel_content import KernelContent
  9  from semantic_kernel.contents.streaming_content_mixin import StreamingContentMixin
 10  from semantic_kernel.functions import FunctionResult
 11  from semantic_kernel.utils.telemetry.model_diagnostics import (
 12      gen_ai_attributes as model_gen_ai_attributes,
 13  )
 14  from semantic_kernel.utils.telemetry.model_diagnostics.decorators import (
 15      CHAT_COMPLETION_OPERATION,
 16      TEXT_COMPLETION_OPERATION,
 17  )
 18  from semantic_kernel.utils.telemetry.model_diagnostics.function_tracer import (
 19      OPERATION_NAME as FUNCTION_OPERATION_NAME,
 20  )
 21  
 22  import mlflow
 23  from mlflow.entities import SpanType
 24  from mlflow.entities.span import LiveSpan
 25  from mlflow.tracing.constant import SpanAttributeKey, TokenUsageKey
 26  from mlflow.tracing.utils import (
 27      construct_full_inputs,
 28      get_mlflow_span_for_otel_span,
 29  )
 30  
 31  _OPERATION_TO_SPAN_TYPE = {
 32      CHAT_COMPLETION_OPERATION: SpanType.CHAT_MODEL,
 33      TEXT_COMPLETION_OPERATION: SpanType.LLM,
 34      FUNCTION_OPERATION_NAME: SpanType.TOOL,
 35      # https://github.com/microsoft/semantic-kernel/blob/d5ee6aa1c176a4b860aba72edaa961570874661b/python/semantic_kernel/utils/telemetry/agent_diagnostics/decorators.py#L22
 36      "invoke_agent": SpanType.AGENT,
 37  }
 38  
 39  # NB: Streaming operation names were removed in Semantic Kernel 1.38.0
 40  try:
 41      from semantic_kernel.utils.telemetry.agent_diagnostics.decorators import (
 42          CHAT_STREAMING_COMPLETION_OPERATION,
 43          TEXT_STREAMING_COMPLETION_OPERATION,
 44      )
 45  
 46      _OPERATION_TO_SPAN_TYPE[CHAT_STREAMING_COMPLETION_OPERATION] = SpanType.CHAT_MODEL
 47      _OPERATION_TO_SPAN_TYPE[TEXT_STREAMING_COMPLETION_OPERATION] = SpanType.LLM
 48  except ImportError:
 49      pass
 50  
 51  _logger = logging.getLogger(__name__)
 52  
 53  
 54  def semantic_kernel_diagnostics_wrapper(original, *args, **kwargs) -> None:
 55      """
 56      Wrapper for Semantic Kernel's model diagnostics decorators.
 57  
 58      This wrapper is used to record the inputs and outputs to the span, because
 59      Semantic Kernel's Otel span do not record the inputs and outputs.
 60      """
 61      full_kwargs = construct_full_inputs(original, *args, **kwargs)
 62      current_span = full_kwargs.get("current_span") or get_current_span()
 63      mlflow_span = get_mlflow_span_for_otel_span(current_span)
 64  
 65      if not mlflow_span:
 66          _logger.debug("Span is not found or recording. Skipping error handling.")
 67          return original(*args, **kwargs)
 68  
 69      if prompt := full_kwargs.get("prompt"):
 70          # Wrapping _set_completion_input
 71          # https://github.com/microsoft/semantic-kernel/blob/d5ee6aa1c176a4b860aba72edaa961570874661b/python/semantic_kernel/utils/telemetry/model_diagnostics/decorators.py#L369
 72          mlflow_span.set_inputs(_parse_content(prompt))
 73  
 74      if completions := full_kwargs.get("completions"):
 75          # Wrapping _set_completion_response
 76          # https://github.com/microsoft/semantic-kernel/blob/d5ee6aa1c176a4b860aba72edaa961570874661b/
 77          mlflow_span.set_outputs({"messages": [_parse_content(c) for c in completions]})
 78  
 79      if error := full_kwargs.get("error"):
 80          # Wrapping _set_completion_error
 81          # https://github.com/microsoft/semantic-kernel/blob/d5ee6aa1c176a4b860aba72edaa961570874661b/python/semantic_kernel/utils/telemetry/model_diagnostics/decorators.py#L452
 82          mlflow_span.record_exception(error)
 83  
 84      return original(*args, **kwargs)
 85  
 86  
 87  async def patched_kernel_entry_point(original, self, *args, **kwargs):
 88      with mlflow.start_span(
 89          name=f"{self.__class__.__name__}.{original.__name__}",
 90          span_type=SpanType.AGENT,
 91      ) as mlflow_span:
 92          inputs = construct_full_inputs(original, self, *args, **kwargs)
 93          mlflow_span.set_inputs(_parse_content(inputs))
 94  
 95          # Attach the MLflow span to the global OTel context so that Semantic Kernel's
 96          # internal OTel spans (e.g., execute_tool, chat.completions) will inherit the
 97          # same trace_id and be properly linked as child spans.
 98          global_ctx = otel_trace.set_span_in_context(mlflow_span._span)
 99          token = otel_context_api.attach(global_ctx)
100          try:
101              result = await original(self, *args, **kwargs)
102          finally:
103              otel_context_api.detach(token)
104  
105          mlflow_span.set_outputs(_parse_content(result))
106  
107      return result
108  
109  
110  def _parse_content(value: Any) -> Any:
111      """
112      Parse the message content objects in Semantic Kernel into a more readable format.
113  
114      Those objects are Pydantic models, but includes many noisy fields that are not
115      useful for debugging and hard to read. The base KernelContent class has a to_dict()
116      method that converts them into more readable format (role, content), so we use that.
117      """
118      if isinstance(value, dict) and (chat_history := value.get("chat_history")):
119          value = _parse_content(chat_history)
120      elif isinstance(value, ChatHistory):
121          # Record chat history as a list of messages for better readability
122          value = {"messages": [_parse_content(m) for m in value.messages]}
123      elif isinstance(value, (KernelContent, StreamingContentMixin)):
124          value = value.to_dict()
125      elif isinstance(value, FunctionResult):
126          # Extract "value" field from the FunctionResult object
127          value = _parse_content(value.value)
128      elif isinstance(value, list):
129          value = [_parse_content(item) for item in value]
130      return value
131  
132  
133  def set_span_type(mlflow_span: LiveSpan) -> str:
134      """Determine the span type based on the operation."""
135      span_type = SpanType.UNKNOWN
136      if operation := mlflow_span.get_attribute(model_gen_ai_attributes.OPERATION):
137          span_type = _OPERATION_TO_SPAN_TYPE.get(operation, SpanType.UNKNOWN)
138  
139      mlflow_span.set_span_type(span_type)
140  
141  
142  def set_token_usage(mlflow_span: LiveSpan) -> None:
143      """Set token usage attributes on the MLflow span."""
144      input_tokens = mlflow_span.get_attribute(model_gen_ai_attributes.INPUT_TOKENS)
145      output_tokens = mlflow_span.get_attribute(model_gen_ai_attributes.OUTPUT_TOKENS)
146  
147      usage_dict = {}
148      if input_tokens is not None:
149          usage_dict[TokenUsageKey.INPUT_TOKENS] = input_tokens
150      if output_tokens is not None:
151          usage_dict[TokenUsageKey.OUTPUT_TOKENS] = output_tokens
152  
153      if input_tokens is not None or output_tokens is not None:
154          total_tokens = (input_tokens or 0) + (output_tokens or 0)
155          usage_dict[TokenUsageKey.TOTAL_TOKENS] = total_tokens
156  
157      if usage_dict:
158          mlflow_span.set_attribute(SpanAttributeKey.CHAT_USAGE, usage_dict)
159  
160  
161  def set_model(mlflow_span: LiveSpan) -> None:
162      """Set model name and provider attributes on the MLflow span."""
163      if model := mlflow_span.get_attribute(model_gen_ai_attributes.MODEL):
164          mlflow_span.set_attribute(SpanAttributeKey.MODEL, model)
165      if provider := mlflow_span.get_attribute(model_gen_ai_attributes.SYSTEM):
166          mlflow_span.set_attribute(SpanAttributeKey.MODEL_PROVIDER, provider)