/ AIG-PromptSecurity / deepteam / telemetry.py
telemetry.py
  1  # Copyright (c) 2024-2026 Tencent Zhuque Lab. All rights reserved.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  #
 15  # Requirement: Any integration or derivative work must explicitly attribute
 16  # Tencent Zhuque Lab (https://github.com/Tencent/AI-Infra-Guard) in its
 17  # documentation or user interface, as detailed in the NOTICE file.
 18  
 19  from contextlib import contextmanager
 20  import logging
 21  import os
 22  import socket
 23  import sys
 24  import uuid
 25  import sentry_sdk
 26  from enum import Enum
 27  import requests
 28  from posthog import Posthog
 29  from typing import List
 30  
 31  
 32  class Feature(Enum):
 33      REDTEAMING = "redteaming"
 34      UNKNOWN = "unknown"
 35  
 36  
 37  TELEMETRY_DATA_FILE = ".deepteam/telemetry.txt"
 38  
 39  
 40  #########################################################
 41  ### Telemetry Config ####################################
 42  #########################################################
 43  
 44  
 45  def telemetry_opt_out():
 46      return os.getenv("DEEPTEAM_TELEMETRY_OPT_OUT", "YES") == "YES"
 47  
 48  
 49  def blocked_by_firewall():
 50      try:
 51          socket.create_connection(("www.google.com", 80))
 52          return False
 53      except OSError:
 54          return True
 55  
 56  
 57  def get_anonymous_public_ip():
 58      try:
 59          response = requests.get("https://api.ipify.org", timeout=5)
 60          if response.status_code == 200:
 61              return response.text
 62      except requests.RequestException:
 63          pass
 64      return None
 65  
 66  
 67  anonymous_public_ip = None
 68  
 69  if not telemetry_opt_out():
 70      from opentelemetry import trace
 71      from opentelemetry.sdk.trace import TracerProvider
 72      from opentelemetry.sdk.trace.export import BatchSpanProcessor
 73      from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
 74          OTLPSpanExporter,
 75      )
 76  
 77      anonymous_public_ip = get_anonymous_public_ip()
 78      sentry_sdk.init(
 79          dsn="",
 80          profiles_sample_rate=1.0,
 81          traces_sample_rate=1.0,  # For performance monitoring
 82          send_default_pii=False,  # Don't send personally identifiable information
 83          attach_stacktrace=False,  # Don't attach stack traces to messages
 84          default_integrations=False,  # Disable Sentry's default integrations
 85      )
 86  
 87      # Set up the Tracer Provider
 88      if not trace.get_tracer_provider().__class__.__name__ == "TracerProvider":
 89          trace.set_tracer_provider(TracerProvider())
 90      tracer_provider = trace.get_tracer_provider()
 91  
 92      # New Relic License Key and OTLP Endpoint
 93      NEW_RELIC_LICENSE_KEY = "dummy_key"
 94      NEW_RELIC_OTLP_ENDPOINT = "http://localhost:0"
 95      otlp_exporter = OTLPSpanExporter(
 96          endpoint=NEW_RELIC_OTLP_ENDPOINT,
 97          headers={"api-key": NEW_RELIC_LICENSE_KEY},
 98      )
 99  
100      # Add the OTLP exporter to the span processor
101      span_processor = BatchSpanProcessor(otlp_exporter)
102      tracer_provider.add_span_processor(span_processor)
103  
104      logging.getLogger("opentelemetry.exporter.otlp").setLevel(logging.CRITICAL)
105  
106      # Create a tracer for your application
107      tracer = trace.get_tracer(__name__)
108  
109      # Initialize PostHog
110      posthog = Posthog(
111          project_api_key="dummy_key",
112          host="http://localhost:0",  # 无效地址
113      )
114  
115  
116  if (
117      os.getenv("ERROR_REPORTING") == "YES"
118      and not blocked_by_firewall()
119      and not os.getenv("TELEMETRY_OPT_OUT")
120  ):
121  
122      def handle_exception(exc_type, exc_value, exc_traceback):
123          print({"exc_type": exc_type, "exc_value": exc_value})
124          sentry_sdk.capture_exception(exc_value)
125          sys.__excepthook__(exc_type, exc_value, exc_traceback)
126  
127      sys.excepthook = handle_exception
128  
129  
130  def is_running_in_jupyter_notebook():
131      try:
132          from IPython import get_ipython
133  
134          if "IPKernelApp" in get_ipython().config:
135              return True
136      except Exception:
137          pass
138      return False
139  
140  
141  IS_RUNNING_IN_JUPYTER = (
142      "jupyter" if is_running_in_jupyter_notebook() else "other"
143  )
144  
145  #########################################################
146  ### Context Managers ####################################
147  #########################################################
148  
149  
150  @contextmanager
151  def capture_red_teamer_run(vulnerabilities: List[str], attacks: List[str]):
152      if not telemetry_opt_out():
153          with tracer.start_as_current_span(f"Invoked redteamer") as span:
154              posthog.capture(get_unique_id(), f"Invoked redteamer")
155              span.set_attribute("environment", IS_RUNNING_IN_JUPYTER)
156              span.set_attribute("user.status", get_status())
157              span.set_attribute("user.unique_id", get_unique_id())
158              span.set_attribute(
159                  "feature_status.redteaming",
160                  get_feature_status(Feature.REDTEAMING),
161              )
162              for vulnerability in vulnerabilities:
163                  span.set_attribute(f"vulnerability.{vulnerability}", 1)
164              for attack in attacks:
165                  span.set_attribute(f"attack.{attack}", 1)
166              if anonymous_public_ip:
167                  span.set_attribute("user.public_ip", anonymous_public_ip)
168              set_last_feature(Feature.REDTEAMING)
169              yield span
170      else:
171          yield
172  
173  
174  #########################################################
175  ### Helper Functions ####################################
176  #########################################################
177  
178  
179  def read_telemetry_file() -> dict:
180      """Reads the telemetry data file and returns the key-value pairs as a dictionary."""
181      if not os.path.exists(TELEMETRY_DATA_FILE):
182          return {}
183      with open(TELEMETRY_DATA_FILE, "r") as file:
184          lines = file.readlines()
185      data = {}
186      for line in lines:
187          key, _, value = line.strip().partition("=")
188          data[key] = value
189      return data
190  
191  
192  def write_telemetry_file(data: dict):
193      """Writes the given key-value pairs to the telemetry data file."""
194      os.makedirs(os.path.dirname(TELEMETRY_DATA_FILE), exist_ok=True)
195      with open(TELEMETRY_DATA_FILE, "w") as file:
196          for key, value in data.items():
197              file.write(f"{key}={value}\n")
198  
199  
200  def get_status() -> str:
201      """Gets the status from the telemetry file."""
202      data = read_telemetry_file()
203      return data.get("DEEPTEAM_STATUS", "new")
204  
205  
206  def get_unique_id() -> str:
207      """Gets or generates a unique ID and updates the telemetry file."""
208      data = read_telemetry_file()
209      unique_id = data.get("DEEPTEAM_ID")
210      if not unique_id:
211          unique_id = str(uuid.uuid4())
212          data["DEEPTEAM_ID"] = unique_id
213          data["DEEPTEAM_STATUS"] = "new"
214      else:
215          data["DEEPTEAM_STATUS"] = "old"
216      write_telemetry_file(data)
217      return unique_id
218  
219  
220  def get_last_feature() -> Feature:
221      """Gets the last feature from the telemetry file."""
222      data = read_telemetry_file()
223      last_feature = data.get("DEEPTEAM_LAST_FEATURE")
224      if last_feature and last_feature in Feature._value2member_map_:
225          return Feature(last_feature)
226      return Feature.UNKNOWN
227  
228  
229  def set_last_feature(feature: Feature):
230      """Sets the last feature in the telemetry file."""
231      if feature not in Feature:
232          raise ValueError(f"Invalid feature: {feature}")
233      data = read_telemetry_file()
234      data["DEEPTEAM_LAST_FEATURE"] = feature.value
235      feature_status_key = f"DEEPTEAM_{feature.value.upper()}_STATUS"
236      data[feature_status_key] = "old"
237      write_telemetry_file(data)
238  
239  
240  def get_feature_status(feature: Feature) -> str:
241      """Gets the status of a feature ('new' or 'old') from the telemetry file."""
242      data = read_telemetry_file()
243      feature_status_key = f"DEEPTEAM_{feature.value.upper()}_STATUS"
244      return data.get(feature_status_key, "new")