/ tests / test_users.py
test_users.py
  1  import random
  2  import pytest
  3  import jwt
  4  from fastapi.testclient import TestClient
  5  from unittest.mock import patch, MagicMock
  6  
  7  from restai.config import RESTAI_DEFAULT_PASSWORD
  8  from restai.main import app
  9  from restai.models.models import UserCreate, UserUpdate
 10  
 11  test_username = "test_user_" + str(random.randint(0, 1000000))
 12  test_admin_username = "admin"
 13  
 14  
 15  @pytest.fixture(scope="module")
 16  def client():
 17      with TestClient(app) as c:
 18          yield c
 19  
 20  
 21  def test_get_users(client):
 22      response = client.get("/users", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD))
 23      assert response.status_code == 200
 24      assert len(response.json()["users"]) >= 1
 25  
 26  def test_create_user(client):
 27      response = client.post(
 28          "/users",
 29          json={
 30              "username": test_username,
 31              "password": "test_password",
 32              "admin": False,
 33              "private": False
 34          },
 35          auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)
 36      )
 37      assert response.status_code == 201
 38      data = response.json()
 39      assert data["username"] == test_username
 40  
 41  def test_get_user(client):
 42      # Test getting user details
 43      response = client.get(f"/users/{test_username}", auth=(test_username, "test_password"))
 44      assert response.status_code == 200
 45      data = response.json()
 46      assert data["username"] == test_username
 47      assert data["is_admin"] == False
 48      assert data["is_private"] == False
 49  
 50      # Test getting admin user from non-admin user (should fail)
 51      response = client.get(f"/users/{test_admin_username}", auth=(test_username, "test_password"))
 52      assert response.status_code == 404
 53  
 54  def test_update_user(client):
 55      # Test user updating their own password
 56      response = client.patch(
 57          f"/users/{test_username}",
 58          json={
 59              "password": "new_password"
 60          },
 61          auth=(test_username, "test_password")
 62      )
 63      assert response.status_code == 200
 64  
 65      # Verify changes by logging in with new password
 66      response = client.get(f"/users/{test_username}", auth=(test_username, "new_password"))
 67      assert response.status_code == 200
 68  
 69      # Test admin updating is_private (only admins/team admins can change this)
 70      response = client.patch(
 71          f"/users/{test_username}",
 72          json={
 73              "is_private": True,
 74          },
 75          auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)
 76      )
 77      assert response.status_code == 200
 78      data = response.json()
 79      assert data["username"] == test_username
 80      assert data["is_private"] == True
 81  
 82  def test_user_apikeys(client):
 83      # 1. Create key with description
 84      response = client.post(
 85          f"/users/{test_username}/apikeys",
 86          json={"description": "test key 1"},
 87          auth=(test_username, "new_password")
 88      )
 89      assert response.status_code == 201
 90      data = response.json()
 91      assert "api_key" in data
 92      assert "id" in data
 93      assert data["key_prefix"] == data["api_key"][:8]
 94      assert data["description"] == "test key 1"
 95      key1 = data["api_key"]
 96      key1_id = data["id"]
 97  
 98      # 2. Auth with Bearer token
 99      response = client.get(
100          f"/users/{test_username}",
101          headers={"Authorization": f"Bearer {key1}"}
102      )
103      assert response.status_code == 200
104  
105      # 3. Create second key
106      response = client.post(
107          f"/users/{test_username}/apikeys",
108          json={"description": "test key 2"},
109          auth=(test_username, "new_password")
110      )
111      assert response.status_code == 201
112      key2 = response.json()["api_key"]
113      key2_id = response.json()["id"]
114  
115      # 4. List keys - should have 2, no full key exposed
116      response = client.get(
117          f"/users/{test_username}/apikeys",
118          auth=(test_username, "new_password")
119      )
120      assert response.status_code == 200
121      keys = response.json()
122      assert len(keys) == 2
123      descriptions = {k["description"] for k in keys}
124      assert descriptions == {"test key 1", "test key 2"}
125      for k in keys:
126          assert "api_key" not in k
127  
128      # 5. Auth with second key
129      response = client.get(
130          f"/users/{test_username}",
131          headers={"Authorization": f"Bearer {key2}"}
132      )
133      assert response.status_code == 200
134  
135      # 6. Delete first key
136      response = client.delete(
137          f"/users/{test_username}/apikeys/{key1_id}",
138          auth=(test_username, "new_password")
139      )
140      assert response.status_code == 200
141  
142      # 7. Auth with deleted key -> 401
143      response = client.get(
144          f"/users/{test_username}",
145          headers={"Authorization": f"Bearer {key1}"}
146      )
147      assert response.status_code == 401
148  
149      # 8. Auth with second key still works
150      response = client.get(
151          f"/users/{test_username}",
152          headers={"Authorization": f"Bearer {key2}"}
153      )
154      assert response.status_code == 200
155  
156      # 9. List keys - should have 1 remaining
157      response = client.get(
158          f"/users/{test_username}/apikeys",
159          auth=(test_username, "new_password")
160      )
161      assert response.status_code == 200
162      keys = response.json()
163      assert len(keys) == 1
164      assert keys[0]["description"] == "test key 2"
165  
166  def test_user_permissions_on_projects(client):
167      # Create a test LLM for this test
168      test_llm = f"test_perm_llm_{random.randint(0, 1000000)}"
169      resp = client.post(
170          "/llms",
171          json={
172              "name": test_llm,
173              "class_name": "OpenAI",
174              "options": {"model": "gpt-test", "api_key": "sk-fake"},
175              "privacy": "private",
176          },
177          auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD),
178      )
179      assert resp.status_code == 201
180  
181      # Create a team and add the test user
182      team_name = f"perm_team_{random.randint(0, 1000000)}"
183      team_resp = client.post(
184          "/teams",
185          json={"name": team_name, "users": [test_username], "llms": [test_llm]},
186          auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)
187      )
188      assert team_resp.status_code == 201
189      team_id = team_resp.json()["id"]
190  
191      # Create a project as the test user
192      user_project_name = f"user_project_{random.randint(0, 1000000)}"
193      response = client.post(
194          "/projects",
195          json={
196              "name": user_project_name,
197              "llm": test_llm,
198              "type": "agent",
199              "team_id": team_id
200          },
201          auth=(test_username, "new_password")
202      )
203      assert response.status_code == 201
204      user_project_id = response.json()["project"]
205  
206      # Create a project as admin
207      admin_project_name = f"admin_project_{random.randint(0, 1000000)}"
208      response = client.post(
209          "/projects",
210          json={
211              "name": admin_project_name,
212              "llm": test_llm,
213              "type": "agent",
214              "team_id": team_id
215          },
216          auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD)
217      )
218      assert response.status_code == 201
219      admin_project_id = response.json()["project"]
220  
221      # Test user can see own projects
222      response = client.get("/projects", auth=(test_username, "new_password"))
223      assert response.status_code == 200
224      projects = response.json()["projects"]
225      user_projects = [p for p in projects if p["name"] == user_project_name]
226      assert len(user_projects) == 1
227  
228      # Test admin can see all projects
229      response = client.get("/projects", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD))
230      assert response.status_code == 200
231      projects = response.json()["projects"]
232      assert len(projects) >= 2
233  
234      # Test user can't delete admin's project
235      response = client.delete(f"/projects/{admin_project_id}", auth=(test_username, "new_password"))
236      assert response.status_code in [401, 403, 404]  # Depending on how the API is designed
237  
238      # Cleanup: Delete the projects
239      response = client.delete(f"/projects/{user_project_id}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD))
240      assert response.status_code == 200
241  
242      response = client.delete(f"/projects/{admin_project_id}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD))
243      assert response.status_code == 200
244  
245      # Cleanup: delete the team and test LLM
246      client.delete(f"/teams/{team_id}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD))
247      client.delete(f"/llms/{test_llm}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD))
248  
249  
250  def test_delete_user(client):
251      # Test deleting user
252      response = client.delete(f"/users/{test_username}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD))
253      assert response.status_code == 200
254  
255      # Verify user is deleted
256      response = client.get(f"/users/{test_username}", auth=(test_admin_username, RESTAI_DEFAULT_PASSWORD))
257      assert response.status_code == 404