/ tests / test_security.py
test_security.py
   1  """
   2  Security tests for RESTai authorization and access control.
   3  
   4  Tests cover: project isolation, team isolation, user isolation,
   5  privilege escalation, team resource validation, authentication edge cases,
   6  tools/settings/proxy authorization, LLM/embedding enumeration,
   7  team deletion, users listing isolation, project team transfer,
   8  input validation, and statistics isolation.
   9  """
  10  
  11  import base64
  12  import random
  13  from datetime import datetime, timedelta, timezone
  14  import pytest
  15  from fastapi.testclient import TestClient
  16  import jwt
  17  
  18  from restai.config import RESTAI_DEFAULT_PASSWORD, RESTAI_AUTH_SECRET
  19  from restai.main import app
  20  
  21  
  22  @pytest.fixture(scope="module")
  23  def client():
  24      with TestClient(app) as c:
  25          yield c
  26  
  27  
  28  # Shared state
  29  suffix = str(random.randint(0, 10000000))
  30  userA_name = f"sec_usera_{suffix}"
  31  userB_name = f"sec_userb_{suffix}"
  32  userC_name = f"sec_userc_{suffix}"
  33  teamA_name = f"sec_teama_{suffix}"
  34  teamB_name = f"sec_teamb_{suffix}"
  35  llmA_name = f"sec_llma_{suffix}"
  36  llmB_name = f"sec_llmb_{suffix}"
  37  embA_name = f"sec_emba_{suffix}"
  38  embB_name = f"sec_embb_{suffix}"
  39  
  40  teamA_id = None
  41  teamB_id = None
  42  projectA_id = None
  43  projectB_id = None
  44  llmA_id = None
  45  llmB_id = None
  46  embA_id = None
  47  embB_id = None
  48  projectA_name = f"sec_proja_{suffix}"
  49  projectB_name = f"sec_projb_{suffix}"
  50  
  51  ADMIN = ("admin", RESTAI_DEFAULT_PASSWORD)
  52  USER_A = (userA_name, "passA123")
  53  USER_B = (userB_name, "passB123")
  54  USER_C = (userC_name, "passC123")
  55  
  56  
  57  # ── Setup ──────────────────────────────────────────────────────────────────
  58  
  59  
  60  def test_security_setup(client):
  61      """Create users, teams, LLMs, embeddings, and projects for security tests."""
  62      global teamA_id, teamB_id, projectA_id, projectB_id, llmA_id, llmB_id, embA_id, embB_id
  63  
  64      # Create users (including userC with no team)
  65      for uname, pwd in [
  66          (userA_name, "passA123"),
  67          (userB_name, "passB123"),
  68          (userC_name, "passC123"),
  69      ]:
  70          r = client.post(
  71              "/users",
  72              json={"username": uname, "password": pwd, "is_admin": False, "is_private": False},
  73              auth=ADMIN,
  74          )
  75          assert r.status_code == 201, f"Failed to create {uname}: {r.text}"
  76  
  77      # Create LLMs
  78      for name in [llmA_name, llmB_name]:
  79          r = client.post(
  80              "/llms",
  81              json={
  82                  "name": name,
  83                  "class_name": "OpenAI",
  84                  "options": {"model": "gpt-test", "api_key": "sk-fake"},
  85                  "privacy": "private",
  86              },
  87              auth=ADMIN,
  88          )
  89          assert r.status_code == 201, f"Failed to create LLM {name}: {r.text}"
  90          if name == llmA_name:
  91              llmA_id = r.json()["id"]
  92          else:
  93              llmB_id = r.json()["id"]
  94  
  95      # Create embeddings
  96      for name in [embA_name, embB_name]:
  97          r = client.post(
  98              "/embeddings",
  99              json={
 100                  "name": name,
 101                  "class_name": "Ollama",
 102                  "options": "{}",
 103                  "privacy": "private",
 104                  "dimension": 768,
 105              },
 106              auth=ADMIN,
 107          )
 108          assert r.status_code == 201, f"Failed to create embedding {name}: {r.text}"
 109          if name == embA_name:
 110              embA_id = r.json()["id"]
 111          else:
 112              embB_id = r.json()["id"]
 113  
 114      # Create teamA with userA as member, llmA, embA
 115      r = client.post(
 116          "/teams",
 117          json={
 118              "name": teamA_name,
 119              "users": [userA_name],
 120              "llms": [llmA_name],
 121              "embeddings": [embA_name],
 122          },
 123          auth=ADMIN,
 124      )
 125      assert r.status_code == 201
 126      teamA_id = r.json()["id"]
 127  
 128      # Create teamB with userB as member, llmB, embB
 129      r = client.post(
 130          "/teams",
 131          json={
 132              "name": teamB_name,
 133              "users": [userB_name],
 134              "llms": [llmB_name],
 135              "embeddings": [embB_name],
 136          },
 137          auth=ADMIN,
 138      )
 139      assert r.status_code == 201
 140      teamB_id = r.json()["id"]
 141  
 142      # Create projectA owned by userA in teamA
 143      r = client.post(
 144          "/projects",
 145          json={
 146              "name": projectA_name,
 147              "llm": llmA_name,
 148              "type": "agent",
 149              "team_id": teamA_id,
 150          },
 151          auth=USER_A,
 152      )
 153      assert r.status_code == 201, f"ProjectA creation failed: {r.status_code} {r.text}"
 154      projectA_id = r.json()["project"]
 155  
 156      # Create projectB owned by userB in teamB
 157      r = client.post(
 158          "/projects",
 159          json={
 160              "name": projectB_name,
 161              "llm": llmB_name,
 162              "type": "agent",
 163              "team_id": teamB_id,
 164          },
 165          auth=USER_B,
 166      )
 167      assert r.status_code == 201
 168      projectB_id = r.json()["project"]
 169  
 170  
 171  # ── Authentication Edge Cases ─────────────────────────────────────────────
 172  
 173  
 174  def test_unauthenticated_access_returns_401(client):
 175      """All protected endpoints must return 401 without credentials."""
 176      for path in ["/projects", "/users", "/teams", "/llms", "/embeddings"]:
 177          r = client.get(path)
 178          assert r.status_code == 401, f"{path} returned {r.status_code} without auth"
 179  
 180  
 181  def test_invalid_password_returns_401(client):
 182      r = client.get("/auth/whoami", auth=(userA_name, "wrong_password"))
 183      assert r.status_code == 401
 184  
 185  
 186  def test_nonexistent_user_returns_401(client):
 187      r = client.get("/auth/whoami", auth=("nonexistent_user_xyz", "somepass"))
 188      assert r.status_code == 401
 189  
 190  
 191  def test_invalid_jwt_cookie_returns_401():
 192      """Use a separate client to avoid polluting shared client cookies."""
 193      with TestClient(app) as c:
 194          c.cookies.set("restai_token", "garbage.jwt.token")
 195          r = c.get("/projects")
 196          assert r.status_code == 401
 197  
 198  
 199  def test_expired_jwt_token_returns_401():
 200      """Craft an expired JWT and verify it's rejected. Uses separate client."""
 201      with TestClient(app) as c:
 202          expired_payload = {
 203              "username": "admin",
 204              "exp": (datetime.now(timezone.utc) - timedelta(hours=1)).timestamp(),
 205          }
 206          expired_token = jwt.encode(expired_payload, RESTAI_AUTH_SECRET, algorithm="HS512")
 207          c.cookies.set("restai_token", expired_token)
 208          r = c.get("/projects")
 209          assert r.status_code == 401
 210  
 211  
 212  def test_malformed_basic_auth_returns_401(client):
 213      """Invalid base64 in Authorization: Basic header must return 401 (validates auth.py fix)."""
 214      r = client.get(
 215          "/projects",
 216          headers={"Authorization": "Basic !!!not-base64!!!"},
 217      )
 218      assert r.status_code == 401
 219  
 220  
 221  def test_bearer_with_invalid_apikey_returns_401(client):
 222      r = client.get(
 223          "/projects",
 224          headers={"Authorization": "Bearer fake-api-key-12345"},
 225      )
 226      assert r.status_code == 401
 227  
 228  
 229  # ── Project Isolation ──────────────────────────────────────────────────────
 230  
 231  
 232  def test_user_cannot_get_other_users_project(client):
 233      r = client.get(f"/projects/{projectB_id}", auth=USER_A)
 234      assert r.status_code == 404
 235  
 236  
 237  def test_user_cannot_edit_other_users_project(client):
 238      r = client.patch(
 239          f"/projects/{projectB_id}",
 240          json={"human_name": "hacked"},
 241          auth=USER_A,
 242      )
 243      assert r.status_code == 404
 244  
 245  
 246  def test_user_cannot_delete_other_users_project(client):
 247      r = client.delete(f"/projects/{projectB_id}", auth=USER_A)
 248      assert r.status_code == 404
 249  
 250  
 251  def test_user_cannot_chat_other_users_private_project(client):
 252      r = client.post(
 253          f"/projects/{projectB_id}/chat",
 254          json={"question": "hello"},
 255          auth=USER_A,
 256      )
 257      assert r.status_code == 404
 258  
 259  
 260  def test_user_cannot_question_other_users_private_project(client):
 261      r = client.post(
 262          f"/projects/{projectB_id}/question",
 263          json={"question": "hello"},
 264          auth=USER_A,
 265      )
 266      assert r.status_code == 404
 267  
 268  
 269  def test_user_can_access_public_project(client):
 270      """Make projectB public, verify userA can access it for chat/question."""
 271      # Make projectB public (as userB)
 272      r = client.patch(
 273          f"/projects/{projectB_id}",
 274          json={"public": True},
 275          auth=USER_B,
 276      )
 277      assert r.status_code == 200
 278  
 279      # userA can now GET the public project
 280      r = client.get(f"/projects/{projectB_id}", auth=USER_A)
 281      assert r.status_code == 200
 282  
 283      # Revert to private
 284      r = client.patch(
 285          f"/projects/{projectB_id}",
 286          json={"public": False},
 287          auth=USER_B,
 288      )
 289      assert r.status_code == 200
 290  
 291  
 292  def test_user_cannot_edit_public_project_they_dont_own(client):
 293      """Even if a project is public, non-owners cannot edit or delete it."""
 294      # Make projectB public
 295      client.patch(f"/projects/{projectB_id}", json={"public": True}, auth=USER_B)
 296  
 297      # userA cannot PATCH it
 298      r = client.patch(
 299          f"/projects/{projectB_id}",
 300          json={"human_name": "hacked"},
 301          auth=USER_A,
 302      )
 303      assert r.status_code == 404
 304  
 305      # userA cannot DELETE it
 306      r = client.delete(f"/projects/{projectB_id}", auth=USER_A)
 307      assert r.status_code == 404
 308  
 309      # Revert
 310      client.patch(f"/projects/{projectB_id}", json={"public": False}, auth=USER_B)
 311  
 312  
 313  def test_user_cannot_create_project_in_other_team(client):
 314      r = client.post(
 315          "/projects",
 316          json={
 317              "name": f"sneaky_proj_{suffix}",
 318              "llm": llmB_name,
 319              "type": "agent",
 320              "team_id": teamB_id,
 321          },
 322          auth=USER_A,
 323      )
 324      assert r.status_code == 403
 325  
 326  
 327  # ── Team Isolation ─────────────────────────────────────────────────────────
 328  
 329  
 330  def test_user_cannot_view_other_team(client):
 331      r = client.get(f"/teams/{teamB_id}", auth=USER_A)
 332      assert r.status_code == 403
 333  
 334  
 335  def test_user_cannot_edit_other_team(client):
 336      r = client.patch(
 337          f"/teams/{teamB_id}",
 338          json={"description": "hacked"},
 339          auth=USER_A,
 340      )
 341      assert r.status_code == 403
 342  
 343  
 344  def test_member_cannot_admin_own_team(client):
 345      """A regular team member (not team admin) cannot edit the team."""
 346      r = client.patch(
 347          f"/teams/{teamA_id}",
 348          json={"description": "modified by member"},
 349          auth=USER_A,
 350      )
 351      assert r.status_code == 403
 352  
 353  
 354  def test_team_admin_can_manage_team(client):
 355      """Make userA a team admin of teamA, verify they can edit it."""
 356      # Promote userA to team admin
 357      r = client.post(f"/teams/{teamA_id}/admins/{userA_name}", auth=ADMIN)
 358      assert r.status_code == 200
 359  
 360      # Now userA can edit teamA
 361      r = client.patch(
 362          f"/teams/{teamA_id}",
 363          json={"description": "edited by team admin"},
 364          auth=USER_A,
 365      )
 366      assert r.status_code == 200
 367  
 368      # Demote userA back to regular member
 369      r = client.delete(f"/teams/{teamA_id}/admins/{userA_name}", auth=ADMIN)
 370      assert r.status_code == 200
 371  
 372  
 373  def test_user_cannot_add_user_to_other_team(client):
 374      r = client.post(
 375          f"/teams/{teamB_id}/users/{userA_name}",
 376          auth=USER_A,
 377      )
 378      assert r.status_code == 403
 379  
 380  
 381  def test_non_admin_cannot_create_team(client):
 382      r = client.post(
 383          "/teams",
 384          json={"name": f"sneaky_team_{suffix}"},
 385          auth=USER_A,
 386      )
 387      assert r.status_code == 403
 388  
 389  
 390  # ── Team Deletion Authorization ───────────────────────────────────────────
 391  
 392  
 393  def test_user_cannot_delete_own_team(client):
 394      """Regular member cannot delete the team they belong to."""
 395      r = client.delete(f"/teams/{teamA_id}", auth=USER_A)
 396      assert r.status_code == 403
 397  
 398  
 399  def test_user_cannot_delete_team_they_dont_belong_to(client):
 400      r = client.delete(f"/teams/{teamB_id}", auth=USER_A)
 401      assert r.status_code == 403
 402  
 403  
 404  def test_team_admin_cannot_delete_team(client):
 405      """Even a team admin cannot delete the team (only platform admins can)."""
 406      # Promote userA to team admin
 407      r = client.post(f"/teams/{teamA_id}/admins/{userA_name}", auth=ADMIN)
 408      assert r.status_code == 200
 409  
 410      # Team admin still cannot delete
 411      r = client.delete(f"/teams/{teamA_id}", auth=USER_A)
 412      assert r.status_code == 403
 413  
 414      # Demote back
 415      r = client.delete(f"/teams/{teamA_id}/admins/{userA_name}", auth=ADMIN)
 416      assert r.status_code == 200
 417  
 418  
 419  # ── User Isolation ─────────────────────────────────────────────────────────
 420  
 421  
 422  def test_user_cannot_view_other_user_profile(client):
 423      r = client.get(f"/users/{userB_name}", auth=USER_A)
 424      assert r.status_code == 404
 425  
 426  
 427  def test_user_cannot_edit_other_user(client):
 428      r = client.patch(
 429          f"/users/{userB_name}",
 430          json={"password": "hacked"},
 431          auth=USER_A,
 432      )
 433      assert r.status_code == 404
 434  
 435  
 436  def test_user_cannot_delete_other_user(client):
 437      r = client.delete(f"/users/{userB_name}", auth=USER_A)
 438      assert r.status_code in [401, 403]
 439  
 440  
 441  def test_non_admin_cannot_create_user(client):
 442      r = client.post(
 443          "/users",
 444          json={"username": "sneaky_user", "password": "pass", "admin": False, "private": False},
 445          auth=USER_A,
 446      )
 447      assert r.status_code == 403
 448  
 449  
 450  def test_user_cannot_view_other_users_apikeys(client):
 451      r = client.get(f"/users/{userB_name}/apikeys", auth=USER_A)
 452      assert r.status_code == 404
 453  
 454  
 455  # ── Privilege Escalation ───────────────────────────────────────────────────
 456  
 457  
 458  def test_user_cannot_self_grant_admin(client):
 459      r = client.patch(
 460          f"/users/{userA_name}",
 461          json={"is_admin": True},
 462          auth=USER_A,
 463      )
 464      assert r.status_code == 403
 465  
 466  
 467  def test_user_cannot_self_remove_private_flag(client):
 468      """Non-admin users cannot remove the is_private flag once set by admin."""
 469      # First set userA as private via admin
 470      r = client.patch(
 471          f"/users/{userA_name}",
 472          json={"is_private": True},
 473          auth=ADMIN,
 474      )
 475      assert r.status_code == 200
 476  
 477      # userA tries to remove the private flag — should be denied
 478      r = client.patch(
 479          f"/users/{userA_name}",
 480          json={"is_private": False},
 481          auth=USER_A,
 482      )
 483      assert r.status_code == 403
 484  
 485      # userA CAN set themselves as private (more restrictive is OK)
 486      r = client.patch(
 487          f"/users/{userA_name}",
 488          json={"is_private": True},
 489          auth=USER_A,
 490      )
 491      assert r.status_code == 200
 492  
 493      # Revert via admin
 494      client.patch(f"/users/{userA_name}", json={"is_private": False}, auth=ADMIN)
 495  
 496  
 497  def test_user_cannot_self_assign_projects(client):
 498      """Non-admin users cannot modify their own project assignments."""
 499      r = client.patch(
 500          f"/users/{userA_name}",
 501          json={"projects": [projectB_name]},
 502          auth=USER_A,
 503      )
 504      assert r.status_code == 403
 505  
 506  
 507  def test_non_admin_cannot_create_llm(client):
 508      r = client.post(
 509          "/llms",
 510          json={
 511              "name": "sneaky_llm",
 512              "class_name": "OpenAI",
 513              "options": {"model": "gpt-test", "api_key": "sk-fake"},
 514              "privacy": "public",
 515          },
 516          auth=USER_A,
 517      )
 518      assert r.status_code == 403
 519  
 520  
 521  def test_non_admin_cannot_create_embedding(client):
 522      r = client.post(
 523          "/embeddings",
 524          json={
 525              "name": "sneaky_emb",
 526              "class_name": "Ollama",
 527              "options": "{}",
 528              "privacy": "public",
 529              "dimension": 768,
 530          },
 531          auth=USER_A,
 532      )
 533      assert r.status_code == 403
 534  
 535  
 536  # ── Team Resource Validation ──────────────────────────────────────────────
 537  
 538  
 539  def test_cannot_create_project_with_unauthorized_llm(client):
 540      """userA cannot create a project using an LLM only assigned to teamB."""
 541      r = client.post(
 542          "/projects",
 543          json={
 544              "name": f"bad_llm_proj_{suffix}",
 545              "llm": llmB_name,
 546              "type": "agent",
 547              "team_id": teamA_id,
 548          },
 549          auth=USER_A,
 550      )
 551      assert r.status_code == 403
 552  
 553  
 554  def test_cannot_change_project_llm_to_unauthorized(client):
 555      """userA cannot change their project's LLM to one not in their team."""
 556      r = client.patch(
 557          f"/projects/{projectA_id}",
 558          json={"llm": llmB_name},
 559          auth=USER_A,
 560      )
 561      assert r.status_code == 403
 562  
 563  
 564  def test_cannot_create_rag_project_with_unauthorized_embedding():
 565      """userA cannot create a RAG project using an embedding only in teamB.
 566  
 567      Note: May return 403 (team check) or raise an exception (embedding
 568      instantiation fails in test env with fake options). Either way, denied.
 569      """
 570      with TestClient(app, raise_server_exceptions=False) as c:
 571          r = c.post(
 572              "/projects",
 573              json={
 574                  "name": f"bad_emb_proj_{suffix}",
 575                  "llm": llmA_name,
 576                  "embeddings": embB_name,
 577                  "vectorstore": "chroma",
 578                  "type": "rag",
 579                  "team_id": teamA_id,
 580              },
 581              auth=USER_A,
 582          )
 583          assert r.status_code in [403, 500], f"Expected denial, got {r.status_code}"
 584  
 585  
 586  # ── Tools Router Authorization ────────────────────────────────────────────
 587  
 588  
 589  def test_tools_mcp_probe_accessible_by_regular_user():
 590      """POST /tools/mcp/probe as regular user should not return 403 (documenting permissive access)."""
 591      with TestClient(app, raise_server_exceptions=False) as c:
 592          r = c.post(
 593              "/tools/mcp/probe",
 594              json={"host": "http://localhost:9999"},
 595              auth=USER_A,
 596          )
 597          # Connection will fail, but auth should pass (not 401/403)
 598          assert r.status_code not in [401, 403], f"Expected auth to pass, got {r.status_code}"
 599  
 600  
 601  def test_tools_ollama_models_accessible_by_regular_user():
 602      """POST /tools/ollama/models as regular user should not return 403."""
 603      with TestClient(app, raise_server_exceptions=False) as c:
 604          r = c.post(
 605              "/tools/ollama/models",
 606              json={"host": "http://localhost:11434"},
 607              auth=USER_A,
 608          )
 609          assert r.status_code not in [401, 403], f"Expected auth to pass, got {r.status_code}"
 610  
 611  
 612  def test_tools_ollama_pull_accessible_by_regular_user():
 613      """POST /tools/ollama/pull as regular user should not return 403."""
 614      with TestClient(app, raise_server_exceptions=False) as c:
 615          r = c.post(
 616              "/tools/ollama/pull",
 617              json={"host": "http://localhost:11434", "model": "test"},
 618              auth=USER_A,
 619          )
 620          assert r.status_code not in [401, 403], f"Expected auth to pass, got {r.status_code}"
 621  
 622  
 623  def test_tools_classifier_requires_auth(client):
 624      r = client.post("/tools/classifier", json={"question": "test", "llm": "test"})
 625      assert r.status_code == 401
 626  
 627  
 628  def test_tools_agent_requires_auth(client):
 629      r = client.get("/tools/agent")
 630      assert r.status_code == 401
 631  
 632  
 633  # ── LLM/Embedding Enumeration ────────────────────────────────────────────
 634  
 635  
 636  def test_any_user_can_list_all_llms(client):
 637      """Users can only see LLMs accessible via their teams."""
 638      r = client.get("/llms", auth=USER_A)
 639      assert r.status_code == 200
 640      llm_names = [llm["name"] for llm in r.json()]
 641      # User A belongs to team A which has llmA — should see it
 642      assert llmA_name in llm_names
 643      # User A does NOT belong to team B — should NOT see llmB
 644      assert llmB_name not in llm_names
 645  
 646  
 647  def test_any_user_can_get_specific_llm(client):
 648      r = client.get(f"/llms/{llmB_id}", auth=USER_A)
 649      assert r.status_code == 200
 650  
 651  
 652  def test_any_user_can_list_all_embeddings(client):
 653      """Users can only see embeddings accessible via their teams."""
 654      r = client.get("/embeddings", auth=USER_A)
 655      assert r.status_code == 200
 656      emb_names = [e["name"] for e in r.json()]
 657      # User A belongs to team A which has embA — should see it
 658      assert embA_name in emb_names
 659      # User A does NOT belong to team B — should NOT see embB
 660      assert embB_name not in emb_names
 661  
 662  
 663  def test_any_user_can_get_specific_embedding(client):
 664      r = client.get(f"/embeddings/{embB_id}", auth=USER_A)
 665      assert r.status_code == 200
 666  
 667  
 668  def test_llm_api_keys_are_masked(client):
 669      """API keys in LLM options should be masked when returned."""
 670      r = client.get(f"/llms/{llmA_id}", auth=ADMIN)
 671      assert r.status_code == 200
 672      data = r.json()
 673      if isinstance(data.get("options"), dict):
 674          api_key = data["options"].get("api_key", "")
 675          assert api_key == "********" or api_key.startswith("****"), \
 676              f"API key not masked: {api_key}"
 677  
 678  
 679  # ── Settings & Proxy Authorization ────────────────────────────────────────
 680  
 681  
 682  def test_non_admin_cannot_get_settings(client):
 683      r = client.get("/settings", auth=USER_A)
 684      assert r.status_code == 403
 685  
 686  
 687  def test_non_admin_cannot_patch_settings(client):
 688      r = client.patch("/settings", json={}, auth=USER_A)
 689      assert r.status_code == 403
 690  
 691  
 692  def test_non_admin_cannot_get_proxy_keys(client):
 693      r = client.get("/proxy/keys", auth=USER_A)
 694      assert r.status_code == 403
 695  
 696  
 697  def test_non_admin_cannot_create_proxy_key(client):
 698      r = client.post(
 699          "/proxy/keys",
 700          json={"name": "sneaky_key"},
 701          auth=USER_A,
 702      )
 703      assert r.status_code == 403
 704  
 705  
 706  # ── Users Listing Isolation ───────────────────────────────────────────────
 707  
 708  
 709  def test_non_admin_users_listing_only_shows_teammates(client):
 710      """Non-admin user should only see users from their own teams."""
 711      r = client.get("/users", auth=USER_A)
 712      assert r.status_code == 200
 713      usernames = [u["username"] for u in r.json()["users"]]
 714      assert userA_name in usernames
 715      assert userB_name not in usernames
 716  
 717  
 718  def test_admin_users_listing_shows_all(client):
 719      r = client.get("/users", auth=ADMIN)
 720      assert r.status_code == 200
 721      usernames = [u["username"] for u in r.json()["users"]]
 722      assert userA_name in usernames
 723      assert userB_name in usernames
 724      assert userC_name in usernames
 725  
 726  
 727  def test_non_admin_listing_returns_limited_schema(client):
 728      """Non-admin users listing should return limited user objects without sensitive fields."""
 729      r = client.get("/users", auth=USER_A)
 730      assert r.status_code == 200
 731      users = r.json()["users"]
 732      for u in users:
 733          assert "is_admin" not in u, f"is_admin leaked for {u.get('username')}"
 734          assert "api_keys" not in u, f"api_keys leaked for {u.get('username')}"
 735  
 736  
 737  # ── Project Team Transfer ─────────────────────────────────────────────────
 738  
 739  
 740  def test_cannot_change_project_team_to_unauthorized_team(client):
 741      """userA cannot move their project to a team they don't belong to."""
 742      r = client.patch(
 743          f"/projects/{projectA_id}",
 744          json={"team_id": teamB_id},
 745          auth=USER_A,
 746      )
 747      assert r.status_code == 403
 748  
 749  
 750  def test_cannot_change_project_llm_to_one_not_in_current_team(client):
 751      """userA cannot change project LLM to one not available in the project's current team."""
 752      r = client.patch(
 753          f"/projects/{projectA_id}",
 754          json={"llm": llmB_name},
 755          auth=USER_A,
 756      )
 757      assert r.status_code == 403
 758  
 759  
 760  def test_admin_can_change_project_team(client):
 761      """Admin can transfer a project to any team (sanity check)."""
 762      # Add llmA to teamB so the project's LLM is valid in the new team
 763      r = client.patch(
 764          f"/teams/{teamB_id}",
 765          json={"llms": [llmB_name, llmA_name]},
 766          auth=ADMIN,
 767      )
 768      assert r.status_code == 200
 769  
 770      # Move projectA to teamB as admin (keep same LLM)
 771      r = client.patch(
 772          f"/projects/{projectA_id}",
 773          json={"team_id": teamB_id},
 774          auth=ADMIN,
 775      )
 776      assert r.status_code == 200
 777  
 778      # Revert: move back to teamA
 779      r = client.patch(
 780          f"/projects/{projectA_id}",
 781          json={"team_id": teamA_id},
 782          auth=ADMIN,
 783      )
 784      assert r.status_code == 200
 785  
 786      # Remove llmA from teamB
 787      r = client.patch(
 788          f"/teams/{teamB_id}",
 789          json={"llms": [llmB_name]},
 790          auth=ADMIN,
 791      )
 792      assert r.status_code == 200
 793  
 794  
 795  # ── Admin Override ─────────────────────────────────────────────────────────
 796  
 797  
 798  def test_admin_can_access_any_project(client):
 799      # Admin can GET any project
 800      r = client.get(f"/projects/{projectB_id}", auth=ADMIN)
 801      assert r.status_code == 200
 802  
 803      # Admin can PATCH any project
 804      r = client.patch(
 805          f"/projects/{projectB_id}",
 806          json={"human_name": "Admin edited"},
 807          auth=ADMIN,
 808      )
 809      assert r.status_code == 200
 810  
 811      # Revert
 812      client.patch(
 813          f"/projects/{projectB_id}",
 814          json={"human_name": None},
 815          auth=ADMIN,
 816      )
 817  
 818  
 819  def test_admin_can_access_any_team(client):
 820      r = client.get(f"/teams/{teamB_id}", auth=ADMIN)
 821      assert r.status_code == 200
 822  
 823      r = client.patch(
 824          f"/teams/{teamB_id}",
 825          json={"description": "Admin edited"},
 826          auth=ADMIN,
 827      )
 828      assert r.status_code == 200
 829  
 830  
 831  def test_admin_can_manage_any_user(client):
 832      # Admin can GET any user
 833      r = client.get(f"/users/{userB_name}", auth=ADMIN)
 834      assert r.status_code == 200
 835  
 836      # Admin can PATCH any user
 837      r = client.patch(
 838          f"/users/{userB_name}",
 839          json={"is_private": True},
 840          auth=ADMIN,
 841      )
 842      assert r.status_code == 200
 843  
 844      # Revert
 845      client.patch(f"/users/{userB_name}", json={"is_private": False}, auth=ADMIN)
 846  
 847  
 848  # ── Projects listing isolation ─────────────────────────────────────────────
 849  
 850  
 851  def test_user_only_sees_own_projects(client):
 852      """userA should only see their own projects in the listing."""
 853      r = client.get("/projects", auth=USER_A)
 854      assert r.status_code == 200
 855      projects = r.json()["projects"]
 856      project_names = [p["name"] for p in projects]
 857      assert projectA_name in project_names
 858      assert projectB_name not in project_names
 859  
 860  
 861  def test_user_sees_public_projects_in_listing(client):
 862      """Public projects should appear when filtering by public."""
 863      # Make projectB public
 864      client.patch(f"/projects/{projectB_id}", json={"public": True}, auth=USER_B)
 865  
 866      # Default listing only shows own projects
 867      r = client.get("/projects", auth=USER_A)
 868      assert r.status_code == 200
 869      own_ids = [p["id"] for p in r.json()["projects"]]
 870      assert projectB_id not in own_ids
 871  
 872      # Public filter shows public projects
 873      r = client.get("/projects?filter=public", auth=USER_A)
 874      assert r.status_code == 200
 875      public_ids = [p["id"] for p in r.json()["projects"]]
 876      assert projectB_id in public_ids
 877  
 878      # Revert
 879      client.patch(f"/projects/{projectB_id}", json={"public": False}, auth=USER_B)
 880  
 881  
 882  # ── Input Validation ──────────────────────────────────────────────────────
 883  
 884  
 885  def test_create_project_empty_name_fails(client):
 886      r = client.post(
 887          "/projects",
 888          json={"name": "", "llm": llmA_name, "type": "agent", "team_id": teamA_id},
 889          auth=USER_A,
 890      )
 891      assert r.status_code in (400, 422), f"Expected 400 or 422, got {r.status_code}"
 892  
 893  
 894  def test_create_project_whitespace_name_fails(client):
 895      r = client.post(
 896          "/projects",
 897          json={"name": "   ", "llm": llmA_name, "type": "agent", "team_id": teamA_id},
 898          auth=USER_A,
 899      )
 900      assert r.status_code in (400, 422), f"Expected 400 or 422, got {r.status_code}"
 901  
 902  
 903  def test_project_name_sanitization():
 904      """Special characters in project name should be sanitized, not cause errors."""
 905      test_name = f"test<>proj&{suffix}"
 906      with TestClient(app, raise_server_exceptions=False) as c:
 907          r = c.post(
 908              "/projects",
 909              json={"name": test_name, "llm": llmA_name, "type": "agent", "team_id": teamA_id},
 910              auth=USER_A,
 911          )
 912          # Should succeed with sanitized name or fail gracefully (not 500)
 913          assert r.status_code != 500, f"Server error on special chars: {r.text}"
 914          if r.status_code == 201:
 915              # Clean up
 916              pid = r.json()["project"]
 917              c.delete(f"/projects/{pid}", auth=ADMIN)
 918  
 919  
 920  def test_very_long_project_name():
 921      """A very long project name should not cause a 500 error."""
 922      long_name = "a" * 1000 + f"_{suffix}"
 923      with TestClient(app, raise_server_exceptions=False) as c:
 924          r = c.post(
 925              "/projects",
 926              json={"name": long_name, "llm": llmA_name, "type": "agent", "team_id": teamA_id},
 927              auth=USER_A,
 928          )
 929          assert r.status_code != 500, f"Server error on long name: {r.text}"
 930          if r.status_code == 201:
 931              pid = r.json()["project"]
 932              c.delete(f"/projects/{pid}", auth=ADMIN)
 933  
 934  
 935  def test_sql_injection_in_project_name():
 936      """SQL injection attempt in project name should be sanitized."""
 937      with TestClient(app, raise_server_exceptions=False) as c:
 938          r = c.post(
 939              "/projects",
 940              json={
 941                  "name": f"'; DROP TABLE projects; --{suffix}",
 942                  "llm": llmA_name,
 943                  "type": "agent",
 944                  "team_id": teamA_id,
 945              },
 946              auth=USER_A,
 947          )
 948          assert r.status_code != 500, f"Server error on SQL injection attempt: {r.text}"
 949          if r.status_code == 201:
 950              pid = r.json()["project"]
 951              c.delete(f"/projects/{pid}", auth=ADMIN)
 952  
 953  
 954  def test_xss_in_username():
 955      """XSS attempt in username should be sanitized."""
 956      with TestClient(app, raise_server_exceptions=False) as c:
 957          r = c.post(
 958              "/users",
 959              json={
 960                  "username": "<script>alert(1)</script>",
 961                  "password": "testpass",
 962                  "is_admin": False,
 963                  "is_private": False,
 964              },
 965              auth=ADMIN,
 966          )
 967          assert r.status_code != 500, f"Server error on XSS attempt: {r.text}"
 968          if r.status_code == 201:
 969              created_name = r.json()["username"]
 970              # Verify the name was sanitized (no angle brackets)
 971              assert "<" not in created_name
 972              assert ">" not in created_name
 973              # Clean up
 974              c.delete(f"/users/{created_name}", auth=ADMIN)
 975  
 976  
 977  # ── Statistics Isolation ──────────────────────────────────────────────────
 978  
 979  
 980  def test_statistics_non_admin_only_sees_own_projects():
 981      """Non-admin user should not see other users' projects in statistics."""
 982      with TestClient(app, raise_server_exceptions=False) as c:
 983          r = c.get("/statistics/top-projects", auth=USER_A)
 984          if r.status_code == 200:
 985              projects = r.json().get("projects", [])
 986              project_names = [p["name"] for p in projects]
 987              assert projectB_name not in project_names, \
 988                  f"userA can see projectB in statistics"
 989  
 990  
 991  # ── Cleanup ────────────────────────────────────────────────────────────────
 992  
 993  
 994  def test_security_cleanup(client):
 995      """Clean up all resources created by security tests."""
 996      # Delete projects
 997      for pid in [projectA_id, projectB_id]:
 998          if pid:
 999              client.delete(f"/projects/{pid}", auth=ADMIN)
1000  
1001      # Delete teams
1002      for tid in [teamA_id, teamB_id]:
1003          if tid:
1004              client.delete(f"/teams/{tid}", auth=ADMIN)
1005  
1006      # Delete LLMs
1007      for name in [llmA_name, llmB_name]:
1008          client.delete(f"/llms/{name}", auth=ADMIN)
1009  
1010      # Delete embeddings
1011      for name in [embA_name, embB_name]:
1012          client.delete(f"/embeddings/{name}", auth=ADMIN)
1013  
1014      # Delete users
1015      for uname in [userA_name, userB_name, userC_name]:
1016          client.delete(f"/users/{uname}", auth=ADMIN)