test_sqlalchemy_store.py
1 import pytest 2 import sqlalchemy as sa 3 from sqlalchemy.exc import IntegrityError 4 5 from mlflow.entities.workspace import Workspace, WorkspaceDeletionMode 6 from mlflow.exceptions import MlflowException 7 from mlflow.store.workspace.dbmodels.models import SqlWorkspace 8 from mlflow.store.workspace.sqlalchemy_store import SqlAlchemyStore 9 from mlflow.utils.workspace_utils import DEFAULT_WORKSPACE_NAME 10 11 12 @pytest.fixture 13 def workspace_store(db_uri, monkeypatch): 14 monkeypatch.setenv("MLFLOW_ENABLE_WORKSPACES", "true") 15 16 store = SqlAlchemyStore(db_uri) 17 18 with store.ManagedSessionMaker() as session: 19 try: 20 session.add( 21 SqlWorkspace( 22 name=DEFAULT_WORKSPACE_NAME, 23 description="Default workspace", 24 ) 25 ) 26 session.commit() 27 except IntegrityError: 28 session.rollback() 29 30 try: 31 yield store 32 finally: 33 store._engine.dispose() 34 35 36 def _workspace_rows(store): 37 with store.ManagedSessionMaker() as session: 38 return { 39 (row.name, row.description) 40 for row in session.query(SqlWorkspace).order_by(SqlWorkspace.name).all() 41 } 42 43 44 def test_list_workspaces_returns_all(workspace_store): 45 workspace_store.create_workspace(Workspace(name="team-a", description="Team A")) 46 workspace_store.create_workspace(Workspace(name="team-b", description=None)) 47 48 workspaces = workspace_store.list_workspaces() 49 rows = {(ws.name, ws.description) for ws in workspaces} 50 default_description = next(desc for name, desc in rows if name == DEFAULT_WORKSPACE_NAME) 51 assert rows == { 52 (DEFAULT_WORKSPACE_NAME, default_description), 53 ("team-a", "Team A"), 54 ("team-b", None), 55 } 56 57 58 def test_get_workspace_success(workspace_store): 59 workspace_store.create_workspace(Workspace(name="team-a", description="Team A")) 60 61 workspace = workspace_store.get_workspace("team-a") 62 assert workspace.name == "team-a" 63 assert workspace.description == "Team A" 64 65 66 def test_get_workspace_not_found(workspace_store): 67 with pytest.raises(MlflowException, match="Workspace 'unknown' not found") as exc: 68 workspace_store.get_workspace("unknown") 69 assert exc.value.error_code == "RESOURCE_DOES_NOT_EXIST" 70 71 72 def test_create_workspace_persists_record(workspace_store): 73 created = workspace_store.create_workspace( 74 Workspace(name="team-a", description="Team A", default_artifact_root="s3://root/team-a"), 75 ) 76 assert created.name == "team-a" 77 assert created.description == "Team A" 78 assert created.default_artifact_root == "s3://root/team-a" 79 assert ("team-a", "Team A") in _workspace_rows(workspace_store) 80 81 82 def test_create_workspace_duplicate_raises(workspace_store): 83 workspace_store.create_workspace(Workspace(name="team-a", description=None)) 84 85 with pytest.raises( 86 MlflowException, 87 match="Workspace 'team-a' already exists\\.", 88 ) as exc: 89 workspace_store.create_workspace(Workspace(name="team-a", description=None)) 90 assert exc.value.error_code == "RESOURCE_ALREADY_EXISTS" 91 92 93 def test_create_workspace_invalid_name_raises(workspace_store): 94 with pytest.raises( 95 MlflowException, 96 match="Workspace name 'Team-A' must match the pattern", 97 ) as exc: 98 workspace_store.create_workspace(Workspace(name="Team-A", description=None)) 99 assert exc.value.error_code == "INVALID_PARAMETER_VALUE" 100 101 102 def test_update_workspace_changes_description(workspace_store): 103 workspace_store.create_workspace(Workspace(name="team-a", description="old")) 104 105 updated = workspace_store.update_workspace( 106 Workspace(name="team-a", description="new description"), 107 ) 108 assert updated.description == "new description" 109 assert ("team-a", "new description") in _workspace_rows(workspace_store) 110 111 112 def test_update_workspace_sets_default_artifact_root(workspace_store): 113 workspace_store.create_workspace(Workspace(name="team-a", description="old")) 114 115 updated = workspace_store.update_workspace( 116 Workspace(name="team-a", default_artifact_root="s3://bucket/team-a"), 117 ) 118 assert updated.default_artifact_root == "s3://bucket/team-a" 119 fetched = workspace_store.get_workspace("team-a") 120 assert fetched.default_artifact_root == "s3://bucket/team-a" 121 122 123 def test_update_workspace_can_clear_default_artifact_root(workspace_store): 124 workspace_store.create_workspace( 125 Workspace(name="team-a", description="old", default_artifact_root="s3://bucket/team-a") 126 ) 127 128 # Empty string signals "clear this field" 129 cleared = workspace_store.update_workspace( 130 Workspace(name="team-a", default_artifact_root=""), 131 ) 132 assert cleared.default_artifact_root is None 133 fetched = workspace_store.get_workspace("team-a") 134 assert fetched.default_artifact_root is None 135 136 137 def test_delete_workspace_removes_empty_workspace(workspace_store): 138 workspace_store.create_workspace(Workspace(name="team-a", description=None)) 139 140 workspace_store.delete_workspace("team-a") 141 rows = _workspace_rows(workspace_store) 142 assert ("team-a", None) not in rows 143 default_ws = workspace_store.get_default_workspace() 144 assert (DEFAULT_WORKSPACE_NAME, default_ws.description) in rows 145 146 147 def test_delete_default_workspace_rejected(workspace_store): 148 with pytest.raises( 149 MlflowException, 150 match=f"Cannot delete the reserved '{DEFAULT_WORKSPACE_NAME}' workspace", 151 ) as exc: 152 workspace_store.delete_workspace(DEFAULT_WORKSPACE_NAME) 153 assert exc.value.error_code == "INVALID_STATE" 154 155 156 def test_update_workspace_not_found(workspace_store): 157 with pytest.raises( 158 MlflowException, 159 match="Workspace 'unknown' not found", 160 ) as exc: 161 workspace_store.update_workspace(Workspace(name="unknown", description="new description")) 162 assert exc.value.error_code == "RESOURCE_DOES_NOT_EXIST" 163 164 165 def test_delete_workspace_not_found(workspace_store): 166 with pytest.raises( 167 MlflowException, 168 match="Workspace 'unknown' not found", 169 ) as exc: 170 workspace_store.delete_workspace("unknown") 171 assert exc.value.error_code == "RESOURCE_DOES_NOT_EXIST" 172 173 174 def test_resolve_artifact_root_returns_default(workspace_store): 175 default_root = "/default/path" 176 assert workspace_store.resolve_artifact_root(default_root, DEFAULT_WORKSPACE_NAME) == ( 177 default_root, 178 True, 179 ) 180 workspace_store.create_workspace(Workspace(name="team-a", description=None)) 181 assert workspace_store.resolve_artifact_root(default_root, workspace_name="team-a") == ( 182 default_root, 183 True, 184 ) 185 186 187 def test_resolve_artifact_root_prefers_workspace_override(workspace_store): 188 workspace_store.create_workspace( 189 Workspace( 190 name="team-a", 191 description=None, 192 default_artifact_root="s3://team-a-artifacts", 193 ) 194 ) 195 196 resolved_root, should_append = workspace_store.resolve_artifact_root( 197 "/default/path", workspace_name="team-a" 198 ) 199 assert resolved_root == "s3://team-a-artifacts" 200 assert not should_append 201 202 203 def test_resolve_artifact_root_cache_updates_on_override_change(workspace_store): 204 default_root = "/default/path" 205 workspace_store.create_workspace(Workspace(name="team-cache", description=None)) 206 207 assert workspace_store.resolve_artifact_root(default_root, "team-cache") == ( 208 default_root, 209 True, 210 ) 211 212 workspace_store.update_workspace( 213 Workspace(name="team-cache", default_artifact_root="s3://cache/team") 214 ) 215 216 assert workspace_store.resolve_artifact_root(default_root, "team-cache") == ( 217 "s3://cache/team", 218 False, 219 ) 220 221 222 def test_resolve_artifact_root_cache_handles_delete_and_recreate(workspace_store): 223 default_root = "/default/path" 224 workspace_store.create_workspace( 225 Workspace(name="team-cache", description=None, default_artifact_root="s3://cache/a") 226 ) 227 228 assert workspace_store.resolve_artifact_root(default_root, "team-cache") == ( 229 "s3://cache/a", 230 False, 231 ) 232 233 workspace_store.delete_workspace("team-cache") 234 workspace_store.create_workspace( 235 Workspace(name="team-cache", description=None, default_artifact_root="s3://cache/b") 236 ) 237 238 assert workspace_store.resolve_artifact_root(default_root, "team-cache") == ( 239 "s3://cache/b", 240 False, 241 ) 242 243 244 def test_resolve_artifact_root_cache_clears_when_override_removed(workspace_store): 245 default_root = "/default/path" 246 workspace_store.create_workspace( 247 Workspace(name="team-cache", description=None, default_artifact_root="s3://cache/a") 248 ) 249 250 assert workspace_store.resolve_artifact_root(default_root, "team-cache") == ( 251 "s3://cache/a", 252 False, 253 ) 254 255 workspace_store.update_workspace(Workspace(name="team-cache", default_artifact_root="")) 256 257 assert workspace_store.resolve_artifact_root(default_root, "team-cache") == ( 258 default_root, 259 True, 260 ) 261 262 263 def test_get_default_workspace_returns_default(workspace_store): 264 default_ws = workspace_store.get_default_workspace() 265 assert default_ws.name == DEFAULT_WORKSPACE_NAME 266 assert default_ws.description is not None 267 268 269 def test_delete_workspace_reassigns_resources_to_default(workspace_store): 270 workspace_store.create_workspace(Workspace(name="team-a", description=None)) 271 272 with workspace_store.ManagedSessionMaker() as session: 273 session.execute( 274 sa.text( 275 "INSERT INTO experiments (name, workspace, lifecycle_stage) " 276 "VALUES (:name, :ws, 'active')" 277 ), 278 {"name": "exp-in-team-a", "ws": "team-a"}, 279 ) 280 281 workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.SET_DEFAULT) 282 283 with workspace_store.ManagedSessionMaker() as session: 284 row = session.execute( 285 sa.text("SELECT workspace FROM experiments WHERE name = :name"), 286 {"name": "exp-in-team-a"}, 287 ).fetchone() 288 assert row[0] == DEFAULT_WORKSPACE_NAME 289 290 291 def test_delete_workspace_fails_on_naming_conflict(workspace_store): 292 workspace_store.create_workspace(Workspace(name="team-a", description=None)) 293 294 with workspace_store.ManagedSessionMaker() as session: 295 session.execute( 296 sa.text( 297 "INSERT INTO experiments (name, workspace, lifecycle_stage) " 298 "VALUES (:name, :ws, 'active')" 299 ), 300 {"name": "shared-exp", "ws": "team-a"}, 301 ) 302 session.execute( 303 sa.text( 304 "INSERT INTO experiments (name, workspace, lifecycle_stage) " 305 "VALUES (:name, :ws, 'active')" 306 ), 307 {"name": "shared-exp", "ws": DEFAULT_WORKSPACE_NAME}, 308 ) 309 310 with pytest.raises(MlflowException, match="already exist in the default workspace") as exc: 311 workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.SET_DEFAULT) 312 assert exc.value.error_code == "INVALID_STATE" 313 314 # Workspace should still exist (transaction rolled back) 315 ws = workspace_store.get_workspace("team-a") 316 assert ws.name == "team-a" 317 318 319 def test_delete_workspace_cascade_removes_resources(workspace_store): 320 workspace_store.create_workspace(Workspace(name="team-a", description=None)) 321 322 with workspace_store.ManagedSessionMaker() as session: 323 session.execute( 324 sa.text( 325 "INSERT INTO experiments (name, workspace, lifecycle_stage) " 326 "VALUES (:name, :ws, 'active')" 327 ), 328 {"name": "exp-in-team-a", "ws": "team-a"}, 329 ) 330 331 workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.CASCADE) 332 333 with workspace_store.ManagedSessionMaker() as session: 334 row = session.execute( 335 sa.text("SELECT count(*) FROM experiments WHERE name = :name"), 336 {"name": "exp-in-team-a"}, 337 ).scalar() 338 assert row == 0 339 340 with pytest.raises(MlflowException, match="not found"): 341 workspace_store.get_workspace("team-a") 342 343 344 def test_delete_workspace_cascade_removes_experiment_with_runs(workspace_store): 345 workspace_store.create_workspace(Workspace(name="team-a", description=None)) 346 347 with workspace_store.ManagedSessionMaker() as session: 348 session.execute( 349 sa.text( 350 "INSERT INTO experiments (experiment_id, name, workspace, lifecycle_stage) " 351 "VALUES (:id, :name, :ws, 'active')" 352 ), 353 {"id": 999, "name": "exp-with-runs", "ws": "team-a"}, 354 ) 355 session.execute( 356 sa.text( 357 "INSERT INTO runs (run_uuid, name, experiment_id, lifecycle_stage, status, " 358 "source_type, start_time, end_time) " 359 "VALUES (:run_id, :name, :exp_id, 'active', 'FINISHED', 'LOCAL', 0, 0)" 360 ), 361 {"run_id": "run-in-team-a", "name": "test-run", "exp_id": 999}, 362 ) 363 364 workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.CASCADE) 365 366 with workspace_store.ManagedSessionMaker() as session: 367 exp_count = session.execute( 368 sa.text("SELECT count(*) FROM experiments WHERE name = :name"), 369 {"name": "exp-with-runs"}, 370 ).scalar() 371 assert exp_count == 0 372 run_count = session.execute( 373 sa.text("SELECT count(*) FROM runs WHERE run_uuid = :run_id"), 374 {"run_id": "run-in-team-a"}, 375 ).scalar() 376 assert run_count == 0 377 378 379 def test_delete_workspace_restrict_blocks_when_resources_exist(workspace_store): 380 workspace_store.create_workspace(Workspace(name="team-a", description=None)) 381 382 with workspace_store.ManagedSessionMaker() as session: 383 session.execute( 384 sa.text( 385 "INSERT INTO experiments (name, workspace, lifecycle_stage) " 386 "VALUES (:name, :ws, 'active')" 387 ), 388 {"name": "exp-in-team-a", "ws": "team-a"}, 389 ) 390 391 with pytest.raises(MlflowException, match="still contains") as exc: 392 workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.RESTRICT) 393 assert exc.value.error_code == "INVALID_STATE" 394 395 # Workspace and resources should still exist 396 ws = workspace_store.get_workspace("team-a") 397 assert ws.name == "team-a" 398 with workspace_store.ManagedSessionMaker() as session: 399 row = session.execute( 400 sa.text("SELECT workspace FROM experiments WHERE name = :name"), 401 {"name": "exp-in-team-a"}, 402 ).fetchone() 403 assert row[0] == "team-a" 404 405 406 def test_delete_workspace_restrict_allows_empty_workspace(workspace_store): 407 workspace_store.create_workspace(Workspace(name="team-a", description=None)) 408 409 workspace_store.delete_workspace("team-a", mode=WorkspaceDeletionMode.RESTRICT) 410 411 with pytest.raises(MlflowException, match="not found"): 412 workspace_store.get_workspace("team-a")