/ cloud_integration_example.py
cloud_integration_example.py
  1  #!/usr/bin/env python3
  2  """
  3  EXAMPLE: Cloud LLM Integration for Orchestrator
  4  Requires: pip install openai anthropic google-generativeai
  5  """
  6  
  7  import os
  8  import asyncio
  9  import aiohttp
 10  
 11  # ========== CONFIGURATION ==========
 12  # Set these environment variables with your API keys
 13  # export OPENAI_API_KEY="your-key"
 14  # export ANTHROPIC_API_KEY="your-key"  
 15  # export GOOGLE_API_KEY="your-key"
 16  
 17  class CloudLLMOrchestrator:
 18      async def delegate_to_openai(self, prompt: str, model: str = "gpt-3.5-turbo"):
 19          """Delegate to OpenAI"""
 20          api_key = os.getenv("OPENAI_API_KEY")
 21          if not api_key:
 22              return "[OpenAI API key not set]"
 23          
 24          async with aiohttp.ClientSession() as session:
 25              async with session.post(
 26                  "https://api.openai.com/v1/chat/completions",
 27                  headers={
 28                      "Authorization": f"Bearer {api_key}",
 29                      "Content-Type": "application/json"
 30                  },
 31                  json={
 32                      "model": model,
 33                      "messages": [{"role": "user", "content": prompt}],
 34                      "max_tokens": 500,
 35                      "temperature": 0.7
 36                  }
 37              ) as response:
 38                  data = await response.json()
 39                  return data.get("choices", [{}])[0].get("message", {}).get("content", "No response")
 40      
 41      async def delegate_to_anthropic(self, prompt: str, model: str = "claude-3-haiku-20240307"):
 42          """Delegate to Anthropic Claude"""
 43          api_key = os.getenv("ANTHROPIC_API_KEY")
 44          if not api_key:
 45              return "[Anthropic API key not set]"
 46          
 47          async with aiohttp.ClientSession() as session:
 48              async with session.post(
 49                  "https://api.anthropic.com/v1/messages",
 50                  headers={
 51                      "x-api-key": api_key,
 52                      "anthropic-version": "2023-06-01",
 53                      "content-type": "application/json"
 54                  },
 55                  json={
 56                      "model": model,
 57                      "max_tokens": 500,
 58                      "messages": [{"role": "user", "content": prompt}]
 59                  }
 60              ) as response:
 61                  data = await response.json()
 62                  return data.get("content", [{}])[0].get("text", "No response")
 63      
 64      async def delegate_to_google(self, prompt: str, model: str = "gemini-pro"):
 65          """Delegate to Google Gemini"""
 66          api_key = os.getenv("GOOGLE_API_KEY")
 67          if not api_key:
 68              return "[Google API key not set]"
 69          
 70          async with aiohttp.ClientSession() as session:
 71              async with session.post(
 72                  f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}",
 73                  json={
 74                      "contents": [{"parts": [{"text": prompt}]}],
 75                      "generationConfig": {
 76                          "maxOutputTokens": 500,
 77                          "temperature": 0.7
 78                      }
 79                  }
 80              ) as response:
 81                  data = await response.json()
 82                  return data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "No response")
 83      
 84      async def process_in_parallel(self, subtasks: list):
 85          """Process multiple subtasks in parallel using different cloud LLMs"""
 86          tasks = []
 87          
 88          # Assign each subtask to a different cloud LLM
 89          for i, subtask in enumerate(subtasks):
 90              if i % 3 == 0:
 91                  tasks.append(self.delegate_to_openai(f"Explain: {subtask}"))
 92              elif i % 3 == 1:
 93                  tasks.append(self.delegate_to_anthropic(f"Explain: {subtask}"))
 94              else:
 95                  tasks.append(self.delegate_to_google(f"Explain: {subtask}"))
 96          
 97          # Run all tasks in parallel
 98          results = await asyncio.gather(*tasks, return_exceptions=True)
 99          
100          # Handle any exceptions
101          clean_results = []
102          for result in results:
103              if isinstance(result, Exception):
104                  clean_results.append(f"Error: {str(result)}")
105              else:
106                  clean_results.append(result)
107          
108          return clean_results
109  
110  # Example usage
111  async def main():
112      orchestrator = CloudLLMOrchestrator()
113      
114      # Example subtasks
115      subtasks = [
116          "Historical development of relativity theory",
117          "Core principles of special relativity",
118          "Core principles of general relativity",
119          "Real-world applications of relativity"
120      ]
121      
122      print("Starting parallel cloud processing...")
123      results = await orchestrator.process_in_parallel(subtasks)
124      
125      print("\nResults from cloud LLMs:")
126      for i, result in enumerate(results, 1):
127          print(f"\n--- Result {i} (First 200 chars) ---")
128          print(result[:200])
129  
130  if __name__ == "__main__":
131      asyncio.run(main())