/ src / solace_agent_mesh / agent / adk / models / dynamic_model_provider.py
dynamic_model_provider.py
  1  """
  2  Dynamic Model Provider for enterprise model configuration.
  3  """
  4  
  5  import asyncio
  6  import logging
  7  import threading
  8  from typing import Any, Dict, List, Optional, Union
  9  
 10  from solace_agent_mesh.agent.adk.models.lite_llm import LiteLlm
 11  from solace_ai_connector.components.component_base import ComponentBase
 12  from solace_ai_connector.common.message import Message as SolaceMessage
 13  
 14  from .dynamic_model_provider_topics import (
 15      extract_model_id_from_topic,
 16      get_bootstrap_request_topic,
 17      get_bootstrap_response_topic,
 18      get_model_config_update_topic,
 19  )
 20  
 21  log = logging.getLogger(__name__)
 22  
 23  _MAX_BOOTSTRAP_RETRIES = 3
 24  _BOOTSTRAP_RETRY_INTERVAL_SECONDS = 5
 25  _RESOLVE_TIMEOUT_SECONDS = 10.0
 26  
 27  
 28  # SAC Component Info for ModelConfigReceiverComponent
 29  _receiver_info = {
 30      "class_name": "ModelConfigReceiverComponent",
 31      "description": (
 32          "Receives model configuration messages from a BrokerInput and applies them to the "
 33          "DynamicModelProvider's LiteLlm instance."
 34      ),
 35      "config_parameters": [
 36          {
 37              "name": "model_provider_ref",
 38              "required": True,
 39              "type": "object",
 40              "description": "A direct reference to the DynamicModelProvider instance.",
 41          }
 42      ],
 43      "input_schema": {
 44          "type": "object",
 45          "description": "Output from a BrokerInput component.",
 46          "properties": {
 47              "payload": {"type": "any", "description": "The message payload."},
 48              "topic": {"type": "string", "description": "The message topic."},
 49              "user_properties": {
 50                  "type": "object",
 51                  "description": "User properties of the message.",
 52              },
 53          },
 54          "required": ["payload", "topic"],
 55      },
 56      "output_schema": None,
 57  }
 58  
 59  
 60  class ModelConfigReceiverComponent(ComponentBase):
 61      """
 62      A SAC component that receives model configuration messages and updates
 63      the DynamicModelProvider accordingly.
 64      """
 65      info = _receiver_info
 66  
 67      def __init__(self, **kwargs: Any):
 68          super().__init__(_receiver_info, **kwargs)
 69          self.model_provider = self.get_config("model_provider_ref")
 70          if not isinstance(self.model_provider, DynamicModelProvider):
 71              log.error(
 72                  "%s Configuration 'model_provider_ref' is not a valid DynamicModelProvider instance. Type: %s",
 73                  self.log_identifier,
 74                  type(self.model_provider),
 75              )
 76              raise ValueError(
 77                  f"{self.log_identifier} 'model_provider_ref' must be a DynamicModelProvider instance."
 78              )
 79          log.info("%s ModelConfigReceiverComponent initialized.", self.log_identifier)
 80  
 81      def invoke(self, message: SolaceMessage, data: Dict[str, Any]) -> None:
 82          """Process an incoming model configuration message.
 83  
 84          Routes by model_id extracted from the topic:
 85          - If model_id matches this provider's configured model, updates
 86            the LiteLlm instance (bootstrap response or config change).
 87          - Otherwise, completes any pending one-shot resolve futures for
 88            that model_id (per-request alias resolution).
 89          """
 90          log_id_prefix = f"{self.log_identifier}[Invoke]"
 91          try:
 92              topic = data.get("topic", "")
 93              payload = data.get("payload", {})
 94  
 95              log.info(
 96                  "%s Received model config message on topic: %s",
 97                  log_id_prefix,
 98                  topic,
 99              )
100  
101              model_config = payload.get("model_config")
102  
103              topic_model_id, _ = extract_model_id_from_topic(topic)
104  
105              if topic_model_id == self.model_provider.model_id:
106                  if model_config:
107                      log.info(
108                          "%s Model config found, updating LiteLlm: %s",
109                          log_id_prefix,
110                          model_config.get('model', 'N/A'),
111                      )
112                      self.model_provider.mark_initialized()
113                      self.model_provider.update_litellm_model(model_config)
114                  else:
115                      log.info(
116                          "%s No model config or empty config, removing LiteLlm model",
117                          log_id_prefix,
118                      )
119                      self.model_provider.remove_litellm_model()
120  
121              # Complete any pending per-request resolve futures for this alias.
122              # This runs for ALL aliases, including the agent's own model_id,
123              # so that resolve() works when overriding with the agent's default.
124              # When model_config is None (model removal), this resolves any
125              # in-flight awaiter with None as an explicit "config unavailable" signal.
126              self.model_provider.complete_pending_resolve(
127                  topic_model_id, model_config
128              )
129  
130              message.call_acknowledgements()
131  
132          except Exception as e:
133              log.exception(
134                  "%s Error in ModelConfigReceiverComponent invoke: %s",
135                  log_id_prefix,
136                  e,
137              )
138              if message:
139                  message.call_negative_acknowledgements()
140              raise
141          return None
142  
143  
144  class DynamicModelProvider:
145  
146      def __init__(self, component: ComponentBase, litellm_instance: LiteLlm, model_id: str):
147          self._component = component
148          self._litellm_instance = litellm_instance
149          self._model_id = model_id
150  
151          # Internal SAC flow for subscribing to model config updates
152          self._internal_app = None
153          self._broker_input = None
154  
155          # Initial model configuration
156          self._initialized = False
157  
158          # Event loop reference, captured during initialize() for cross-thread use
159          self._loop: Optional[asyncio.AbstractEventLoop] = None
160  
161          # One-shot alias resolution state (thread-safe)
162          self._pending_resolves: Dict[str, List[asyncio.Future]] = {}
163          self._resolve_lock = threading.Lock()
164  
165          asyncio.create_task(self.initialize())
166  
167      @property
168      def model_id(self) -> str:
169          """The model identifier this provider is configured for."""
170          return self._model_id
171  
172      async def initialize(self):
173          """
174          Initialize the DynamicModelProvider by starting to listen for model config changes.
175          """
176          self._loop = asyncio.get_running_loop()
177          await self.listen_for_model_config_change()
178  
179          # Call request_model_config up to 3 times, once every 5 seconds, until initialized
180          for i in range(_MAX_BOOTSTRAP_RETRIES):
181              await self.request_model_config()
182              await asyncio.sleep(_BOOTSTRAP_RETRY_INTERVAL_SECONDS)
183              if self._initialized:
184                  break
185          
186          if not self._initialized:
187              log.warning(
188                  "%s Model configuration not received after multiple attempts. LiteLlm instance may not be configured.",
189                  self._component.log_identifier,
190              )
191      def mark_initialized(self):
192          """
193          Mark the provider as initialized. This can be called by the receiver component
194          when a valid model config is received.
195          """
196          self._initialized = True
197  
198      def update_litellm_model(self, model_config: Union[str, Dict[str, Any]]) -> None:
199          """
200          Update the LiteLlm instance with the new model configuration.
201  
202          Args:
203              model_config: The new model configuration (model name or config dict).
204          """
205          log.info("Updating LiteLlm instance with new model: %s", model_config.get('model', 'N/A') if isinstance(model_config, dict) else model_config)
206          self._litellm_instance.configure_model(model_config)
207  
208      def remove_litellm_model(self) -> None:
209          """
210          Remove the model configuration from the LiteLlm instance.
211          """
212          log.info("Removing model configuration from LiteLlm instance.")
213          self._litellm_instance.unconfigure_model()
214  
215      async def request_model_config(self) -> None:
216          """
217          Request model configuration for the component.
218          """
219          component_id = self._component.get_component_id()
220          log.info("Requesting model configuration for LiteLlm instance for component %s", component_id)
221          topic = get_bootstrap_request_topic(self._component.namespace, self._model_id)
222          payload = {
223              "component_id": component_id,
224              "component_type": self._component._get_component_type(),
225              "reply_to": get_bootstrap_response_topic(self._component.namespace, self._model_id, component_id),
226              "model_id": self._model_id,
227          }
228          self._component.publish_a2a_message(
229              payload=payload,
230              topic=topic
231          )
232          log.debug("Published model config request to topic %s with payload: %s", topic, payload)
233  
234      def _ensure_config_listener_flow_is_running(self) -> None:
235          """
236          Ensures the internal SAC flow for model config updates is created and running.
237          This method is designed to be called once during component startup.
238          """
239          log_id_prefix = f"[DynamicModelProvider][EnsureConfigFlow]"
240          if self._internal_app is not None:
241              log.debug("%s Config listener flow already running.", log_id_prefix)
242              return
243  
244          log.info("%s Initializing internal model config listener flow...", log_id_prefix)
245          try:
246              main_app = self._component.get_app()
247              if not main_app or not main_app.connector:
248                  log.error(
249                      "%s Cannot get main app or connector instance. Config listener flow NOT started.",
250                      log_id_prefix,
251                  )
252                  raise RuntimeError(
253                      "Main app or connector not available for internal flow creation."
254                  )
255  
256              main_broker_config = main_app.app_info.get("broker", {})
257              if not main_broker_config:
258                  log.error(
259                      "%s Main app broker configuration not found. Config listener flow NOT started.",
260                      log_id_prefix,
261                  )
262                  raise ValueError("Main app broker configuration is missing.")
263  
264              config_update_topic = get_model_config_update_topic(self._component.namespace, self._model_id)
265              # Wildcard subscription catches both own-model bootstrap responses
266              # and one-shot alias resolution responses for any model_id
267              config_bootstrap_topic = get_bootstrap_response_topic(self._component.namespace, "*", self._component.get_component_id())
268              component_id = self._component.get_component_id()
269  
270              broker_input_cfg = {
271                  "component_module": "broker_input",
272                  "component_name": f"{component_id}_model_config_broker_input",
273                  "broker_queue_name": f"{self._component.namespace}q/model_config/{component_id}",
274                  "create_queue_on_start": True,
275                  "component_config": {
276                      **main_broker_config,
277                      "broker_subscriptions": [
278                          {"topic": config_update_topic},
279                          {"topic": config_bootstrap_topic}
280                          ],
281                  },
282              }
283  
284              receiver_cfg = {
285                  "component_class": ModelConfigReceiverComponent,
286                  "component_name": f"{component_id}_model_config_receiver",
287                  "component_config": {
288                      "model_provider_ref": self
289                  },
290              }
291  
292              flow_config = {
293                  "name": f"{component_id}_model_config_flow",
294                  "components": [broker_input_cfg, receiver_cfg],
295              }
296  
297              internal_app_broker_config = main_broker_config.copy()
298              internal_app_broker_config["input_enabled"] = True
299              internal_app_broker_config["output_enabled"] = False
300  
301              app_config_for_internal_flow = {
302                  "name": f"{component_id}_model_config_internal_app",
303                  "flows": [flow_config],
304                  "broker": internal_app_broker_config,
305                  "app_config": {},
306              }
307  
308              self._internal_app = main_app.connector.create_internal_app(
309                  app_name=app_config_for_internal_flow["name"],
310                  flows=app_config_for_internal_flow["flows"],
311              )
312  
313              if not self._internal_app or not self._internal_app.flows:
314                  log.error(
315                      "%s Failed to create internal model config app/flow.",
316                      log_id_prefix,
317                  )
318                  self._internal_app = None
319                  raise RuntimeError("Internal model config app/flow creation failed.")
320  
321              self._internal_app.run()
322              log.info("%s Internal model config app started.", log_id_prefix)
323  
324              flow_instance = self._internal_app.flows[0]
325              if flow_instance.component_groups and flow_instance.component_groups[0]:
326                  from solace_ai_connector.components.inputs_outputs.broker_input import BrokerInput
327                  self._broker_input = flow_instance.component_groups[0][0]
328                  if not isinstance(self._broker_input, BrokerInput):
329                      log.error(
330                          "%s First component in config flow is not BrokerInput. Type: %s",
331                          log_id_prefix,
332                          type(self._broker_input).__name__,
333                      )
334                      self._broker_input = None
335                      raise RuntimeError(
336                          "Config listener flow setup error: BrokerInput not found."
337                      )
338                  log.debug(
339                      "%s Obtained reference to internal BrokerInput component.",
340                      log_id_prefix,
341                  )
342              else:
343                  log.error(
344                      "%s Could not get BrokerInput instance from internal flow.",
345                      log_id_prefix,
346                  )
347                  raise RuntimeError(
348                      "Config listener flow setup error: BrokerInput instance not accessible."
349                  )
350  
351          except Exception as e:
352              log.exception(
353                  "%s Failed to ensure config listener flow is running: %s",
354                  log_id_prefix,
355                  e,
356              )
357              if self._internal_app:
358                  try:
359                      self._internal_app.cleanup()
360                  except Exception as cleanup_err:
361                      log.error(
362                          "%s Error during cleanup after config flow init failure: %s",
363                          log_id_prefix,
364                          cleanup_err,
365                      )
366              self._internal_app = None
367              self._broker_input = None
368              raise
369  
370      async def listen_for_model_config_change(self) -> None:
371          """
372          Listen for changes in the model configuration.
373          Sets up the internal SAC flow to subscribe to model config updates.
374          """
375          log.info("Setting up model configuration listener...")
376          self._ensure_config_listener_flow_is_running()
377  
378      async def resolve(
379          self, alias: str, timeout: float = _RESOLVE_TIMEOUT_SECONDS
380      ) -> Optional[Dict[str, Any]]:
381          """Resolve a model config alias to a raw LiteLLM config dict.
382  
383          Sends a bootstrap request to the platform service and awaits the
384          response. Concurrent requests for the same alias are de-duplicated:
385          only the first publishes a broker message; subsequent callers
386          piggyback on the pending response.
387  
388          Returns None on timeout or if the platform returns no config.
389          """
390          if not self._internal_app:
391              log.warning(
392                  "%s Cannot resolve alias '%s' — listener not running",
393                  self._component.log_identifier,
394                  alias,
395              )
396              return None
397  
398          loop = asyncio.get_running_loop()
399          future: asyncio.Future = loop.create_future()
400  
401          with self._resolve_lock:
402              if alias in self._pending_resolves:
403                  self._pending_resolves[alias].append(future)
404                  should_publish = False
405              else:
406                  self._pending_resolves[alias] = [future]
407                  should_publish = True
408  
409          if should_publish:
410              component_id = self._component.get_component_id()
411              response_topic = get_bootstrap_response_topic(
412                  self._component.namespace, alias, component_id
413              )
414              self._component.publish_a2a_message(
415                  payload={
416                      "model_id": alias,
417                      "reply_to": response_topic,
418                      "component_id": component_id,
419                      "component_type": self._component._get_component_type(),
420                  },
421                  topic=get_bootstrap_request_topic(
422                      self._component.namespace, alias
423                  ),
424              )
425  
426          try:
427              result = await asyncio.wait_for(future, timeout=timeout)
428              return result
429          except asyncio.TimeoutError:
430              log.warning(
431                  "%s Model alias resolution timed out for '%s' after %.1fs",
432                  self._component.log_identifier,
433                  alias,
434                  timeout,
435              )
436              return None
437          finally:
438              with self._resolve_lock:
439                  futures = self._pending_resolves.get(alias, [])
440                  if future in futures:
441                      futures.remove(future)
442                  if not futures:
443                      self._pending_resolves.pop(alias, None)
444  
445      def complete_pending_resolve(
446          self, alias: str, model_config: Optional[Dict[str, Any]]
447      ) -> None:
448          """Complete all pending resolve futures for a given alias.
449  
450          Called from the SAC receiver thread. Uses call_soon_threadsafe to
451          safely set results on asyncio.Futures from a non-event-loop thread.
452          """
453          with self._resolve_lock:
454              futures = self._pending_resolves.pop(alias, [])
455  
456          if not futures:
457              return
458  
459          loop = self._loop
460          for future in futures:
461              if not future.done():
462                  loop.call_soon_threadsafe(future.set_result, model_config)
463  
464      def cleanup(self) -> None:
465          """Cleanup resources when the provider is no longer needed."""
466          log.info("Cleaning up DynamicModelProvider...")
467  
468          with self._resolve_lock:
469              for futures in self._pending_resolves.values():
470                  for future in futures:
471                      if not future.done():
472                          future.cancel()
473              self._pending_resolves.clear()
474  
475          if self._internal_app:
476              log.info("Cleaning up internal model config app...")
477              try:
478                  self._internal_app.cleanup()
479              except Exception as e:
480                  log.error(
481                      "Error cleaning up internal model config app: %s",
482                      e,
483                  )
484          self._internal_app = None
485          self._broker_input = None
486  
487  
488  async def start_model_listener(litellm_instance: LiteLlm, component: ComponentBase, model_provider_id: str):
489      """
490      Start a model configuration listener for the given LiteLlm instance.
491  
492      Subscribes to A2A topic for model config events and calls
493      litellm_instance.configure_model(config) when received.
494  
495      Args:
496          litellm_instance: The LiteLlm instance to configure when model arrives.
497          component: The SamAgentComponent for context (namespace, agent_name, etc.)
498          model_provider_id: The identifier for the model provider
499      """
500      log.info("Starting model '%s' listener for component %s", model_provider_id, component.get_component_id())
501      model_config_provider = DynamicModelProvider(component, litellm_instance, model_provider_id)
502      return model_config_provider