telemetry.py
1 # Copyright (c) 2024-2026 Tencent Zhuque Lab. All rights reserved. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 # 15 # Requirement: Any integration or derivative work must explicitly attribute 16 # Tencent Zhuque Lab (https://github.com/Tencent/AI-Infra-Guard) in its 17 # documentation or user interface, as detailed in the NOTICE file. 18 19 from contextlib import contextmanager 20 import logging 21 import os 22 import socket 23 import sys 24 import uuid 25 import sentry_sdk 26 from enum import Enum 27 import requests 28 from posthog import Posthog 29 from typing import List 30 31 32 class Feature(Enum): 33 REDTEAMING = "redteaming" 34 UNKNOWN = "unknown" 35 36 37 TELEMETRY_DATA_FILE = ".deepteam/telemetry.txt" 38 39 40 ######################################################### 41 ### Telemetry Config #################################### 42 ######################################################### 43 44 45 def telemetry_opt_out(): 46 return os.getenv("DEEPTEAM_TELEMETRY_OPT_OUT", "YES") == "YES" 47 48 49 def blocked_by_firewall(): 50 try: 51 socket.create_connection(("www.google.com", 80)) 52 return False 53 except OSError: 54 return True 55 56 57 def get_anonymous_public_ip(): 58 try: 59 response = requests.get("https://api.ipify.org", timeout=5) 60 if response.status_code == 200: 61 return response.text 62 except requests.RequestException: 63 pass 64 return None 65 66 67 anonymous_public_ip = None 68 69 if not telemetry_opt_out(): 70 from opentelemetry import trace 71 from opentelemetry.sdk.trace import TracerProvider 72 from opentelemetry.sdk.trace.export import BatchSpanProcessor 73 from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( 74 OTLPSpanExporter, 75 ) 76 77 anonymous_public_ip = get_anonymous_public_ip() 78 sentry_sdk.init( 79 dsn="", 80 profiles_sample_rate=1.0, 81 traces_sample_rate=1.0, # For performance monitoring 82 send_default_pii=False, # Don't send personally identifiable information 83 attach_stacktrace=False, # Don't attach stack traces to messages 84 default_integrations=False, # Disable Sentry's default integrations 85 ) 86 87 # Set up the Tracer Provider 88 if not trace.get_tracer_provider().__class__.__name__ == "TracerProvider": 89 trace.set_tracer_provider(TracerProvider()) 90 tracer_provider = trace.get_tracer_provider() 91 92 # New Relic License Key and OTLP Endpoint 93 NEW_RELIC_LICENSE_KEY = "dummy_key" 94 NEW_RELIC_OTLP_ENDPOINT = "http://localhost:0" 95 otlp_exporter = OTLPSpanExporter( 96 endpoint=NEW_RELIC_OTLP_ENDPOINT, 97 headers={"api-key": NEW_RELIC_LICENSE_KEY}, 98 ) 99 100 # Add the OTLP exporter to the span processor 101 span_processor = BatchSpanProcessor(otlp_exporter) 102 tracer_provider.add_span_processor(span_processor) 103 104 logging.getLogger("opentelemetry.exporter.otlp").setLevel(logging.CRITICAL) 105 106 # Create a tracer for your application 107 tracer = trace.get_tracer(__name__) 108 109 # Initialize PostHog 110 posthog = Posthog( 111 project_api_key="dummy_key", 112 host="http://localhost:0", # 无效地址 113 ) 114 115 116 if ( 117 os.getenv("ERROR_REPORTING") == "YES" 118 and not blocked_by_firewall() 119 and not os.getenv("TELEMETRY_OPT_OUT") 120 ): 121 122 def handle_exception(exc_type, exc_value, exc_traceback): 123 print({"exc_type": exc_type, "exc_value": exc_value}) 124 sentry_sdk.capture_exception(exc_value) 125 sys.__excepthook__(exc_type, exc_value, exc_traceback) 126 127 sys.excepthook = handle_exception 128 129 130 def is_running_in_jupyter_notebook(): 131 try: 132 from IPython import get_ipython 133 134 if "IPKernelApp" in get_ipython().config: 135 return True 136 except Exception: 137 pass 138 return False 139 140 141 IS_RUNNING_IN_JUPYTER = ( 142 "jupyter" if is_running_in_jupyter_notebook() else "other" 143 ) 144 145 ######################################################### 146 ### Context Managers #################################### 147 ######################################################### 148 149 150 @contextmanager 151 def capture_red_teamer_run(vulnerabilities: List[str], attacks: List[str]): 152 if not telemetry_opt_out(): 153 with tracer.start_as_current_span(f"Invoked redteamer") as span: 154 posthog.capture(get_unique_id(), f"Invoked redteamer") 155 span.set_attribute("environment", IS_RUNNING_IN_JUPYTER) 156 span.set_attribute("user.status", get_status()) 157 span.set_attribute("user.unique_id", get_unique_id()) 158 span.set_attribute( 159 "feature_status.redteaming", 160 get_feature_status(Feature.REDTEAMING), 161 ) 162 for vulnerability in vulnerabilities: 163 span.set_attribute(f"vulnerability.{vulnerability}", 1) 164 for attack in attacks: 165 span.set_attribute(f"attack.{attack}", 1) 166 if anonymous_public_ip: 167 span.set_attribute("user.public_ip", anonymous_public_ip) 168 set_last_feature(Feature.REDTEAMING) 169 yield span 170 else: 171 yield 172 173 174 ######################################################### 175 ### Helper Functions #################################### 176 ######################################################### 177 178 179 def read_telemetry_file() -> dict: 180 """Reads the telemetry data file and returns the key-value pairs as a dictionary.""" 181 if not os.path.exists(TELEMETRY_DATA_FILE): 182 return {} 183 with open(TELEMETRY_DATA_FILE, "r") as file: 184 lines = file.readlines() 185 data = {} 186 for line in lines: 187 key, _, value = line.strip().partition("=") 188 data[key] = value 189 return data 190 191 192 def write_telemetry_file(data: dict): 193 """Writes the given key-value pairs to the telemetry data file.""" 194 os.makedirs(os.path.dirname(TELEMETRY_DATA_FILE), exist_ok=True) 195 with open(TELEMETRY_DATA_FILE, "w") as file: 196 for key, value in data.items(): 197 file.write(f"{key}={value}\n") 198 199 200 def get_status() -> str: 201 """Gets the status from the telemetry file.""" 202 data = read_telemetry_file() 203 return data.get("DEEPTEAM_STATUS", "new") 204 205 206 def get_unique_id() -> str: 207 """Gets or generates a unique ID and updates the telemetry file.""" 208 data = read_telemetry_file() 209 unique_id = data.get("DEEPTEAM_ID") 210 if not unique_id: 211 unique_id = str(uuid.uuid4()) 212 data["DEEPTEAM_ID"] = unique_id 213 data["DEEPTEAM_STATUS"] = "new" 214 else: 215 data["DEEPTEAM_STATUS"] = "old" 216 write_telemetry_file(data) 217 return unique_id 218 219 220 def get_last_feature() -> Feature: 221 """Gets the last feature from the telemetry file.""" 222 data = read_telemetry_file() 223 last_feature = data.get("DEEPTEAM_LAST_FEATURE") 224 if last_feature and last_feature in Feature._value2member_map_: 225 return Feature(last_feature) 226 return Feature.UNKNOWN 227 228 229 def set_last_feature(feature: Feature): 230 """Sets the last feature in the telemetry file.""" 231 if feature not in Feature: 232 raise ValueError(f"Invalid feature: {feature}") 233 data = read_telemetry_file() 234 data["DEEPTEAM_LAST_FEATURE"] = feature.value 235 feature_status_key = f"DEEPTEAM_{feature.value.upper()}_STATUS" 236 data[feature_status_key] = "old" 237 write_telemetry_file(data) 238 239 240 def get_feature_status(feature: Feature) -> str: 241 """Gets the status of a feature ('new' or 'old') from the telemetry file.""" 242 data = read_telemetry_file() 243 feature_status_key = f"DEEPTEAM_{feature.value.upper()}_STATUS" 244 return data.get(feature_status_key, "new")