test_sqlalchemy_store_workspace.py
1 import pytest 2 3 from mlflow.environment_variables import MLFLOW_ENABLE_WORKSPACES 4 from mlflow.exceptions import MlflowException 5 from mlflow.server.auth.entities import WorkspacePermission 6 from mlflow.server.auth.permissions import EDIT, MANAGE, NO_PERMISSIONS, READ 7 from mlflow.server.auth.sqlalchemy_store import SqlAlchemyStore 8 from mlflow.utils.workspace_context import WorkspaceContext 9 from mlflow.utils.workspace_utils import DEFAULT_WORKSPACE_NAME 10 11 from tests.helper_functions import random_str 12 from tests.server.auth.test_sqlalchemy_store import _rmp_maker, _user_maker 13 14 pytest_plugins = ["tests.server.auth.test_sqlalchemy_store"] 15 16 17 pytestmark = pytest.mark.notrackingurimock 18 19 20 @pytest.fixture 21 def store(tmp_sqlite_uri): 22 store = SqlAlchemyStore() 23 store.init_db(tmp_sqlite_uri) 24 return store 25 26 27 def test_set_workspace_permission_creates_and_updates(store): 28 workspace = "team-alpha" 29 username = random_str() 30 user = store.create_user(username, random_str()) 31 32 perm = store.set_workspace_permission(workspace, username, READ.name) 33 assert isinstance(perm, WorkspacePermission) 34 assert perm.workspace == workspace 35 assert perm.user_id == user.id 36 assert perm.permission == READ.name 37 38 updated = store.set_workspace_permission(workspace, username, MANAGE.name) 39 assert updated.permission == MANAGE.name 40 41 42 def test_get_workspace_permission_precedence(store): 43 workspace = "team-beta" 44 username = random_str() 45 store.create_user(username, random_str()) 46 47 assert store.get_workspace_permission(workspace, username) is None 48 49 store.set_workspace_permission(workspace, username, READ.name) 50 perm = store.get_workspace_permission(workspace, username) 51 assert perm == READ 52 53 54 def test_list_workspace_permissions(store): 55 workspace = "team-gamma" 56 other_workspace = "team-delta" 57 username = random_str() 58 other_username = random_str() 59 user = store.create_user(username, random_str()) 60 other_user = store.create_user(other_username, random_str()) 61 62 p1 = store.set_workspace_permission(workspace, username, READ.name) 63 p2 = store.set_workspace_permission(workspace, other_username, EDIT.name) 64 p3 = store.set_workspace_permission(other_workspace, username, MANAGE.name) 65 66 perms = store.list_workspace_permissions(workspace) 67 actual = {(perm.workspace, perm.user_id, perm.permission) for perm in perms} 68 expected = { 69 (p1.workspace, user.id, p1.permission), 70 (p2.workspace, other_user.id, p2.permission), 71 } 72 assert actual == expected 73 74 perms_other = store.list_workspace_permissions(other_workspace) 75 assert {(perm.workspace, perm.user_id, perm.permission) for perm in perms_other} == { 76 (p3.workspace, user.id, p3.permission) 77 } 78 79 80 def test_delete_workspace_permission(store): 81 workspace = "workspace-delete" 82 username = random_str() 83 store.create_user(username, random_str()) 84 85 store.set_workspace_permission(workspace, username, READ.name) 86 87 store.delete_workspace_permission(workspace, username) 88 assert store.get_workspace_permission(workspace, username) is None 89 90 with pytest.raises( 91 MlflowException, 92 match=( 93 "Workspace permission does not exist for " 94 f"workspace='{workspace}', username='{username}'" 95 ), 96 ): 97 store.delete_workspace_permission(workspace, username) 98 99 100 def test_delete_workspace_permissions_for_workspace(store): 101 workspace = "workspace-delete-all" 102 other_workspace = "workspace-keep" 103 username = random_str() 104 store.create_user(username, random_str()) 105 106 store.set_workspace_permission(workspace, username, READ.name) 107 store.set_workspace_permission(other_workspace, username, EDIT.name) 108 109 store.delete_workspace_permissions_for_workspace(workspace) 110 111 assert store.list_workspace_permissions(workspace) == [] 112 remaining = store.list_workspace_permissions(other_workspace) 113 assert len(remaining) == 1 114 assert remaining[0].workspace == other_workspace 115 116 117 def test_list_accessible_workspace_names(store): 118 username = random_str() 119 other_user = random_str() 120 store.create_user(username, random_str()) 121 store.create_user(other_user, random_str()) 122 123 store.set_workspace_permission("workspace-read", username, READ.name) 124 store.set_workspace_permission("workspace-edit", username, EDIT.name) 125 store.set_workspace_permission("workspace-no-access", username, NO_PERMISSIONS.name) 126 store.set_workspace_permission("workspace-other", other_user, READ.name) 127 128 accessible = store.list_accessible_workspace_names(username) 129 assert accessible == {"workspace-read", "workspace-edit"} 130 131 assert store.list_accessible_workspace_names(other_user) == { 132 "workspace-other", 133 } 134 assert store.list_accessible_workspace_names(None) == set() 135 136 137 def test_list_accessible_workspace_names_includes_role_based_workspaces(store): 138 # Role assignment with permissions → workspace visible, even without a legacy 139 # workspace_permissions row. 140 username = random_str() 141 user = store.create_user(username, random_str()) 142 143 role = store.create_role(name="viewer", workspace="ws1") 144 store.add_role_permission(role.id, "experiment", "*", READ.name) 145 store.assign_role_to_user(user.id, role.id) 146 147 assert store.list_accessible_workspace_names(username) == {"ws1"} 148 149 150 def test_list_accessible_workspace_names_includes_role_with_no_permissions(store): 151 # Role membership alone implies visibility — the role has zero permission rows, 152 # but the user is still assigned to it. Documents the intentional design: a 153 # workspace admin can give someone "membership" without any capability and the 154 # UI still surfaces the workspace in their list. 155 username = random_str() 156 user = store.create_user(username, random_str()) 157 158 empty_role = store.create_role(name="shell", workspace="ws1") 159 store.assign_role_to_user(user.id, empty_role.id) 160 161 assert store.list_accessible_workspace_names(username) == {"ws1"} 162 163 164 def test_list_accessible_workspace_names_combines_legacy_and_role_sources(store): 165 # Legacy READ on ws1 + role assignment in ws2 → both surface. 166 username = random_str() 167 user = store.create_user(username, random_str()) 168 169 store.set_workspace_permission("ws1", username, READ.name) 170 role = store.create_role(name="viewer", workspace="ws2") 171 store.assign_role_to_user(user.id, role.id) 172 173 assert store.list_accessible_workspace_names(username) == {"ws1", "ws2"} 174 175 176 def test_list_accessible_workspace_names_legacy_no_permissions_still_hides(store): 177 # Legacy NO_PERMISSIONS row should not make a workspace visible — the legacy 178 # branch still filters by ``can_read``. Regression guard for the carve-out that 179 # only the role-based branch unconditionally counts membership. 180 username = random_str() 181 store.create_user(username, random_str()) 182 183 store.set_workspace_permission("ws1", username, NO_PERMISSIONS.name) 184 185 assert store.list_accessible_workspace_names(username) == set() 186 187 188 def test_list_accessible_workspace_names_role_in_other_workspace_doesnt_leak(store): 189 # A role assignment in one workspace must not surface unrelated workspaces. 190 username = random_str() 191 user = store.create_user(username, random_str()) 192 193 role_ws1 = store.create_role(name="viewer", workspace="ws1") 194 store.assign_role_to_user(user.id, role_ws1.id) 195 # ws2 and ws3 exist (via roles for other users) but the user has no assignment. 196 store.create_role(name="viewer", workspace="ws2") 197 store.create_role(name="viewer", workspace="ws3") 198 199 assert store.list_accessible_workspace_names(username) == {"ws1"} 200 201 202 def test_list_accessible_workspace_names_combines_legacy_and_role_same_workspace(store): 203 # Overlap: a user has BOTH a legacy READ and a role assignment in the same 204 # workspace. Deduplication should collapse to a single entry. 205 username = random_str() 206 user = store.create_user(username, random_str()) 207 208 store.set_workspace_permission("ws1", username, READ.name) 209 role = store.create_role(name="viewer", workspace="ws1") 210 store.assign_role_to_user(user.id, role.id) 211 212 accessible = store.list_accessible_workspace_names(username) 213 assert accessible == {"ws1"} 214 assert len(accessible) == 1 215 216 217 def test_rename_registered_model_permissions_scoped_by_workspace(store, monkeypatch): 218 monkeypatch.setenv(MLFLOW_ENABLE_WORKSPACES.name, "true") 219 username = random_str() 220 password = random_str() 221 _user_maker(store, username, password) 222 223 with WorkspaceContext("workspace-a"): 224 _rmp_maker(store, "model", username, READ.name) 225 with WorkspaceContext("workspace-b"): 226 _rmp_maker(store, "model", username, READ.name) 227 228 with WorkspaceContext("workspace-a"): 229 store.rename_registered_model_permissions("model", "model-renamed") 230 renamed = store.get_registered_model_permission("model-renamed", username) 231 assert renamed.name == "model-renamed" 232 assert renamed.workspace == "workspace-a" 233 with pytest.raises( 234 MlflowException, 235 match=( 236 "Registered model permission with workspace=workspace-a, name=model and username=" 237 ), 238 ): 239 store.get_registered_model_permission("model", username) 240 241 with WorkspaceContext("workspace-b"): 242 still_original = store.get_registered_model_permission("model", username) 243 assert still_original.name == "model" 244 assert still_original.workspace == "workspace-b" 245 246 247 def test_registered_model_permissions_are_workspace_scoped(store, monkeypatch): 248 monkeypatch.setenv(MLFLOW_ENABLE_WORKSPACES.name, "true") 249 username = random_str() 250 password = random_str() 251 _user_maker(store, username, password) 252 253 model_name = random_str() 254 workspace_alt = f"workspace-{random_str()}" 255 256 with WorkspaceContext(DEFAULT_WORKSPACE_NAME): 257 store.create_registered_model_permission(model_name, username, READ.name) 258 259 with WorkspaceContext(workspace_alt): 260 perm_alt = store.create_registered_model_permission(model_name, username, EDIT.name) 261 assert perm_alt.workspace == workspace_alt 262 263 with WorkspaceContext(DEFAULT_WORKSPACE_NAME): 264 perm_default = store.get_registered_model_permission(model_name, username) 265 assert perm_default.permission == READ.name 266 assert perm_default.workspace == DEFAULT_WORKSPACE_NAME 267 perms_default = store.list_registered_model_permissions(username) 268 assert [p.permission for p in perms_default] == [READ.name] 269 270 with WorkspaceContext(workspace_alt): 271 perm_alt_lookup = store.get_registered_model_permission(model_name, username) 272 assert perm_alt_lookup.permission == EDIT.name 273 assert perm_alt_lookup.workspace == workspace_alt 274 perms_alt = store.list_registered_model_permissions(username) 275 assert [p.permission for p in perms_alt] == [EDIT.name] 276 277 # Switching back to default workspace should not affect alternate workspace permission 278 with WorkspaceContext(DEFAULT_WORKSPACE_NAME): 279 updated = store.update_registered_model_permission(model_name, username, MANAGE.name) 280 assert updated.permission == MANAGE.name 281 assert updated.workspace == DEFAULT_WORKSPACE_NAME 282 283 with WorkspaceContext(workspace_alt): 284 perm_alt_post_update = store.get_registered_model_permission(model_name, username) 285 assert perm_alt_post_update.permission == EDIT.name 286 assert perm_alt_post_update.workspace == workspace_alt 287 288 289 def test_delete_registered_model_permissions_scoped_by_workspace(store, monkeypatch): 290 monkeypatch.setenv(MLFLOW_ENABLE_WORKSPACES.name, "true") 291 username1 = random_str() 292 username2 = random_str() 293 _user_maker(store, username1, random_str()) 294 _user_maker(store, username2, random_str()) 295 296 model_name = random_str() 297 298 with WorkspaceContext("workspace-a"): 299 _rmp_maker(store, model_name, username1, READ.name) 300 _rmp_maker(store, model_name, username2, EDIT.name) 301 302 with WorkspaceContext("workspace-b"): 303 _rmp_maker(store, model_name, username1, MANAGE.name) 304 305 with WorkspaceContext("workspace-a"): 306 store.delete_registered_model_permissions(model_name) 307 with pytest.raises(MlflowException, match="Registered model permission .* not found"): 308 store.get_registered_model_permission(model_name, username1) 309 with pytest.raises(MlflowException, match="Registered model permission .* not found"): 310 store.get_registered_model_permission(model_name, username2) 311 312 with WorkspaceContext("workspace-b"): 313 remaining = store.get_registered_model_permission(model_name, username1) 314 assert remaining.permission == MANAGE.name 315 assert remaining.workspace == "workspace-b"