create-gateways.md
1 --- 2 title: Creating Custom Gateways 3 sidebar_position: 430 4 --- 5 6 import NetworkAccessMayBeRequired from '@site/docs/partials/network-access-may-be-required.mdx'; 7 8 # Creating Custom Gateways 9 10 Gateway adapters connect external systems to the Agent Mesh through custom interfaces. They translate between platform-specific formats (Slack messages, HTTP requests, webhook payloads) and the standardized A2A protocol that agents understand. Gateway adapters handle platform events, extract user identity, and deliver agent responses back to users in the appropriate format. 11 12 This guide walks you through creating custom gateways using the gateway adapter pattern. 13 14 :::tip[In one sentence] 15 Gateway adapters are custom interfaces that connect external platforms to the agent mesh by translating events and responses between platform formats and the A2A protocol. 16 ::: 17 18 <NetworkAccessMayBeRequired /> 19 20 ## Key Functions 21 22 Gateway adapters provide the following capabilities: 23 24 1. **Platform Integration**: Connect external systems such as Slack, webhooks, REST APIs, or custom protocols to the agent mesh. Handle incoming events, commands, and messages from these platforms. 25 26 2. **Authentication**: Extract user identity from platform events (OAuth tokens, API keys, platform user IDs) and integrate with identity providers for user enrichment. 27 28 3. **Message Translation**: Convert platform-specific message formats into standardized content that agents can process. Transform agent responses back into platform-native formats. 29 30 4. **Streaming Responses**: Deliver real-time updates from agents to users as they are generated. Handle text streaming, status updates, and progress indicators. 31 32 5. **File Management**: Process file uploads from users and deliver agent-generated artifacts (reports, images, data files) back to the platform in the appropriate format. 33 34 6. **Built-in Services**: Access state management for maintaining conversation context, timer scheduling for delayed operations, and feedback collection for gathering user ratings. 35 36 ## Architecture Overview 37 38 Gateway adapters work alongside a generic gateway component to connect platforms to the agent mesh: 39 40 - **Gateway Adapter**: Your platform-specific code that receives events from external systems and formats responses for delivery back to users 41 - **Generic Gateway Component**: Handles A2A protocol communication, authentication flow, user enrichment, message routing, and artifact management 42 43 The following diagram illustrates the complete flow from external platform to agent mesh and back: 44 45 ```mermaid 46 sequenceDiagram 47 participant Platform as External Platform 48 participant Adapter as Gateway Adapter 49 participant Generic as Generic Gateway 50 participant Mesh as Agent Mesh 51 52 rect rgba(234, 234, 234, 1) 53 Note over Platform,Adapter: 1. External Event Arrives 54 Platform->>Adapter: Platform Event 55 end 56 57 rect rgba(234, 234, 234, 1) 58 Note over Adapter,Generic: 2. Adapter Processes Event 59 Adapter->>Adapter: extract_auth_claims() 60 Adapter->>Adapter: prepare_task() 61 Adapter->>Generic: handle_external_input(SamTask) 62 end 63 64 rect rgba(234, 234, 234, 1) 65 Note over Generic,Mesh: 3. Generic Gateway Handles A2A 66 Generic->>Generic: Authenticate & Enrich User 67 Generic->>Generic: Convert to A2A Format 68 Generic->>Mesh: Submit A2A Task 69 end 70 71 rect rgba(234, 234, 234, 1) 72 Note over Mesh,Adapter: 4. Response Flow 73 Mesh-->>Generic: A2A Updates 74 Generic->>Generic: Convert to SAM Types 75 Generic->>Adapter: handle_update(SamUpdate) 76 Adapter->>Platform: Platform Response 77 end 78 79 %%{init: { 80 'theme': 'base', 81 'themeVariables': { 82 'actorBkg': '#00C895', 83 'actorBorder': '#00C895', 84 'actorTextColor': '#000000', 85 'noteBkgColor': '#FFF7C2', 86 'noteTextColor': '#000000', 87 'noteBorderColor': '#FFF7C2' 88 } 89 }}%% 90 ``` 91 92 ## Core Concepts 93 94 ### The Gateway Adapter Contract 95 96 Gateway adapters extend the `GatewayAdapter` base class and implement methods for handling platform-specific events: 97 98 - Extract authentication information from platform events 99 - Convert platform events into standardized task format 100 - Format and send responses back to the platform 101 102 ### The Gateway Context 103 104 When your adapter initializes, it receives a `GatewayContext` object that provides access to framework services: 105 106 - Task submission and cancellation 107 - Artifact loading and management 108 - User feedback collection 109 - State management (task-level and session-level) 110 - Timer scheduling 111 112 ### The Type System 113 114 Gateway adapters use a standardized type system for messages: 115 116 - **SamTask**: An inbound request with content parts and metadata 117 - **SamUpdate**: An outbound update containing one or more content parts 118 - **SamTextPart**: Text content in tasks or updates 119 - **SamFilePart**: File content with bytes or URI 120 - **SamDataPart**: Structured data with metadata 121 - **AuthClaims**: User authentication information 122 - **ResponseContext**: Context provided with each outbound callback 123 124 ## Adapter Lifecycle 125 126 Gateway adapters follow a simple lifecycle: 127 128 ### Initialization 129 130 The `init()` method is called when the gateway starts: 131 132 ```python 133 async def init(self, context: GatewayContext) -> None: 134 """ 135 Initialize the gateway adapter. 136 137 This is where you should: 138 - Store the context for later use 139 - Start platform listeners (WebSocket, HTTP server, etc.) 140 - Connect to external services 141 """ 142 self.context = context 143 # Initialize your platform connection 144 await self.start_platform_listener() 145 ``` 146 147 ### Active Processing 148 149 During normal operation, the generic gateway calls your adapter methods: 150 151 - `extract_auth_claims()` - Extract user identity from platform events 152 - `prepare_task()` - Convert platform events to SamTask format 153 - `handle_update()` - Process updates from agents 154 - `handle_task_complete()` - Handle task completion 155 - `handle_error()` - Handle errors 156 157 ### Cleanup 158 159 The `cleanup()` method is called during shutdown: 160 161 ```python 162 async def cleanup(self) -> None: 163 """ 164 Clean up resources on shutdown. 165 166 This is where you should: 167 - Stop platform listeners 168 - Close connections 169 - Release resources 170 """ 171 await self.stop_platform_listener() 172 ``` 173 174 ## Implementing an Adapter 175 176 ### Required Configuration 177 178 Define a configuration model for your adapter using Pydantic: 179 180 ```python 181 from pydantic import BaseModel, Field 182 183 class MyAdapterConfig(BaseModel): 184 """Configuration model for MyAdapter.""" 185 186 api_token: str = Field(..., description="API token for the platform.") 187 webhook_url: str = Field(..., description="Webhook URL to listen on.") 188 timeout_seconds: int = Field(default=30, description="Request timeout.") 189 ``` 190 191 Set the `ConfigModel` class attribute: 192 193 ```python 194 from solace_agent_mesh.gateway.adapter.base import GatewayAdapter 195 196 class MyAdapter(GatewayAdapter): 197 ConfigModel = MyAdapterConfig 198 ``` 199 200 ### Authentication: extract_auth_claims() 201 202 Extract user identity from platform events: 203 204 ```python 205 async def extract_auth_claims( 206 self, 207 external_input: Dict, 208 endpoint_context: Optional[Dict[str, Any]] = None, 209 ) -> Optional[AuthClaims]: 210 """ 211 Extract authentication claims from platform input. 212 213 Return AuthClaims with user info, or None to use config-based auth. 214 """ 215 user_id = external_input.get("user_id") 216 user_email = external_input.get("user_email") 217 218 if user_id and user_email: 219 return AuthClaims( 220 id=user_email, 221 email=user_email, 222 source="platform_api", 223 raw_context={"platform_user_id": user_id} 224 ) 225 226 return None 227 ``` 228 229 ### Inbound: prepare_task() 230 231 Convert platform events into standardized task format: 232 233 ```python 234 async def prepare_task( 235 self, 236 external_input: Dict, 237 endpoint_context: Optional[Dict[str, Any]] = None, 238 ) -> SamTask: 239 """ 240 Prepare a task from platform input. 241 242 This method is called after authentication succeeds. Convert your 243 platform's event format into a SamTask with parts. 244 """ 245 message_text = external_input.get("message", "") 246 conversation_id = external_input.get("conversation_id") 247 248 # Create content parts 249 parts = [self.context.create_text_part(message_text)] 250 251 # Handle file attachments if present 252 if "attachments" in external_input: 253 for attachment in external_input["attachments"]: 254 file_bytes = await self._download_attachment(attachment) 255 parts.append( 256 self.context.create_file_part_from_bytes( 257 name=attachment["filename"], 258 content_bytes=file_bytes, 259 mime_type=attachment["mime_type"] 260 ) 261 ) 262 263 return SamTask( 264 parts=parts, 265 session_id=conversation_id, 266 target_agent=self.context.config.get("default_agent_name", "default"), 267 platform_context={ 268 "conversation_id": conversation_id, 269 # Store any platform-specific data needed for responses 270 } 271 ) 272 ``` 273 274 ### Outbound: handle_update() 275 276 Process updates from agents and send them to your platform: 277 278 ```python 279 async def handle_update(self, update: SamUpdate, context: ResponseContext) -> None: 280 """ 281 Handle an update from the agent. 282 283 By default, this dispatches to individual part handlers. 284 Override for custom batch processing. 285 """ 286 # Default implementation handles each part type 287 for part in update.parts: 288 if isinstance(part, SamTextPart): 289 await self.handle_text_chunk(part.text, context) 290 elif isinstance(part, SamFilePart): 291 await self.handle_file(part, context) 292 elif isinstance(part, SamDataPart): 293 await self.handle_data_part(part, context) 294 ``` 295 296 Implement individual part handlers: 297 298 ```python 299 async def handle_text_chunk(self, text: str, context: ResponseContext) -> None: 300 """Handle streaming text chunk from the agent.""" 301 conversation_id = context.platform_context["conversation_id"] 302 await self.platform_api.send_message(conversation_id, text) 303 304 async def handle_file(self, file_part: SamFilePart, context: ResponseContext) -> None: 305 """Handle file/artifact from the agent.""" 306 conversation_id = context.platform_context["conversation_id"] 307 await self.platform_api.upload_file( 308 conversation_id, 309 filename=file_part.name, 310 content=file_part.content_bytes 311 ) 312 313 async def handle_data_part(self, data_part: SamDataPart, context: ResponseContext) -> None: 314 """Handle structured data part from the agent.""" 315 # Check for special data part types 316 if data_part.data.get("type") == "agent_progress_update": 317 status_text = data_part.data.get("status_text") 318 if status_text: 319 await self.handle_status_update(status_text, context) 320 321 async def handle_status_update(self, status_text: str, context: ResponseContext) -> None: 322 """Handle agent status update (progress indicator).""" 323 conversation_id = context.platform_context["conversation_id"] 324 await self.platform_api.update_status(conversation_id, status_text) 325 ``` 326 327 ### Completion: handle_task_complete() 328 329 Handle task completion notification: 330 331 ```python 332 async def handle_task_complete(self, context: ResponseContext) -> None: 333 """Handle task completion notification.""" 334 conversation_id = context.platform_context["conversation_id"] 335 await self.platform_api.send_message( 336 conversation_id, 337 "✅ Task complete." 338 ) 339 ``` 340 341 ### Error Handling: handle_error() 342 343 Handle errors from the agent or gateway: 344 345 ```python 346 async def handle_error(self, error: SamError, context: ResponseContext) -> None: 347 """Handle error from the agent or gateway.""" 348 conversation_id = context.platform_context.get("conversation_id") 349 350 if error.category == "CANCELED": 351 error_message = "🛑 Task canceled." 352 else: 353 error_message = f"❌ Error: {error.message}" 354 355 if conversation_id: 356 await self.platform_api.send_message(conversation_id, error_message) 357 ``` 358 359 ## Gateway Context Services 360 361 The `GatewayContext` provides access to framework services: 362 363 ### Task Management 364 365 ```python 366 # Submit a new task to the agent mesh 367 task_id = await self.context.handle_external_input( 368 external_input=platform_event, 369 endpoint_context={"source": "webhook"} 370 ) 371 372 # Cancel an in-flight task 373 await self.context.cancel_task(task_id) 374 ``` 375 376 ### Artifact Management 377 378 ```python 379 # Load artifact content 380 content_bytes = await self.context.load_artifact_content( 381 context=response_context, 382 filename="report.pdf", 383 version="latest" 384 ) 385 386 # List available artifacts 387 artifacts = await self.context.list_artifacts(response_context) 388 for artifact in artifacts: 389 print(f"{artifact.filename}: {artifact.version}") 390 ``` 391 392 ### Feedback Collection 393 394 ```python 395 # Submit user feedback 396 feedback = SamFeedback( 397 task_id=task_id, 398 session_id=session_id, 399 rating="up", # or "down" 400 comment="Great response!", 401 user_id=user_id 402 ) 403 await self.context.submit_feedback(feedback) 404 ``` 405 406 ### State Management 407 408 ```python 409 # Task-level state (expires after 1 hour) 410 self.context.set_task_state(task_id, "status_message_id", message_id) 411 message_id = self.context.get_task_state(task_id, "status_message_id") 412 413 # Session-level state (expires after 24 hours) 414 self.context.set_session_state(session_id, "user_preferences", preferences) 415 preferences = self.context.get_session_state(session_id, "user_preferences") 416 ``` 417 418 ### Timer Management 419 420 ```python 421 # Schedule a one-time callback 422 timer_id = self.context.add_timer( 423 delay_ms=5000, 424 callback=self.my_async_callback 425 ) 426 427 # Schedule a recurring callback 428 timer_id = self.context.add_timer( 429 delay_ms=1000, 430 callback=self.my_async_callback, 431 interval_ms=1000 432 ) 433 434 # Cancel a timer 435 self.context.cancel_timer(timer_id) 436 ``` 437 438 ## Configuration 439 440 Configure a gateway adapter in your YAML file: 441 442 ```yaml 443 apps: 444 - name: my_gateway_app 445 app_base_path: . 446 app_module: solace_agent_mesh.gateway.generic.app 447 448 broker: 449 # Broker connection configuration 450 <<: *broker_connection 451 452 app_config: 453 # Required: namespace for A2A topics 454 namespace: ${NAMESPACE} 455 gateway_id: my_gateway 456 457 # Required: path to your adapter class 458 gateway_adapter: my_package.adapters.MyAdapter 459 460 # Adapter-specific configuration 461 adapter_config: 462 api_token: ${MY_PLATFORM_API_TOKEN} 463 webhook_url: ${WEBHOOK_URL} 464 timeout_seconds: 30 465 466 # Standard gateway configuration 467 default_agent_name: OrchestratorAgent 468 469 # Artifact service configuration 470 artifact_service: 471 type: "filesystem" 472 base_path: "/tmp/artifacts" 473 artifact_scope: "namespace" 474 475 # System purpose and response format 476 system_purpose: > 477 The system is an AI assistant that helps users 478 accomplish tasks through natural language interaction. 479 480 response_format: > 481 Responses should be clear, concise, and formatted 482 appropriately for the platform. 483 ``` 484 485 :::info 486 If `gateway_id` is not provided, it defaults to the app name. Ensure `apps.name` is unique across all gateways or specify a unique `gateway_id` to prevent conflicts. 487 ::: 488 489 ## Example: Slack Adapter 490 491 The Slack gateway adapter demonstrates a complete implementation of the adapter pattern. It handles: 492 493 - **Socket Mode Connection**: Maintains WebSocket connection to Slack 494 - **Event Handling**: Processes messages, mentions, slash commands, and button actions 495 - **Message Queuing**: Manages streaming updates with proper ordering 496 - **File Uploads**: Handles artifact uploads to Slack 497 - **Markdown Conversion**: Converts standard Markdown to Slack format 498 - **Feedback Collection**: Provides thumbs up/down buttons for user feedback 499 500 Key highlights from the Slack adapter implementation: 501 502 ### Configuration Model 503 504 ```python 505 class SlackAdapterConfig(BaseModel): 506 slack_bot_token: str = Field(..., description="Slack Bot Token (xoxb-...).") 507 slack_app_token: str = Field(..., description="Slack App Token (xapp-...).") 508 slack_initial_status_message: str = Field( 509 "Got it, thinking...", 510 description="Message posted to Slack upon receiving a user request." 511 ) 512 correct_markdown_formatting: bool = Field( 513 True, 514 description="Attempt to convert common Markdown to Slack's format." 515 ) 516 feedback_enabled: bool = Field( 517 False, 518 description="Enable thumbs up/down feedback buttons." 519 ) 520 ``` 521 522 ### Authentication with Caching 523 524 The Slack adapter extracts user email from Slack's API and caches the results: 525 526 ```python 527 async def extract_auth_claims( 528 self, 529 external_input: Dict, 530 endpoint_context: Optional[Dict[str, Any]] = None, 531 ) -> Optional[AuthClaims]: 532 slack_user_id = external_input.get("user") 533 534 # Check cache first 535 if cached_email := self.get_cached_email(slack_user_id): 536 return AuthClaims(id=cached_email, email=cached_email, source="slack_api") 537 538 # Fetch from Slack API 539 profile = await self.slack_app.client.users_profile_get(user=slack_user_id) 540 user_email = profile.get("profile", {}).get("email") 541 542 if user_email: 543 self.cache_email(slack_user_id, user_email) 544 return AuthClaims(id=user_email, email=user_email, source="slack_api") 545 ``` 546 547 ### Streaming Updates with Message Queue 548 549 The Slack adapter uses a message queue to handle streaming updates efficiently: 550 551 ```python 552 async def handle_update(self, update: SamUpdate, context: ResponseContext) -> None: 553 task_id = context.task_id 554 queue = await self._get_or_create_queue(task_id, channel_id, thread_ts) 555 556 for part in update.parts: 557 if isinstance(part, SamTextPart): 558 await queue.queue_text_update(part.text) 559 elif isinstance(part, SamFilePart): 560 await queue.queue_file_upload(part.name, part.content_bytes) 561 ``` 562 563 For the complete Slack adapter implementation, see `src/solace_agent_mesh/gateway/slack/adapter.py`. 564 565 ## Creating Your Own Adapter 566 567 ### Option 1: Create as a Plugin (Recommended) 568 569 For reusable adapters that you plan to share or use across multiple projects: 570 571 ```bash 572 sam plugin create my-gateway-plugin 573 ``` 574 575 Select "Gateway Plugin" when prompted. This creates a complete plugin structure with: 576 577 - Python package structure 578 - Sample adapter implementation 579 - Configuration template 580 - Build tooling 581 582 After implementing your adapter, build and distribute: 583 584 ```bash 585 cd my-gateway-plugin 586 sam plugin build 587 ``` 588 589 For more information on plugins, see [Plugins](../components/plugins.md). 590 591 ### Option 2: Add to Existing Project 592 593 For project-specific adapters, create your adapter class in your project: 594 595 1. Create your adapter module: 596 597 ```python 598 # my_project/gateways/my_adapter.py 599 from solace_agent_mesh.gateway.adapter.base import GatewayAdapter 600 from solace_agent_mesh.gateway.adapter.types import ( 601 AuthClaims, 602 GatewayContext, 603 ResponseContext, 604 SamTask, 605 SamUpdate 606 ) 607 608 class MyAdapter(GatewayAdapter): 609 async def init(self, context: GatewayContext) -> None: 610 self.context = context 611 # Initialize your platform connection 612 613 async def prepare_task(self, external_input, endpoint_context=None) -> SamTask: 614 # Convert platform event to SamTask 615 pass 616 617 async def handle_update(self, update: SamUpdate, context: ResponseContext) -> None: 618 # Send update to platform 619 pass 620 ``` 621 622 2. Reference it in your configuration: 623 624 ```yaml 625 app_config: 626 gateway_adapter: my_project.gateways.my_adapter.MyAdapter 627 adapter_config: 628 # Your adapter configuration 629 ``` 630 631 ## Advanced: Full Custom Gateways 632 633 For most use cases, gateway adapters provide all the functionality you need to connect external platforms to the agent mesh. However, if you have highly specialized requirements, you can create a full custom gateway that implements the complete gateway lifecycle and A2A protocol handling from scratch. 634 635 Consider a full custom gateway only if you need: 636 637 - Highly specialized authentication flows not supported by the standard flow 638 - Custom A2A protocol behavior or extensions 639 - Complex multi-stage processing pipelines with custom state management 640 - Fine-grained control over every aspect of the gateway lifecycle 641 642 Full custom gateways extend `BaseGatewayComponent` directly and implement all protocol handling manually. This approach requires significantly more code and expertise but provides complete control over the gateway behavior. 643 644 ## Related Documentation 645 646 - [Gateways](../components/gateways.md) - Overview of gateway concepts and types 647 - [Plugins](../components/plugins.md) - Creating and distributing gateway plugins