test_sqlalchemy_store_rbac.py
1 import pytest 2 3 from mlflow.exceptions import MlflowException 4 from mlflow.server.auth.entities import Role, RolePermission, UserRoleAssignment 5 from mlflow.server.auth.permissions import EDIT, MANAGE, READ, USE, VALID_RESOURCE_TYPES 6 7 # Every concrete resource type the resolver accepts, excluding the special 8 # ``"workspace"`` bucket which is exercised separately (it's not a real resource 9 # type — it's the workspace-wide grant form). 10 _CONCRETE_RESOURCE_TYPES = sorted(VALID_RESOURCE_TYPES - {"workspace"}) 11 from mlflow.server.auth.sqlalchemy_store import SqlAlchemyStore 12 13 from tests.helper_functions import random_str 14 15 pytestmark = pytest.mark.notrackingurimock 16 17 18 @pytest.fixture 19 def store(tmp_sqlite_uri): 20 store = SqlAlchemyStore() 21 store.init_db(tmp_sqlite_uri) 22 return store 23 24 25 @pytest.fixture 26 def user(store): 27 return store.create_user(random_str(), random_str()) 28 29 30 @pytest.fixture 31 def user2(store): 32 return store.create_user(random_str(), random_str()) 33 34 35 # ---- Role CRUD ---- 36 37 38 def test_create_role(store): 39 role = store.create_role(name="viewer", workspace="ws1", description="Read-only access") 40 assert isinstance(role, Role) 41 assert role.name == "viewer" 42 assert role.workspace == "ws1" 43 assert role.description == "Read-only access" 44 assert role.permissions == [] 45 46 47 def test_create_role_duplicate(store): 48 store.create_role(name="viewer", workspace="ws1") 49 with pytest.raises(MlflowException, match="already exists"): 50 store.create_role(name="viewer", workspace="ws1") 51 52 53 def test_create_role_same_name_different_workspace(store): 54 r1 = store.create_role(name="viewer", workspace="ws1") 55 r2 = store.create_role(name="viewer", workspace="ws2") 56 assert r1.id != r2.id 57 58 59 def test_get_role(store): 60 created = store.create_role(name="editor", workspace="ws1") 61 fetched = store.get_role(created.id) 62 assert fetched.id == created.id 63 assert fetched.name == "editor" 64 assert fetched.workspace == "ws1" 65 66 67 def test_get_role_not_found(store): 68 with pytest.raises(MlflowException, match="not found"): 69 store.get_role(99999) 70 71 72 def test_get_role_by_name(store): 73 created = store.create_role(name="editor", workspace="ws1") 74 fetched = store.get_role_by_name("ws1", "editor") 75 assert fetched.id == created.id 76 77 78 def test_get_role_by_name_not_found(store): 79 with pytest.raises(MlflowException, match="not found"): 80 store.get_role_by_name("ws1", "nonexistent") 81 82 83 def test_list_roles(store): 84 store.create_role(name="viewer", workspace="ws1") 85 store.create_role(name="editor", workspace="ws1") 86 store.create_role(name="viewer", workspace="ws2") 87 88 ws1_roles = store.list_roles("ws1") 89 assert len(ws1_roles) == 2 90 assert {r.name for r in ws1_roles} == {"viewer", "editor"} 91 92 ws2_roles = store.list_roles("ws2") 93 assert len(ws2_roles) == 1 94 95 96 def test_list_all_roles(store): 97 store.create_role(name="viewer", workspace="ws1") 98 store.create_role(name="editor", workspace="ws2") 99 all_roles = store.list_all_roles() 100 assert len(all_roles) == 2 101 102 103 def test_update_role(store): 104 role = store.create_role(name="old-name", workspace="ws1", description="old desc") 105 updated = store.update_role(role.id, name="new-name", description="new desc") 106 assert updated.name == "new-name" 107 assert updated.description == "new desc" 108 109 110 def test_update_role_name_conflict(store): 111 store.create_role(name="existing", workspace="ws1") 112 role2 = store.create_role(name="other", workspace="ws1") 113 with pytest.raises(MlflowException, match="already exists"): 114 store.update_role(role2.id, name="existing") 115 116 117 def test_delete_role(store): 118 role = store.create_role(name="doomed", workspace="ws1") 119 store.delete_role(role.id) 120 with pytest.raises(MlflowException, match="not found"): 121 store.get_role(role.id) 122 123 124 def test_delete_role_cascades_permissions_and_assignments(store, user): 125 role = store.create_role(name="role1", workspace="ws1") 126 store.add_role_permission(role.id, "experiment", "*", "READ") 127 store.assign_role_to_user(user.id, role.id) 128 129 store.delete_role(role.id) 130 131 # Role no longer exists 132 with pytest.raises(MlflowException, match="not found"): 133 store.get_role(role.id) 134 135 # User no longer has the role 136 assert store.list_user_roles(user.id) == [] 137 138 139 def test_delete_roles_for_workspace(store): 140 store.create_role(name="r1", workspace="ws1") 141 store.create_role(name="r2", workspace="ws1") 142 store.create_role(name="r3", workspace="ws2") 143 144 store.delete_roles_for_workspace("ws1") 145 assert store.list_roles("ws1") == [] 146 assert len(store.list_roles("ws2")) == 1 147 148 149 def test_delete_roles_for_workspace_cascades(store, user): 150 role = store.create_role(name="r1", workspace="ws1") 151 store.add_role_permission(role.id, "experiment", "*", "READ") 152 store.assign_role_to_user(user.id, role.id) 153 154 store.delete_roles_for_workspace("ws1") 155 156 assert store.list_roles("ws1") == [] 157 assert store.list_user_roles(user.id) == [] 158 159 160 # ---- RolePermission CRUD ---- 161 162 163 def test_add_role_permission(store): 164 role = store.create_role(name="viewer", workspace="ws1") 165 rp = store.add_role_permission(role.id, "experiment", "123", "READ") 166 assert isinstance(rp, RolePermission) 167 assert rp.role_id == role.id 168 assert rp.resource_type == "experiment" 169 assert rp.resource_pattern == "123" 170 assert rp.permission == "READ" 171 172 173 def test_add_role_permission_wildcard(store): 174 role = store.create_role(name="viewer", workspace="ws1") 175 rp = store.add_role_permission(role.id, "experiment", "*", "READ") 176 assert rp.resource_pattern == "*" 177 178 179 def test_add_role_permission_duplicate(store): 180 role = store.create_role(name="viewer", workspace="ws1") 181 store.add_role_permission(role.id, "experiment", "123", "READ") 182 with pytest.raises(MlflowException, match="already exists"): 183 store.add_role_permission(role.id, "experiment", "123", "EDIT") 184 185 186 def test_add_role_permission_invalid_permission(store): 187 role = store.create_role(name="viewer", workspace="ws1") 188 with pytest.raises(MlflowException, match="Invalid permission"): 189 store.add_role_permission(role.id, "experiment", "123", "INVALID") 190 191 192 def test_add_role_permission_invalid_resource_type(store): 193 role = store.create_role(name="viewer", workspace="ws1") 194 with pytest.raises(MlflowException, match="Invalid resource type"): 195 store.add_role_permission(role.id, "invalid_type", "123", "READ") 196 197 198 def test_add_role_permission_workspace_requires_wildcard(store): 199 role = store.create_role(name="ws-role", workspace="ws1") 200 with pytest.raises(MlflowException, match="resource_type='workspace' requires"): 201 store.add_role_permission(role.id, "workspace", "42", "MANAGE") 202 203 204 def test_add_role_permission_nonexistent_role(store): 205 with pytest.raises(MlflowException, match="not found"): 206 store.add_role_permission(99999, "experiment", "123", "READ") 207 208 209 def test_remove_role_permission(store): 210 role = store.create_role(name="viewer", workspace="ws1") 211 rp = store.add_role_permission(role.id, "experiment", "123", "READ") 212 store.remove_role_permission(rp.id) 213 assert store.list_role_permissions(role.id) == [] 214 215 216 def test_remove_role_permission_not_found(store): 217 with pytest.raises(MlflowException, match="not found"): 218 store.remove_role_permission(99999) 219 220 221 def test_list_role_permissions(store): 222 role = store.create_role(name="viewer", workspace="ws1") 223 store.add_role_permission(role.id, "experiment", "1", "READ") 224 store.add_role_permission(role.id, "experiment", "2", "EDIT") 225 store.add_role_permission(role.id, "registered_model", "*", "READ") 226 227 perms = store.list_role_permissions(role.id) 228 assert len(perms) == 3 229 230 231 def test_update_role_permission(store): 232 role = store.create_role(name="viewer", workspace="ws1") 233 rp = store.add_role_permission(role.id, "experiment", "123", "READ") 234 updated = store.update_role_permission(rp.id, "EDIT") 235 assert updated.permission == "EDIT" 236 237 238 def test_update_role_permission_not_found(store): 239 with pytest.raises(MlflowException, match="not found"): 240 store.update_role_permission(99999, "READ") 241 242 243 def test_update_role_permission_invalid_permission(store): 244 role = store.create_role(name="viewer", workspace="ws1") 245 rp = store.add_role_permission(role.id, "experiment", "123", "READ") 246 with pytest.raises(MlflowException, match="Invalid permission"): 247 store.update_role_permission(rp.id, "INVALID") 248 249 250 # ---- UserRoleAssignment CRUD ---- 251 252 253 def test_assign_role_to_user(store, user): 254 role = store.create_role(name="viewer", workspace="ws1") 255 assignment = store.assign_role_to_user(user.id, role.id) 256 assert isinstance(assignment, UserRoleAssignment) 257 assert assignment.user_id == user.id 258 assert assignment.role_id == role.id 259 260 261 def test_assign_role_nonexistent_user(store): 262 role = store.create_role(name="viewer", workspace="ws1") 263 with pytest.raises(MlflowException, match="not found"): 264 store.assign_role_to_user(99999, role.id) 265 266 267 def test_assign_role_duplicate(store, user): 268 role = store.create_role(name="viewer", workspace="ws1") 269 store.assign_role_to_user(user.id, role.id) 270 with pytest.raises(MlflowException, match="already exists"): 271 store.assign_role_to_user(user.id, role.id) 272 273 274 def test_unassign_role_from_user(store, user): 275 role = store.create_role(name="viewer", workspace="ws1") 276 store.assign_role_to_user(user.id, role.id) 277 store.unassign_role_from_user(user.id, role.id) 278 assert store.list_user_roles(user.id) == [] 279 280 281 def test_unassign_role_not_found(store, user): 282 with pytest.raises(MlflowException, match="not found"): 283 store.unassign_role_from_user(user.id, 99999) 284 285 286 def test_list_user_roles(store, user): 287 r1 = store.create_role(name="viewer", workspace="ws1") 288 r2 = store.create_role(name="editor", workspace="ws2") 289 store.assign_role_to_user(user.id, r1.id) 290 store.assign_role_to_user(user.id, r2.id) 291 292 roles = store.list_user_roles(user.id) 293 assert len(roles) == 2 294 assert {r.name for r in roles} == {"viewer", "editor"} 295 296 297 def test_list_user_roles_for_workspace(store, user): 298 r1 = store.create_role(name="viewer", workspace="ws1") 299 r2 = store.create_role(name="editor", workspace="ws1") 300 r3 = store.create_role(name="viewer", workspace="ws2") 301 store.assign_role_to_user(user.id, r1.id) 302 store.assign_role_to_user(user.id, r2.id) 303 store.assign_role_to_user(user.id, r3.id) 304 305 ws1_roles = store.list_user_roles_for_workspace(user.id, "ws1") 306 assert len(ws1_roles) == 2 307 308 ws2_roles = store.list_user_roles_for_workspace(user.id, "ws2") 309 assert len(ws2_roles) == 1 310 311 312 def test_list_role_users(store, user, user2): 313 role = store.create_role(name="viewer", workspace="ws1") 314 store.assign_role_to_user(user.id, role.id) 315 store.assign_role_to_user(user2.id, role.id) 316 317 users = store.list_role_users(role.id) 318 assert len(users) == 2 319 assert {u.user_id for u in users} == {user.id, user2.id} 320 321 322 # ---- Role-based permission resolution ---- 323 324 325 def test_get_role_permission_no_roles(store, user): 326 result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") 327 assert result is None 328 329 330 def test_get_role_permission_specific_match(store, user): 331 role = store.create_role(name="viewer", workspace="ws1") 332 store.add_role_permission(role.id, "experiment", "1", "READ") 333 store.assign_role_to_user(user.id, role.id) 334 335 result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") 336 assert result == READ 337 338 339 def test_get_role_permission_no_match(store, user): 340 role = store.create_role(name="viewer", workspace="ws1") 341 store.add_role_permission(role.id, "experiment", "1", "READ") 342 store.assign_role_to_user(user.id, role.id) 343 344 result = store.get_role_permission_for_resource(user.id, "experiment", "999", "ws1") 345 assert result is None 346 347 348 def test_get_role_permission_wildcard_match(store, user): 349 role = store.create_role(name="viewer", workspace="ws1") 350 store.add_role_permission(role.id, "experiment", "*", "EDIT") 351 store.assign_role_to_user(user.id, role.id) 352 353 result = store.get_role_permission_for_resource(user.id, "experiment", "any-id", "ws1") 354 assert result == EDIT 355 356 357 def test_get_role_permission_union_of_multiple_roles(store, user): 358 r1 = store.create_role(name="viewer", workspace="ws1") 359 store.add_role_permission(r1.id, "experiment", "1", "READ") 360 store.assign_role_to_user(user.id, r1.id) 361 362 r2 = store.create_role(name="editor", workspace="ws1") 363 store.add_role_permission(r2.id, "experiment", "1", "EDIT") 364 store.assign_role_to_user(user.id, r2.id) 365 366 result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") 367 assert result == EDIT 368 369 370 def test_get_role_permission_wildcard_and_specific_union(store, user): 371 role = store.create_role(name="mixed", workspace="ws1") 372 store.add_role_permission(role.id, "experiment", "*", "READ") 373 store.add_role_permission(role.id, "experiment", "1", "EDIT") 374 store.assign_role_to_user(user.id, role.id) 375 376 # Experiment 1 gets EDIT (higher of READ wildcard and EDIT specific) 377 result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") 378 assert result == EDIT 379 380 # Other experiments get READ from wildcard 381 result = store.get_role_permission_for_resource(user.id, "experiment", "999", "ws1") 382 assert result == READ 383 384 385 def test_get_role_permission_workspace_admin(store, user): 386 role = store.create_role(name="ws-admin", workspace="ws1") 387 store.add_role_permission(role.id, "workspace", "*", "MANAGE") 388 store.assign_role_to_user(user.id, role.id) 389 390 # Workspace-wide MANAGE applies to any resource type. 391 result = store.get_role_permission_for_resource(user.id, "experiment", "any-id", "ws1") 392 assert result == MANAGE 393 394 result = store.get_role_permission_for_resource(user.id, "registered_model", "m1", "ws1") 395 assert result == MANAGE 396 397 398 def test_workspace_permission_applies_across_resource_types(store, user): 399 role = store.create_role(name="reader", workspace="ws1") 400 store.add_role_permission(role.id, "workspace", "*", "READ") 401 store.assign_role_to_user(user.id, role.id) 402 403 # READ at the workspace level grants READ on every resource type in the workspace. 404 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == READ 405 assert store.get_role_permission_for_resource(user.id, "registered_model", "m1", "ws1") == READ 406 assert store.get_role_permission_for_resource(user.id, "gateway_endpoint", "e1", "ws1") == READ 407 408 409 def test_workspace_permission_respects_union_with_specific(store, user): 410 role = store.create_role(name="mixed", workspace="ws1") 411 store.add_role_permission(role.id, "workspace", "*", "READ") 412 store.add_role_permission(role.id, "experiment", "42", "EDIT") 413 store.assign_role_to_user(user.id, role.id) 414 415 # Experiment 42: max(workspace READ, specific EDIT) = EDIT 416 assert store.get_role_permission_for_resource(user.id, "experiment", "42", "ws1") == EDIT 417 # Other experiments: just workspace READ 418 assert store.get_role_permission_for_resource(user.id, "experiment", "99", "ws1") == READ 419 420 421 def test_is_workspace_admin(store, user): 422 role = store.create_role(name="ws-admin", workspace="ws1") 423 store.add_role_permission(role.id, "workspace", "*", "MANAGE") 424 store.assign_role_to_user(user.id, role.id) 425 426 assert store.is_workspace_admin(user.id, "ws1") is True 427 assert store.is_workspace_admin(user.id, "ws2") is False 428 429 430 def test_is_workspace_admin_requires_manage(store, user): 431 # A non-MANAGE workspace permission does not make the user a WP admin. 432 role = store.create_role(name="ws-reader", workspace="ws1") 433 store.add_role_permission(role.id, "workspace", "*", "READ") 434 store.assign_role_to_user(user.id, role.id) 435 436 assert store.is_workspace_admin(user.id, "ws1") is False 437 438 439 def test_list_role_grants_for_user_in_workspace(store, user): 440 # Role with specific + wildcard experiment grants + workspace-wide grant. 441 role = store.create_role(name="multi", workspace="ws1") 442 store.add_role_permission(role.id, "experiment", "42", "EDIT") 443 store.add_role_permission(role.id, "experiment", "*", "READ") 444 store.add_role_permission(role.id, "workspace", "*", "READ") 445 # Unrelated grant on another resource type. 446 store.add_role_permission(role.id, "registered_model", "*", "MANAGE") 447 store.assign_role_to_user(user.id, role.id) 448 449 grants = store.list_role_grants_for_user_in_workspace(user.id, "ws1", "experiment") 450 # Should include specific experiment grant, wildcard experiment grant, 451 # and the workspace-wide grant. Should NOT include the registered_model grant. 452 assert sorted(grants) == sorted([("42", "EDIT"), ("*", "READ"), ("*", "READ")]) 453 454 455 def test_list_role_grants_for_user_in_workspace_cross_workspace(store, user): 456 # Grants in ws2 should not surface when querying ws1. 457 role = store.create_role(name="other-ws", workspace="ws2") 458 store.add_role_permission(role.id, "experiment", "99", "EDIT") 459 store.assign_role_to_user(user.id, role.id) 460 461 assert store.list_role_grants_for_user_in_workspace(user.id, "ws1", "experiment") == [] 462 463 464 def test_list_role_grants_for_user_in_workspace_no_roles(store, user): 465 assert store.list_role_grants_for_user_in_workspace(user.id, "ws1", "experiment") == [] 466 467 468 def test_list_role_grants_for_user_in_workspace_rejects_invalid_resource_type(store, user): 469 with pytest.raises(MlflowException, match="Invalid resource type"): 470 store.list_role_grants_for_user_in_workspace(user.id, "ws1", "not_a_type") 471 472 473 def test_list_workspace_admin_workspaces(store, user): 474 # WP admin in ws1 + ws3, regular member in ws2. 475 admin_ws1 = store.create_role(name="wa1", workspace="ws1") 476 store.add_role_permission(admin_ws1.id, "workspace", "*", "MANAGE") 477 store.assign_role_to_user(user.id, admin_ws1.id) 478 admin_ws3 = store.create_role(name="wa3", workspace="ws3") 479 store.add_role_permission(admin_ws3.id, "workspace", "*", "MANAGE") 480 store.assign_role_to_user(user.id, admin_ws3.id) 481 member_ws2 = store.create_role(name="mem", workspace="ws2") 482 store.add_role_permission(member_ws2.id, "experiment", "*", "READ") 483 store.assign_role_to_user(user.id, member_ws2.id) 484 485 assert store.list_workspace_admin_workspaces(user.id) == {"ws1", "ws3"} 486 487 488 def test_list_workspace_admin_workspaces_ignores_non_manage(store, user): 489 # A workspace-scope grant with a non-MANAGE permission should not count. 490 role = store.create_role(name="reader", workspace="ws1") 491 store.add_role_permission(role.id, "workspace", "*", "READ") 492 store.assign_role_to_user(user.id, role.id) 493 494 assert store.list_workspace_admin_workspaces(user.id) == set() 495 496 497 # ---- Resolver coverage: cross-workspace isolation, NO_PERMISSIONS, resource types ---- 498 499 500 def test_resolver_cross_workspace_isolation(store, user): 501 """A role scoped to ws1 must not grant anything when resolving in ws2, 502 even if the user has the role assigned and the permission pattern matches. 503 """ 504 role = store.create_role(name="editor", workspace="ws1") 505 store.add_role_permission(role.id, "experiment", "*", "EDIT") 506 store.assign_role_to_user(user.id, role.id) 507 508 # ws1: resolver finds the role and returns EDIT. 509 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == EDIT 510 # ws2: no role tied to ws2 for this user — resolver returns None. 511 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws2") is None 512 513 514 def test_resolver_role_assignment_in_other_workspace_doesnt_leak(store, user): 515 r_ws1 = store.create_role(name="ws1-editor", workspace="ws1") 516 store.add_role_permission(r_ws1.id, "experiment", "*", "EDIT") 517 store.assign_role_to_user(user.id, r_ws1.id) 518 519 r_ws2 = store.create_role(name="ws2-reader", workspace="ws2") 520 store.add_role_permission(r_ws2.id, "experiment", "*", "READ") 521 store.assign_role_to_user(user.id, r_ws2.id) 522 523 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == EDIT 524 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws2") == READ 525 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws3") is None 526 527 528 def test_resolver_returns_no_permissions_when_role_only_has_no_permissions(store, user): 529 """If a user's only grant is NO_PERMISSIONS, ``get_role_permission_for_resource`` 530 returns that permission object — not ``None``. Documents the distinction at the 531 store layer between 'no role grant found' (None) and 'role grant resolved to an 532 explicit NO_PERMISSIONS' without implying different fallback behavior in the 533 outer resolver — at that layer (_get_permission_from_store_or_default) both 534 cases fall through to the direct grant / workspace / default chain identically. 535 """ 536 role = store.create_role(name="locked", workspace="ws1") 537 store.add_role_permission(role.id, "experiment", "*", "NO_PERMISSIONS") 538 store.assign_role_to_user(user.id, role.id) 539 540 result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") 541 assert result is not None 542 assert result.name == "NO_PERMISSIONS" 543 544 545 def test_resolver_no_permissions_loses_to_any_positive_grant(store, user): 546 """When a user has both NO_PERMISSIONS and a positive grant (from different 547 roles or the same role), the positive grant wins. Reflects the 548 ``max_permission`` policy where explicit grants outrank explicit denies. 549 """ 550 r_deny = store.create_role(name="deny", workspace="ws1") 551 store.add_role_permission(r_deny.id, "experiment", "*", "NO_PERMISSIONS") 552 store.assign_role_to_user(user.id, r_deny.id) 553 554 r_read = store.create_role(name="reader", workspace="ws1") 555 store.add_role_permission(r_read.id, "experiment", "*", "READ") 556 store.assign_role_to_user(user.id, r_read.id) 557 558 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == READ 559 560 561 def test_resolver_unassigned_role_doesnt_grant(store, user): 562 """A role in the workspace with the right permissions doesn't help if the 563 user isn't assigned to it. 564 """ 565 role = store.create_role(name="editor", workspace="ws1") 566 store.add_role_permission(role.id, "experiment", "*", "EDIT") 567 # Intentionally skip assign_role_to_user. 568 569 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") is None 570 571 572 def test_resolver_resource_type_filter(store, user): 573 """A grant on resource_type=registered_model does not satisfy an 574 experiment lookup (and vice versa). Only the ``workspace`` resource type 575 promotes across all types. 576 """ 577 role = store.create_role(name="models-only", workspace="ws1") 578 store.add_role_permission(role.id, "registered_model", "*", "EDIT") 579 store.assign_role_to_user(user.id, role.id) 580 581 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") is None 582 assert store.get_role_permission_for_resource(user.id, "registered_model", "m1", "ws1") == EDIT 583 584 585 # ---- Resolver coverage: permission hierarchy matrix ---- 586 587 588 @pytest.mark.parametrize("resource_type", _CONCRETE_RESOURCE_TYPES) 589 @pytest.mark.parametrize( 590 ("granted", "expected"), 591 [ 592 ("READ", READ), 593 ("USE", USE), 594 ("EDIT", EDIT), 595 ("MANAGE", MANAGE), 596 ], 597 ) 598 def test_resolver_returns_granted_permission_for_each_resource_type( 599 store, user, resource_type, granted, expected 600 ): 601 """For each (resource_type, granted_permission) pair, resolving the user's 602 permission on a specific resource of that type returns exactly the granted 603 permission. This ensures the resolver applies uniformly across every 604 resource type the system knows about. 605 """ 606 role = store.create_role(name=f"{resource_type}-{granted}", workspace="ws1") 607 store.add_role_permission(role.id, resource_type, "*", granted) 608 store.assign_role_to_user(user.id, role.id) 609 610 assert store.get_role_permission_for_resource(user.id, resource_type, "id", "ws1") == expected 611 612 613 @pytest.mark.parametrize("resource_type", _CONCRETE_RESOURCE_TYPES) 614 def test_resolver_workspace_grant_promotes_to_every_resource_type(store, user, resource_type): 615 """``(workspace, *, MANAGE)`` should grant MANAGE on every known resource 616 type in the role's workspace. This is the workspace-admin short-circuit — 617 if it regresses, workspace admins silently lose authority over specific 618 resource types. 619 """ 620 role = store.create_role(name="ws-admin", workspace="ws1") 621 store.add_role_permission(role.id, "workspace", "*", "MANAGE") 622 store.assign_role_to_user(user.id, role.id) 623 624 assert store.get_role_permission_for_resource(user.id, resource_type, "any-id", "ws1") == MANAGE 625 626 627 @pytest.mark.parametrize( 628 ("granted", "expected"), 629 [ 630 ("READ", READ), 631 ("USE", USE), 632 ("EDIT", EDIT), 633 ("MANAGE", MANAGE), 634 ], 635 ) 636 def test_resolver_workspace_grant_propagates_at_every_level(store, user, granted, expected): 637 """``(workspace, *, X)`` where X ∈ {READ, USE, EDIT, MANAGE} promotes X to 638 every resource type — not just MANAGE. This ensures workspace-wide grants 639 work as a blanket baseline permission, which the UI relies on (e.g. the 640 seeded ``viewer`` and ``editor`` roles use this form). 641 """ 642 role = store.create_role(name=f"ws-{granted}", workspace="ws1") 643 store.add_role_permission(role.id, "workspace", "*", granted) 644 store.assign_role_to_user(user.id, role.id) 645 646 for resource_type in _CONCRETE_RESOURCE_TYPES: 647 assert ( 648 store.get_role_permission_for_resource(user.id, resource_type, "id", "ws1") == expected 649 ) 650 651 652 def test_resolver_workspace_grant_scoped_to_role_workspace(store, user): 653 """A workspace-wide grant in ws1 has no effect when resolving in ws2 — 654 the role's workspace scopes the grant. 655 """ 656 role = store.create_role(name="ws1-admin", workspace="ws1") 657 store.add_role_permission(role.id, "workspace", "*", "MANAGE") 658 store.assign_role_to_user(user.id, role.id) 659 660 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == MANAGE 661 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws2") is None 662 663 664 # ---- Resolver coverage: pattern matching completeness ---- 665 666 667 def test_resolver_specific_pattern_does_not_apply_to_different_id(store, user): 668 role = store.create_role(name="e42-editor", workspace="ws1") 669 store.add_role_permission(role.id, "experiment", "42", "EDIT") 670 store.assign_role_to_user(user.id, role.id) 671 672 assert store.get_role_permission_for_resource(user.id, "experiment", "42", "ws1") == EDIT 673 assert store.get_role_permission_for_resource(user.id, "experiment", "99", "ws1") is None 674 675 676 def test_resolver_wildcard_applies_to_any_id(store, user): 677 role = store.create_role(name="any-experiment", workspace="ws1") 678 store.add_role_permission(role.id, "experiment", "*", "READ") 679 store.assign_role_to_user(user.id, role.id) 680 681 for eid in ["1", "42", "long-uuid-6a4c"]: 682 assert store.get_role_permission_for_resource(user.id, "experiment", eid, "ws1") == READ 683 684 685 def test_resolver_specific_outranks_wildcard_when_higher(store, user): 686 # Specific grant > wildcard grant → specific wins. 687 role = store.create_role(name="mixed", workspace="ws1") 688 store.add_role_permission(role.id, "experiment", "*", "READ") 689 store.add_role_permission(role.id, "experiment", "42", "EDIT") 690 store.assign_role_to_user(user.id, role.id) 691 692 assert store.get_role_permission_for_resource(user.id, "experiment", "42", "ws1") == EDIT 693 assert store.get_role_permission_for_resource(user.id, "experiment", "99", "ws1") == READ 694 695 696 def test_resolver_wildcard_outranks_specific_when_higher(store, user): 697 """Wildcard grant > specific grant → wildcard wins (best grant policy, 698 not "most specific wins"). This prevents an operator from accidentally 699 *downgrading* a user's access by adding a narrower grant with a lower 700 permission level. 701 """ 702 role = store.create_role(name="mixed", workspace="ws1") 703 store.add_role_permission(role.id, "experiment", "*", "EDIT") 704 store.add_role_permission(role.id, "experiment", "42", "READ") 705 store.assign_role_to_user(user.id, role.id) 706 707 assert store.get_role_permission_for_resource(user.id, "experiment", "42", "ws1") == EDIT 708 assert store.get_role_permission_for_resource(user.id, "experiment", "99", "ws1") == EDIT 709 710 711 # ---- Resolver coverage: multi-role union ---- 712 713 714 def test_resolver_union_picks_max_across_roles(store, user): 715 # Permissions union across all roles assigned to the user — max wins. 716 r1 = store.create_role(name="r1", workspace="ws1") 717 store.add_role_permission(r1.id, "experiment", "*", "READ") 718 store.assign_role_to_user(user.id, r1.id) 719 720 r2 = store.create_role(name="r2", workspace="ws1") 721 store.add_role_permission(r2.id, "experiment", "*", "USE") 722 store.assign_role_to_user(user.id, r2.id) 723 724 r3 = store.create_role(name="r3", workspace="ws1") 725 store.add_role_permission(r3.id, "experiment", "*", "MANAGE") 726 store.assign_role_to_user(user.id, r3.id) 727 728 assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == MANAGE 729 730 731 def test_resolver_union_mixes_workspace_and_resource_grants(store, user): 732 """A workspace-wide EDIT + a specific experiment READ → resolver still 733 surfaces EDIT for that experiment, because the workspace grant already 734 covers it. Specific grants only promote, never downgrade. 735 """ 736 r_ws = store.create_role(name="ws-editor", workspace="ws1") 737 store.add_role_permission(r_ws.id, "workspace", "*", "EDIT") 738 store.assign_role_to_user(user.id, r_ws.id) 739 740 r_specific = store.create_role(name="one-reader", workspace="ws1") 741 store.add_role_permission(r_specific.id, "experiment", "42", "READ") 742 store.assign_role_to_user(user.id, r_specific.id) 743 744 assert store.get_role_permission_for_resource(user.id, "experiment", "42", "ws1") == EDIT 745 746 747 # ---- Legacy workspace_permissions as workspace-admin source ---- 748 # 749 # Pre-RBAC operators relied on `workspace_permissions` MANAGE to convey workspace-wide 750 # admin authority. The workspace-admin helpers must still recognize that grant, 751 # otherwise operators mid-migration (or just not yet using roles) silently lose admin 752 # status behind RBAC-aware validators. 753 754 755 def test_is_workspace_admin_honors_legacy_workspace_permissions(store, user): 756 store.set_workspace_permission("ws1", user.username, "MANAGE") 757 758 assert store.is_workspace_admin(user.id, "ws1") is True 759 assert store.is_workspace_admin(user.id, "ws2") is False 760 761 762 def test_is_workspace_admin_ignores_non_manage_legacy(store, user): 763 store.set_workspace_permission("ws1", user.username, "READ") 764 765 assert store.is_workspace_admin(user.id, "ws1") is False 766 767 768 def test_list_workspace_admin_workspaces_unions_role_and_legacy(store, user): 769 # Role admin in ws1, legacy MANAGE in ws2, legacy READ in ws3 (should not count). 770 role = store.create_role(name="wa1", workspace="ws1") 771 store.add_role_permission(role.id, "workspace", "*", "MANAGE") 772 store.assign_role_to_user(user.id, role.id) 773 store.set_workspace_permission("ws2", user.username, "MANAGE") 774 store.set_workspace_permission("ws3", user.username, "READ") 775 776 assert store.list_workspace_admin_workspaces(user.id) == {"ws1", "ws2"} 777 778 779 def test_is_workspace_admin_of_any_of_users_workspaces_legacy_admin(store, user, user2): 780 # Admin authority via legacy, target presence via role. 781 store.set_workspace_permission("ws1", user.username, "MANAGE") 782 target_role = store.create_role(name="member", workspace="ws1") 783 store.add_role_permission(target_role.id, "experiment", "*", "READ") 784 store.assign_role_to_user(user2.id, target_role.id) 785 786 assert store.is_workspace_admin_of_any_of_users_workspaces(user.id, user2.id) is True 787 788 789 def test_is_workspace_admin_of_any_of_users_workspaces_legacy_target(store, user, user2): 790 # Admin authority via role, target presence via legacy. 791 admin_role = store.create_role(name="wa", workspace="ws1") 792 store.add_role_permission(admin_role.id, "workspace", "*", "MANAGE") 793 store.assign_role_to_user(user.id, admin_role.id) 794 store.set_workspace_permission("ws1", user2.username, "READ") 795 796 assert store.is_workspace_admin_of_any_of_users_workspaces(user.id, user2.id) is True 797 798 799 def test_is_workspace_admin_of_any_of_users_workspaces_no_overlap(store, user, user2): 800 # Admin in ws1, target present only in ws2 → no intersection. 801 store.set_workspace_permission("ws1", user.username, "MANAGE") 802 store.set_workspace_permission("ws2", user2.username, "READ") 803 804 assert store.is_workspace_admin_of_any_of_users_workspaces(user.id, user2.id) is False 805 806 807 def test_get_role_permission_does_not_cross_workspace(store, user): 808 role = store.create_role(name="viewer", workspace="ws1") 809 store.add_role_permission(role.id, "experiment", "*", "READ") 810 store.assign_role_to_user(user.id, role.id) 811 812 # Should not apply in ws2 813 result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws2") 814 assert result is None 815 816 817 def test_get_role_permission_different_resource_types(store, user): 818 role = store.create_role(name="viewer", workspace="ws1") 819 store.add_role_permission(role.id, "experiment", "*", "READ") 820 store.assign_role_to_user(user.id, role.id) 821 822 # Should not match registered_model 823 result = store.get_role_permission_for_resource(user.id, "registered_model", "m1", "ws1") 824 assert result is None 825 826 # Should match experiment 827 result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") 828 assert result == READ 829 830 831 @pytest.mark.parametrize( 832 ("perms", "expected"), 833 [ 834 ([("experiment", "1", "READ"), ("experiment", "1", "MANAGE")], MANAGE), 835 ([("experiment", "1", "USE"), ("experiment", "1", "EDIT")], EDIT), 836 ([("experiment", "*", "READ"), ("experiment", "1", "USE")], USE), 837 ], 838 ) 839 def test_get_role_permission_picks_highest(store, user, perms, expected): 840 for i, (rtype, pattern, perm) in enumerate(perms): 841 role = store.create_role(name=f"role-{i}", workspace="ws1") 842 store.add_role_permission(role.id, rtype, pattern, perm) 843 store.assign_role_to_user(user.id, role.id) 844 845 result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") 846 assert result == expected