/ docs / docs / documentation / developing / create-gateways.md
create-gateways.md
  1  ---
  2  title: Creating Custom Gateways
  3  sidebar_position: 430
  4  ---
  5  
  6  import NetworkAccessMayBeRequired from '@site/docs/partials/network-access-may-be-required.mdx';
  7  
  8  # Creating Custom Gateways
  9  
 10  Gateway adapters connect external systems to the Agent Mesh through custom interfaces. They translate between platform-specific formats (Slack messages, HTTP requests, webhook payloads) and the standardized A2A protocol that agents understand. Gateway adapters handle platform events, extract user identity, and deliver agent responses back to users in the appropriate format.
 11  
 12  This guide walks you through creating custom gateways using the gateway adapter pattern.
 13  
 14  :::tip[In one sentence]
 15  Gateway adapters are custom interfaces that connect external platforms to the agent mesh by translating events and responses between platform formats and the A2A protocol.
 16  :::
 17  
 18  <NetworkAccessMayBeRequired />
 19  
 20  ## Key Functions
 21  
 22  Gateway adapters provide the following capabilities:
 23  
 24  1. **Platform Integration**: Connect external systems such as Slack, webhooks, REST APIs, or custom protocols to the agent mesh. Handle incoming events, commands, and messages from these platforms.
 25  
 26  2. **Authentication**: Extract user identity from platform events (OAuth tokens, API keys, platform user IDs) and integrate with identity providers for user enrichment.
 27  
 28  3. **Message Translation**: Convert platform-specific message formats into standardized content that agents can process. Transform agent responses back into platform-native formats.
 29  
 30  4. **Streaming Responses**: Deliver real-time updates from agents to users as they are generated. Handle text streaming, status updates, and progress indicators.
 31  
 32  5. **File Management**: Process file uploads from users and deliver agent-generated artifacts (reports, images, data files) back to the platform in the appropriate format.
 33  
 34  6. **Built-in Services**: Access state management for maintaining conversation context, timer scheduling for delayed operations, and feedback collection for gathering user ratings.
 35  
 36  ## Architecture Overview
 37  
 38  Gateway adapters work alongside a generic gateway component to connect platforms to the agent mesh:
 39  
 40  - **Gateway Adapter**: Your platform-specific code that receives events from external systems and formats responses for delivery back to users
 41  - **Generic Gateway Component**: Handles A2A protocol communication, authentication flow, user enrichment, message routing, and artifact management
 42  
 43  The following diagram illustrates the complete flow from external platform to agent mesh and back:
 44  
 45  ```mermaid
 46  sequenceDiagram
 47      participant Platform as External Platform
 48      participant Adapter as Gateway Adapter
 49      participant Generic as Generic Gateway
 50      participant Mesh as Agent Mesh
 51  
 52      rect rgba(234, 234, 234, 1)
 53          Note over Platform,Adapter: 1. External Event Arrives
 54          Platform->>Adapter: Platform Event
 55      end
 56  
 57      rect rgba(234, 234, 234, 1)
 58          Note over Adapter,Generic: 2. Adapter Processes Event
 59          Adapter->>Adapter: extract_auth_claims()
 60          Adapter->>Adapter: prepare_task()
 61          Adapter->>Generic: handle_external_input(SamTask)
 62      end
 63  
 64      rect rgba(234, 234, 234, 1)
 65          Note over Generic,Mesh: 3. Generic Gateway Handles A2A
 66          Generic->>Generic: Authenticate & Enrich User
 67          Generic->>Generic: Convert to A2A Format
 68          Generic->>Mesh: Submit A2A Task
 69      end
 70  
 71      rect rgba(234, 234, 234, 1)
 72          Note over Mesh,Adapter: 4. Response Flow
 73          Mesh-->>Generic: A2A Updates
 74          Generic->>Generic: Convert to SAM Types
 75          Generic->>Adapter: handle_update(SamUpdate)
 76          Adapter->>Platform: Platform Response
 77      end
 78  
 79      %%{init: {
 80          'theme': 'base',
 81          'themeVariables': {
 82              'actorBkg': '#00C895',
 83              'actorBorder': '#00C895',
 84              'actorTextColor': '#000000',
 85              'noteBkgColor': '#FFF7C2',
 86              'noteTextColor': '#000000',
 87              'noteBorderColor': '#FFF7C2'
 88          }
 89      }}%%
 90  ```
 91  
 92  ## Core Concepts
 93  
 94  ### The Gateway Adapter Contract
 95  
 96  Gateway adapters extend the `GatewayAdapter` base class and implement methods for handling platform-specific events:
 97  
 98  - Extract authentication information from platform events
 99  - Convert platform events into standardized task format
100  - Format and send responses back to the platform
101  
102  ### The Gateway Context
103  
104  When your adapter initializes, it receives a `GatewayContext` object that provides access to framework services:
105  
106  - Task submission and cancellation
107  - Artifact loading and management
108  - User feedback collection
109  - State management (task-level and session-level)
110  - Timer scheduling
111  
112  ### The Type System
113  
114  Gateway adapters use a standardized type system for messages:
115  
116  - **SamTask**: An inbound request with content parts and metadata
117  - **SamUpdate**: An outbound update containing one or more content parts
118  - **SamTextPart**: Text content in tasks or updates
119  - **SamFilePart**: File content with bytes or URI
120  - **SamDataPart**: Structured data with metadata
121  - **AuthClaims**: User authentication information
122  - **ResponseContext**: Context provided with each outbound callback
123  
124  ## Adapter Lifecycle
125  
126  Gateway adapters follow a simple lifecycle:
127  
128  ### Initialization
129  
130  The `init()` method is called when the gateway starts:
131  
132  ```python
133  async def init(self, context: GatewayContext) -> None:
134      """
135      Initialize the gateway adapter.
136  
137      This is where you should:
138      - Store the context for later use
139      - Start platform listeners (WebSocket, HTTP server, etc.)
140      - Connect to external services
141      """
142      self.context = context
143      # Initialize your platform connection
144      await self.start_platform_listener()
145  ```
146  
147  ### Active Processing
148  
149  During normal operation, the generic gateway calls your adapter methods:
150  
151  - `extract_auth_claims()` - Extract user identity from platform events
152  - `prepare_task()` - Convert platform events to SamTask format
153  - `handle_update()` - Process updates from agents
154  - `handle_task_complete()` - Handle task completion
155  - `handle_error()` - Handle errors
156  
157  ### Cleanup
158  
159  The `cleanup()` method is called during shutdown:
160  
161  ```python
162  async def cleanup(self) -> None:
163      """
164      Clean up resources on shutdown.
165  
166      This is where you should:
167      - Stop platform listeners
168      - Close connections
169      - Release resources
170      """
171      await self.stop_platform_listener()
172  ```
173  
174  ## Implementing an Adapter
175  
176  ### Required Configuration
177  
178  Define a configuration model for your adapter using Pydantic:
179  
180  ```python
181  from pydantic import BaseModel, Field
182  
183  class MyAdapterConfig(BaseModel):
184      """Configuration model for MyAdapter."""
185  
186      api_token: str = Field(..., description="API token for the platform.")
187      webhook_url: str = Field(..., description="Webhook URL to listen on.")
188      timeout_seconds: int = Field(default=30, description="Request timeout.")
189  ```
190  
191  Set the `ConfigModel` class attribute:
192  
193  ```python
194  from solace_agent_mesh.gateway.adapter.base import GatewayAdapter
195  
196  class MyAdapter(GatewayAdapter):
197      ConfigModel = MyAdapterConfig
198  ```
199  
200  ### Authentication: extract_auth_claims()
201  
202  Extract user identity from platform events:
203  
204  ```python
205  async def extract_auth_claims(
206      self,
207      external_input: Dict,
208      endpoint_context: Optional[Dict[str, Any]] = None,
209  ) -> Optional[AuthClaims]:
210      """
211      Extract authentication claims from platform input.
212  
213      Return AuthClaims with user info, or None to use config-based auth.
214      """
215      user_id = external_input.get("user_id")
216      user_email = external_input.get("user_email")
217  
218      if user_id and user_email:
219          return AuthClaims(
220              id=user_email,
221              email=user_email,
222              source="platform_api",
223              raw_context={"platform_user_id": user_id}
224          )
225  
226      return None
227  ```
228  
229  ### Inbound: prepare_task()
230  
231  Convert platform events into standardized task format:
232  
233  ```python
234  async def prepare_task(
235      self,
236      external_input: Dict,
237      endpoint_context: Optional[Dict[str, Any]] = None,
238  ) -> SamTask:
239      """
240      Prepare a task from platform input.
241  
242      This method is called after authentication succeeds. Convert your
243      platform's event format into a SamTask with parts.
244      """
245      message_text = external_input.get("message", "")
246      conversation_id = external_input.get("conversation_id")
247  
248      # Create content parts
249      parts = [self.context.create_text_part(message_text)]
250  
251      # Handle file attachments if present
252      if "attachments" in external_input:
253          for attachment in external_input["attachments"]:
254              file_bytes = await self._download_attachment(attachment)
255              parts.append(
256                  self.context.create_file_part_from_bytes(
257                      name=attachment["filename"],
258                      content_bytes=file_bytes,
259                      mime_type=attachment["mime_type"]
260                  )
261              )
262  
263      return SamTask(
264          parts=parts,
265          session_id=conversation_id,
266          target_agent=self.context.config.get("default_agent_name", "default"),
267          platform_context={
268              "conversation_id": conversation_id,
269              # Store any platform-specific data needed for responses
270          }
271      )
272  ```
273  
274  ### Outbound: handle_update()
275  
276  Process updates from agents and send them to your platform:
277  
278  ```python
279  async def handle_update(self, update: SamUpdate, context: ResponseContext) -> None:
280      """
281      Handle an update from the agent.
282  
283      By default, this dispatches to individual part handlers.
284      Override for custom batch processing.
285      """
286      # Default implementation handles each part type
287      for part in update.parts:
288          if isinstance(part, SamTextPart):
289              await self.handle_text_chunk(part.text, context)
290          elif isinstance(part, SamFilePart):
291              await self.handle_file(part, context)
292          elif isinstance(part, SamDataPart):
293              await self.handle_data_part(part, context)
294  ```
295  
296  Implement individual part handlers:
297  
298  ```python
299  async def handle_text_chunk(self, text: str, context: ResponseContext) -> None:
300      """Handle streaming text chunk from the agent."""
301      conversation_id = context.platform_context["conversation_id"]
302      await self.platform_api.send_message(conversation_id, text)
303  
304  async def handle_file(self, file_part: SamFilePart, context: ResponseContext) -> None:
305      """Handle file/artifact from the agent."""
306      conversation_id = context.platform_context["conversation_id"]
307      await self.platform_api.upload_file(
308          conversation_id,
309          filename=file_part.name,
310          content=file_part.content_bytes
311      )
312  
313  async def handle_data_part(self, data_part: SamDataPart, context: ResponseContext) -> None:
314      """Handle structured data part from the agent."""
315      # Check for special data part types
316      if data_part.data.get("type") == "agent_progress_update":
317          status_text = data_part.data.get("status_text")
318          if status_text:
319              await self.handle_status_update(status_text, context)
320  
321  async def handle_status_update(self, status_text: str, context: ResponseContext) -> None:
322      """Handle agent status update (progress indicator)."""
323      conversation_id = context.platform_context["conversation_id"]
324      await self.platform_api.update_status(conversation_id, status_text)
325  ```
326  
327  ### Completion: handle_task_complete()
328  
329  Handle task completion notification:
330  
331  ```python
332  async def handle_task_complete(self, context: ResponseContext) -> None:
333      """Handle task completion notification."""
334      conversation_id = context.platform_context["conversation_id"]
335      await self.platform_api.send_message(
336          conversation_id,
337          "✅ Task complete."
338      )
339  ```
340  
341  ### Error Handling: handle_error()
342  
343  Handle errors from the agent or gateway:
344  
345  ```python
346  async def handle_error(self, error: SamError, context: ResponseContext) -> None:
347      """Handle error from the agent or gateway."""
348      conversation_id = context.platform_context.get("conversation_id")
349  
350      if error.category == "CANCELED":
351          error_message = "🛑 Task canceled."
352      else:
353          error_message = f"❌ Error: {error.message}"
354  
355      if conversation_id:
356          await self.platform_api.send_message(conversation_id, error_message)
357  ```
358  
359  ## Gateway Context Services
360  
361  The `GatewayContext` provides access to framework services:
362  
363  ### Task Management
364  
365  ```python
366  # Submit a new task to the agent mesh
367  task_id = await self.context.handle_external_input(
368      external_input=platform_event,
369      endpoint_context={"source": "webhook"}
370  )
371  
372  # Cancel an in-flight task
373  await self.context.cancel_task(task_id)
374  ```
375  
376  ### Artifact Management
377  
378  ```python
379  # Load artifact content
380  content_bytes = await self.context.load_artifact_content(
381      context=response_context,
382      filename="report.pdf",
383      version="latest"
384  )
385  
386  # List available artifacts
387  artifacts = await self.context.list_artifacts(response_context)
388  for artifact in artifacts:
389      print(f"{artifact.filename}: {artifact.version}")
390  ```
391  
392  ### Feedback Collection
393  
394  ```python
395  # Submit user feedback
396  feedback = SamFeedback(
397      task_id=task_id,
398      session_id=session_id,
399      rating="up",  # or "down"
400      comment="Great response!",
401      user_id=user_id
402  )
403  await self.context.submit_feedback(feedback)
404  ```
405  
406  ### State Management
407  
408  ```python
409  # Task-level state (expires after 1 hour)
410  self.context.set_task_state(task_id, "status_message_id", message_id)
411  message_id = self.context.get_task_state(task_id, "status_message_id")
412  
413  # Session-level state (expires after 24 hours)
414  self.context.set_session_state(session_id, "user_preferences", preferences)
415  preferences = self.context.get_session_state(session_id, "user_preferences")
416  ```
417  
418  ### Timer Management
419  
420  ```python
421  # Schedule a one-time callback
422  timer_id = self.context.add_timer(
423      delay_ms=5000,
424      callback=self.my_async_callback
425  )
426  
427  # Schedule a recurring callback
428  timer_id = self.context.add_timer(
429      delay_ms=1000,
430      callback=self.my_async_callback,
431      interval_ms=1000
432  )
433  
434  # Cancel a timer
435  self.context.cancel_timer(timer_id)
436  ```
437  
438  ## Configuration
439  
440  Configure a gateway adapter in your YAML file:
441  
442  ```yaml
443  apps:
444    - name: my_gateway_app
445      app_base_path: .
446      app_module: solace_agent_mesh.gateway.generic.app
447  
448      broker:
449        # Broker connection configuration
450        <<: *broker_connection
451  
452      app_config:
453        # Required: namespace for A2A topics
454        namespace: ${NAMESPACE}
455        gateway_id: my_gateway
456  
457        # Required: path to your adapter class
458        gateway_adapter: my_package.adapters.MyAdapter
459  
460        # Adapter-specific configuration
461        adapter_config:
462          api_token: ${MY_PLATFORM_API_TOKEN}
463          webhook_url: ${WEBHOOK_URL}
464          timeout_seconds: 30
465  
466        # Standard gateway configuration
467        default_agent_name: OrchestratorAgent
468  
469        # Artifact service configuration
470        artifact_service:
471          type: "filesystem"
472          base_path: "/tmp/artifacts"
473          artifact_scope: "namespace"
474  
475        # System purpose and response format
476        system_purpose: >
477          The system is an AI assistant that helps users
478          accomplish tasks through natural language interaction.
479  
480        response_format: >
481          Responses should be clear, concise, and formatted
482          appropriately for the platform.
483  ```
484  
485  :::info
486  If `gateway_id` is not provided, it defaults to the app name. Ensure `apps.name` is unique across all gateways or specify a unique `gateway_id` to prevent conflicts.
487  :::
488  
489  ## Example: Slack Adapter
490  
491  The Slack gateway adapter demonstrates a complete implementation of the adapter pattern. It handles:
492  
493  - **Socket Mode Connection**: Maintains WebSocket connection to Slack
494  - **Event Handling**: Processes messages, mentions, slash commands, and button actions
495  - **Message Queuing**: Manages streaming updates with proper ordering
496  - **File Uploads**: Handles artifact uploads to Slack
497  - **Markdown Conversion**: Converts standard Markdown to Slack format
498  - **Feedback Collection**: Provides thumbs up/down buttons for user feedback
499  
500  Key highlights from the Slack adapter implementation:
501  
502  ### Configuration Model
503  
504  ```python
505  class SlackAdapterConfig(BaseModel):
506      slack_bot_token: str = Field(..., description="Slack Bot Token (xoxb-...).")
507      slack_app_token: str = Field(..., description="Slack App Token (xapp-...).")
508      slack_initial_status_message: str = Field(
509          "Got it, thinking...",
510          description="Message posted to Slack upon receiving a user request."
511      )
512      correct_markdown_formatting: bool = Field(
513          True,
514          description="Attempt to convert common Markdown to Slack's format."
515      )
516      feedback_enabled: bool = Field(
517          False,
518          description="Enable thumbs up/down feedback buttons."
519      )
520  ```
521  
522  ### Authentication with Caching
523  
524  The Slack adapter extracts user email from Slack's API and caches the results:
525  
526  ```python
527  async def extract_auth_claims(
528      self,
529      external_input: Dict,
530      endpoint_context: Optional[Dict[str, Any]] = None,
531  ) -> Optional[AuthClaims]:
532      slack_user_id = external_input.get("user")
533  
534      # Check cache first
535      if cached_email := self.get_cached_email(slack_user_id):
536          return AuthClaims(id=cached_email, email=cached_email, source="slack_api")
537  
538      # Fetch from Slack API
539      profile = await self.slack_app.client.users_profile_get(user=slack_user_id)
540      user_email = profile.get("profile", {}).get("email")
541  
542      if user_email:
543          self.cache_email(slack_user_id, user_email)
544          return AuthClaims(id=user_email, email=user_email, source="slack_api")
545  ```
546  
547  ### Streaming Updates with Message Queue
548  
549  The Slack adapter uses a message queue to handle streaming updates efficiently:
550  
551  ```python
552  async def handle_update(self, update: SamUpdate, context: ResponseContext) -> None:
553      task_id = context.task_id
554      queue = await self._get_or_create_queue(task_id, channel_id, thread_ts)
555  
556      for part in update.parts:
557          if isinstance(part, SamTextPart):
558              await queue.queue_text_update(part.text)
559          elif isinstance(part, SamFilePart):
560              await queue.queue_file_upload(part.name, part.content_bytes)
561  ```
562  
563  For the complete Slack adapter implementation, see `src/solace_agent_mesh/gateway/slack/adapter.py`.
564  
565  ## Creating Your Own Adapter
566  
567  ### Option 1: Create as a Plugin (Recommended)
568  
569  For reusable adapters that you plan to share or use across multiple projects:
570  
571  ```bash
572  sam plugin create my-gateway-plugin
573  ```
574  
575  Select "Gateway Plugin" when prompted. This creates a complete plugin structure with:
576  
577  - Python package structure
578  - Sample adapter implementation
579  - Configuration template
580  - Build tooling
581  
582  After implementing your adapter, build and distribute:
583  
584  ```bash
585  cd my-gateway-plugin
586  sam plugin build
587  ```
588  
589  For more information on plugins, see [Plugins](../components/plugins.md).
590  
591  ### Option 2: Add to Existing Project
592  
593  For project-specific adapters, create your adapter class in your project:
594  
595  1. Create your adapter module:
596  
597  ```python
598  # my_project/gateways/my_adapter.py
599  from solace_agent_mesh.gateway.adapter.base import GatewayAdapter
600  from solace_agent_mesh.gateway.adapter.types import (
601      AuthClaims,
602      GatewayContext,
603      ResponseContext,
604      SamTask,
605      SamUpdate
606  )
607  
608  class MyAdapter(GatewayAdapter):
609      async def init(self, context: GatewayContext) -> None:
610          self.context = context
611          # Initialize your platform connection
612  
613      async def prepare_task(self, external_input, endpoint_context=None) -> SamTask:
614          # Convert platform event to SamTask
615          pass
616  
617      async def handle_update(self, update: SamUpdate, context: ResponseContext) -> None:
618          # Send update to platform
619          pass
620  ```
621  
622  2. Reference it in your configuration:
623  
624  ```yaml
625  app_config:
626    gateway_adapter: my_project.gateways.my_adapter.MyAdapter
627    adapter_config:
628      # Your adapter configuration
629  ```
630  
631  ## Advanced: Full Custom Gateways
632  
633  For most use cases, gateway adapters provide all the functionality you need to connect external platforms to the agent mesh. However, if you have highly specialized requirements, you can create a full custom gateway that implements the complete gateway lifecycle and A2A protocol handling from scratch.
634  
635  Consider a full custom gateway only if you need:
636  
637  - Highly specialized authentication flows not supported by the standard flow
638  - Custom A2A protocol behavior or extensions
639  - Complex multi-stage processing pipelines with custom state management
640  - Fine-grained control over every aspect of the gateway lifecycle
641  
642  Full custom gateways extend `BaseGatewayComponent` directly and implement all protocol handling manually. This approach requires significantly more code and expertise but provides complete control over the gateway behavior.
643  
644  ## Related Documentation
645  
646  - [Gateways](../components/gateways.md) - Overview of gateway concepts and types
647  - [Plugins](../components/plugins.md) - Creating and distributing gateway plugins