/ tests / server / auth / test_sqlalchemy_store_rbac.py
test_sqlalchemy_store_rbac.py
  1  import pytest
  2  
  3  from mlflow.exceptions import MlflowException
  4  from mlflow.server.auth.entities import Role, RolePermission, UserRoleAssignment
  5  from mlflow.server.auth.permissions import EDIT, MANAGE, READ, USE, VALID_RESOURCE_TYPES
  6  
  7  # Every concrete resource type the resolver accepts, excluding the special
  8  # ``"workspace"`` bucket which is exercised separately (it's not a real resource
  9  # type — it's the workspace-wide grant form).
 10  _CONCRETE_RESOURCE_TYPES = sorted(VALID_RESOURCE_TYPES - {"workspace"})
 11  from mlflow.server.auth.sqlalchemy_store import SqlAlchemyStore
 12  
 13  from tests.helper_functions import random_str
 14  
 15  pytestmark = pytest.mark.notrackingurimock
 16  
 17  
 18  @pytest.fixture
 19  def store(tmp_sqlite_uri):
 20      store = SqlAlchemyStore()
 21      store.init_db(tmp_sqlite_uri)
 22      return store
 23  
 24  
 25  @pytest.fixture
 26  def user(store):
 27      return store.create_user(random_str(), random_str())
 28  
 29  
 30  @pytest.fixture
 31  def user2(store):
 32      return store.create_user(random_str(), random_str())
 33  
 34  
 35  # ---- Role CRUD ----
 36  
 37  
 38  def test_create_role(store):
 39      role = store.create_role(name="viewer", workspace="ws1", description="Read-only access")
 40      assert isinstance(role, Role)
 41      assert role.name == "viewer"
 42      assert role.workspace == "ws1"
 43      assert role.description == "Read-only access"
 44      assert role.permissions == []
 45  
 46  
 47  def test_create_role_duplicate(store):
 48      store.create_role(name="viewer", workspace="ws1")
 49      with pytest.raises(MlflowException, match="already exists"):
 50          store.create_role(name="viewer", workspace="ws1")
 51  
 52  
 53  def test_create_role_same_name_different_workspace(store):
 54      r1 = store.create_role(name="viewer", workspace="ws1")
 55      r2 = store.create_role(name="viewer", workspace="ws2")
 56      assert r1.id != r2.id
 57  
 58  
 59  def test_get_role(store):
 60      created = store.create_role(name="editor", workspace="ws1")
 61      fetched = store.get_role(created.id)
 62      assert fetched.id == created.id
 63      assert fetched.name == "editor"
 64      assert fetched.workspace == "ws1"
 65  
 66  
 67  def test_get_role_not_found(store):
 68      with pytest.raises(MlflowException, match="not found"):
 69          store.get_role(99999)
 70  
 71  
 72  def test_get_role_by_name(store):
 73      created = store.create_role(name="editor", workspace="ws1")
 74      fetched = store.get_role_by_name("ws1", "editor")
 75      assert fetched.id == created.id
 76  
 77  
 78  def test_get_role_by_name_not_found(store):
 79      with pytest.raises(MlflowException, match="not found"):
 80          store.get_role_by_name("ws1", "nonexistent")
 81  
 82  
 83  def test_list_roles(store):
 84      store.create_role(name="viewer", workspace="ws1")
 85      store.create_role(name="editor", workspace="ws1")
 86      store.create_role(name="viewer", workspace="ws2")
 87  
 88      ws1_roles = store.list_roles("ws1")
 89      assert len(ws1_roles) == 2
 90      assert {r.name for r in ws1_roles} == {"viewer", "editor"}
 91  
 92      ws2_roles = store.list_roles("ws2")
 93      assert len(ws2_roles) == 1
 94  
 95  
 96  def test_list_all_roles(store):
 97      store.create_role(name="viewer", workspace="ws1")
 98      store.create_role(name="editor", workspace="ws2")
 99      all_roles = store.list_all_roles()
100      assert len(all_roles) == 2
101  
102  
103  def test_update_role(store):
104      role = store.create_role(name="old-name", workspace="ws1", description="old desc")
105      updated = store.update_role(role.id, name="new-name", description="new desc")
106      assert updated.name == "new-name"
107      assert updated.description == "new desc"
108  
109  
110  def test_update_role_name_conflict(store):
111      store.create_role(name="existing", workspace="ws1")
112      role2 = store.create_role(name="other", workspace="ws1")
113      with pytest.raises(MlflowException, match="already exists"):
114          store.update_role(role2.id, name="existing")
115  
116  
117  def test_delete_role(store):
118      role = store.create_role(name="doomed", workspace="ws1")
119      store.delete_role(role.id)
120      with pytest.raises(MlflowException, match="not found"):
121          store.get_role(role.id)
122  
123  
124  def test_delete_role_cascades_permissions_and_assignments(store, user):
125      role = store.create_role(name="role1", workspace="ws1")
126      store.add_role_permission(role.id, "experiment", "*", "READ")
127      store.assign_role_to_user(user.id, role.id)
128  
129      store.delete_role(role.id)
130  
131      # Role no longer exists
132      with pytest.raises(MlflowException, match="not found"):
133          store.get_role(role.id)
134  
135      # User no longer has the role
136      assert store.list_user_roles(user.id) == []
137  
138  
139  def test_delete_roles_for_workspace(store):
140      store.create_role(name="r1", workspace="ws1")
141      store.create_role(name="r2", workspace="ws1")
142      store.create_role(name="r3", workspace="ws2")
143  
144      store.delete_roles_for_workspace("ws1")
145      assert store.list_roles("ws1") == []
146      assert len(store.list_roles("ws2")) == 1
147  
148  
149  def test_delete_roles_for_workspace_cascades(store, user):
150      role = store.create_role(name="r1", workspace="ws1")
151      store.add_role_permission(role.id, "experiment", "*", "READ")
152      store.assign_role_to_user(user.id, role.id)
153  
154      store.delete_roles_for_workspace("ws1")
155  
156      assert store.list_roles("ws1") == []
157      assert store.list_user_roles(user.id) == []
158  
159  
160  # ---- RolePermission CRUD ----
161  
162  
163  def test_add_role_permission(store):
164      role = store.create_role(name="viewer", workspace="ws1")
165      rp = store.add_role_permission(role.id, "experiment", "123", "READ")
166      assert isinstance(rp, RolePermission)
167      assert rp.role_id == role.id
168      assert rp.resource_type == "experiment"
169      assert rp.resource_pattern == "123"
170      assert rp.permission == "READ"
171  
172  
173  def test_add_role_permission_wildcard(store):
174      role = store.create_role(name="viewer", workspace="ws1")
175      rp = store.add_role_permission(role.id, "experiment", "*", "READ")
176      assert rp.resource_pattern == "*"
177  
178  
179  def test_add_role_permission_duplicate(store):
180      role = store.create_role(name="viewer", workspace="ws1")
181      store.add_role_permission(role.id, "experiment", "123", "READ")
182      with pytest.raises(MlflowException, match="already exists"):
183          store.add_role_permission(role.id, "experiment", "123", "EDIT")
184  
185  
186  def test_add_role_permission_invalid_permission(store):
187      role = store.create_role(name="viewer", workspace="ws1")
188      with pytest.raises(MlflowException, match="Invalid permission"):
189          store.add_role_permission(role.id, "experiment", "123", "INVALID")
190  
191  
192  def test_add_role_permission_invalid_resource_type(store):
193      role = store.create_role(name="viewer", workspace="ws1")
194      with pytest.raises(MlflowException, match="Invalid resource type"):
195          store.add_role_permission(role.id, "invalid_type", "123", "READ")
196  
197  
198  def test_add_role_permission_workspace_requires_wildcard(store):
199      role = store.create_role(name="ws-role", workspace="ws1")
200      with pytest.raises(MlflowException, match="resource_type='workspace' requires"):
201          store.add_role_permission(role.id, "workspace", "42", "MANAGE")
202  
203  
204  def test_add_role_permission_nonexistent_role(store):
205      with pytest.raises(MlflowException, match="not found"):
206          store.add_role_permission(99999, "experiment", "123", "READ")
207  
208  
209  def test_remove_role_permission(store):
210      role = store.create_role(name="viewer", workspace="ws1")
211      rp = store.add_role_permission(role.id, "experiment", "123", "READ")
212      store.remove_role_permission(rp.id)
213      assert store.list_role_permissions(role.id) == []
214  
215  
216  def test_remove_role_permission_not_found(store):
217      with pytest.raises(MlflowException, match="not found"):
218          store.remove_role_permission(99999)
219  
220  
221  def test_list_role_permissions(store):
222      role = store.create_role(name="viewer", workspace="ws1")
223      store.add_role_permission(role.id, "experiment", "1", "READ")
224      store.add_role_permission(role.id, "experiment", "2", "EDIT")
225      store.add_role_permission(role.id, "registered_model", "*", "READ")
226  
227      perms = store.list_role_permissions(role.id)
228      assert len(perms) == 3
229  
230  
231  def test_update_role_permission(store):
232      role = store.create_role(name="viewer", workspace="ws1")
233      rp = store.add_role_permission(role.id, "experiment", "123", "READ")
234      updated = store.update_role_permission(rp.id, "EDIT")
235      assert updated.permission == "EDIT"
236  
237  
238  def test_update_role_permission_not_found(store):
239      with pytest.raises(MlflowException, match="not found"):
240          store.update_role_permission(99999, "READ")
241  
242  
243  def test_update_role_permission_invalid_permission(store):
244      role = store.create_role(name="viewer", workspace="ws1")
245      rp = store.add_role_permission(role.id, "experiment", "123", "READ")
246      with pytest.raises(MlflowException, match="Invalid permission"):
247          store.update_role_permission(rp.id, "INVALID")
248  
249  
250  # ---- UserRoleAssignment CRUD ----
251  
252  
253  def test_assign_role_to_user(store, user):
254      role = store.create_role(name="viewer", workspace="ws1")
255      assignment = store.assign_role_to_user(user.id, role.id)
256      assert isinstance(assignment, UserRoleAssignment)
257      assert assignment.user_id == user.id
258      assert assignment.role_id == role.id
259  
260  
261  def test_assign_role_nonexistent_user(store):
262      role = store.create_role(name="viewer", workspace="ws1")
263      with pytest.raises(MlflowException, match="not found"):
264          store.assign_role_to_user(99999, role.id)
265  
266  
267  def test_assign_role_duplicate(store, user):
268      role = store.create_role(name="viewer", workspace="ws1")
269      store.assign_role_to_user(user.id, role.id)
270      with pytest.raises(MlflowException, match="already exists"):
271          store.assign_role_to_user(user.id, role.id)
272  
273  
274  def test_unassign_role_from_user(store, user):
275      role = store.create_role(name="viewer", workspace="ws1")
276      store.assign_role_to_user(user.id, role.id)
277      store.unassign_role_from_user(user.id, role.id)
278      assert store.list_user_roles(user.id) == []
279  
280  
281  def test_unassign_role_not_found(store, user):
282      with pytest.raises(MlflowException, match="not found"):
283          store.unassign_role_from_user(user.id, 99999)
284  
285  
286  def test_list_user_roles(store, user):
287      r1 = store.create_role(name="viewer", workspace="ws1")
288      r2 = store.create_role(name="editor", workspace="ws2")
289      store.assign_role_to_user(user.id, r1.id)
290      store.assign_role_to_user(user.id, r2.id)
291  
292      roles = store.list_user_roles(user.id)
293      assert len(roles) == 2
294      assert {r.name for r in roles} == {"viewer", "editor"}
295  
296  
297  def test_list_user_roles_for_workspace(store, user):
298      r1 = store.create_role(name="viewer", workspace="ws1")
299      r2 = store.create_role(name="editor", workspace="ws1")
300      r3 = store.create_role(name="viewer", workspace="ws2")
301      store.assign_role_to_user(user.id, r1.id)
302      store.assign_role_to_user(user.id, r2.id)
303      store.assign_role_to_user(user.id, r3.id)
304  
305      ws1_roles = store.list_user_roles_for_workspace(user.id, "ws1")
306      assert len(ws1_roles) == 2
307  
308      ws2_roles = store.list_user_roles_for_workspace(user.id, "ws2")
309      assert len(ws2_roles) == 1
310  
311  
312  def test_list_role_users(store, user, user2):
313      role = store.create_role(name="viewer", workspace="ws1")
314      store.assign_role_to_user(user.id, role.id)
315      store.assign_role_to_user(user2.id, role.id)
316  
317      users = store.list_role_users(role.id)
318      assert len(users) == 2
319      assert {u.user_id for u in users} == {user.id, user2.id}
320  
321  
322  # ---- Role-based permission resolution ----
323  
324  
325  def test_get_role_permission_no_roles(store, user):
326      result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1")
327      assert result is None
328  
329  
330  def test_get_role_permission_specific_match(store, user):
331      role = store.create_role(name="viewer", workspace="ws1")
332      store.add_role_permission(role.id, "experiment", "1", "READ")
333      store.assign_role_to_user(user.id, role.id)
334  
335      result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1")
336      assert result == READ
337  
338  
339  def test_get_role_permission_no_match(store, user):
340      role = store.create_role(name="viewer", workspace="ws1")
341      store.add_role_permission(role.id, "experiment", "1", "READ")
342      store.assign_role_to_user(user.id, role.id)
343  
344      result = store.get_role_permission_for_resource(user.id, "experiment", "999", "ws1")
345      assert result is None
346  
347  
348  def test_get_role_permission_wildcard_match(store, user):
349      role = store.create_role(name="viewer", workspace="ws1")
350      store.add_role_permission(role.id, "experiment", "*", "EDIT")
351      store.assign_role_to_user(user.id, role.id)
352  
353      result = store.get_role_permission_for_resource(user.id, "experiment", "any-id", "ws1")
354      assert result == EDIT
355  
356  
357  def test_get_role_permission_union_of_multiple_roles(store, user):
358      r1 = store.create_role(name="viewer", workspace="ws1")
359      store.add_role_permission(r1.id, "experiment", "1", "READ")
360      store.assign_role_to_user(user.id, r1.id)
361  
362      r2 = store.create_role(name="editor", workspace="ws1")
363      store.add_role_permission(r2.id, "experiment", "1", "EDIT")
364      store.assign_role_to_user(user.id, r2.id)
365  
366      result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1")
367      assert result == EDIT
368  
369  
370  def test_get_role_permission_wildcard_and_specific_union(store, user):
371      role = store.create_role(name="mixed", workspace="ws1")
372      store.add_role_permission(role.id, "experiment", "*", "READ")
373      store.add_role_permission(role.id, "experiment", "1", "EDIT")
374      store.assign_role_to_user(user.id, role.id)
375  
376      # Experiment 1 gets EDIT (higher of READ wildcard and EDIT specific)
377      result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1")
378      assert result == EDIT
379  
380      # Other experiments get READ from wildcard
381      result = store.get_role_permission_for_resource(user.id, "experiment", "999", "ws1")
382      assert result == READ
383  
384  
385  def test_get_role_permission_workspace_admin(store, user):
386      role = store.create_role(name="ws-admin", workspace="ws1")
387      store.add_role_permission(role.id, "workspace", "*", "MANAGE")
388      store.assign_role_to_user(user.id, role.id)
389  
390      # Workspace-wide MANAGE applies to any resource type.
391      result = store.get_role_permission_for_resource(user.id, "experiment", "any-id", "ws1")
392      assert result == MANAGE
393  
394      result = store.get_role_permission_for_resource(user.id, "registered_model", "m1", "ws1")
395      assert result == MANAGE
396  
397  
398  def test_workspace_permission_applies_across_resource_types(store, user):
399      role = store.create_role(name="reader", workspace="ws1")
400      store.add_role_permission(role.id, "workspace", "*", "READ")
401      store.assign_role_to_user(user.id, role.id)
402  
403      # READ at the workspace level grants READ on every resource type in the workspace.
404      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == READ
405      assert store.get_role_permission_for_resource(user.id, "registered_model", "m1", "ws1") == READ
406      assert store.get_role_permission_for_resource(user.id, "gateway_endpoint", "e1", "ws1") == READ
407  
408  
409  def test_workspace_permission_respects_union_with_specific(store, user):
410      role = store.create_role(name="mixed", workspace="ws1")
411      store.add_role_permission(role.id, "workspace", "*", "READ")
412      store.add_role_permission(role.id, "experiment", "42", "EDIT")
413      store.assign_role_to_user(user.id, role.id)
414  
415      # Experiment 42: max(workspace READ, specific EDIT) = EDIT
416      assert store.get_role_permission_for_resource(user.id, "experiment", "42", "ws1") == EDIT
417      # Other experiments: just workspace READ
418      assert store.get_role_permission_for_resource(user.id, "experiment", "99", "ws1") == READ
419  
420  
421  def test_is_workspace_admin(store, user):
422      role = store.create_role(name="ws-admin", workspace="ws1")
423      store.add_role_permission(role.id, "workspace", "*", "MANAGE")
424      store.assign_role_to_user(user.id, role.id)
425  
426      assert store.is_workspace_admin(user.id, "ws1") is True
427      assert store.is_workspace_admin(user.id, "ws2") is False
428  
429  
430  def test_is_workspace_admin_requires_manage(store, user):
431      # A non-MANAGE workspace permission does not make the user a WP admin.
432      role = store.create_role(name="ws-reader", workspace="ws1")
433      store.add_role_permission(role.id, "workspace", "*", "READ")
434      store.assign_role_to_user(user.id, role.id)
435  
436      assert store.is_workspace_admin(user.id, "ws1") is False
437  
438  
439  def test_list_role_grants_for_user_in_workspace(store, user):
440      # Role with specific + wildcard experiment grants + workspace-wide grant.
441      role = store.create_role(name="multi", workspace="ws1")
442      store.add_role_permission(role.id, "experiment", "42", "EDIT")
443      store.add_role_permission(role.id, "experiment", "*", "READ")
444      store.add_role_permission(role.id, "workspace", "*", "READ")
445      # Unrelated grant on another resource type.
446      store.add_role_permission(role.id, "registered_model", "*", "MANAGE")
447      store.assign_role_to_user(user.id, role.id)
448  
449      grants = store.list_role_grants_for_user_in_workspace(user.id, "ws1", "experiment")
450      # Should include specific experiment grant, wildcard experiment grant,
451      # and the workspace-wide grant. Should NOT include the registered_model grant.
452      assert sorted(grants) == sorted([("42", "EDIT"), ("*", "READ"), ("*", "READ")])
453  
454  
455  def test_list_role_grants_for_user_in_workspace_cross_workspace(store, user):
456      # Grants in ws2 should not surface when querying ws1.
457      role = store.create_role(name="other-ws", workspace="ws2")
458      store.add_role_permission(role.id, "experiment", "99", "EDIT")
459      store.assign_role_to_user(user.id, role.id)
460  
461      assert store.list_role_grants_for_user_in_workspace(user.id, "ws1", "experiment") == []
462  
463  
464  def test_list_role_grants_for_user_in_workspace_no_roles(store, user):
465      assert store.list_role_grants_for_user_in_workspace(user.id, "ws1", "experiment") == []
466  
467  
468  def test_list_role_grants_for_user_in_workspace_rejects_invalid_resource_type(store, user):
469      with pytest.raises(MlflowException, match="Invalid resource type"):
470          store.list_role_grants_for_user_in_workspace(user.id, "ws1", "not_a_type")
471  
472  
473  def test_list_workspace_admin_workspaces(store, user):
474      # WP admin in ws1 + ws3, regular member in ws2.
475      admin_ws1 = store.create_role(name="wa1", workspace="ws1")
476      store.add_role_permission(admin_ws1.id, "workspace", "*", "MANAGE")
477      store.assign_role_to_user(user.id, admin_ws1.id)
478      admin_ws3 = store.create_role(name="wa3", workspace="ws3")
479      store.add_role_permission(admin_ws3.id, "workspace", "*", "MANAGE")
480      store.assign_role_to_user(user.id, admin_ws3.id)
481      member_ws2 = store.create_role(name="mem", workspace="ws2")
482      store.add_role_permission(member_ws2.id, "experiment", "*", "READ")
483      store.assign_role_to_user(user.id, member_ws2.id)
484  
485      assert store.list_workspace_admin_workspaces(user.id) == {"ws1", "ws3"}
486  
487  
488  def test_list_workspace_admin_workspaces_ignores_non_manage(store, user):
489      # A workspace-scope grant with a non-MANAGE permission should not count.
490      role = store.create_role(name="reader", workspace="ws1")
491      store.add_role_permission(role.id, "workspace", "*", "READ")
492      store.assign_role_to_user(user.id, role.id)
493  
494      assert store.list_workspace_admin_workspaces(user.id) == set()
495  
496  
497  # ---- Resolver coverage: cross-workspace isolation, NO_PERMISSIONS, resource types ----
498  
499  
500  def test_resolver_cross_workspace_isolation(store, user):
501      """A role scoped to ws1 must not grant anything when resolving in ws2,
502      even if the user has the role assigned and the permission pattern matches.
503      """
504      role = store.create_role(name="editor", workspace="ws1")
505      store.add_role_permission(role.id, "experiment", "*", "EDIT")
506      store.assign_role_to_user(user.id, role.id)
507  
508      # ws1: resolver finds the role and returns EDIT.
509      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == EDIT
510      # ws2: no role tied to ws2 for this user — resolver returns None.
511      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws2") is None
512  
513  
514  def test_resolver_role_assignment_in_other_workspace_doesnt_leak(store, user):
515      r_ws1 = store.create_role(name="ws1-editor", workspace="ws1")
516      store.add_role_permission(r_ws1.id, "experiment", "*", "EDIT")
517      store.assign_role_to_user(user.id, r_ws1.id)
518  
519      r_ws2 = store.create_role(name="ws2-reader", workspace="ws2")
520      store.add_role_permission(r_ws2.id, "experiment", "*", "READ")
521      store.assign_role_to_user(user.id, r_ws2.id)
522  
523      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == EDIT
524      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws2") == READ
525      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws3") is None
526  
527  
528  def test_resolver_returns_no_permissions_when_role_only_has_no_permissions(store, user):
529      """If a user's only grant is NO_PERMISSIONS, ``get_role_permission_for_resource``
530      returns that permission object — not ``None``. Documents the distinction at the
531      store layer between 'no role grant found' (None) and 'role grant resolved to an
532      explicit NO_PERMISSIONS' without implying different fallback behavior in the
533      outer resolver — at that layer (_get_permission_from_store_or_default) both
534      cases fall through to the direct grant / workspace / default chain identically.
535      """
536      role = store.create_role(name="locked", workspace="ws1")
537      store.add_role_permission(role.id, "experiment", "*", "NO_PERMISSIONS")
538      store.assign_role_to_user(user.id, role.id)
539  
540      result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1")
541      assert result is not None
542      assert result.name == "NO_PERMISSIONS"
543  
544  
545  def test_resolver_no_permissions_loses_to_any_positive_grant(store, user):
546      """When a user has both NO_PERMISSIONS and a positive grant (from different
547      roles or the same role), the positive grant wins. Reflects the
548      ``max_permission`` policy where explicit grants outrank explicit denies.
549      """
550      r_deny = store.create_role(name="deny", workspace="ws1")
551      store.add_role_permission(r_deny.id, "experiment", "*", "NO_PERMISSIONS")
552      store.assign_role_to_user(user.id, r_deny.id)
553  
554      r_read = store.create_role(name="reader", workspace="ws1")
555      store.add_role_permission(r_read.id, "experiment", "*", "READ")
556      store.assign_role_to_user(user.id, r_read.id)
557  
558      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == READ
559  
560  
561  def test_resolver_unassigned_role_doesnt_grant(store, user):
562      """A role in the workspace with the right permissions doesn't help if the
563      user isn't assigned to it.
564      """
565      role = store.create_role(name="editor", workspace="ws1")
566      store.add_role_permission(role.id, "experiment", "*", "EDIT")
567      # Intentionally skip assign_role_to_user.
568  
569      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") is None
570  
571  
572  def test_resolver_resource_type_filter(store, user):
573      """A grant on resource_type=registered_model does not satisfy an
574      experiment lookup (and vice versa). Only the ``workspace`` resource type
575      promotes across all types.
576      """
577      role = store.create_role(name="models-only", workspace="ws1")
578      store.add_role_permission(role.id, "registered_model", "*", "EDIT")
579      store.assign_role_to_user(user.id, role.id)
580  
581      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") is None
582      assert store.get_role_permission_for_resource(user.id, "registered_model", "m1", "ws1") == EDIT
583  
584  
585  # ---- Resolver coverage: permission hierarchy matrix ----
586  
587  
588  @pytest.mark.parametrize("resource_type", _CONCRETE_RESOURCE_TYPES)
589  @pytest.mark.parametrize(
590      ("granted", "expected"),
591      [
592          ("READ", READ),
593          ("USE", USE),
594          ("EDIT", EDIT),
595          ("MANAGE", MANAGE),
596      ],
597  )
598  def test_resolver_returns_granted_permission_for_each_resource_type(
599      store, user, resource_type, granted, expected
600  ):
601      """For each (resource_type, granted_permission) pair, resolving the user's
602      permission on a specific resource of that type returns exactly the granted
603      permission. This ensures the resolver applies uniformly across every
604      resource type the system knows about.
605      """
606      role = store.create_role(name=f"{resource_type}-{granted}", workspace="ws1")
607      store.add_role_permission(role.id, resource_type, "*", granted)
608      store.assign_role_to_user(user.id, role.id)
609  
610      assert store.get_role_permission_for_resource(user.id, resource_type, "id", "ws1") == expected
611  
612  
613  @pytest.mark.parametrize("resource_type", _CONCRETE_RESOURCE_TYPES)
614  def test_resolver_workspace_grant_promotes_to_every_resource_type(store, user, resource_type):
615      """``(workspace, *, MANAGE)`` should grant MANAGE on every known resource
616      type in the role's workspace. This is the workspace-admin short-circuit —
617      if it regresses, workspace admins silently lose authority over specific
618      resource types.
619      """
620      role = store.create_role(name="ws-admin", workspace="ws1")
621      store.add_role_permission(role.id, "workspace", "*", "MANAGE")
622      store.assign_role_to_user(user.id, role.id)
623  
624      assert store.get_role_permission_for_resource(user.id, resource_type, "any-id", "ws1") == MANAGE
625  
626  
627  @pytest.mark.parametrize(
628      ("granted", "expected"),
629      [
630          ("READ", READ),
631          ("USE", USE),
632          ("EDIT", EDIT),
633          ("MANAGE", MANAGE),
634      ],
635  )
636  def test_resolver_workspace_grant_propagates_at_every_level(store, user, granted, expected):
637      """``(workspace, *, X)`` where X ∈ {READ, USE, EDIT, MANAGE} promotes X to
638      every resource type — not just MANAGE. This ensures workspace-wide grants
639      work as a blanket baseline permission, which the UI relies on (e.g. the
640      seeded ``viewer`` and ``editor`` roles use this form).
641      """
642      role = store.create_role(name=f"ws-{granted}", workspace="ws1")
643      store.add_role_permission(role.id, "workspace", "*", granted)
644      store.assign_role_to_user(user.id, role.id)
645  
646      for resource_type in _CONCRETE_RESOURCE_TYPES:
647          assert (
648              store.get_role_permission_for_resource(user.id, resource_type, "id", "ws1") == expected
649          )
650  
651  
652  def test_resolver_workspace_grant_scoped_to_role_workspace(store, user):
653      """A workspace-wide grant in ws1 has no effect when resolving in ws2 —
654      the role's workspace scopes the grant.
655      """
656      role = store.create_role(name="ws1-admin", workspace="ws1")
657      store.add_role_permission(role.id, "workspace", "*", "MANAGE")
658      store.assign_role_to_user(user.id, role.id)
659  
660      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == MANAGE
661      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws2") is None
662  
663  
664  # ---- Resolver coverage: pattern matching completeness ----
665  
666  
667  def test_resolver_specific_pattern_does_not_apply_to_different_id(store, user):
668      role = store.create_role(name="e42-editor", workspace="ws1")
669      store.add_role_permission(role.id, "experiment", "42", "EDIT")
670      store.assign_role_to_user(user.id, role.id)
671  
672      assert store.get_role_permission_for_resource(user.id, "experiment", "42", "ws1") == EDIT
673      assert store.get_role_permission_for_resource(user.id, "experiment", "99", "ws1") is None
674  
675  
676  def test_resolver_wildcard_applies_to_any_id(store, user):
677      role = store.create_role(name="any-experiment", workspace="ws1")
678      store.add_role_permission(role.id, "experiment", "*", "READ")
679      store.assign_role_to_user(user.id, role.id)
680  
681      for eid in ["1", "42", "long-uuid-6a4c"]:
682          assert store.get_role_permission_for_resource(user.id, "experiment", eid, "ws1") == READ
683  
684  
685  def test_resolver_specific_outranks_wildcard_when_higher(store, user):
686      # Specific grant > wildcard grant → specific wins.
687      role = store.create_role(name="mixed", workspace="ws1")
688      store.add_role_permission(role.id, "experiment", "*", "READ")
689      store.add_role_permission(role.id, "experiment", "42", "EDIT")
690      store.assign_role_to_user(user.id, role.id)
691  
692      assert store.get_role_permission_for_resource(user.id, "experiment", "42", "ws1") == EDIT
693      assert store.get_role_permission_for_resource(user.id, "experiment", "99", "ws1") == READ
694  
695  
696  def test_resolver_wildcard_outranks_specific_when_higher(store, user):
697      """Wildcard grant > specific grant → wildcard wins (best grant policy,
698      not "most specific wins"). This prevents an operator from accidentally
699      *downgrading* a user's access by adding a narrower grant with a lower
700      permission level.
701      """
702      role = store.create_role(name="mixed", workspace="ws1")
703      store.add_role_permission(role.id, "experiment", "*", "EDIT")
704      store.add_role_permission(role.id, "experiment", "42", "READ")
705      store.assign_role_to_user(user.id, role.id)
706  
707      assert store.get_role_permission_for_resource(user.id, "experiment", "42", "ws1") == EDIT
708      assert store.get_role_permission_for_resource(user.id, "experiment", "99", "ws1") == EDIT
709  
710  
711  # ---- Resolver coverage: multi-role union ----
712  
713  
714  def test_resolver_union_picks_max_across_roles(store, user):
715      # Permissions union across all roles assigned to the user — max wins.
716      r1 = store.create_role(name="r1", workspace="ws1")
717      store.add_role_permission(r1.id, "experiment", "*", "READ")
718      store.assign_role_to_user(user.id, r1.id)
719  
720      r2 = store.create_role(name="r2", workspace="ws1")
721      store.add_role_permission(r2.id, "experiment", "*", "USE")
722      store.assign_role_to_user(user.id, r2.id)
723  
724      r3 = store.create_role(name="r3", workspace="ws1")
725      store.add_role_permission(r3.id, "experiment", "*", "MANAGE")
726      store.assign_role_to_user(user.id, r3.id)
727  
728      assert store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1") == MANAGE
729  
730  
731  def test_resolver_union_mixes_workspace_and_resource_grants(store, user):
732      """A workspace-wide EDIT + a specific experiment READ → resolver still
733      surfaces EDIT for that experiment, because the workspace grant already
734      covers it. Specific grants only promote, never downgrade.
735      """
736      r_ws = store.create_role(name="ws-editor", workspace="ws1")
737      store.add_role_permission(r_ws.id, "workspace", "*", "EDIT")
738      store.assign_role_to_user(user.id, r_ws.id)
739  
740      r_specific = store.create_role(name="one-reader", workspace="ws1")
741      store.add_role_permission(r_specific.id, "experiment", "42", "READ")
742      store.assign_role_to_user(user.id, r_specific.id)
743  
744      assert store.get_role_permission_for_resource(user.id, "experiment", "42", "ws1") == EDIT
745  
746  
747  # ---- Legacy workspace_permissions as workspace-admin source ----
748  #
749  # Pre-RBAC operators relied on `workspace_permissions` MANAGE to convey workspace-wide
750  # admin authority. The workspace-admin helpers must still recognize that grant,
751  # otherwise operators mid-migration (or just not yet using roles) silently lose admin
752  # status behind RBAC-aware validators.
753  
754  
755  def test_is_workspace_admin_honors_legacy_workspace_permissions(store, user):
756      store.set_workspace_permission("ws1", user.username, "MANAGE")
757  
758      assert store.is_workspace_admin(user.id, "ws1") is True
759      assert store.is_workspace_admin(user.id, "ws2") is False
760  
761  
762  def test_is_workspace_admin_ignores_non_manage_legacy(store, user):
763      store.set_workspace_permission("ws1", user.username, "READ")
764  
765      assert store.is_workspace_admin(user.id, "ws1") is False
766  
767  
768  def test_list_workspace_admin_workspaces_unions_role_and_legacy(store, user):
769      # Role admin in ws1, legacy MANAGE in ws2, legacy READ in ws3 (should not count).
770      role = store.create_role(name="wa1", workspace="ws1")
771      store.add_role_permission(role.id, "workspace", "*", "MANAGE")
772      store.assign_role_to_user(user.id, role.id)
773      store.set_workspace_permission("ws2", user.username, "MANAGE")
774      store.set_workspace_permission("ws3", user.username, "READ")
775  
776      assert store.list_workspace_admin_workspaces(user.id) == {"ws1", "ws2"}
777  
778  
779  def test_is_workspace_admin_of_any_of_users_workspaces_legacy_admin(store, user, user2):
780      # Admin authority via legacy, target presence via role.
781      store.set_workspace_permission("ws1", user.username, "MANAGE")
782      target_role = store.create_role(name="member", workspace="ws1")
783      store.add_role_permission(target_role.id, "experiment", "*", "READ")
784      store.assign_role_to_user(user2.id, target_role.id)
785  
786      assert store.is_workspace_admin_of_any_of_users_workspaces(user.id, user2.id) is True
787  
788  
789  def test_is_workspace_admin_of_any_of_users_workspaces_legacy_target(store, user, user2):
790      # Admin authority via role, target presence via legacy.
791      admin_role = store.create_role(name="wa", workspace="ws1")
792      store.add_role_permission(admin_role.id, "workspace", "*", "MANAGE")
793      store.assign_role_to_user(user.id, admin_role.id)
794      store.set_workspace_permission("ws1", user2.username, "READ")
795  
796      assert store.is_workspace_admin_of_any_of_users_workspaces(user.id, user2.id) is True
797  
798  
799  def test_is_workspace_admin_of_any_of_users_workspaces_no_overlap(store, user, user2):
800      # Admin in ws1, target present only in ws2 → no intersection.
801      store.set_workspace_permission("ws1", user.username, "MANAGE")
802      store.set_workspace_permission("ws2", user2.username, "READ")
803  
804      assert store.is_workspace_admin_of_any_of_users_workspaces(user.id, user2.id) is False
805  
806  
807  def test_get_role_permission_does_not_cross_workspace(store, user):
808      role = store.create_role(name="viewer", workspace="ws1")
809      store.add_role_permission(role.id, "experiment", "*", "READ")
810      store.assign_role_to_user(user.id, role.id)
811  
812      # Should not apply in ws2
813      result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws2")
814      assert result is None
815  
816  
817  def test_get_role_permission_different_resource_types(store, user):
818      role = store.create_role(name="viewer", workspace="ws1")
819      store.add_role_permission(role.id, "experiment", "*", "READ")
820      store.assign_role_to_user(user.id, role.id)
821  
822      # Should not match registered_model
823      result = store.get_role_permission_for_resource(user.id, "registered_model", "m1", "ws1")
824      assert result is None
825  
826      # Should match experiment
827      result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1")
828      assert result == READ
829  
830  
831  @pytest.mark.parametrize(
832      ("perms", "expected"),
833      [
834          ([("experiment", "1", "READ"), ("experiment", "1", "MANAGE")], MANAGE),
835          ([("experiment", "1", "USE"), ("experiment", "1", "EDIT")], EDIT),
836          ([("experiment", "*", "READ"), ("experiment", "1", "USE")], USE),
837      ],
838  )
839  def test_get_role_permission_picks_highest(store, user, perms, expected):
840      for i, (rtype, pattern, perm) in enumerate(perms):
841          role = store.create_role(name=f"role-{i}", workspace="ws1")
842          store.add_role_permission(role.id, rtype, pattern, perm)
843          store.assign_role_to_user(user.id, role.id)
844  
845      result = store.get_role_permission_for_resource(user.id, "experiment", "1", "ws1")
846      assert result == expected