telemetry.py
1 """Anonymized open-source telemetry for RESTai. 2 3 Collects aggregate, non-identifying usage statistics and sends them 4 to a central endpoint periodically. No PII, no content, no API keys. 5 6 Controlled by ANONYMIZED_TELEMETRY env var (default: true). 7 Opt out by setting ANONYMIZED_TELEMETRY=false. 8 """ 9 10 import asyncio 11 import logging 12 import platform 13 import sys 14 import time 15 import uuid 16 17 import httpx 18 19 from restai import config 20 21 logger = logging.getLogger(__name__) 22 23 TELEMETRY_ENDPOINT = "https://telemetry.restai.cloud/v1/report" 24 REPORT_INTERVAL_SECONDS = 86400 # 24 hours 25 STARTUP_DELAY_SECONDS = 60 # Wait for app to stabilize 26 27 28 def _get_or_create_instance_id(db_wrapper) -> str: 29 """Get or generate a persistent anonymous instance ID.""" 30 setting = db_wrapper.get_setting("telemetry_instance_id") 31 if setting and setting.value: 32 return setting.value 33 instance_id = uuid.uuid4().hex 34 db_wrapper.upsert_setting("telemetry_instance_id", instance_id) 35 return instance_id 36 37 38 def _get_database_type() -> str: 39 if getattr(config, "POSTGRES_HOST", None): 40 return "postgresql" 41 if getattr(config, "MYSQL_HOST", None): 42 return "mysql" 43 return "sqlite" 44 45 46 def collect_telemetry(db_wrapper) -> dict: 47 """Collect anonymized telemetry payload from the database.""" 48 from restai.models.databasemodels import ProjectDatabase, OutputDatabase 49 from datetime import datetime, timedelta, timezone 50 from sqlalchemy import func 51 52 instance_id = _get_or_create_instance_id(db_wrapper) 53 54 # Project counts by type 55 projects = db_wrapper.db.query(ProjectDatabase).all() 56 project_counts = {"projects": len(projects)} 57 for ptype in ("rag", "agent", "block"): 58 project_counts[f"projects_{ptype}"] = sum(1 for p in projects if p.type == ptype) 59 60 # Basic counts 61 user_count = len(db_wrapper.get_users()) 62 team_count = len(db_wrapper.get_teams()) 63 llms = db_wrapper.get_llms() 64 embeddings = db_wrapper.get_embeddings() 65 66 # LLM/embedding class names (not model names or API keys) 67 llm_classes = sorted(set(l.class_name for l in llms if l.class_name)) 68 embedding_classes = sorted(set(e.class_name for e in embeddings if e.class_name)) 69 70 # Vectorstore types in use 71 vectorstores = sorted(set(p.vectorstore for p in projects if p.vectorstore)) 72 73 # Inference count in last 24h 74 cutoff = datetime.now(timezone.utc) - timedelta(hours=24) 75 inference_count = db_wrapper.db.query(func.count(OutputDatabase.id)).filter( 76 OutputDatabase.date >= cutoff 77 ).scalar() or 0 78 79 # Daily active users (distinct users with inferences in last 24h) 80 dau = db_wrapper.db.query(func.count(func.distinct(OutputDatabase.user_id))).filter( 81 OutputDatabase.date >= cutoff 82 ).scalar() or 0 83 84 # Feature flags 85 features = { 86 "gpu": bool(getattr(config, "RESTAI_GPU", False)), 87 "docker": bool(getattr(config, "DOCKER_ENABLED", False)), 88 "mcp_server": bool(getattr(config, "MCP_SERVER", False)), 89 "redis": bool(getattr(config, "REDIS_HOST", "")), 90 "sso_google": bool(getattr(config, "GOOGLE_CLIENT_ID", "")), 91 "sso_microsoft": bool(getattr(config, "MICROSOFT_CLIENT_ID", "")), 92 "sso_github": bool(getattr(config, "GITHUB_CLIENT_ID", "")), 93 "ldap": bool(getattr(config, "LDAP_HOST", "")), 94 "enforce_2fa": str(getattr(config, "ENFORCE_2FA", "false")).lower() == "true", 95 } 96 97 from restai.utils.version import get_version_from_pyproject 98 99 return { 100 "instance_id": instance_id, 101 "version": get_version_from_pyproject(), 102 "python_version": platform.python_version(), 103 "os": sys.platform, 104 "counts": { 105 **project_counts, 106 "users": user_count, 107 "teams": team_count, 108 "llms": len(llms), 109 "embeddings": len(embeddings), 110 }, 111 "features": features, 112 "llm_classes": llm_classes, 113 "embedding_classes": embedding_classes, 114 "vectorstores": vectorstores, 115 "database": _get_database_type(), 116 "inference_count_24h": inference_count, 117 "daily_active_users_24h": dau, 118 } 119 120 121 async def send_telemetry(payload: dict) -> bool: 122 """Send telemetry payload to central endpoint. Fire-and-forget.""" 123 try: 124 async with httpx.AsyncClient(timeout=10) as client: 125 resp = await client.post(TELEMETRY_ENDPOINT, json=payload) 126 return resp.status_code == 200 127 except Exception: 128 return False 129 130 131 async def telemetry_loop(): 132 """Background loop: collect and send telemetry every 24 hours.""" 133 from restai.database import get_db_wrapper 134 135 await asyncio.sleep(STARTUP_DELAY_SECONDS) 136 137 while True: 138 try: 139 db_wrapper = get_db_wrapper() 140 payload = collect_telemetry(db_wrapper) 141 await send_telemetry(payload) 142 db_wrapper.db.close() 143 except Exception: 144 logger.debug("Telemetry collection failed", exc_info=True) 145 146 await asyncio.sleep(REPORT_INTERVAL_SECONDS)