test_security.py
1 """ 2 Security tests for RESTai authorization and access control. 3 4 Tests cover: project isolation, team isolation, user isolation, 5 privilege escalation, team resource validation, authentication edge cases, 6 tools/settings/proxy authorization, LLM/embedding enumeration, 7 team deletion, users listing isolation, project team transfer, 8 input validation, and statistics isolation. 9 """ 10 11 import base64 12 import random 13 from datetime import datetime, timedelta, timezone 14 import pytest 15 from fastapi.testclient import TestClient 16 import jwt 17 18 from restai.config import RESTAI_DEFAULT_PASSWORD, RESTAI_AUTH_SECRET 19 from restai.main import app 20 21 22 @pytest.fixture(scope="module") 23 def client(): 24 with TestClient(app) as c: 25 yield c 26 27 28 # Shared state 29 suffix = str(random.randint(0, 10000000)) 30 userA_name = f"sec_usera_{suffix}" 31 userB_name = f"sec_userb_{suffix}" 32 userC_name = f"sec_userc_{suffix}" 33 teamA_name = f"sec_teama_{suffix}" 34 teamB_name = f"sec_teamb_{suffix}" 35 llmA_name = f"sec_llma_{suffix}" 36 llmB_name = f"sec_llmb_{suffix}" 37 embA_name = f"sec_emba_{suffix}" 38 embB_name = f"sec_embb_{suffix}" 39 40 teamA_id = None 41 teamB_id = None 42 projectA_id = None 43 projectB_id = None 44 llmA_id = None 45 llmB_id = None 46 embA_id = None 47 embB_id = None 48 projectA_name = f"sec_proja_{suffix}" 49 projectB_name = f"sec_projb_{suffix}" 50 51 ADMIN = ("admin", RESTAI_DEFAULT_PASSWORD) 52 USER_A = (userA_name, "passA123") 53 USER_B = (userB_name, "passB123") 54 USER_C = (userC_name, "passC123") 55 56 57 # ── Setup ────────────────────────────────────────────────────────────────── 58 59 60 def test_security_setup(client): 61 """Create users, teams, LLMs, embeddings, and projects for security tests.""" 62 global teamA_id, teamB_id, projectA_id, projectB_id, llmA_id, llmB_id, embA_id, embB_id 63 64 # Create users (including userC with no team) 65 for uname, pwd in [ 66 (userA_name, "passA123"), 67 (userB_name, "passB123"), 68 (userC_name, "passC123"), 69 ]: 70 r = client.post( 71 "/users", 72 json={"username": uname, "password": pwd, "is_admin": False, "is_private": False}, 73 auth=ADMIN, 74 ) 75 assert r.status_code == 201, f"Failed to create {uname}: {r.text}" 76 77 # Create LLMs 78 for name in [llmA_name, llmB_name]: 79 r = client.post( 80 "/llms", 81 json={ 82 "name": name, 83 "class_name": "OpenAI", 84 "options": {"model": "gpt-test", "api_key": "sk-fake"}, 85 "privacy": "private", 86 }, 87 auth=ADMIN, 88 ) 89 assert r.status_code == 201, f"Failed to create LLM {name}: {r.text}" 90 if name == llmA_name: 91 llmA_id = r.json()["id"] 92 else: 93 llmB_id = r.json()["id"] 94 95 # Create embeddings 96 for name in [embA_name, embB_name]: 97 r = client.post( 98 "/embeddings", 99 json={ 100 "name": name, 101 "class_name": "Ollama", 102 "options": "{}", 103 "privacy": "private", 104 "dimension": 768, 105 }, 106 auth=ADMIN, 107 ) 108 assert r.status_code == 201, f"Failed to create embedding {name}: {r.text}" 109 if name == embA_name: 110 embA_id = r.json()["id"] 111 else: 112 embB_id = r.json()["id"] 113 114 # Create teamA with userA as member, llmA, embA 115 r = client.post( 116 "/teams", 117 json={ 118 "name": teamA_name, 119 "users": [userA_name], 120 "llms": [llmA_name], 121 "embeddings": [embA_name], 122 }, 123 auth=ADMIN, 124 ) 125 assert r.status_code == 201 126 teamA_id = r.json()["id"] 127 128 # Create teamB with userB as member, llmB, embB 129 r = client.post( 130 "/teams", 131 json={ 132 "name": teamB_name, 133 "users": [userB_name], 134 "llms": [llmB_name], 135 "embeddings": [embB_name], 136 }, 137 auth=ADMIN, 138 ) 139 assert r.status_code == 201 140 teamB_id = r.json()["id"] 141 142 # Create projectA owned by userA in teamA 143 r = client.post( 144 "/projects", 145 json={ 146 "name": projectA_name, 147 "llm": llmA_name, 148 "type": "agent", 149 "team_id": teamA_id, 150 }, 151 auth=USER_A, 152 ) 153 assert r.status_code == 201, f"ProjectA creation failed: {r.status_code} {r.text}" 154 projectA_id = r.json()["project"] 155 156 # Create projectB owned by userB in teamB 157 r = client.post( 158 "/projects", 159 json={ 160 "name": projectB_name, 161 "llm": llmB_name, 162 "type": "agent", 163 "team_id": teamB_id, 164 }, 165 auth=USER_B, 166 ) 167 assert r.status_code == 201 168 projectB_id = r.json()["project"] 169 170 171 # ── Authentication Edge Cases ───────────────────────────────────────────── 172 173 174 def test_unauthenticated_access_returns_401(client): 175 """All protected endpoints must return 401 without credentials.""" 176 for path in ["/projects", "/users", "/teams", "/llms", "/embeddings"]: 177 r = client.get(path) 178 assert r.status_code == 401, f"{path} returned {r.status_code} without auth" 179 180 181 def test_invalid_password_returns_401(client): 182 r = client.get("/auth/whoami", auth=(userA_name, "wrong_password")) 183 assert r.status_code == 401 184 185 186 def test_nonexistent_user_returns_401(client): 187 r = client.get("/auth/whoami", auth=("nonexistent_user_xyz", "somepass")) 188 assert r.status_code == 401 189 190 191 def test_invalid_jwt_cookie_returns_401(): 192 """Use a separate client to avoid polluting shared client cookies.""" 193 with TestClient(app) as c: 194 c.cookies.set("restai_token", "garbage.jwt.token") 195 r = c.get("/projects") 196 assert r.status_code == 401 197 198 199 def test_expired_jwt_token_returns_401(): 200 """Craft an expired JWT and verify it's rejected. Uses separate client.""" 201 with TestClient(app) as c: 202 expired_payload = { 203 "username": "admin", 204 "exp": (datetime.now(timezone.utc) - timedelta(hours=1)).timestamp(), 205 } 206 expired_token = jwt.encode(expired_payload, RESTAI_AUTH_SECRET, algorithm="HS512") 207 c.cookies.set("restai_token", expired_token) 208 r = c.get("/projects") 209 assert r.status_code == 401 210 211 212 def test_malformed_basic_auth_returns_401(client): 213 """Invalid base64 in Authorization: Basic header must return 401 (validates auth.py fix).""" 214 r = client.get( 215 "/projects", 216 headers={"Authorization": "Basic !!!not-base64!!!"}, 217 ) 218 assert r.status_code == 401 219 220 221 def test_bearer_with_invalid_apikey_returns_401(client): 222 r = client.get( 223 "/projects", 224 headers={"Authorization": "Bearer fake-api-key-12345"}, 225 ) 226 assert r.status_code == 401 227 228 229 # ── Project Isolation ────────────────────────────────────────────────────── 230 231 232 def test_user_cannot_get_other_users_project(client): 233 r = client.get(f"/projects/{projectB_id}", auth=USER_A) 234 assert r.status_code == 404 235 236 237 def test_user_cannot_edit_other_users_project(client): 238 r = client.patch( 239 f"/projects/{projectB_id}", 240 json={"human_name": "hacked"}, 241 auth=USER_A, 242 ) 243 assert r.status_code == 404 244 245 246 def test_user_cannot_delete_other_users_project(client): 247 r = client.delete(f"/projects/{projectB_id}", auth=USER_A) 248 assert r.status_code == 404 249 250 251 def test_user_cannot_chat_other_users_private_project(client): 252 r = client.post( 253 f"/projects/{projectB_id}/chat", 254 json={"question": "hello"}, 255 auth=USER_A, 256 ) 257 assert r.status_code == 404 258 259 260 def test_user_cannot_question_other_users_private_project(client): 261 r = client.post( 262 f"/projects/{projectB_id}/question", 263 json={"question": "hello"}, 264 auth=USER_A, 265 ) 266 assert r.status_code == 404 267 268 269 def test_user_can_access_public_project(client): 270 """Make projectB public, verify userA can access it for chat/question.""" 271 # Make projectB public (as userB) 272 r = client.patch( 273 f"/projects/{projectB_id}", 274 json={"public": True}, 275 auth=USER_B, 276 ) 277 assert r.status_code == 200 278 279 # userA can now GET the public project 280 r = client.get(f"/projects/{projectB_id}", auth=USER_A) 281 assert r.status_code == 200 282 283 # Revert to private 284 r = client.patch( 285 f"/projects/{projectB_id}", 286 json={"public": False}, 287 auth=USER_B, 288 ) 289 assert r.status_code == 200 290 291 292 def test_user_cannot_edit_public_project_they_dont_own(client): 293 """Even if a project is public, non-owners cannot edit or delete it.""" 294 # Make projectB public 295 client.patch(f"/projects/{projectB_id}", json={"public": True}, auth=USER_B) 296 297 # userA cannot PATCH it 298 r = client.patch( 299 f"/projects/{projectB_id}", 300 json={"human_name": "hacked"}, 301 auth=USER_A, 302 ) 303 assert r.status_code == 404 304 305 # userA cannot DELETE it 306 r = client.delete(f"/projects/{projectB_id}", auth=USER_A) 307 assert r.status_code == 404 308 309 # Revert 310 client.patch(f"/projects/{projectB_id}", json={"public": False}, auth=USER_B) 311 312 313 def test_user_cannot_create_project_in_other_team(client): 314 r = client.post( 315 "/projects", 316 json={ 317 "name": f"sneaky_proj_{suffix}", 318 "llm": llmB_name, 319 "type": "agent", 320 "team_id": teamB_id, 321 }, 322 auth=USER_A, 323 ) 324 assert r.status_code == 403 325 326 327 # ── Team Isolation ───────────────────────────────────────────────────────── 328 329 330 def test_user_cannot_view_other_team(client): 331 r = client.get(f"/teams/{teamB_id}", auth=USER_A) 332 assert r.status_code == 403 333 334 335 def test_user_cannot_edit_other_team(client): 336 r = client.patch( 337 f"/teams/{teamB_id}", 338 json={"description": "hacked"}, 339 auth=USER_A, 340 ) 341 assert r.status_code == 403 342 343 344 def test_member_cannot_admin_own_team(client): 345 """A regular team member (not team admin) cannot edit the team.""" 346 r = client.patch( 347 f"/teams/{teamA_id}", 348 json={"description": "modified by member"}, 349 auth=USER_A, 350 ) 351 assert r.status_code == 403 352 353 354 def test_team_admin_can_manage_team(client): 355 """Make userA a team admin of teamA, verify they can edit it.""" 356 # Promote userA to team admin 357 r = client.post(f"/teams/{teamA_id}/admins/{userA_name}", auth=ADMIN) 358 assert r.status_code == 200 359 360 # Now userA can edit teamA 361 r = client.patch( 362 f"/teams/{teamA_id}", 363 json={"description": "edited by team admin"}, 364 auth=USER_A, 365 ) 366 assert r.status_code == 200 367 368 # Demote userA back to regular member 369 r = client.delete(f"/teams/{teamA_id}/admins/{userA_name}", auth=ADMIN) 370 assert r.status_code == 200 371 372 373 def test_user_cannot_add_user_to_other_team(client): 374 r = client.post( 375 f"/teams/{teamB_id}/users/{userA_name}", 376 auth=USER_A, 377 ) 378 assert r.status_code == 403 379 380 381 def test_non_admin_cannot_create_team(client): 382 r = client.post( 383 "/teams", 384 json={"name": f"sneaky_team_{suffix}"}, 385 auth=USER_A, 386 ) 387 assert r.status_code == 403 388 389 390 # ── Team Deletion Authorization ─────────────────────────────────────────── 391 392 393 def test_user_cannot_delete_own_team(client): 394 """Regular member cannot delete the team they belong to.""" 395 r = client.delete(f"/teams/{teamA_id}", auth=USER_A) 396 assert r.status_code == 403 397 398 399 def test_user_cannot_delete_team_they_dont_belong_to(client): 400 r = client.delete(f"/teams/{teamB_id}", auth=USER_A) 401 assert r.status_code == 403 402 403 404 def test_team_admin_cannot_delete_team(client): 405 """Even a team admin cannot delete the team (only platform admins can).""" 406 # Promote userA to team admin 407 r = client.post(f"/teams/{teamA_id}/admins/{userA_name}", auth=ADMIN) 408 assert r.status_code == 200 409 410 # Team admin still cannot delete 411 r = client.delete(f"/teams/{teamA_id}", auth=USER_A) 412 assert r.status_code == 403 413 414 # Demote back 415 r = client.delete(f"/teams/{teamA_id}/admins/{userA_name}", auth=ADMIN) 416 assert r.status_code == 200 417 418 419 # ── User Isolation ───────────────────────────────────────────────────────── 420 421 422 def test_user_cannot_view_other_user_profile(client): 423 r = client.get(f"/users/{userB_name}", auth=USER_A) 424 assert r.status_code == 404 425 426 427 def test_user_cannot_edit_other_user(client): 428 r = client.patch( 429 f"/users/{userB_name}", 430 json={"password": "hacked"}, 431 auth=USER_A, 432 ) 433 assert r.status_code == 404 434 435 436 def test_user_cannot_delete_other_user(client): 437 r = client.delete(f"/users/{userB_name}", auth=USER_A) 438 assert r.status_code in [401, 403] 439 440 441 def test_non_admin_cannot_create_user(client): 442 r = client.post( 443 "/users", 444 json={"username": "sneaky_user", "password": "pass", "admin": False, "private": False}, 445 auth=USER_A, 446 ) 447 assert r.status_code == 403 448 449 450 def test_user_cannot_view_other_users_apikeys(client): 451 r = client.get(f"/users/{userB_name}/apikeys", auth=USER_A) 452 assert r.status_code == 404 453 454 455 # ── Privilege Escalation ─────────────────────────────────────────────────── 456 457 458 def test_user_cannot_self_grant_admin(client): 459 r = client.patch( 460 f"/users/{userA_name}", 461 json={"is_admin": True}, 462 auth=USER_A, 463 ) 464 assert r.status_code == 403 465 466 467 def test_user_cannot_self_remove_private_flag(client): 468 """Non-admin users cannot remove the is_private flag once set by admin.""" 469 # First set userA as private via admin 470 r = client.patch( 471 f"/users/{userA_name}", 472 json={"is_private": True}, 473 auth=ADMIN, 474 ) 475 assert r.status_code == 200 476 477 # userA tries to remove the private flag — should be denied 478 r = client.patch( 479 f"/users/{userA_name}", 480 json={"is_private": False}, 481 auth=USER_A, 482 ) 483 assert r.status_code == 403 484 485 # userA CAN set themselves as private (more restrictive is OK) 486 r = client.patch( 487 f"/users/{userA_name}", 488 json={"is_private": True}, 489 auth=USER_A, 490 ) 491 assert r.status_code == 200 492 493 # Revert via admin 494 client.patch(f"/users/{userA_name}", json={"is_private": False}, auth=ADMIN) 495 496 497 def test_user_cannot_self_assign_projects(client): 498 """Non-admin users cannot modify their own project assignments.""" 499 r = client.patch( 500 f"/users/{userA_name}", 501 json={"projects": [projectB_name]}, 502 auth=USER_A, 503 ) 504 assert r.status_code == 403 505 506 507 def test_non_admin_cannot_create_llm(client): 508 r = client.post( 509 "/llms", 510 json={ 511 "name": "sneaky_llm", 512 "class_name": "OpenAI", 513 "options": {"model": "gpt-test", "api_key": "sk-fake"}, 514 "privacy": "public", 515 }, 516 auth=USER_A, 517 ) 518 assert r.status_code == 403 519 520 521 def test_non_admin_cannot_create_embedding(client): 522 r = client.post( 523 "/embeddings", 524 json={ 525 "name": "sneaky_emb", 526 "class_name": "Ollama", 527 "options": "{}", 528 "privacy": "public", 529 "dimension": 768, 530 }, 531 auth=USER_A, 532 ) 533 assert r.status_code == 403 534 535 536 # ── Team Resource Validation ────────────────────────────────────────────── 537 538 539 def test_cannot_create_project_with_unauthorized_llm(client): 540 """userA cannot create a project using an LLM only assigned to teamB.""" 541 r = client.post( 542 "/projects", 543 json={ 544 "name": f"bad_llm_proj_{suffix}", 545 "llm": llmB_name, 546 "type": "agent", 547 "team_id": teamA_id, 548 }, 549 auth=USER_A, 550 ) 551 assert r.status_code == 403 552 553 554 def test_cannot_change_project_llm_to_unauthorized(client): 555 """userA cannot change their project's LLM to one not in their team.""" 556 r = client.patch( 557 f"/projects/{projectA_id}", 558 json={"llm": llmB_name}, 559 auth=USER_A, 560 ) 561 assert r.status_code == 403 562 563 564 def test_cannot_create_rag_project_with_unauthorized_embedding(): 565 """userA cannot create a RAG project using an embedding only in teamB. 566 567 Note: May return 403 (team check) or raise an exception (embedding 568 instantiation fails in test env with fake options). Either way, denied. 569 """ 570 with TestClient(app, raise_server_exceptions=False) as c: 571 r = c.post( 572 "/projects", 573 json={ 574 "name": f"bad_emb_proj_{suffix}", 575 "llm": llmA_name, 576 "embeddings": embB_name, 577 "vectorstore": "chroma", 578 "type": "rag", 579 "team_id": teamA_id, 580 }, 581 auth=USER_A, 582 ) 583 assert r.status_code in [403, 500], f"Expected denial, got {r.status_code}" 584 585 586 # ── Tools Router Authorization ──────────────────────────────────────────── 587 588 589 def test_tools_mcp_probe_accessible_by_regular_user(): 590 """POST /tools/mcp/probe as regular user should not return 403 (documenting permissive access).""" 591 with TestClient(app, raise_server_exceptions=False) as c: 592 r = c.post( 593 "/tools/mcp/probe", 594 json={"host": "http://localhost:9999"}, 595 auth=USER_A, 596 ) 597 # Connection will fail, but auth should pass (not 401/403) 598 assert r.status_code not in [401, 403], f"Expected auth to pass, got {r.status_code}" 599 600 601 def test_tools_ollama_models_accessible_by_regular_user(): 602 """POST /tools/ollama/models as regular user should not return 403.""" 603 with TestClient(app, raise_server_exceptions=False) as c: 604 r = c.post( 605 "/tools/ollama/models", 606 json={"host": "http://localhost:11434"}, 607 auth=USER_A, 608 ) 609 assert r.status_code not in [401, 403], f"Expected auth to pass, got {r.status_code}" 610 611 612 def test_tools_ollama_pull_accessible_by_regular_user(): 613 """POST /tools/ollama/pull as regular user should not return 403.""" 614 with TestClient(app, raise_server_exceptions=False) as c: 615 r = c.post( 616 "/tools/ollama/pull", 617 json={"host": "http://localhost:11434", "model": "test"}, 618 auth=USER_A, 619 ) 620 assert r.status_code not in [401, 403], f"Expected auth to pass, got {r.status_code}" 621 622 623 def test_tools_classifier_requires_auth(client): 624 r = client.post("/tools/classifier", json={"question": "test", "llm": "test"}) 625 assert r.status_code == 401 626 627 628 def test_tools_agent_requires_auth(client): 629 r = client.get("/tools/agent") 630 assert r.status_code == 401 631 632 633 # ── LLM/Embedding Enumeration ──────────────────────────────────────────── 634 635 636 def test_any_user_can_list_all_llms(client): 637 """Users can only see LLMs accessible via their teams.""" 638 r = client.get("/llms", auth=USER_A) 639 assert r.status_code == 200 640 llm_names = [llm["name"] for llm in r.json()] 641 # User A belongs to team A which has llmA — should see it 642 assert llmA_name in llm_names 643 # User A does NOT belong to team B — should NOT see llmB 644 assert llmB_name not in llm_names 645 646 647 def test_any_user_can_get_specific_llm(client): 648 r = client.get(f"/llms/{llmB_id}", auth=USER_A) 649 assert r.status_code == 200 650 651 652 def test_any_user_can_list_all_embeddings(client): 653 """Users can only see embeddings accessible via their teams.""" 654 r = client.get("/embeddings", auth=USER_A) 655 assert r.status_code == 200 656 emb_names = [e["name"] for e in r.json()] 657 # User A belongs to team A which has embA — should see it 658 assert embA_name in emb_names 659 # User A does NOT belong to team B — should NOT see embB 660 assert embB_name not in emb_names 661 662 663 def test_any_user_can_get_specific_embedding(client): 664 r = client.get(f"/embeddings/{embB_id}", auth=USER_A) 665 assert r.status_code == 200 666 667 668 def test_llm_api_keys_are_masked(client): 669 """API keys in LLM options should be masked when returned.""" 670 r = client.get(f"/llms/{llmA_id}", auth=ADMIN) 671 assert r.status_code == 200 672 data = r.json() 673 if isinstance(data.get("options"), dict): 674 api_key = data["options"].get("api_key", "") 675 assert api_key == "********" or api_key.startswith("****"), \ 676 f"API key not masked: {api_key}" 677 678 679 # ── Settings & Proxy Authorization ──────────────────────────────────────── 680 681 682 def test_non_admin_cannot_get_settings(client): 683 r = client.get("/settings", auth=USER_A) 684 assert r.status_code == 403 685 686 687 def test_non_admin_cannot_patch_settings(client): 688 r = client.patch("/settings", json={}, auth=USER_A) 689 assert r.status_code == 403 690 691 692 def test_non_admin_cannot_get_proxy_keys(client): 693 r = client.get("/proxy/keys", auth=USER_A) 694 assert r.status_code == 403 695 696 697 def test_non_admin_cannot_create_proxy_key(client): 698 r = client.post( 699 "/proxy/keys", 700 json={"name": "sneaky_key"}, 701 auth=USER_A, 702 ) 703 assert r.status_code == 403 704 705 706 # ── Users Listing Isolation ─────────────────────────────────────────────── 707 708 709 def test_non_admin_users_listing_only_shows_teammates(client): 710 """Non-admin user should only see users from their own teams.""" 711 r = client.get("/users", auth=USER_A) 712 assert r.status_code == 200 713 usernames = [u["username"] for u in r.json()["users"]] 714 assert userA_name in usernames 715 assert userB_name not in usernames 716 717 718 def test_admin_users_listing_shows_all(client): 719 r = client.get("/users", auth=ADMIN) 720 assert r.status_code == 200 721 usernames = [u["username"] for u in r.json()["users"]] 722 assert userA_name in usernames 723 assert userB_name in usernames 724 assert userC_name in usernames 725 726 727 def test_non_admin_listing_returns_limited_schema(client): 728 """Non-admin users listing should return limited user objects without sensitive fields.""" 729 r = client.get("/users", auth=USER_A) 730 assert r.status_code == 200 731 users = r.json()["users"] 732 for u in users: 733 assert "is_admin" not in u, f"is_admin leaked for {u.get('username')}" 734 assert "api_keys" not in u, f"api_keys leaked for {u.get('username')}" 735 736 737 # ── Project Team Transfer ───────────────────────────────────────────────── 738 739 740 def test_cannot_change_project_team_to_unauthorized_team(client): 741 """userA cannot move their project to a team they don't belong to.""" 742 r = client.patch( 743 f"/projects/{projectA_id}", 744 json={"team_id": teamB_id}, 745 auth=USER_A, 746 ) 747 assert r.status_code == 403 748 749 750 def test_cannot_change_project_llm_to_one_not_in_current_team(client): 751 """userA cannot change project LLM to one not available in the project's current team.""" 752 r = client.patch( 753 f"/projects/{projectA_id}", 754 json={"llm": llmB_name}, 755 auth=USER_A, 756 ) 757 assert r.status_code == 403 758 759 760 def test_admin_can_change_project_team(client): 761 """Admin can transfer a project to any team (sanity check).""" 762 # Add llmA to teamB so the project's LLM is valid in the new team 763 r = client.patch( 764 f"/teams/{teamB_id}", 765 json={"llms": [llmB_name, llmA_name]}, 766 auth=ADMIN, 767 ) 768 assert r.status_code == 200 769 770 # Move projectA to teamB as admin (keep same LLM) 771 r = client.patch( 772 f"/projects/{projectA_id}", 773 json={"team_id": teamB_id}, 774 auth=ADMIN, 775 ) 776 assert r.status_code == 200 777 778 # Revert: move back to teamA 779 r = client.patch( 780 f"/projects/{projectA_id}", 781 json={"team_id": teamA_id}, 782 auth=ADMIN, 783 ) 784 assert r.status_code == 200 785 786 # Remove llmA from teamB 787 r = client.patch( 788 f"/teams/{teamB_id}", 789 json={"llms": [llmB_name]}, 790 auth=ADMIN, 791 ) 792 assert r.status_code == 200 793 794 795 # ── Admin Override ───────────────────────────────────────────────────────── 796 797 798 def test_admin_can_access_any_project(client): 799 # Admin can GET any project 800 r = client.get(f"/projects/{projectB_id}", auth=ADMIN) 801 assert r.status_code == 200 802 803 # Admin can PATCH any project 804 r = client.patch( 805 f"/projects/{projectB_id}", 806 json={"human_name": "Admin edited"}, 807 auth=ADMIN, 808 ) 809 assert r.status_code == 200 810 811 # Revert 812 client.patch( 813 f"/projects/{projectB_id}", 814 json={"human_name": None}, 815 auth=ADMIN, 816 ) 817 818 819 def test_admin_can_access_any_team(client): 820 r = client.get(f"/teams/{teamB_id}", auth=ADMIN) 821 assert r.status_code == 200 822 823 r = client.patch( 824 f"/teams/{teamB_id}", 825 json={"description": "Admin edited"}, 826 auth=ADMIN, 827 ) 828 assert r.status_code == 200 829 830 831 def test_admin_can_manage_any_user(client): 832 # Admin can GET any user 833 r = client.get(f"/users/{userB_name}", auth=ADMIN) 834 assert r.status_code == 200 835 836 # Admin can PATCH any user 837 r = client.patch( 838 f"/users/{userB_name}", 839 json={"is_private": True}, 840 auth=ADMIN, 841 ) 842 assert r.status_code == 200 843 844 # Revert 845 client.patch(f"/users/{userB_name}", json={"is_private": False}, auth=ADMIN) 846 847 848 # ── Projects listing isolation ───────────────────────────────────────────── 849 850 851 def test_user_only_sees_own_projects(client): 852 """userA should only see their own projects in the listing.""" 853 r = client.get("/projects", auth=USER_A) 854 assert r.status_code == 200 855 projects = r.json()["projects"] 856 project_names = [p["name"] for p in projects] 857 assert projectA_name in project_names 858 assert projectB_name not in project_names 859 860 861 def test_user_sees_public_projects_in_listing(client): 862 """Public projects should appear when filtering by public.""" 863 # Make projectB public 864 client.patch(f"/projects/{projectB_id}", json={"public": True}, auth=USER_B) 865 866 # Default listing only shows own projects 867 r = client.get("/projects", auth=USER_A) 868 assert r.status_code == 200 869 own_ids = [p["id"] for p in r.json()["projects"]] 870 assert projectB_id not in own_ids 871 872 # Public filter shows public projects 873 r = client.get("/projects?filter=public", auth=USER_A) 874 assert r.status_code == 200 875 public_ids = [p["id"] for p in r.json()["projects"]] 876 assert projectB_id in public_ids 877 878 # Revert 879 client.patch(f"/projects/{projectB_id}", json={"public": False}, auth=USER_B) 880 881 882 # ── Input Validation ────────────────────────────────────────────────────── 883 884 885 def test_create_project_empty_name_fails(client): 886 r = client.post( 887 "/projects", 888 json={"name": "", "llm": llmA_name, "type": "agent", "team_id": teamA_id}, 889 auth=USER_A, 890 ) 891 assert r.status_code in (400, 422), f"Expected 400 or 422, got {r.status_code}" 892 893 894 def test_create_project_whitespace_name_fails(client): 895 r = client.post( 896 "/projects", 897 json={"name": " ", "llm": llmA_name, "type": "agent", "team_id": teamA_id}, 898 auth=USER_A, 899 ) 900 assert r.status_code in (400, 422), f"Expected 400 or 422, got {r.status_code}" 901 902 903 def test_project_name_sanitization(): 904 """Special characters in project name should be sanitized, not cause errors.""" 905 test_name = f"test<>proj&{suffix}" 906 with TestClient(app, raise_server_exceptions=False) as c: 907 r = c.post( 908 "/projects", 909 json={"name": test_name, "llm": llmA_name, "type": "agent", "team_id": teamA_id}, 910 auth=USER_A, 911 ) 912 # Should succeed with sanitized name or fail gracefully (not 500) 913 assert r.status_code != 500, f"Server error on special chars: {r.text}" 914 if r.status_code == 201: 915 # Clean up 916 pid = r.json()["project"] 917 c.delete(f"/projects/{pid}", auth=ADMIN) 918 919 920 def test_very_long_project_name(): 921 """A very long project name should not cause a 500 error.""" 922 long_name = "a" * 1000 + f"_{suffix}" 923 with TestClient(app, raise_server_exceptions=False) as c: 924 r = c.post( 925 "/projects", 926 json={"name": long_name, "llm": llmA_name, "type": "agent", "team_id": teamA_id}, 927 auth=USER_A, 928 ) 929 assert r.status_code != 500, f"Server error on long name: {r.text}" 930 if r.status_code == 201: 931 pid = r.json()["project"] 932 c.delete(f"/projects/{pid}", auth=ADMIN) 933 934 935 def test_sql_injection_in_project_name(): 936 """SQL injection attempt in project name should be sanitized.""" 937 with TestClient(app, raise_server_exceptions=False) as c: 938 r = c.post( 939 "/projects", 940 json={ 941 "name": f"'; DROP TABLE projects; --{suffix}", 942 "llm": llmA_name, 943 "type": "agent", 944 "team_id": teamA_id, 945 }, 946 auth=USER_A, 947 ) 948 assert r.status_code != 500, f"Server error on SQL injection attempt: {r.text}" 949 if r.status_code == 201: 950 pid = r.json()["project"] 951 c.delete(f"/projects/{pid}", auth=ADMIN) 952 953 954 def test_xss_in_username(): 955 """XSS attempt in username should be sanitized.""" 956 with TestClient(app, raise_server_exceptions=False) as c: 957 r = c.post( 958 "/users", 959 json={ 960 "username": "<script>alert(1)</script>", 961 "password": "testpass", 962 "is_admin": False, 963 "is_private": False, 964 }, 965 auth=ADMIN, 966 ) 967 assert r.status_code != 500, f"Server error on XSS attempt: {r.text}" 968 if r.status_code == 201: 969 created_name = r.json()["username"] 970 # Verify the name was sanitized (no angle brackets) 971 assert "<" not in created_name 972 assert ">" not in created_name 973 # Clean up 974 c.delete(f"/users/{created_name}", auth=ADMIN) 975 976 977 # ── Statistics Isolation ────────────────────────────────────────────────── 978 979 980 def test_statistics_non_admin_only_sees_own_projects(): 981 """Non-admin user should not see other users' projects in statistics.""" 982 with TestClient(app, raise_server_exceptions=False) as c: 983 r = c.get("/statistics/top-projects", auth=USER_A) 984 if r.status_code == 200: 985 projects = r.json().get("projects", []) 986 project_names = [p["name"] for p in projects] 987 assert projectB_name not in project_names, \ 988 f"userA can see projectB in statistics" 989 990 991 # ── Cleanup ──────────────────────────────────────────────────────────────── 992 993 994 def test_security_cleanup(client): 995 """Clean up all resources created by security tests.""" 996 # Delete projects 997 for pid in [projectA_id, projectB_id]: 998 if pid: 999 client.delete(f"/projects/{pid}", auth=ADMIN) 1000 1001 # Delete teams 1002 for tid in [teamA_id, teamB_id]: 1003 if tid: 1004 client.delete(f"/teams/{tid}", auth=ADMIN) 1005 1006 # Delete LLMs 1007 for name in [llmA_name, llmB_name]: 1008 client.delete(f"/llms/{name}", auth=ADMIN) 1009 1010 # Delete embeddings 1011 for name in [embA_name, embB_name]: 1012 client.delete(f"/embeddings/{name}", auth=ADMIN) 1013 1014 # Delete users 1015 for uname in [userA_name, userB_name, userC_name]: 1016 client.delete(f"/users/{uname}", auth=ADMIN)