dynamic_model_provider.py
1 """ 2 Dynamic Model Provider for enterprise model configuration. 3 """ 4 5 import asyncio 6 import logging 7 import threading 8 from typing import Any, Dict, List, Optional, Union 9 10 from solace_agent_mesh.agent.adk.models.lite_llm import LiteLlm 11 from solace_ai_connector.components.component_base import ComponentBase 12 from solace_ai_connector.common.message import Message as SolaceMessage 13 14 from .dynamic_model_provider_topics import ( 15 extract_model_id_from_topic, 16 get_bootstrap_request_topic, 17 get_bootstrap_response_topic, 18 get_model_config_update_topic, 19 ) 20 21 log = logging.getLogger(__name__) 22 23 _MAX_BOOTSTRAP_RETRIES = 3 24 _BOOTSTRAP_RETRY_INTERVAL_SECONDS = 5 25 _RESOLVE_TIMEOUT_SECONDS = 10.0 26 27 28 # SAC Component Info for ModelConfigReceiverComponent 29 _receiver_info = { 30 "class_name": "ModelConfigReceiverComponent", 31 "description": ( 32 "Receives model configuration messages from a BrokerInput and applies them to the " 33 "DynamicModelProvider's LiteLlm instance." 34 ), 35 "config_parameters": [ 36 { 37 "name": "model_provider_ref", 38 "required": True, 39 "type": "object", 40 "description": "A direct reference to the DynamicModelProvider instance.", 41 } 42 ], 43 "input_schema": { 44 "type": "object", 45 "description": "Output from a BrokerInput component.", 46 "properties": { 47 "payload": {"type": "any", "description": "The message payload."}, 48 "topic": {"type": "string", "description": "The message topic."}, 49 "user_properties": { 50 "type": "object", 51 "description": "User properties of the message.", 52 }, 53 }, 54 "required": ["payload", "topic"], 55 }, 56 "output_schema": None, 57 } 58 59 60 class ModelConfigReceiverComponent(ComponentBase): 61 """ 62 A SAC component that receives model configuration messages and updates 63 the DynamicModelProvider accordingly. 64 """ 65 info = _receiver_info 66 67 def __init__(self, **kwargs: Any): 68 super().__init__(_receiver_info, **kwargs) 69 self.model_provider = self.get_config("model_provider_ref") 70 if not isinstance(self.model_provider, DynamicModelProvider): 71 log.error( 72 "%s Configuration 'model_provider_ref' is not a valid DynamicModelProvider instance. Type: %s", 73 self.log_identifier, 74 type(self.model_provider), 75 ) 76 raise ValueError( 77 f"{self.log_identifier} 'model_provider_ref' must be a DynamicModelProvider instance." 78 ) 79 log.info("%s ModelConfigReceiverComponent initialized.", self.log_identifier) 80 81 def invoke(self, message: SolaceMessage, data: Dict[str, Any]) -> None: 82 """Process an incoming model configuration message. 83 84 Routes by model_id extracted from the topic: 85 - If model_id matches this provider's configured model, updates 86 the LiteLlm instance (bootstrap response or config change). 87 - Otherwise, completes any pending one-shot resolve futures for 88 that model_id (per-request alias resolution). 89 """ 90 log_id_prefix = f"{self.log_identifier}[Invoke]" 91 try: 92 topic = data.get("topic", "") 93 payload = data.get("payload", {}) 94 95 log.info( 96 "%s Received model config message on topic: %s", 97 log_id_prefix, 98 topic, 99 ) 100 101 model_config = payload.get("model_config") 102 103 topic_model_id, _ = extract_model_id_from_topic(topic) 104 105 if topic_model_id == self.model_provider.model_id: 106 if model_config: 107 log.info( 108 "%s Model config found, updating LiteLlm: %s", 109 log_id_prefix, 110 model_config.get('model', 'N/A'), 111 ) 112 self.model_provider.mark_initialized() 113 self.model_provider.update_litellm_model(model_config) 114 else: 115 log.info( 116 "%s No model config or empty config, removing LiteLlm model", 117 log_id_prefix, 118 ) 119 self.model_provider.remove_litellm_model() 120 121 # Complete any pending per-request resolve futures for this alias. 122 # This runs for ALL aliases, including the agent's own model_id, 123 # so that resolve() works when overriding with the agent's default. 124 # When model_config is None (model removal), this resolves any 125 # in-flight awaiter with None as an explicit "config unavailable" signal. 126 self.model_provider.complete_pending_resolve( 127 topic_model_id, model_config 128 ) 129 130 message.call_acknowledgements() 131 132 except Exception as e: 133 log.exception( 134 "%s Error in ModelConfigReceiverComponent invoke: %s", 135 log_id_prefix, 136 e, 137 ) 138 if message: 139 message.call_negative_acknowledgements() 140 raise 141 return None 142 143 144 class DynamicModelProvider: 145 146 def __init__(self, component: ComponentBase, litellm_instance: LiteLlm, model_id: str): 147 self._component = component 148 self._litellm_instance = litellm_instance 149 self._model_id = model_id 150 151 # Internal SAC flow for subscribing to model config updates 152 self._internal_app = None 153 self._broker_input = None 154 155 # Initial model configuration 156 self._initialized = False 157 158 # Event loop reference, captured during initialize() for cross-thread use 159 self._loop: Optional[asyncio.AbstractEventLoop] = None 160 161 # One-shot alias resolution state (thread-safe) 162 self._pending_resolves: Dict[str, List[asyncio.Future]] = {} 163 self._resolve_lock = threading.Lock() 164 165 asyncio.create_task(self.initialize()) 166 167 @property 168 def model_id(self) -> str: 169 """The model identifier this provider is configured for.""" 170 return self._model_id 171 172 async def initialize(self): 173 """ 174 Initialize the DynamicModelProvider by starting to listen for model config changes. 175 """ 176 self._loop = asyncio.get_running_loop() 177 await self.listen_for_model_config_change() 178 179 # Call request_model_config up to 3 times, once every 5 seconds, until initialized 180 for i in range(_MAX_BOOTSTRAP_RETRIES): 181 await self.request_model_config() 182 await asyncio.sleep(_BOOTSTRAP_RETRY_INTERVAL_SECONDS) 183 if self._initialized: 184 break 185 186 if not self._initialized: 187 log.warning( 188 "%s Model configuration not received after multiple attempts. LiteLlm instance may not be configured.", 189 self._component.log_identifier, 190 ) 191 def mark_initialized(self): 192 """ 193 Mark the provider as initialized. This can be called by the receiver component 194 when a valid model config is received. 195 """ 196 self._initialized = True 197 198 def update_litellm_model(self, model_config: Union[str, Dict[str, Any]]) -> None: 199 """ 200 Update the LiteLlm instance with the new model configuration. 201 202 Args: 203 model_config: The new model configuration (model name or config dict). 204 """ 205 log.info("Updating LiteLlm instance with new model: %s", model_config.get('model', 'N/A') if isinstance(model_config, dict) else model_config) 206 self._litellm_instance.configure_model(model_config) 207 208 def remove_litellm_model(self) -> None: 209 """ 210 Remove the model configuration from the LiteLlm instance. 211 """ 212 log.info("Removing model configuration from LiteLlm instance.") 213 self._litellm_instance.unconfigure_model() 214 215 async def request_model_config(self) -> None: 216 """ 217 Request model configuration for the component. 218 """ 219 component_id = self._component.get_component_id() 220 log.info("Requesting model configuration for LiteLlm instance for component %s", component_id) 221 topic = get_bootstrap_request_topic(self._component.namespace, self._model_id) 222 payload = { 223 "component_id": component_id, 224 "component_type": self._component._get_component_type(), 225 "reply_to": get_bootstrap_response_topic(self._component.namespace, self._model_id, component_id), 226 "model_id": self._model_id, 227 } 228 self._component.publish_a2a_message( 229 payload=payload, 230 topic=topic 231 ) 232 log.debug("Published model config request to topic %s with payload: %s", topic, payload) 233 234 def _ensure_config_listener_flow_is_running(self) -> None: 235 """ 236 Ensures the internal SAC flow for model config updates is created and running. 237 This method is designed to be called once during component startup. 238 """ 239 log_id_prefix = f"[DynamicModelProvider][EnsureConfigFlow]" 240 if self._internal_app is not None: 241 log.debug("%s Config listener flow already running.", log_id_prefix) 242 return 243 244 log.info("%s Initializing internal model config listener flow...", log_id_prefix) 245 try: 246 main_app = self._component.get_app() 247 if not main_app or not main_app.connector: 248 log.error( 249 "%s Cannot get main app or connector instance. Config listener flow NOT started.", 250 log_id_prefix, 251 ) 252 raise RuntimeError( 253 "Main app or connector not available for internal flow creation." 254 ) 255 256 main_broker_config = main_app.app_info.get("broker", {}) 257 if not main_broker_config: 258 log.error( 259 "%s Main app broker configuration not found. Config listener flow NOT started.", 260 log_id_prefix, 261 ) 262 raise ValueError("Main app broker configuration is missing.") 263 264 config_update_topic = get_model_config_update_topic(self._component.namespace, self._model_id) 265 # Wildcard subscription catches both own-model bootstrap responses 266 # and one-shot alias resolution responses for any model_id 267 config_bootstrap_topic = get_bootstrap_response_topic(self._component.namespace, "*", self._component.get_component_id()) 268 component_id = self._component.get_component_id() 269 270 broker_input_cfg = { 271 "component_module": "broker_input", 272 "component_name": f"{component_id}_model_config_broker_input", 273 "broker_queue_name": f"{self._component.namespace}q/model_config/{component_id}", 274 "create_queue_on_start": True, 275 "component_config": { 276 **main_broker_config, 277 "broker_subscriptions": [ 278 {"topic": config_update_topic}, 279 {"topic": config_bootstrap_topic} 280 ], 281 }, 282 } 283 284 receiver_cfg = { 285 "component_class": ModelConfigReceiverComponent, 286 "component_name": f"{component_id}_model_config_receiver", 287 "component_config": { 288 "model_provider_ref": self 289 }, 290 } 291 292 flow_config = { 293 "name": f"{component_id}_model_config_flow", 294 "components": [broker_input_cfg, receiver_cfg], 295 } 296 297 internal_app_broker_config = main_broker_config.copy() 298 internal_app_broker_config["input_enabled"] = True 299 internal_app_broker_config["output_enabled"] = False 300 301 app_config_for_internal_flow = { 302 "name": f"{component_id}_model_config_internal_app", 303 "flows": [flow_config], 304 "broker": internal_app_broker_config, 305 "app_config": {}, 306 } 307 308 self._internal_app = main_app.connector.create_internal_app( 309 app_name=app_config_for_internal_flow["name"], 310 flows=app_config_for_internal_flow["flows"], 311 ) 312 313 if not self._internal_app or not self._internal_app.flows: 314 log.error( 315 "%s Failed to create internal model config app/flow.", 316 log_id_prefix, 317 ) 318 self._internal_app = None 319 raise RuntimeError("Internal model config app/flow creation failed.") 320 321 self._internal_app.run() 322 log.info("%s Internal model config app started.", log_id_prefix) 323 324 flow_instance = self._internal_app.flows[0] 325 if flow_instance.component_groups and flow_instance.component_groups[0]: 326 from solace_ai_connector.components.inputs_outputs.broker_input import BrokerInput 327 self._broker_input = flow_instance.component_groups[0][0] 328 if not isinstance(self._broker_input, BrokerInput): 329 log.error( 330 "%s First component in config flow is not BrokerInput. Type: %s", 331 log_id_prefix, 332 type(self._broker_input).__name__, 333 ) 334 self._broker_input = None 335 raise RuntimeError( 336 "Config listener flow setup error: BrokerInput not found." 337 ) 338 log.debug( 339 "%s Obtained reference to internal BrokerInput component.", 340 log_id_prefix, 341 ) 342 else: 343 log.error( 344 "%s Could not get BrokerInput instance from internal flow.", 345 log_id_prefix, 346 ) 347 raise RuntimeError( 348 "Config listener flow setup error: BrokerInput instance not accessible." 349 ) 350 351 except Exception as e: 352 log.exception( 353 "%s Failed to ensure config listener flow is running: %s", 354 log_id_prefix, 355 e, 356 ) 357 if self._internal_app: 358 try: 359 self._internal_app.cleanup() 360 except Exception as cleanup_err: 361 log.error( 362 "%s Error during cleanup after config flow init failure: %s", 363 log_id_prefix, 364 cleanup_err, 365 ) 366 self._internal_app = None 367 self._broker_input = None 368 raise 369 370 async def listen_for_model_config_change(self) -> None: 371 """ 372 Listen for changes in the model configuration. 373 Sets up the internal SAC flow to subscribe to model config updates. 374 """ 375 log.info("Setting up model configuration listener...") 376 self._ensure_config_listener_flow_is_running() 377 378 async def resolve( 379 self, alias: str, timeout: float = _RESOLVE_TIMEOUT_SECONDS 380 ) -> Optional[Dict[str, Any]]: 381 """Resolve a model config alias to a raw LiteLLM config dict. 382 383 Sends a bootstrap request to the platform service and awaits the 384 response. Concurrent requests for the same alias are de-duplicated: 385 only the first publishes a broker message; subsequent callers 386 piggyback on the pending response. 387 388 Returns None on timeout or if the platform returns no config. 389 """ 390 if not self._internal_app: 391 log.warning( 392 "%s Cannot resolve alias '%s' — listener not running", 393 self._component.log_identifier, 394 alias, 395 ) 396 return None 397 398 loop = asyncio.get_running_loop() 399 future: asyncio.Future = loop.create_future() 400 401 with self._resolve_lock: 402 if alias in self._pending_resolves: 403 self._pending_resolves[alias].append(future) 404 should_publish = False 405 else: 406 self._pending_resolves[alias] = [future] 407 should_publish = True 408 409 if should_publish: 410 component_id = self._component.get_component_id() 411 response_topic = get_bootstrap_response_topic( 412 self._component.namespace, alias, component_id 413 ) 414 self._component.publish_a2a_message( 415 payload={ 416 "model_id": alias, 417 "reply_to": response_topic, 418 "component_id": component_id, 419 "component_type": self._component._get_component_type(), 420 }, 421 topic=get_bootstrap_request_topic( 422 self._component.namespace, alias 423 ), 424 ) 425 426 try: 427 result = await asyncio.wait_for(future, timeout=timeout) 428 return result 429 except asyncio.TimeoutError: 430 log.warning( 431 "%s Model alias resolution timed out for '%s' after %.1fs", 432 self._component.log_identifier, 433 alias, 434 timeout, 435 ) 436 return None 437 finally: 438 with self._resolve_lock: 439 futures = self._pending_resolves.get(alias, []) 440 if future in futures: 441 futures.remove(future) 442 if not futures: 443 self._pending_resolves.pop(alias, None) 444 445 def complete_pending_resolve( 446 self, alias: str, model_config: Optional[Dict[str, Any]] 447 ) -> None: 448 """Complete all pending resolve futures for a given alias. 449 450 Called from the SAC receiver thread. Uses call_soon_threadsafe to 451 safely set results on asyncio.Futures from a non-event-loop thread. 452 """ 453 with self._resolve_lock: 454 futures = self._pending_resolves.pop(alias, []) 455 456 if not futures: 457 return 458 459 loop = self._loop 460 for future in futures: 461 if not future.done(): 462 loop.call_soon_threadsafe(future.set_result, model_config) 463 464 def cleanup(self) -> None: 465 """Cleanup resources when the provider is no longer needed.""" 466 log.info("Cleaning up DynamicModelProvider...") 467 468 with self._resolve_lock: 469 for futures in self._pending_resolves.values(): 470 for future in futures: 471 if not future.done(): 472 future.cancel() 473 self._pending_resolves.clear() 474 475 if self._internal_app: 476 log.info("Cleaning up internal model config app...") 477 try: 478 self._internal_app.cleanup() 479 except Exception as e: 480 log.error( 481 "Error cleaning up internal model config app: %s", 482 e, 483 ) 484 self._internal_app = None 485 self._broker_input = None 486 487 488 async def start_model_listener(litellm_instance: LiteLlm, component: ComponentBase, model_provider_id: str): 489 """ 490 Start a model configuration listener for the given LiteLlm instance. 491 492 Subscribes to A2A topic for model config events and calls 493 litellm_instance.configure_model(config) when received. 494 495 Args: 496 litellm_instance: The LiteLlm instance to configure when model arrives. 497 component: The SamAgentComponent for context (namespace, agent_name, etc.) 498 model_provider_id: The identifier for the model provider 499 """ 500 log.info("Starting model '%s' listener for component %s", model_provider_id, component.get_component_id()) 501 model_config_provider = DynamicModelProvider(component, litellm_instance, model_provider_id) 502 return model_config_provider