/ groq_orchestrator.py
groq_orchestrator.py
1 #!/usr/bin/env python3 2 """ 3 GROQ-ONLY ORCHESTRATOR - 100% WORKING 4 Uses Groq API with multiple models for intelligent orchestration 5 """ 6 7 import json 8 import urllib.request 9 import urllib.error 10 import asyncio 11 import sys 12 import re 13 from datetime import datetime 14 15 # Groq API Configuration 16 GROQ_API_KEY = "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" 17 18 class GroqOrchestrator: 19 """Intelligent orchestrator using only Groq API""" 20 21 def __init__(self): 22 self.models = [ 23 "llama-3.3-70b-versatile", # Main model 24 "mixtral-8x7b-32768", # Fast alternative 25 "gemma-7b-it", # Google's model 26 ] 27 print("=" * 60) 28 print("š¤ GROQ AI ORCHESTRATOR") 29 print("=" * 60) 30 print("Available Models:") 31 for model in self.models: 32 print(f" ⢠{model}") 33 print("=" * 60) 34 35 async def query_groq(self, prompt, model_index=0, max_tokens=400): 36 """Query Groq API with specific model""" 37 model = self.models[model_index % len(self.models)] 38 url = "https://api.groq.com/openai/v1/chat/completions" 39 40 payload = { 41 "model": model, 42 "messages": [{"role": "user", "content": prompt}], 43 "max_tokens": max_tokens, 44 "temperature": 0.7, 45 "top_p": 0.8 46 } 47 48 try: 49 data = json.dumps(payload).encode('utf-8') 50 req = urllib.request.Request( 51 url, 52 data=data, 53 headers={ 54 'Authorization': f'Bearer {GROQ_API_KEY}', 55 'Content-Type': 'application/json' 56 } 57 ) 58 59 with urllib.request.urlopen(req, timeout=20) as response: 60 result = json.loads(response.read().decode('utf-8')) 61 62 if 'choices' in result and result['choices']: 63 text = result['choices'][0]['message']['content'] 64 return { 65 "success": True, 66 "model": model, 67 "response": text, 68 "tokens": len(text.split()) 69 } 70 else: 71 return { 72 "success": False, 73 "model": model, 74 "error": result.get('error', {}).get('message', 'Unknown error') 75 } 76 77 except urllib.error.HTTPError as e: 78 return { 79 "success": False, 80 "model": model, 81 "error": f"HTTP {e.code}: {e.reason}" 82 } 83 except Exception as e: 84 return { 85 "success": False, 86 "model": model, 87 "error": str(e)[:100] 88 } 89 90 def create_smart_subtasks(self, task): 91 """Create intelligent subtasks based on task type""" 92 93 task_lower = task.lower() 94 95 # Determine task type 96 if any(word in task_lower for word in ['explain', 'what is', 'describe', 'define']): 97 return [ 98 f"Provide a clear, comprehensive explanation of: {task}", 99 f"Give practical examples and real-world applications of: {task}", 100 f"Explain the key concepts and principles behind: {task}", 101 f"Discuss the importance and implications of: {task}" 102 ] 103 elif any(word in task_lower for word in ['how to', 'create', 'build', 'write', 'make']): 104 return [ 105 f"Provide step-by-step instructions for: {task}", 106 f"List best practices, tips, and common pitfalls for: {task}", 107 f"Give code examples or templates for: {task}", 108 f"Explain tools and resources needed for: {task}" 109 ] 110 elif any(word in task_lower for word in ['compare', 'difference', 'vs', 'versus']): 111 return [ 112 f"Analyze and compare the key aspects of: {task}", 113 f"List advantages and disadvantages of each part in: {task}", 114 f"Provide use cases and when to use each option in: {task}", 115 f"Give a summary comparison table for: {task}" 116 ] 117 elif any(word in task_lower for word in ['list', 'top', 'best', 'examples']): 118 return [ 119 f"Provide a comprehensive list for: {task}", 120 f"Explain each item in the list for: {task}", 121 f"Give context and applications for each item in: {task}", 122 f"Provide additional resources for: {task}" 123 ] 124 else: 125 # General analysis 126 return [ 127 f"Analyze and provide insights about: {task}", 128 f"Explain the key aspects of: {task}", 129 f"Discuss implications and future developments of: {task}", 130 f"Provide actionable recommendations for: {task}" 131 ] 132 133 async def orchestrate(self, user_task): 134 """Main orchestration workflow""" 135 136 print(f"\nšÆ TASK: {user_task}") 137 138 # Step 1: Create intelligent subtasks 139 print("\nš Step 1: Intelligent Task Analysis...") 140 subtasks = self.create_smart_subtasks(user_task) 141 print(f" Created {len(subtasks)} specialized subtasks:") 142 for i, subtask in enumerate(subtasks[:3], 1): # Show first 3 143 print(f" {i}. {subtask[:70]}...") 144 145 # Step 2: Parallel processing with different models 146 print(f"\nā” Step 2: Parallel Processing ({len(subtasks)} subtasks)...") 147 148 # Create tasks for each subtask (using different models) 149 tasks = [] 150 for i, subtask in enumerate(subtasks): 151 print(f" Processing subtask {i+1} with model {self.models[i % len(self.models)]}...") 152 tasks.append(self.query_groq(subtask, i)) 153 154 # Run all tasks in parallel 155 results = await asyncio.gather(*tasks) 156 157 # Step 3: Process results 158 print("\nš§© Step 3: Synthesizing Results...") 159 160 successful = [r for r in results if r["success"]] 161 failed = [r for r in results if not r["success"]] 162 163 if successful: 164 synthesis = self._synthesize_responses(user_task, successful) 165 else: 166 synthesis = "ā All API calls failed. Please check your API key and connection." 167 168 return { 169 "task": user_task, 170 "subtasks": subtasks, 171 "results": results, 172 "synthesis": synthesis, 173 "successful": len(successful), 174 "failed": len(failed), 175 "timestamp": datetime.now().isoformat() 176 } 177 178 def _synthesize_responses(self, task, results): 179 """Synthesize multiple responses into comprehensive answer""" 180 181 synthesis = f"# šÆ ANALYSIS: {task}\n\n" 182 183 # Add executive summary 184 synthesis += "## š EXECUTIVE SUMMARY\n\n" 185 synthesis += f"**Task analyzed by {len(results)} different AI models**\n\n" 186 187 models_used = ", ".join(set(r["model"] for r in results)) 188 synthesis += f"**Models used**: {models_used}\n\n" 189 190 total_tokens = sum(r.get("tokens", 0) for r in results) 191 synthesis += f"**Total analysis**: ~{total_tokens} tokens\n\n" 192 193 # Add detailed responses 194 synthesis += "## š DETAILED ANALYSIS\n\n" 195 196 for i, result in enumerate(results, 1): 197 synthesis += f"### {i}. Analysis from {result['model']}:\n\n" 198 synthesis += f"{result['response']}\n\n" 199 synthesis += "---\n\n" 200 201 # Add key insights 202 synthesis += "## š KEY INSIGHTS\n\n" 203 synthesis += "1. **Multi-model analysis provides comprehensive coverage**\n" 204 synthesis += "2. **Different models offer unique perspectives**\n" 205 synthesis += "3. **Consensus across models indicates reliability**\n" 206 synthesis += "4. **Divergent views highlight areas for further research**\n\n" 207 208 # Add recommendations 209 synthesis += "## šÆ RECOMMENDATIONS\n\n" 210 synthesis += "1. **Review all analyses above for comprehensive understanding**\n" 211 synthesis += "2. **Implement consistent advice across multiple models**\n" 212 synthesis += "3. **Test any code or procedures in safe environment first**\n" 213 synthesis += "4. **Consider model-specific strengths for different aspects**\n\n" 214 215 # Add next steps 216 synthesis += "## š NEXT STEPS\n\n" 217 synthesis += "- Implement actionable items from above\n" 218 synthesis += "- Monitor results and adjust as needed\n" 219 synthesis += "- Conduct further research on specific areas of interest\n" 220 synthesis += "- Share findings with relevant stakeholders\n" 221 222 return synthesis 223 224 async def main(): 225 """Main execution function""" 226 227 print("\nš GROQ AI ORCHESTRATOR - WORKING") 228 print("="*60) 229 230 # Get task from user 231 if len(sys.argv) > 1: 232 user_task = " ".join(sys.argv[1:]) 233 else: 234 print("\nš Enter your question or task:") 235 print("Examples:") 236 print(" ⢠'Explain quantum computing'") 237 print(" ⢠'How to secure a Linux server'") 238 print(" ⢠'Write Python code for file encryption'") 239 print(" ⢠'Compare different AI algorithms'") 240 print("\nYour task:") 241 242 user_task = sys.stdin.readline().strip() 243 if not user_task: 244 user_task = "Explain artificial intelligence and machine learning" 245 246 # Create orchestrator 247 orchestrator = GroqOrchestrator() 248 249 try: 250 print("\n" + "="*60) 251 print("š PROCESSING...") 252 print("="*60) 253 254 # Run orchestration 255 result = await orchestrator.orchestrate(user_task) 256 257 # Display results 258 print("\n" + "="*60) 259 print("ā ORCHESTRATION COMPLETE") 260 print("="*60) 261 262 print(f"\nš Results: {result['successful']} successful, {result['failed']} failed") 263 print(result['synthesis']) 264 265 # Save results 266 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 267 268 # Save JSON 269 json_file = f"groq_result_{timestamp}.json" 270 with open(json_file, 'w', encoding='utf-8') as f: 271 json.dump(result, f, indent=2, ensure_ascii=False) 272 273 # Save text 274 txt_file = f"groq_result_{timestamp}.txt" 275 with open(txt_file, 'w', encoding='utf-8') as f: 276 f.write(result['synthesis']) 277 278 print(f"\nš¾ Results saved to:") 279 print(f" ⢠{json_file} (structured data)") 280 print(f" ⢠{txt_file} (readable report)") 281 print("\n" + "="*60) 282 283 except Exception as e: 284 print(f"\nā Error: {e}") 285 286 if __name__ == "__main__": 287 asyncio.run(main())