test_webhook_integration.py
1 """Integration tests for the generic webhook platform adapter. 2 3 These tests exercise end-to-end flows through the webhook adapter: 4 1. GitHub PR webhook → agent MessageEvent created 5 2. Skills config injects skill content into the prompt 6 3. Cross-platform delivery routes to a mock Telegram adapter 7 4. GitHub comment delivery invokes ``gh`` CLI (mocked subprocess) 8 """ 9 10 import asyncio 11 import hashlib 12 import hmac 13 import json 14 from unittest.mock import AsyncMock, MagicMock, patch 15 16 import pytest 17 from aiohttp import web 18 from aiohttp.test_utils import TestClient, TestServer 19 20 from gateway.config import ( 21 GatewayConfig, 22 HomeChannel, 23 Platform, 24 PlatformConfig, 25 ) 26 from gateway.platforms.base import MessageEvent, MessageType, SendResult 27 from gateway.platforms.webhook import WebhookAdapter, _INSECURE_NO_AUTH 28 29 30 # --------------------------------------------------------------------------- 31 # Helpers 32 # --------------------------------------------------------------------------- 33 34 def _make_adapter(routes, **extra_kw) -> WebhookAdapter: 35 """Create a WebhookAdapter with the given routes.""" 36 extra = {"host": "0.0.0.0", "port": 0, "routes": routes} 37 extra.update(extra_kw) 38 config = PlatformConfig(enabled=True, extra=extra) 39 return WebhookAdapter(config) 40 41 42 def _create_app(adapter: WebhookAdapter) -> web.Application: 43 """Build the aiohttp Application from the adapter.""" 44 app = web.Application() 45 app.router.add_get("/health", adapter._handle_health) 46 app.router.add_post("/webhooks/{route_name}", adapter._handle_webhook) 47 return app 48 49 50 def _github_signature(body: bytes, secret: str) -> str: 51 """Compute X-Hub-Signature-256 for *body* using *secret*.""" 52 return "sha256=" + hmac.new( 53 secret.encode(), body, hashlib.sha256 54 ).hexdigest() 55 56 57 # A realistic GitHub pull_request event payload (trimmed) 58 GITHUB_PR_PAYLOAD = { 59 "action": "opened", 60 "number": 42, 61 "pull_request": { 62 "title": "Add webhook adapter", 63 "body": "This PR adds a generic webhook platform adapter.", 64 "html_url": "https://github.com/org/repo/pull/42", 65 "user": {"login": "contributor"}, 66 "head": {"ref": "feature/webhooks"}, 67 "base": {"ref": "main"}, 68 }, 69 "repository": { 70 "full_name": "org/repo", 71 "html_url": "https://github.com/org/repo", 72 }, 73 "sender": {"login": "contributor"}, 74 } 75 76 77 # =================================================================== 78 # Test 1: GitHub PR webhook triggers agent 79 # =================================================================== 80 81 class TestGitHubPRWebhook: 82 83 @pytest.mark.asyncio 84 async def test_github_pr_webhook_triggers_agent(self): 85 """POST with a realistic GitHub PR payload should: 86 1. Return 202 Accepted 87 2. Call handle_message with a MessageEvent 88 3. The event text contains the rendered prompt 89 4. The event source has chat_type 'webhook' 90 """ 91 secret = "gh-webhook-test-secret" 92 routes = { 93 "github-pr": { 94 "secret": secret, 95 "events": ["pull_request"], 96 "prompt": ( 97 "Review PR #{number} by {sender.login}: " 98 "{pull_request.title}\n\n{pull_request.body}" 99 ), 100 "deliver": "log", 101 } 102 } 103 adapter = _make_adapter(routes) 104 105 captured_events: list[MessageEvent] = [] 106 107 async def _capture(event: MessageEvent): 108 captured_events.append(event) 109 110 adapter.handle_message = _capture 111 112 app = _create_app(adapter) 113 body = json.dumps(GITHUB_PR_PAYLOAD).encode() 114 sig = _github_signature(body, secret) 115 116 async with TestClient(TestServer(app)) as cli: 117 resp = await cli.post( 118 "/webhooks/github-pr", 119 data=body, 120 headers={ 121 "Content-Type": "application/json", 122 "X-GitHub-Event": "pull_request", 123 "X-Hub-Signature-256": sig, 124 "X-GitHub-Delivery": "gh-delivery-001", 125 }, 126 ) 127 assert resp.status == 202 128 data = await resp.json() 129 assert data["status"] == "accepted" 130 assert data["route"] == "github-pr" 131 assert data["event"] == "pull_request" 132 assert data["delivery_id"] == "gh-delivery-001" 133 134 # Let the asyncio.create_task fire 135 await asyncio.sleep(0.05) 136 137 assert len(captured_events) == 1 138 event = captured_events[0] 139 assert "Review PR #42 by contributor" in event.text 140 assert "Add webhook adapter" in event.text 141 assert event.source.chat_type == "webhook" 142 assert event.source.platform == Platform.WEBHOOK 143 assert "github-pr" in event.source.chat_id 144 assert event.message_id == "gh-delivery-001" 145 146 147 # =================================================================== 148 # Test 2: Skills injected into prompt 149 # =================================================================== 150 151 class TestSkillsInjection: 152 153 @pytest.mark.asyncio 154 async def test_skills_injected_into_prompt(self): 155 """When a route has skills: [code-review], the adapter should 156 call build_skill_invocation_message() and use its output as the 157 prompt instead of the raw template render.""" 158 routes = { 159 "pr-review": { 160 "secret": _INSECURE_NO_AUTH, 161 "events": ["pull_request"], 162 "prompt": "Review this PR: {pull_request.title}", 163 "skills": ["code-review"], 164 } 165 } 166 adapter = _make_adapter(routes) 167 168 captured_events: list[MessageEvent] = [] 169 170 async def _capture(event: MessageEvent): 171 captured_events.append(event) 172 173 adapter.handle_message = _capture 174 175 skill_content = ( 176 "You are a code reviewer. Review the following:\n" 177 "Review this PR: Add webhook adapter" 178 ) 179 180 # The imports are lazy (inside the handler), so patch the source module 181 with patch( 182 "agent.skill_commands.build_skill_invocation_message", 183 return_value=skill_content, 184 ) as mock_build, patch( 185 "agent.skill_commands.get_skill_commands", 186 return_value={"/code-review": {"name": "code-review"}}, 187 ): 188 app = _create_app(adapter) 189 async with TestClient(TestServer(app)) as cli: 190 resp = await cli.post( 191 "/webhooks/pr-review", 192 json=GITHUB_PR_PAYLOAD, 193 headers={ 194 "X-GitHub-Event": "pull_request", 195 "X-GitHub-Delivery": "skill-test-001", 196 }, 197 ) 198 assert resp.status == 202 199 200 await asyncio.sleep(0.05) 201 202 assert len(captured_events) == 1 203 event = captured_events[0] 204 # The prompt should be the skill content, not the raw template 205 assert "You are a code reviewer" in event.text 206 mock_build.assert_called_once() 207 208 209 # =================================================================== 210 # Test 3: Cross-platform delivery (webhook → Telegram) 211 # =================================================================== 212 213 class TestCrossPlatformDelivery: 214 215 @pytest.mark.asyncio 216 async def test_cross_platform_delivery(self): 217 """When deliver='telegram', the response is routed to the 218 Telegram adapter via gateway_runner.adapters.""" 219 routes = { 220 "alerts": { 221 "secret": _INSECURE_NO_AUTH, 222 "prompt": "Alert: {message}", 223 "deliver": "telegram", 224 "deliver_extra": {"chat_id": "12345"}, 225 } 226 } 227 adapter = _make_adapter(routes) 228 adapter.handle_message = AsyncMock() 229 230 # Set up a mock gateway runner with a mock Telegram adapter 231 mock_tg_adapter = AsyncMock() 232 mock_tg_adapter.send = AsyncMock(return_value=SendResult(success=True)) 233 234 mock_runner = MagicMock() 235 mock_runner.adapters = {Platform.TELEGRAM: mock_tg_adapter} 236 mock_runner.config = GatewayConfig( 237 platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake")} 238 ) 239 adapter.gateway_runner = mock_runner 240 241 # First, simulate a webhook POST to set up delivery_info 242 app = _create_app(adapter) 243 async with TestClient(TestServer(app)) as cli: 244 resp = await cli.post( 245 "/webhooks/alerts", 246 json={"message": "Server is on fire!"}, 247 headers={"X-GitHub-Delivery": "alert-001"}, 248 ) 249 assert resp.status == 202 250 251 # The adapter should have stored delivery info 252 chat_id = "webhook:alerts:alert-001" 253 assert chat_id in adapter._delivery_info 254 255 # Now call send() as if the agent has finished 256 result = await adapter.send(chat_id, "I've acknowledged the alert.") 257 258 assert result.success is True 259 mock_tg_adapter.send.assert_awaited_once_with( 260 "12345", "I've acknowledged the alert.", metadata=None 261 ) 262 # Delivery info is retained after send() so interim status messages 263 # don't strand the final response (TTL-based cleanup happens on POST). 264 assert chat_id in adapter._delivery_info 265 266 267 # =================================================================== 268 # Test 4: GitHub comment delivery via gh CLI 269 # =================================================================== 270 271 class TestGitHubCommentDelivery: 272 273 @pytest.mark.asyncio 274 async def test_github_comment_delivery(self): 275 """When deliver='github_comment', the adapter invokes 276 ``gh pr comment`` via subprocess.run (mocked).""" 277 routes = { 278 "pr-bot": { 279 "secret": _INSECURE_NO_AUTH, 280 "prompt": "Review: {pull_request.title}", 281 "deliver": "github_comment", 282 "deliver_extra": { 283 "repo": "{repository.full_name}", 284 "pr_number": "{number}", 285 }, 286 } 287 } 288 adapter = _make_adapter(routes) 289 adapter.handle_message = AsyncMock() 290 291 # POST a webhook to set up delivery info 292 app = _create_app(adapter) 293 async with TestClient(TestServer(app)) as cli: 294 resp = await cli.post( 295 "/webhooks/pr-bot", 296 json=GITHUB_PR_PAYLOAD, 297 headers={ 298 "X-GitHub-Event": "pull_request", 299 "X-GitHub-Delivery": "gh-comment-001", 300 }, 301 ) 302 assert resp.status == 202 303 304 chat_id = "webhook:pr-bot:gh-comment-001" 305 assert chat_id in adapter._delivery_info 306 307 # Verify deliver_extra was rendered with payload data 308 delivery = adapter._delivery_info[chat_id] 309 assert delivery["deliver_extra"]["repo"] == "org/repo" 310 assert delivery["deliver_extra"]["pr_number"] == "42" 311 312 # Mock subprocess.run and call send() 313 mock_result = MagicMock() 314 mock_result.returncode = 0 315 mock_result.stdout = "Comment posted" 316 mock_result.stderr = "" 317 318 with patch( 319 "gateway.platforms.webhook.subprocess.run", 320 return_value=mock_result, 321 ) as mock_run: 322 result = await adapter.send( 323 chat_id, "LGTM! The code looks great." 324 ) 325 326 assert result.success is True 327 mock_run.assert_called_once_with( 328 [ 329 "gh", "pr", "comment", "42", 330 "--repo", "org/repo", 331 "--body", "LGTM! The code looks great.", 332 ], 333 capture_output=True, 334 text=True, 335 timeout=30, 336 ) 337 # Delivery info is retained after send() so interim status messages 338 # don't strand the final response (TTL-based cleanup happens on POST). 339 assert chat_id in adapter._delivery_info