tracing.py
1 import os 2 from getpass import getpass 3 4 from haystack import Pipeline 5 from haystack.components.builders import ChatPromptBuilder 6 from haystack.components.generators.chat import OpenAIChatGenerator 7 from haystack.components.retrievers.in_memory import InMemoryBM25Retriever 8 from haystack.components.routers import ConditionalRouter 9 from haystack.components.websearch.serper_dev import SerperDevWebSearch 10 from haystack.dataclasses import ChatMessage, Document 11 from haystack.document_stores.in_memory import InMemoryDocumentStore 12 13 import mlflow 14 15 mlflow.set_experiment("Haystack Tracing") 16 mlflow.haystack.autolog() 17 18 if "OPENAI_API_KEY" not in os.environ: 19 os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:") 20 if "SERPERDEV_API_KEY" not in os.environ: 21 os.environ["SERPERDEV_API_KEY"] = getpass("Enter SerperDev API key:") 22 23 24 document_store = InMemoryDocumentStore() 25 26 documents = [ 27 Document( 28 content="""Munich, the vibrant capital of Bavaria in southern Germany, exudes a perfect blend of rich cultural 29 heritage and modern urban sophistication. Nestled along the banks of the Isar River, Munich is renowned 30 for its splendid architecture, including the iconic Neues Rathaus (New Town Hall) at Marienplatz and 31 the grandeur of Nymphenburg Palace. The city is a haven for art enthusiasts, with world-class museums like the 32 Alte Pinakothek housing masterpieces by renowned artists. Munich is also famous for its lively beer gardens, where 33 locals and tourists gather to enjoy the city's famed beers and traditional Bavarian cuisine. The city's annual 34 Oktoberfest celebration, the world's largest beer festival, attracts millions of visitors from around the globe. 35 Beyond its cultural and culinary delights, Munich offers picturesque parks like the English Garden, providing a 36 serene escape within the heart of the bustling metropolis. Visitors are charmed by Munich's warm hospitality, 37 making it a must-visit destination for travelers seeking a taste of both old-world charm and contemporary allure.""" 38 ) 39 ] 40 41 document_store.write_documents(documents) 42 43 retriever = InMemoryBM25Retriever(document_store) 44 45 prompt_template = [ 46 ChatMessage.from_user( 47 """ 48 Answer the following query given the documents. 49 If the answer is not contained within the documents reply with 'no_answer' 50 51 Documents: 52 {% for document in documents %} 53 {{document.content}} 54 {% endfor %} 55 Query: {{query}} 56 """ 57 ) 58 ] 59 60 prompt_builder = ChatPromptBuilder(template=prompt_template, required_variables="*") 61 llm = OpenAIChatGenerator(model="gpt-4o-mini") 62 63 prompt_for_websearch = [ 64 ChatMessage.from_user( 65 """ 66 Answer the following query given the documents retrieved from the web. 67 Your answer should indicate that your answer was generated from websearch. 68 69 Documents: 70 {% for document in documents %} 71 {{document.content}} 72 {% endfor %} 73 74 Query: {{query}} 75 """ 76 ) 77 ] 78 79 websearch = SerperDevWebSearch() 80 prompt_builder_for_websearch = ChatPromptBuilder( 81 template=prompt_for_websearch, required_variables="*" 82 ) 83 llm_for_websearch = OpenAIChatGenerator(model="gpt-4o-mini") 84 85 86 routes = [ 87 { 88 "condition": "{{'no_answer' in replies[0].text}}", 89 "output": "{{query}}", 90 "output_name": "go_to_websearch", 91 "output_type": str, 92 }, 93 { 94 "condition": "{{'no_answer' not in replies[0].text}}", 95 "output": "{{replies[0].text}}", 96 "output_name": "answer", 97 "output_type": str, 98 }, 99 ] 100 101 router = ConditionalRouter(routes) 102 103 agentic_rag_pipe = Pipeline() 104 agentic_rag_pipe.add_component("retriever", retriever) 105 agentic_rag_pipe.add_component("prompt_builder", prompt_builder) 106 agentic_rag_pipe.add_component("llm", llm) 107 agentic_rag_pipe.add_component("router", router) 108 agentic_rag_pipe.add_component("websearch", websearch) 109 agentic_rag_pipe.add_component("prompt_builder_for_websearch", prompt_builder_for_websearch) 110 agentic_rag_pipe.add_component("llm_for_websearch", llm_for_websearch) 111 112 agentic_rag_pipe.connect("retriever", "prompt_builder.documents") 113 agentic_rag_pipe.connect("prompt_builder.prompt", "llm.messages") 114 agentic_rag_pipe.connect("llm.replies", "router.replies") 115 agentic_rag_pipe.connect("router.go_to_websearch", "websearch.query") 116 agentic_rag_pipe.connect("router.go_to_websearch", "prompt_builder_for_websearch.query") 117 agentic_rag_pipe.connect("websearch.documents", "prompt_builder_for_websearch.documents") 118 agentic_rag_pipe.connect("prompt_builder_for_websearch", "llm_for_websearch") 119 120 121 query = "How many people live in Munich?" 122 123 result = agentic_rag_pipe.run({ 124 "retriever": {"query": query}, 125 "prompt_builder": {"query": query}, 126 "router": {"query": query}, 127 }) 128 129 # Print the `replies` generated using the web searched Documents 130 print(result["llm_for_websearch"]["replies"][0].text) 131 132 last_trace_id = mlflow.get_last_active_trace_id() 133 trace = mlflow.get_trace(trace_id=last_trace_id) 134 135 # Print the token usage 136 total_usage = trace.info.token_usage 137 print("== Total token usage: ==") 138 print(f" Input tokens: {total_usage['input_tokens']}") 139 print(f" Output tokens: {total_usage['output_tokens']}") 140 print(f" Total tokens: {total_usage['total_tokens']}") 141 142 # Print the token usage for each LLM call 143 print("\n== Detailed usage for each LLM call: ==") 144 for span in trace.data.spans: 145 if usage := span.get_attribute("mlflow.chat.tokenUsage"): 146 print(f"{span.name}:") 147 print(f" Input tokens: {usage['input_tokens']}") 148 print(f" Output tokens: {usage['output_tokens']}") 149 print(f" Total tokens: {usage['total_tokens']}")