test_users.py
1 import random 2 import pytest 3 import jwt 4 from fastapi.testclient import TestClient 5 from unittest.mock import patch, MagicMock 6 7 from restai.config import RESTAI_DEFAULT_PASSWORD 8 from restai.main import app 9 from restai.models.models import UserCreate, UserUpdate 10 11 test_username = "test_user_" + str(random.randint(0, 1000000)) 12 test_admin_username = "admin" 13 14 15 @pytest.fixture(scope="module") 16 def client(): 17 with TestClient(app) as c: 18 yield c 19 20 21 def test_get_users(client): 22 response = client.get("/users", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)) 23 assert response.status_code == 200 24 assert len(response.json()["users"]) >= 1 25 26 def test_create_user(client): 27 response = client.post( 28 "/users", 29 json={ 30 "username": test_username, 31 "password": "test_password", 32 "admin": False, 33 "private": False 34 }, 35 auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD) 36 ) 37 assert response.status_code == 201 38 data = response.json() 39 assert data["username"] == test_username 40 41 def test_get_user(client): 42 # Test getting user details 43 response = client.get(f"/users/{test_username}", auth=(test_username, "test_password")) 44 assert response.status_code == 200 45 data = response.json() 46 assert data["username"] == test_username 47 assert data["is_admin"] == False 48 assert data["is_private"] == False 49 50 # Test getting admin user from non-admin user (should fail) 51 response = client.get(f"/users/{test_admin_username}", auth=(test_username, "test_password")) 52 assert response.status_code == 404 53 54 def test_update_user(client): 55 # Test user updating their own password 56 response = client.patch( 57 f"/users/{test_username}", 58 json={ 59 "password": "new_password" 60 }, 61 auth=(test_username, "test_password") 62 ) 63 assert response.status_code == 200 64 65 # Verify changes by logging in with new password 66 response = client.get(f"/users/{test_username}", auth=(test_username, "new_password")) 67 assert response.status_code == 200 68 69 # Test admin updating is_private (only admins/team admins can change this) 70 response = client.patch( 71 f"/users/{test_username}", 72 json={ 73 "is_private": True, 74 }, 75 auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD) 76 ) 77 assert response.status_code == 200 78 data = response.json() 79 assert data["username"] == test_username 80 assert data["is_private"] == True 81 82 def test_user_apikeys(client): 83 # 1. Create key with description 84 response = client.post( 85 f"/users/{test_username}/apikeys", 86 json={"description": "test key 1"}, 87 auth=(test_username, "new_password") 88 ) 89 assert response.status_code == 201 90 data = response.json() 91 assert "api_key" in data 92 assert "id" in data 93 assert data["key_prefix"] == data["api_key"][:8] 94 assert data["description"] == "test key 1" 95 key1 = data["api_key"] 96 key1_id = data["id"] 97 98 # 2. Auth with Bearer token 99 response = client.get( 100 f"/users/{test_username}", 101 headers={"Authorization": f"Bearer {key1}"} 102 ) 103 assert response.status_code == 200 104 105 # 3. Create second key 106 response = client.post( 107 f"/users/{test_username}/apikeys", 108 json={"description": "test key 2"}, 109 auth=(test_username, "new_password") 110 ) 111 assert response.status_code == 201 112 key2 = response.json()["api_key"] 113 key2_id = response.json()["id"] 114 115 # 4. List keys - should have 2, no full key exposed 116 response = client.get( 117 f"/users/{test_username}/apikeys", 118 auth=(test_username, "new_password") 119 ) 120 assert response.status_code == 200 121 keys = response.json() 122 assert len(keys) == 2 123 descriptions = {k["description"] for k in keys} 124 assert descriptions == {"test key 1", "test key 2"} 125 for k in keys: 126 assert "api_key" not in k 127 128 # 5. Auth with second key 129 response = client.get( 130 f"/users/{test_username}", 131 headers={"Authorization": f"Bearer {key2}"} 132 ) 133 assert response.status_code == 200 134 135 # 6. Delete first key 136 response = client.delete( 137 f"/users/{test_username}/apikeys/{key1_id}", 138 auth=(test_username, "new_password") 139 ) 140 assert response.status_code == 200 141 142 # 7. Auth with deleted key -> 401 143 response = client.get( 144 f"/users/{test_username}", 145 headers={"Authorization": f"Bearer {key1}"} 146 ) 147 assert response.status_code == 401 148 149 # 8. Auth with second key still works 150 response = client.get( 151 f"/users/{test_username}", 152 headers={"Authorization": f"Bearer {key2}"} 153 ) 154 assert response.status_code == 200 155 156 # 9. List keys - should have 1 remaining 157 response = client.get( 158 f"/users/{test_username}/apikeys", 159 auth=(test_username, "new_password") 160 ) 161 assert response.status_code == 200 162 keys = response.json() 163 assert len(keys) == 1 164 assert keys[0]["description"] == "test key 2" 165 166 def test_user_permissions_on_projects(client): 167 # Create a test LLM for this test 168 test_llm = f"test_perm_llm_{random.randint(0, 1000000)}" 169 resp = client.post( 170 "/llms", 171 json={ 172 "name": test_llm, 173 "class_name": "OpenAI", 174 "options": {"model": "gpt-test", "api_key": "sk-fake"}, 175 "privacy": "private", 176 }, 177 auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD), 178 ) 179 assert resp.status_code == 201 180 181 # Create a team and add the test user 182 team_name = f"perm_team_{random.randint(0, 1000000)}" 183 team_resp = client.post( 184 "/teams", 185 json={"name": team_name, "users": [test_username], "llms": [test_llm]}, 186 auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD) 187 ) 188 assert team_resp.status_code == 201 189 team_id = team_resp.json()["id"] 190 191 # Create a project as the test user 192 user_project_name = f"user_project_{random.randint(0, 1000000)}" 193 response = client.post( 194 "/projects", 195 json={ 196 "name": user_project_name, 197 "llm": test_llm, 198 "type": "agent", 199 "team_id": team_id 200 }, 201 auth=(test_username, "new_password") 202 ) 203 assert response.status_code == 201 204 user_project_id = response.json()["project"] 205 206 # Create a project as admin 207 admin_project_name = f"admin_project_{random.randint(0, 1000000)}" 208 response = client.post( 209 "/projects", 210 json={ 211 "name": admin_project_name, 212 "llm": test_llm, 213 "type": "agent", 214 "team_id": team_id 215 }, 216 auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD) 217 ) 218 assert response.status_code == 201 219 admin_project_id = response.json()["project"] 220 221 # Test user can see own projects 222 response = client.get("/projects", auth=(test_username, "new_password")) 223 assert response.status_code == 200 224 projects = response.json()["projects"] 225 user_projects = [p for p in projects if p["name"] == user_project_name] 226 assert len(user_projects) == 1 227 228 # Test admin can see all projects 229 response = client.get("/projects", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)) 230 assert response.status_code == 200 231 projects = response.json()["projects"] 232 assert len(projects) >= 2 233 234 # Test user can't delete admin's project 235 response = client.delete(f"/projects/{admin_project_id}", auth=(test_username, "new_password")) 236 assert response.status_code in [401, 403, 404] # Depending on how the API is designed 237 238 # Cleanup: Delete the projects 239 response = client.delete(f"/projects/{user_project_id}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)) 240 assert response.status_code == 200 241 242 response = client.delete(f"/projects/{admin_project_id}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)) 243 assert response.status_code == 200 244 245 # Cleanup: delete the team and test LLM 246 client.delete(f"/teams/{team_id}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)) 247 client.delete(f"/llms/{test_llm}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)) 248 249 250 def test_delete_user(client): 251 # Test deleting user 252 response = client.delete(f"/users/{test_username}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)) 253 assert response.status_code == 200 254 255 # Verify user is deleted 256 response = client.get(f"/users/{test_username}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)) 257 assert response.status_code == 404