/ tests / server / auth / test_sqlalchemy_store_workspace.py
test_sqlalchemy_store_workspace.py
  1  import pytest
  2  
  3  from mlflow.environment_variables import MLFLOW_ENABLE_WORKSPACES
  4  from mlflow.exceptions import MlflowException
  5  from mlflow.server.auth.entities import WorkspacePermission
  6  from mlflow.server.auth.permissions import EDIT, MANAGE, NO_PERMISSIONS, READ
  7  from mlflow.server.auth.sqlalchemy_store import SqlAlchemyStore
  8  from mlflow.utils.workspace_context import WorkspaceContext
  9  from mlflow.utils.workspace_utils import DEFAULT_WORKSPACE_NAME
 10  
 11  from tests.helper_functions import random_str
 12  from tests.server.auth.test_sqlalchemy_store import _rmp_maker, _user_maker
 13  
 14  pytest_plugins = ["tests.server.auth.test_sqlalchemy_store"]
 15  
 16  
 17  pytestmark = pytest.mark.notrackingurimock
 18  
 19  
 20  @pytest.fixture
 21  def store(tmp_sqlite_uri):
 22      store = SqlAlchemyStore()
 23      store.init_db(tmp_sqlite_uri)
 24      return store
 25  
 26  
 27  def test_set_workspace_permission_creates_and_updates(store):
 28      workspace = "team-alpha"
 29      username = random_str()
 30      user = store.create_user(username, random_str())
 31  
 32      perm = store.set_workspace_permission(workspace, username, READ.name)
 33      assert isinstance(perm, WorkspacePermission)
 34      assert perm.workspace == workspace
 35      assert perm.user_id == user.id
 36      assert perm.permission == READ.name
 37  
 38      updated = store.set_workspace_permission(workspace, username, MANAGE.name)
 39      assert updated.permission == MANAGE.name
 40  
 41  
 42  def test_get_workspace_permission_precedence(store):
 43      workspace = "team-beta"
 44      username = random_str()
 45      store.create_user(username, random_str())
 46  
 47      assert store.get_workspace_permission(workspace, username) is None
 48  
 49      store.set_workspace_permission(workspace, username, READ.name)
 50      perm = store.get_workspace_permission(workspace, username)
 51      assert perm == READ
 52  
 53  
 54  def test_list_workspace_permissions(store):
 55      workspace = "team-gamma"
 56      other_workspace = "team-delta"
 57      username = random_str()
 58      other_username = random_str()
 59      user = store.create_user(username, random_str())
 60      other_user = store.create_user(other_username, random_str())
 61  
 62      p1 = store.set_workspace_permission(workspace, username, READ.name)
 63      p2 = store.set_workspace_permission(workspace, other_username, EDIT.name)
 64      p3 = store.set_workspace_permission(other_workspace, username, MANAGE.name)
 65  
 66      perms = store.list_workspace_permissions(workspace)
 67      actual = {(perm.workspace, perm.user_id, perm.permission) for perm in perms}
 68      expected = {
 69          (p1.workspace, user.id, p1.permission),
 70          (p2.workspace, other_user.id, p2.permission),
 71      }
 72      assert actual == expected
 73  
 74      perms_other = store.list_workspace_permissions(other_workspace)
 75      assert {(perm.workspace, perm.user_id, perm.permission) for perm in perms_other} == {
 76          (p3.workspace, user.id, p3.permission)
 77      }
 78  
 79  
 80  def test_delete_workspace_permission(store):
 81      workspace = "workspace-delete"
 82      username = random_str()
 83      store.create_user(username, random_str())
 84  
 85      store.set_workspace_permission(workspace, username, READ.name)
 86  
 87      store.delete_workspace_permission(workspace, username)
 88      assert store.get_workspace_permission(workspace, username) is None
 89  
 90      with pytest.raises(
 91          MlflowException,
 92          match=(
 93              "Workspace permission does not exist for "
 94              f"workspace='{workspace}', username='{username}'"
 95          ),
 96      ):
 97          store.delete_workspace_permission(workspace, username)
 98  
 99  
100  def test_delete_workspace_permissions_for_workspace(store):
101      workspace = "workspace-delete-all"
102      other_workspace = "workspace-keep"
103      username = random_str()
104      store.create_user(username, random_str())
105  
106      store.set_workspace_permission(workspace, username, READ.name)
107      store.set_workspace_permission(other_workspace, username, EDIT.name)
108  
109      store.delete_workspace_permissions_for_workspace(workspace)
110  
111      assert store.list_workspace_permissions(workspace) == []
112      remaining = store.list_workspace_permissions(other_workspace)
113      assert len(remaining) == 1
114      assert remaining[0].workspace == other_workspace
115  
116  
117  def test_list_accessible_workspace_names(store):
118      username = random_str()
119      other_user = random_str()
120      store.create_user(username, random_str())
121      store.create_user(other_user, random_str())
122  
123      store.set_workspace_permission("workspace-read", username, READ.name)
124      store.set_workspace_permission("workspace-edit", username, EDIT.name)
125      store.set_workspace_permission("workspace-no-access", username, NO_PERMISSIONS.name)
126      store.set_workspace_permission("workspace-other", other_user, READ.name)
127  
128      accessible = store.list_accessible_workspace_names(username)
129      assert accessible == {"workspace-read", "workspace-edit"}
130  
131      assert store.list_accessible_workspace_names(other_user) == {
132          "workspace-other",
133      }
134      assert store.list_accessible_workspace_names(None) == set()
135  
136  
137  def test_list_accessible_workspace_names_includes_role_based_workspaces(store):
138      # Role assignment with permissions → workspace visible, even without a legacy
139      # workspace_permissions row.
140      username = random_str()
141      user = store.create_user(username, random_str())
142  
143      role = store.create_role(name="viewer", workspace="ws1")
144      store.add_role_permission(role.id, "experiment", "*", READ.name)
145      store.assign_role_to_user(user.id, role.id)
146  
147      assert store.list_accessible_workspace_names(username) == {"ws1"}
148  
149  
150  def test_list_accessible_workspace_names_includes_role_with_no_permissions(store):
151      # Role membership alone implies visibility — the role has zero permission rows,
152      # but the user is still assigned to it. Documents the intentional design: a
153      # workspace admin can give someone "membership" without any capability and the
154      # UI still surfaces the workspace in their list.
155      username = random_str()
156      user = store.create_user(username, random_str())
157  
158      empty_role = store.create_role(name="shell", workspace="ws1")
159      store.assign_role_to_user(user.id, empty_role.id)
160  
161      assert store.list_accessible_workspace_names(username) == {"ws1"}
162  
163  
164  def test_list_accessible_workspace_names_combines_legacy_and_role_sources(store):
165      # Legacy READ on ws1 + role assignment in ws2 → both surface.
166      username = random_str()
167      user = store.create_user(username, random_str())
168  
169      store.set_workspace_permission("ws1", username, READ.name)
170      role = store.create_role(name="viewer", workspace="ws2")
171      store.assign_role_to_user(user.id, role.id)
172  
173      assert store.list_accessible_workspace_names(username) == {"ws1", "ws2"}
174  
175  
176  def test_list_accessible_workspace_names_legacy_no_permissions_still_hides(store):
177      # Legacy NO_PERMISSIONS row should not make a workspace visible — the legacy
178      # branch still filters by ``can_read``. Regression guard for the carve-out that
179      # only the role-based branch unconditionally counts membership.
180      username = random_str()
181      store.create_user(username, random_str())
182  
183      store.set_workspace_permission("ws1", username, NO_PERMISSIONS.name)
184  
185      assert store.list_accessible_workspace_names(username) == set()
186  
187  
188  def test_list_accessible_workspace_names_role_in_other_workspace_doesnt_leak(store):
189      # A role assignment in one workspace must not surface unrelated workspaces.
190      username = random_str()
191      user = store.create_user(username, random_str())
192  
193      role_ws1 = store.create_role(name="viewer", workspace="ws1")
194      store.assign_role_to_user(user.id, role_ws1.id)
195      # ws2 and ws3 exist (via roles for other users) but the user has no assignment.
196      store.create_role(name="viewer", workspace="ws2")
197      store.create_role(name="viewer", workspace="ws3")
198  
199      assert store.list_accessible_workspace_names(username) == {"ws1"}
200  
201  
202  def test_list_accessible_workspace_names_combines_legacy_and_role_same_workspace(store):
203      # Overlap: a user has BOTH a legacy READ and a role assignment in the same
204      # workspace. Deduplication should collapse to a single entry.
205      username = random_str()
206      user = store.create_user(username, random_str())
207  
208      store.set_workspace_permission("ws1", username, READ.name)
209      role = store.create_role(name="viewer", workspace="ws1")
210      store.assign_role_to_user(user.id, role.id)
211  
212      accessible = store.list_accessible_workspace_names(username)
213      assert accessible == {"ws1"}
214      assert len(accessible) == 1
215  
216  
217  def test_rename_registered_model_permissions_scoped_by_workspace(store, monkeypatch):
218      monkeypatch.setenv(MLFLOW_ENABLE_WORKSPACES.name, "true")
219      username = random_str()
220      password = random_str()
221      _user_maker(store, username, password)
222  
223      with WorkspaceContext("workspace-a"):
224          _rmp_maker(store, "model", username, READ.name)
225      with WorkspaceContext("workspace-b"):
226          _rmp_maker(store, "model", username, READ.name)
227  
228      with WorkspaceContext("workspace-a"):
229          store.rename_registered_model_permissions("model", "model-renamed")
230          renamed = store.get_registered_model_permission("model-renamed", username)
231          assert renamed.name == "model-renamed"
232          assert renamed.workspace == "workspace-a"
233          with pytest.raises(
234              MlflowException,
235              match=(
236                  "Registered model permission with workspace=workspace-a, name=model and username="
237              ),
238          ):
239              store.get_registered_model_permission("model", username)
240  
241      with WorkspaceContext("workspace-b"):
242          still_original = store.get_registered_model_permission("model", username)
243          assert still_original.name == "model"
244          assert still_original.workspace == "workspace-b"
245  
246  
247  def test_registered_model_permissions_are_workspace_scoped(store, monkeypatch):
248      monkeypatch.setenv(MLFLOW_ENABLE_WORKSPACES.name, "true")
249      username = random_str()
250      password = random_str()
251      _user_maker(store, username, password)
252  
253      model_name = random_str()
254      workspace_alt = f"workspace-{random_str()}"
255  
256      with WorkspaceContext(DEFAULT_WORKSPACE_NAME):
257          store.create_registered_model_permission(model_name, username, READ.name)
258  
259      with WorkspaceContext(workspace_alt):
260          perm_alt = store.create_registered_model_permission(model_name, username, EDIT.name)
261          assert perm_alt.workspace == workspace_alt
262  
263      with WorkspaceContext(DEFAULT_WORKSPACE_NAME):
264          perm_default = store.get_registered_model_permission(model_name, username)
265          assert perm_default.permission == READ.name
266          assert perm_default.workspace == DEFAULT_WORKSPACE_NAME
267          perms_default = store.list_registered_model_permissions(username)
268          assert [p.permission for p in perms_default] == [READ.name]
269  
270      with WorkspaceContext(workspace_alt):
271          perm_alt_lookup = store.get_registered_model_permission(model_name, username)
272          assert perm_alt_lookup.permission == EDIT.name
273          assert perm_alt_lookup.workspace == workspace_alt
274          perms_alt = store.list_registered_model_permissions(username)
275          assert [p.permission for p in perms_alt] == [EDIT.name]
276  
277      # Switching back to default workspace should not affect alternate workspace permission
278      with WorkspaceContext(DEFAULT_WORKSPACE_NAME):
279          updated = store.update_registered_model_permission(model_name, username, MANAGE.name)
280          assert updated.permission == MANAGE.name
281          assert updated.workspace == DEFAULT_WORKSPACE_NAME
282  
283      with WorkspaceContext(workspace_alt):
284          perm_alt_post_update = store.get_registered_model_permission(model_name, username)
285          assert perm_alt_post_update.permission == EDIT.name
286          assert perm_alt_post_update.workspace == workspace_alt
287  
288  
289  def test_delete_registered_model_permissions_scoped_by_workspace(store, monkeypatch):
290      monkeypatch.setenv(MLFLOW_ENABLE_WORKSPACES.name, "true")
291      username1 = random_str()
292      username2 = random_str()
293      _user_maker(store, username1, random_str())
294      _user_maker(store, username2, random_str())
295  
296      model_name = random_str()
297  
298      with WorkspaceContext("workspace-a"):
299          _rmp_maker(store, model_name, username1, READ.name)
300          _rmp_maker(store, model_name, username2, EDIT.name)
301  
302      with WorkspaceContext("workspace-b"):
303          _rmp_maker(store, model_name, username1, MANAGE.name)
304  
305      with WorkspaceContext("workspace-a"):
306          store.delete_registered_model_permissions(model_name)
307          with pytest.raises(MlflowException, match="Registered model permission .* not found"):
308              store.get_registered_model_permission(model_name, username1)
309          with pytest.raises(MlflowException, match="Registered model permission .* not found"):
310              store.get_registered_model_permission(model_name, username2)
311  
312      with WorkspaceContext("workspace-b"):
313          remaining = store.get_registered_model_permission(model_name, username1)
314          assert remaining.permission == MANAGE.name
315          assert remaining.workspace == "workspace-b"