/ tests / store / workspace / test_sqlalchemy_store.py
test_sqlalchemy_store.py
  1  import pytest
  2  import sqlalchemy as sa
  3  from sqlalchemy.exc import IntegrityError
  4  
  5  from mlflow.entities.workspace import Workspace, WorkspaceDeletionMode
  6  from mlflow.exceptions import MlflowException
  7  from mlflow.store.workspace.dbmodels.models import SqlWorkspace
  8  from mlflow.store.workspace.sqlalchemy_store import SqlAlchemyStore
  9  from mlflow.utils.workspace_utils import DEFAULT_WORKSPACE_NAME
 10  
 11  
 12  @pytest.fixture
 13  def workspace_store(db_uri, monkeypatch):
 14      monkeypatch.setenv("MLFLOW_ENABLE_WORKSPACES", "true")
 15  
 16      store = SqlAlchemyStore(db_uri)
 17  
 18      with store.ManagedSessionMaker() as session:
 19          try:
 20              session.add(
 21                  SqlWorkspace(
 22                      name=DEFAULT_WORKSPACE_NAME,
 23                      description="Default workspace",
 24                  )
 25              )
 26              session.commit()
 27          except IntegrityError:
 28              session.rollback()
 29  
 30      try:
 31          yield store
 32      finally:
 33          store._engine.dispose()
 34  
 35  
 36  def _workspace_rows(store):
 37      with store.ManagedSessionMaker() as session:
 38          return {
 39              (row.name, row.description)
 40              for row in session.query(SqlWorkspace).order_by(SqlWorkspace.name).all()
 41          }
 42  
 43  
 44  def test_list_workspaces_returns_all(workspace_store):
 45      workspace_store.create_workspace(Workspace(name="team-a", description="Team A"))
 46      workspace_store.create_workspace(Workspace(name="team-b", description=None))
 47  
 48      workspaces = workspace_store.list_workspaces()
 49      rows = {(ws.name, ws.description) for ws in workspaces}
 50      default_description = next(desc for name, desc in rows if name == DEFAULT_WORKSPACE_NAME)
 51      assert rows == {
 52          (DEFAULT_WORKSPACE_NAME, default_description),
 53          ("team-a", "Team A"),
 54          ("team-b", None),
 55      }
 56  
 57  
 58  def test_get_workspace_success(workspace_store):
 59      workspace_store.create_workspace(Workspace(name="team-a", description="Team A"))
 60  
 61      workspace = workspace_store.get_workspace("team-a")
 62      assert workspace.name == "team-a"
 63      assert workspace.description == "Team A"
 64  
 65  
 66  def test_get_workspace_not_found(workspace_store):
 67      with pytest.raises(MlflowException, match="Workspace 'unknown' not found") as exc:
 68          workspace_store.get_workspace("unknown")
 69      assert exc.value.error_code == "RESOURCE_DOES_NOT_EXIST"
 70  
 71  
 72  def test_create_workspace_persists_record(workspace_store):
 73      created = workspace_store.create_workspace(
 74          Workspace(name="team-a", description="Team A", default_artifact_root="s3://root/team-a"),
 75      )
 76      assert created.name == "team-a"
 77      assert created.description == "Team A"
 78      assert created.default_artifact_root == "s3://root/team-a"
 79      assert ("team-a", "Team A") in _workspace_rows(workspace_store)
 80  
 81  
 82  def test_create_workspace_duplicate_raises(workspace_store):
 83      workspace_store.create_workspace(Workspace(name="team-a", description=None))
 84  
 85      with pytest.raises(
 86          MlflowException,
 87          match="Workspace 'team-a' already exists\\.",
 88      ) as exc:
 89          workspace_store.create_workspace(Workspace(name="team-a", description=None))
 90      assert exc.value.error_code == "RESOURCE_ALREADY_EXISTS"
 91  
 92  
 93  def test_create_workspace_invalid_name_raises(workspace_store):
 94      with pytest.raises(
 95          MlflowException,
 96          match="Workspace name 'Team-A' must match the pattern",
 97      ) as exc:
 98          workspace_store.create_workspace(Workspace(name="Team-A", description=None))
 99      assert exc.value.error_code == "INVALID_PARAMETER_VALUE"
100  
101  
102  def test_update_workspace_changes_description(workspace_store):
103      workspace_store.create_workspace(Workspace(name="team-a", description="old"))
104  
105      updated = workspace_store.update_workspace(
106          Workspace(name="team-a", description="new description"),
107      )
108      assert updated.description == "new description"
109      assert ("team-a", "new description") in _workspace_rows(workspace_store)
110  
111  
112  def test_update_workspace_sets_default_artifact_root(workspace_store):
113      workspace_store.create_workspace(Workspace(name="team-a", description="old"))
114  
115      updated = workspace_store.update_workspace(
116          Workspace(name="team-a", default_artifact_root="s3://bucket/team-a"),
117      )
118      assert updated.default_artifact_root == "s3://bucket/team-a"
119      fetched = workspace_store.get_workspace("team-a")
120      assert fetched.default_artifact_root == "s3://bucket/team-a"
121  
122  
123  def test_update_workspace_can_clear_default_artifact_root(workspace_store):
124      workspace_store.create_workspace(
125          Workspace(name="team-a", description="old", default_artifact_root="s3://bucket/team-a")
126      )
127  
128      # Empty string signals "clear this field"
129      cleared = workspace_store.update_workspace(
130          Workspace(name="team-a", default_artifact_root=""),
131      )
132      assert cleared.default_artifact_root is None
133      fetched = workspace_store.get_workspace("team-a")
134      assert fetched.default_artifact_root is None
135  
136  
137  def test_delete_workspace_removes_empty_workspace(workspace_store):
138      workspace_store.create_workspace(Workspace(name="team-a", description=None))
139  
140      workspace_store.delete_workspace("team-a")
141      rows = _workspace_rows(workspace_store)
142      assert ("team-a", None) not in rows
143      default_ws = workspace_store.get_default_workspace()
144      assert (DEFAULT_WORKSPACE_NAME, default_ws.description) in rows
145  
146  
147  def test_delete_default_workspace_rejected(workspace_store):
148      with pytest.raises(
149          MlflowException,
150          match=f"Cannot delete the reserved '{DEFAULT_WORKSPACE_NAME}' workspace",
151      ) as exc:
152          workspace_store.delete_workspace(DEFAULT_WORKSPACE_NAME)
153      assert exc.value.error_code == "INVALID_STATE"
154  
155  
156  def test_update_workspace_not_found(workspace_store):
157      with pytest.raises(
158          MlflowException,
159          match="Workspace 'unknown' not found",
160      ) as exc:
161          workspace_store.update_workspace(Workspace(name="unknown", description="new description"))
162      assert exc.value.error_code == "RESOURCE_DOES_NOT_EXIST"
163  
164  
165  def test_delete_workspace_not_found(workspace_store):
166      with pytest.raises(
167          MlflowException,
168          match="Workspace 'unknown' not found",
169      ) as exc:
170          workspace_store.delete_workspace("unknown")
171      assert exc.value.error_code == "RESOURCE_DOES_NOT_EXIST"
172  
173  
174  def test_resolve_artifact_root_returns_default(workspace_store):
175      default_root = "/default/path"
176      assert workspace_store.resolve_artifact_root(default_root, DEFAULT_WORKSPACE_NAME) == (
177          default_root,
178          True,
179      )
180      workspace_store.create_workspace(Workspace(name="team-a", description=None))
181      assert workspace_store.resolve_artifact_root(default_root, workspace_name="team-a") == (
182          default_root,
183          True,
184      )
185  
186  
187  def test_resolve_artifact_root_prefers_workspace_override(workspace_store):
188      workspace_store.create_workspace(
189          Workspace(
190              name="team-a",
191              description=None,
192              default_artifact_root="s3://team-a-artifacts",
193          )
194      )
195  
196      resolved_root, should_append = workspace_store.resolve_artifact_root(
197          "/default/path", workspace_name="team-a"
198      )
199      assert resolved_root == "s3://team-a-artifacts"
200      assert not should_append
201  
202  
203  def test_resolve_artifact_root_cache_updates_on_override_change(workspace_store):
204      default_root = "/default/path"
205      workspace_store.create_workspace(Workspace(name="team-cache", description=None))
206  
207      assert workspace_store.resolve_artifact_root(default_root, "team-cache") == (
208          default_root,
209          True,
210      )
211  
212      workspace_store.update_workspace(
213          Workspace(name="team-cache", default_artifact_root="s3://cache/team")
214      )
215  
216      assert workspace_store.resolve_artifact_root(default_root, "team-cache") == (
217          "s3://cache/team",
218          False,
219      )
220  
221  
222  def test_resolve_artifact_root_cache_handles_delete_and_recreate(workspace_store):
223      default_root = "/default/path"
224      workspace_store.create_workspace(
225          Workspace(name="team-cache", description=None, default_artifact_root="s3://cache/a")
226      )
227  
228      assert workspace_store.resolve_artifact_root(default_root, "team-cache") == (
229          "s3://cache/a",
230          False,
231      )
232  
233      workspace_store.delete_workspace("team-cache")
234      workspace_store.create_workspace(
235          Workspace(name="team-cache", description=None, default_artifact_root="s3://cache/b")
236      )
237  
238      assert workspace_store.resolve_artifact_root(default_root, "team-cache") == (
239          "s3://cache/b",
240          False,
241      )
242  
243  
244  def test_resolve_artifact_root_cache_clears_when_override_removed(workspace_store):
245      default_root = "/default/path"
246      workspace_store.create_workspace(
247          Workspace(name="team-cache", description=None, default_artifact_root="s3://cache/a")
248      )
249  
250      assert workspace_store.resolve_artifact_root(default_root, "team-cache") == (
251          "s3://cache/a",
252          False,
253      )
254  
255      workspace_store.update_workspace(Workspace(name="team-cache", default_artifact_root=""))
256  
257      assert workspace_store.resolve_artifact_root(default_root, "team-cache") == (
258          default_root,
259          True,
260      )
261  
262  
263  def test_get_default_workspace_returns_default(workspace_store):
264      default_ws = workspace_store.get_default_workspace()
265      assert default_ws.name == DEFAULT_WORKSPACE_NAME
266      assert default_ws.description is not None
267  
268  
269  def test_delete_workspace_reassigns_resources_to_default(workspace_store):
270      workspace_store.create_workspace(Workspace(name="team-a", description=None))
271  
272      with workspace_store.ManagedSessionMaker() as session:
273          session.execute(
274              sa.text(
275                  "INSERT INTO experiments (name, workspace, lifecycle_stage) "
276                  "VALUES (:name, :ws, 'active')"
277              ),
278              {"name": "exp-in-team-a", "ws": "team-a"},
279          )
280  
281      workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.SET_DEFAULT)
282  
283      with workspace_store.ManagedSessionMaker() as session:
284          row = session.execute(
285              sa.text("SELECT workspace FROM experiments WHERE name = :name"),
286              {"name": "exp-in-team-a"},
287          ).fetchone()
288          assert row[0] == DEFAULT_WORKSPACE_NAME
289  
290  
291  def test_delete_workspace_fails_on_naming_conflict(workspace_store):
292      workspace_store.create_workspace(Workspace(name="team-a", description=None))
293  
294      with workspace_store.ManagedSessionMaker() as session:
295          session.execute(
296              sa.text(
297                  "INSERT INTO experiments (name, workspace, lifecycle_stage) "
298                  "VALUES (:name, :ws, 'active')"
299              ),
300              {"name": "shared-exp", "ws": "team-a"},
301          )
302          session.execute(
303              sa.text(
304                  "INSERT INTO experiments (name, workspace, lifecycle_stage) "
305                  "VALUES (:name, :ws, 'active')"
306              ),
307              {"name": "shared-exp", "ws": DEFAULT_WORKSPACE_NAME},
308          )
309  
310      with pytest.raises(MlflowException, match="already exist in the default workspace") as exc:
311          workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.SET_DEFAULT)
312      assert exc.value.error_code == "INVALID_STATE"
313  
314      # Workspace should still exist (transaction rolled back)
315      ws = workspace_store.get_workspace("team-a")
316      assert ws.name == "team-a"
317  
318  
319  def test_delete_workspace_cascade_removes_resources(workspace_store):
320      workspace_store.create_workspace(Workspace(name="team-a", description=None))
321  
322      with workspace_store.ManagedSessionMaker() as session:
323          session.execute(
324              sa.text(
325                  "INSERT INTO experiments (name, workspace, lifecycle_stage) "
326                  "VALUES (:name, :ws, 'active')"
327              ),
328              {"name": "exp-in-team-a", "ws": "team-a"},
329          )
330  
331      workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.CASCADE)
332  
333      with workspace_store.ManagedSessionMaker() as session:
334          row = session.execute(
335              sa.text("SELECT count(*) FROM experiments WHERE name = :name"),
336              {"name": "exp-in-team-a"},
337          ).scalar()
338          assert row == 0
339  
340      with pytest.raises(MlflowException, match="not found"):
341          workspace_store.get_workspace("team-a")
342  
343  
344  def test_delete_workspace_cascade_removes_experiment_with_runs(workspace_store):
345      workspace_store.create_workspace(Workspace(name="team-a", description=None))
346  
347      with workspace_store.ManagedSessionMaker() as session:
348          session.execute(
349              sa.text(
350                  "INSERT INTO experiments (experiment_id, name, workspace, lifecycle_stage) "
351                  "VALUES (:id, :name, :ws, 'active')"
352              ),
353              {"id": 999, "name": "exp-with-runs", "ws": "team-a"},
354          )
355          session.execute(
356              sa.text(
357                  "INSERT INTO runs (run_uuid, name, experiment_id, lifecycle_stage, status, "
358                  "source_type, start_time, end_time) "
359                  "VALUES (:run_id, :name, :exp_id, 'active', 'FINISHED', 'LOCAL', 0, 0)"
360              ),
361              {"run_id": "run-in-team-a", "name": "test-run", "exp_id": 999},
362          )
363  
364      workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.CASCADE)
365  
366      with workspace_store.ManagedSessionMaker() as session:
367          exp_count = session.execute(
368              sa.text("SELECT count(*) FROM experiments WHERE name = :name"),
369              {"name": "exp-with-runs"},
370          ).scalar()
371          assert exp_count == 0
372          run_count = session.execute(
373              sa.text("SELECT count(*) FROM runs WHERE run_uuid = :run_id"),
374              {"run_id": "run-in-team-a"},
375          ).scalar()
376          assert run_count == 0
377  
378  
379  def test_delete_workspace_restrict_blocks_when_resources_exist(workspace_store):
380      workspace_store.create_workspace(Workspace(name="team-a", description=None))
381  
382      with workspace_store.ManagedSessionMaker() as session:
383          session.execute(
384              sa.text(
385                  "INSERT INTO experiments (name, workspace, lifecycle_stage) "
386                  "VALUES (:name, :ws, 'active')"
387              ),
388              {"name": "exp-in-team-a", "ws": "team-a"},
389          )
390  
391      with pytest.raises(MlflowException, match="still contains") as exc:
392          workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.RESTRICT)
393      assert exc.value.error_code == "INVALID_STATE"
394  
395      # Workspace and resources should still exist
396      ws = workspace_store.get_workspace("team-a")
397      assert ws.name == "team-a"
398      with workspace_store.ManagedSessionMaker() as session:
399          row = session.execute(
400              sa.text("SELECT workspace FROM experiments WHERE name = :name"),
401              {"name": "exp-in-team-a"},
402          ).fetchone()
403          assert row[0] == "team-a"
404  
405  
406  def test_delete_workspace_restrict_allows_empty_workspace(workspace_store):
407      workspace_store.create_workspace(Workspace(name="team-a", description=None))
408  
409      workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.RESTRICT)
410  
411      with pytest.raises(MlflowException, match="not found"):
412          workspace_store.get_workspace("team-a")