multithreading.py
1 """ 2 This example demonstrates how to create a trace to track the execution of a multi-threaded application. 3 4 To trace a multi-threaded operation, you need to use the low-level MLflow client APIs to create a trace and spans, because the high-level fluent APIs are not thread-safe. 5 """ 6 7 import contextvars 8 from concurrent.futures import ThreadPoolExecutor, as_completed 9 10 import openai 11 12 import mlflow 13 14 exp = mlflow.set_experiment("mlflow-tracing-example") 15 exp_id = exp.experiment_id 16 17 client = openai.OpenAI() 18 19 # Enable MLflow Tracing for OpenAI 20 mlflow.openai.autolog() 21 22 23 @mlflow.trace 24 def worker(question: str) -> str: 25 messages = [ 26 {"role": "system", "content": "You are a helpful assistant."}, 27 {"role": "user", "content": question}, 28 ] 29 response = client.chat.completions.create( 30 model="gpt-4o-mini", 31 messages=messages, 32 temperature=0.1, 33 max_tokens=100, 34 ) 35 return response.choices[0].message.content 36 37 38 @mlflow.trace 39 def main(questions: list[str]) -> list[str]: 40 results = [] 41 # Almost same as how you would use ThreadPoolExecutor, but two additional steps 42 # 1. Copy the context in the main thread using copy_context() 43 # 2. Use ctx.run() to run the worker in the copied context 44 with ThreadPoolExecutor(max_workers=2) as executor: 45 futures = [] 46 for question in questions: 47 ctx = contextvars.copy_context() 48 futures.append(executor.submit(ctx.run, worker, question)) 49 results.extend(future.result() for future in as_completed(futures)) 50 return results 51 52 53 questions = [ 54 "What is the capital of France?", 55 "What is the capital of Germany?", 56 ] 57 58 main(questions) 59 60 print( 61 "\033[92m" 62 + "🤖Now run `mlflow server` and open MLflow UI to see the trace visualization!" 63 + "\033[0m" 64 )