tracing_utils.py
1 import logging 2 from typing import Any 3 4 from opentelemetry import context as otel_context_api 5 from opentelemetry import trace as otel_trace 6 from opentelemetry.trace import get_current_span 7 from semantic_kernel.contents.chat_history import ChatHistory 8 from semantic_kernel.contents.kernel_content import KernelContent 9 from semantic_kernel.contents.streaming_content_mixin import StreamingContentMixin 10 from semantic_kernel.functions import FunctionResult 11 from semantic_kernel.utils.telemetry.model_diagnostics import ( 12 gen_ai_attributes as model_gen_ai_attributes, 13 ) 14 from semantic_kernel.utils.telemetry.model_diagnostics.decorators import ( 15 CHAT_COMPLETION_OPERATION, 16 TEXT_COMPLETION_OPERATION, 17 ) 18 from semantic_kernel.utils.telemetry.model_diagnostics.function_tracer import ( 19 OPERATION_NAME as FUNCTION_OPERATION_NAME, 20 ) 21 22 import mlflow 23 from mlflow.entities import SpanType 24 from mlflow.entities.span import LiveSpan 25 from mlflow.tracing.constant import SpanAttributeKey, TokenUsageKey 26 from mlflow.tracing.utils import ( 27 construct_full_inputs, 28 get_mlflow_span_for_otel_span, 29 ) 30 31 _OPERATION_TO_SPAN_TYPE = { 32 CHAT_COMPLETION_OPERATION: SpanType.CHAT_MODEL, 33 TEXT_COMPLETION_OPERATION: SpanType.LLM, 34 FUNCTION_OPERATION_NAME: SpanType.TOOL, 35 # https://github.com/microsoft/semantic-kernel/blob/d5ee6aa1c176a4b860aba72edaa961570874661b/python/semantic_kernel/utils/telemetry/agent_diagnostics/decorators.py#L22 36 "invoke_agent": SpanType.AGENT, 37 } 38 39 # NB: Streaming operation names were removed in Semantic Kernel 1.38.0 40 try: 41 from semantic_kernel.utils.telemetry.agent_diagnostics.decorators import ( 42 CHAT_STREAMING_COMPLETION_OPERATION, 43 TEXT_STREAMING_COMPLETION_OPERATION, 44 ) 45 46 _OPERATION_TO_SPAN_TYPE[CHAT_STREAMING_COMPLETION_OPERATION] = SpanType.CHAT_MODEL 47 _OPERATION_TO_SPAN_TYPE[TEXT_STREAMING_COMPLETION_OPERATION] = SpanType.LLM 48 except ImportError: 49 pass 50 51 _logger = logging.getLogger(__name__) 52 53 54 def semantic_kernel_diagnostics_wrapper(original, *args, **kwargs) -> None: 55 """ 56 Wrapper for Semantic Kernel's model diagnostics decorators. 57 58 This wrapper is used to record the inputs and outputs to the span, because 59 Semantic Kernel's Otel span do not record the inputs and outputs. 60 """ 61 full_kwargs = construct_full_inputs(original, *args, **kwargs) 62 current_span = full_kwargs.get("current_span") or get_current_span() 63 mlflow_span = get_mlflow_span_for_otel_span(current_span) 64 65 if not mlflow_span: 66 _logger.debug("Span is not found or recording. Skipping error handling.") 67 return original(*args, **kwargs) 68 69 if prompt := full_kwargs.get("prompt"): 70 # Wrapping _set_completion_input 71 # https://github.com/microsoft/semantic-kernel/blob/d5ee6aa1c176a4b860aba72edaa961570874661b/python/semantic_kernel/utils/telemetry/model_diagnostics/decorators.py#L369 72 mlflow_span.set_inputs(_parse_content(prompt)) 73 74 if completions := full_kwargs.get("completions"): 75 # Wrapping _set_completion_response 76 # https://github.com/microsoft/semantic-kernel/blob/d5ee6aa1c176a4b860aba72edaa961570874661b/ 77 mlflow_span.set_outputs({"messages": [_parse_content(c) for c in completions]}) 78 79 if error := full_kwargs.get("error"): 80 # Wrapping _set_completion_error 81 # https://github.com/microsoft/semantic-kernel/blob/d5ee6aa1c176a4b860aba72edaa961570874661b/python/semantic_kernel/utils/telemetry/model_diagnostics/decorators.py#L452 82 mlflow_span.record_exception(error) 83 84 return original(*args, **kwargs) 85 86 87 async def patched_kernel_entry_point(original, self, *args, **kwargs): 88 with mlflow.start_span( 89 name=f"{self.__class__.__name__}.{original.__name__}", 90 span_type=SpanType.AGENT, 91 ) as mlflow_span: 92 inputs = construct_full_inputs(original, self, *args, **kwargs) 93 mlflow_span.set_inputs(_parse_content(inputs)) 94 95 # Attach the MLflow span to the global OTel context so that Semantic Kernel's 96 # internal OTel spans (e.g., execute_tool, chat.completions) will inherit the 97 # same trace_id and be properly linked as child spans. 98 global_ctx = otel_trace.set_span_in_context(mlflow_span._span) 99 token = otel_context_api.attach(global_ctx) 100 try: 101 result = await original(self, *args, **kwargs) 102 finally: 103 otel_context_api.detach(token) 104 105 mlflow_span.set_outputs(_parse_content(result)) 106 107 return result 108 109 110 def _parse_content(value: Any) -> Any: 111 """ 112 Parse the message content objects in Semantic Kernel into a more readable format. 113 114 Those objects are Pydantic models, but includes many noisy fields that are not 115 useful for debugging and hard to read. The base KernelContent class has a to_dict() 116 method that converts them into more readable format (role, content), so we use that. 117 """ 118 if isinstance(value, dict) and (chat_history := value.get("chat_history")): 119 value = _parse_content(chat_history) 120 elif isinstance(value, ChatHistory): 121 # Record chat history as a list of messages for better readability 122 value = {"messages": [_parse_content(m) for m in value.messages]} 123 elif isinstance(value, (KernelContent, StreamingContentMixin)): 124 value = value.to_dict() 125 elif isinstance(value, FunctionResult): 126 # Extract "value" field from the FunctionResult object 127 value = _parse_content(value.value) 128 elif isinstance(value, list): 129 value = [_parse_content(item) for item in value] 130 return value 131 132 133 def set_span_type(mlflow_span: LiveSpan) -> str: 134 """Determine the span type based on the operation.""" 135 span_type = SpanType.UNKNOWN 136 if operation := mlflow_span.get_attribute(model_gen_ai_attributes.OPERATION): 137 span_type = _OPERATION_TO_SPAN_TYPE.get(operation, SpanType.UNKNOWN) 138 139 mlflow_span.set_span_type(span_type) 140 141 142 def set_token_usage(mlflow_span: LiveSpan) -> None: 143 """Set token usage attributes on the MLflow span.""" 144 input_tokens = mlflow_span.get_attribute(model_gen_ai_attributes.INPUT_TOKENS) 145 output_tokens = mlflow_span.get_attribute(model_gen_ai_attributes.OUTPUT_TOKENS) 146 147 usage_dict = {} 148 if input_tokens is not None: 149 usage_dict[TokenUsageKey.INPUT_TOKENS] = input_tokens 150 if output_tokens is not None: 151 usage_dict[TokenUsageKey.OUTPUT_TOKENS] = output_tokens 152 153 if input_tokens is not None or output_tokens is not None: 154 total_tokens = (input_tokens or 0) + (output_tokens or 0) 155 usage_dict[TokenUsageKey.TOTAL_TOKENS] = total_tokens 156 157 if usage_dict: 158 mlflow_span.set_attribute(SpanAttributeKey.CHAT_USAGE, usage_dict) 159 160 161 def set_model(mlflow_span: LiveSpan) -> None: 162 """Set model name and provider attributes on the MLflow span.""" 163 if model := mlflow_span.get_attribute(model_gen_ai_attributes.MODEL): 164 mlflow_span.set_attribute(SpanAttributeKey.MODEL, model) 165 if provider := mlflow_span.get_attribute(model_gen_ai_attributes.SYSTEM): 166 mlflow_span.set_attribute(SpanAttributeKey.MODEL_PROVIDER, provider)