test_cli.py
1 import json 2 import os 3 import shutil 4 import subprocess 5 import sys 6 import tempfile 7 import time 8 from pathlib import Path 9 from unittest import mock 10 from urllib.parse import unquote, urlparse 11 from urllib.request import url2pathname 12 13 import click 14 import numpy as np 15 import pandas as pd 16 import pytest 17 import requests 18 from botocore.stub import Stubber 19 from click.testing import CliRunner 20 21 import mlflow 22 from mlflow import pyfunc 23 from mlflow.cli import cli, doctor, gc, server 24 from mlflow.data import numpy_dataset 25 from mlflow.entities import Metric, ViewType 26 from mlflow.entities.logged_model import LoggedModelParameter, LoggedModelTag 27 from mlflow.environment_variables import MLFLOW_ENABLE_WORKSPACES, MLFLOW_WORKSPACE_STORE_URI 28 from mlflow.exceptions import MlflowException 29 from mlflow.server import handlers 30 from mlflow.store.artifact.artifact_repository_registry import get_artifact_repository 31 from mlflow.store.jobs.sqlalchemy_store import SqlAlchemyJobStore 32 from mlflow.store.tracking.dbmodels.models import ( 33 SqlLoggedModelMetric, 34 SqlLoggedModelParam, 35 SqlLoggedModelTag, 36 ) 37 from mlflow.store.tracking.file_store import FileStore 38 from mlflow.store.tracking.sqlalchemy_store import SqlAlchemyStore 39 from mlflow.utils.os import is_windows 40 from mlflow.utils.rest_utils import augmented_raise_for_status 41 from mlflow.utils.time import get_current_time_millis 42 43 from tests.helper_functions import ( 44 PROTOBUF_REQUIREMENT, 45 get_safe_port, 46 kill_process_tree, 47 pyfunc_serve_and_score_model, 48 ) 49 from tests.tracking.integration_test_utils import _await_server_up_or_die 50 51 52 @pytest.mark.parametrize("command", ["server"]) 53 def test_mlflow_server_command(command): 54 port = get_safe_port() 55 with subprocess.Popen([ 56 sys.executable, 57 "-m", 58 "mlflow", 59 command, 60 "--port", 61 str(port), 62 ]) as process: 63 try: 64 _await_server_up_or_die(port) 65 resp = requests.get(f"http://localhost:{port}/health") 66 augmented_raise_for_status(resp) 67 assert resp.text == "OK" 68 finally: 69 kill_process_tree(process.pid) 70 71 72 def test_server_static_prefix_validation(): 73 with mock.patch("mlflow.server._run_server") as run_server_mock: 74 CliRunner().invoke(server) 75 run_server_mock.assert_called_once() 76 with mock.patch("mlflow.server._run_server") as run_server_mock: 77 CliRunner().invoke(server, ["--static-prefix", "/mlflow"]) 78 run_server_mock.assert_called_once() 79 with mock.patch("mlflow.server._run_server") as run_server_mock: 80 result = CliRunner().invoke(server, ["--static-prefix", "mlflow/"]) 81 assert "--static-prefix must begin with a '/'." in result.output 82 run_server_mock.assert_not_called() 83 with mock.patch("mlflow.server._run_server") as run_server_mock: 84 result = CliRunner().invoke(server, ["--static-prefix", "/mlflow/"]) 85 assert "--static-prefix should not end with a '/'." in result.output 86 run_server_mock.assert_not_called() 87 88 89 def test_server_cli_fails_when_workspace_env_set(): 90 env = os.environ.copy() 91 # Trigger server-mode guard in mlflow.server.__init__ 92 env["_MLFLOW_SERVER_SERVE_ARTIFACTS"] = "1" 93 env["MLFLOW_WORKSPACE"] = "team-a" 94 95 result = subprocess.run( 96 [sys.executable, "-m", "mlflow", "server", "--port", "0"], 97 capture_output=True, 98 text=True, 99 env=env, 100 ) 101 assert result.returncode != 0 102 assert "MLFLOW_WORKSPACE=team-a is client-only" in result.stderr 103 104 105 def test_server_uvicorn_options(): 106 with mock.patch("mlflow.server._run_server") as run_server_mock: 107 # Test default behavior (uvicorn should be used when no server options specified) 108 CliRunner().invoke(server) 109 run_server_mock.assert_called_once_with( 110 file_store_path=mock.ANY, 111 registry_store_uri=mock.ANY, 112 default_artifact_root=mock.ANY, 113 serve_artifacts=mock.ANY, 114 artifacts_only=mock.ANY, 115 artifacts_destination=mock.ANY, 116 host="127.0.0.1", 117 port=5000, 118 static_prefix=None, 119 workers=None, 120 gunicorn_opts=None, 121 waitress_opts=None, 122 expose_prometheus=None, 123 app_name=None, 124 uvicorn_opts=None, 125 env_file=None, 126 secrets_cache_ttl=60, 127 secrets_cache_max_size=1000, 128 ) 129 130 with mock.patch("mlflow.server._run_server") as run_server_mock: 131 # Test with uvicorn-opts - use different options than dev mode 132 CliRunner().invoke(server, ["--uvicorn-opts", "--loop asyncio --limit-concurrency 100"]) 133 run_server_mock.assert_called_once_with( 134 file_store_path=mock.ANY, 135 registry_store_uri=mock.ANY, 136 default_artifact_root=mock.ANY, 137 serve_artifacts=mock.ANY, 138 artifacts_only=mock.ANY, 139 artifacts_destination=mock.ANY, 140 host="127.0.0.1", 141 port=5000, 142 static_prefix=None, 143 workers=None, 144 gunicorn_opts=None, 145 waitress_opts=None, 146 expose_prometheus=None, 147 app_name=None, 148 uvicorn_opts="--loop asyncio --limit-concurrency 100", 149 env_file=None, 150 secrets_cache_ttl=60, 151 secrets_cache_max_size=1000, 152 ) 153 154 155 @pytest.mark.skipif(is_windows(), reason="--dev mode is not supported on Windows") 156 def test_server_dev_mode(): 157 with mock.patch("mlflow.server._run_server") as run_server_mock: 158 # Test with --dev flag (should set uvicorn opts) 159 CliRunner().invoke(server, ["--dev"]) 160 run_server_mock.assert_called_once_with( 161 file_store_path=mock.ANY, 162 registry_store_uri=mock.ANY, 163 default_artifact_root=mock.ANY, 164 serve_artifacts=mock.ANY, 165 artifacts_only=mock.ANY, 166 artifacts_destination=mock.ANY, 167 host="127.0.0.1", 168 port=5000, 169 static_prefix=None, 170 workers=None, 171 gunicorn_opts=None, 172 waitress_opts=None, 173 expose_prometheus=None, 174 app_name=None, 175 uvicorn_opts="--reload --log-level debug", 176 env_file=None, 177 secrets_cache_ttl=60, 178 secrets_cache_max_size=1000, 179 ) 180 181 182 @pytest.mark.skipif(is_windows(), reason="Gunicorn is not supported on Windows") 183 def test_server_gunicorn_options(): 184 with mock.patch("mlflow.server._run_server") as run_server_mock: 185 # Test that gunicorn-opts disables uvicorn 186 CliRunner().invoke(server, ["--gunicorn-opts", "--timeout 120 --max-requests 1000"]) 187 run_server_mock.assert_called_once_with( 188 file_store_path=mock.ANY, 189 registry_store_uri=mock.ANY, 190 default_artifact_root=mock.ANY, 191 serve_artifacts=mock.ANY, 192 artifacts_only=mock.ANY, 193 artifacts_destination=mock.ANY, 194 host="127.0.0.1", 195 port=5000, 196 static_prefix=None, 197 workers=None, 198 gunicorn_opts="--timeout 120 --max-requests 1000", 199 waitress_opts=None, 200 expose_prometheus=None, 201 app_name=None, 202 uvicorn_opts=None, 203 env_file=None, 204 secrets_cache_ttl=60, 205 secrets_cache_max_size=1000, 206 ) 207 208 # Test conflicting options 209 result = CliRunner().invoke( 210 server, ["--uvicorn-opts", "--reload", "--gunicorn-opts", "--log-level debug"] 211 ) 212 assert result.exit_code != 0 213 assert "Cannot specify multiple server options" in result.output 214 215 216 def test_server_initializes_backend_store_when_tracking_enabled(): 217 handlers._tracking_store = None 218 handlers._model_registry_store = None 219 runner = CliRunner() 220 with runner.isolated_filesystem(): 221 with ( 222 mock.patch("mlflow.server.handlers.initialize_backend_stores") as init_backend_mock, 223 mock.patch("mlflow.server._run_server") as run_server_mock, 224 ): 225 result = runner.invoke(server) 226 assert result.exit_code == 0 227 init_backend_mock.assert_called_once_with( 228 mock.ANY, mock.ANY, mock.ANY, workspace_store_uri=None 229 ) 230 run_server_mock.assert_called_once() 231 232 233 def test_server_skips_backend_store_init_in_artifacts_only_mode(): 234 handlers._tracking_store = None 235 handlers._model_registry_store = None 236 runner = CliRunner() 237 with runner.isolated_filesystem(): 238 with ( 239 mock.patch("mlflow.server.handlers.initialize_backend_stores") as init_backend_mock, 240 mock.patch("mlflow.server._run_server") as run_server_mock, 241 ): 242 result = runner.invoke(server, ["--artifacts-only"]) 243 assert result.exit_code == 0 244 init_backend_mock.assert_not_called() 245 run_server_mock.assert_called_once() 246 247 248 def test_server_mlflow_artifacts_options(): 249 handlers._tracking_store = None 250 handlers._model_registry_store = None 251 with mock.patch( 252 "mlflow.tracking._tracking_service.utils._has_existing_mlruns_data", return_value=False 253 ): 254 runner = CliRunner() 255 with runner.isolated_filesystem(): 256 with mock.patch("mlflow.server._run_server") as run_server_mock: 257 runner.invoke(server, ["--artifacts-only"]) 258 run_server_mock.assert_called_once() 259 with mock.patch("mlflow.server._run_server") as run_server_mock: 260 runner.invoke(server, ["--serve-artifacts"]) 261 run_server_mock.assert_called_once() 262 with mock.patch("mlflow.server._run_server") as run_server_mock: 263 runner.invoke(server, ["--no-serve-artifacts"]) 264 run_server_mock.assert_called_once() 265 with mock.patch("mlflow.server._run_server") as run_server_mock: 266 runner.invoke(server, ["--artifacts-only"]) 267 run_server_mock.assert_called_once() 268 269 270 def test_server_artifacts_only_conflicts_with_enable_workspaces(): 271 with pytest.raises( 272 click.UsageError, match="--enable-workspaces cannot be combined with --artifacts-only" 273 ): 274 CliRunner().invoke( 275 server, 276 ["--artifacts-only", "--enable-workspaces"], 277 catch_exceptions=False, 278 standalone_mode=False, 279 ) 280 281 282 def test_server_workspace_uri_sets_env_when_workspaces_enabled(tmp_path): 283 handlers._tracking_store = None 284 handlers._model_registry_store = None 285 workspace_uri = f"sqlite:///{tmp_path / 'workspace.db'}" 286 backend_uri = f"sqlite:///{tmp_path / 'backend.db'}" 287 artifact_root_path = tmp_path / "artifacts" 288 artifact_root_path.mkdir() 289 artifact_root = artifact_root_path.as_uri() 290 291 MLFLOW_WORKSPACE_STORE_URI.unset() 292 MLFLOW_ENABLE_WORKSPACES.unset() 293 294 try: 295 with ( 296 mock.patch("mlflow.server._run_server") as run_server_mock, 297 mock.patch("mlflow.server.handlers.initialize_backend_stores") as init_backend, 298 ): 299 result = CliRunner().invoke( 300 server, 301 [ 302 "--enable-workspaces", 303 "--workspace-store-uri", 304 workspace_uri, 305 "--backend-store-uri", 306 backend_uri, 307 "--registry-store-uri", 308 backend_uri, 309 "--default-artifact-root", 310 artifact_root, 311 ], 312 catch_exceptions=False, 313 standalone_mode=False, 314 ) 315 assert result.exit_code == 0 316 run_server_mock.assert_called_once() 317 init_backend.assert_called_once_with( 318 backend_uri, 319 backend_uri, 320 artifact_root, 321 workspace_store_uri=workspace_uri, 322 ) 323 assert MLFLOW_WORKSPACE_STORE_URI.get() == workspace_uri 324 assert MLFLOW_ENABLE_WORKSPACES.get() is True 325 finally: 326 MLFLOW_WORKSPACE_STORE_URI.unset() 327 MLFLOW_ENABLE_WORKSPACES.unset() 328 329 330 @pytest.mark.parametrize("command", [server]) 331 def test_tracking_uri_validation_failure(command): 332 handlers._tracking_store = None 333 with mock.patch("mlflow.server._run_server") as run_server_mock: 334 # SQLAlchemy expects postgresql:// not postgres:// 335 CliRunner().invoke( 336 command, 337 [ 338 "--backend-store-uri", 339 "postgres://user:pwd@host:5432/mydb", 340 "--default-artifact-root", 341 "./mlruns", 342 ], 343 ) 344 run_server_mock.assert_not_called() 345 346 347 @pytest.mark.parametrize("command", [server]) 348 def test_tracking_uri_validation_sql_driver_uris(command): 349 handlers._tracking_store = None 350 handlers._model_registry_store = None 351 with ( 352 mock.patch("mlflow.server._run_server") as run_server_mock, 353 mock.patch("mlflow.store.tracking.sqlalchemy_store.SqlAlchemyStore"), 354 mock.patch("mlflow.store.model_registry.sqlalchemy_store.SqlAlchemyStore"), 355 ): 356 result = CliRunner().invoke( 357 command, 358 [ 359 "--backend-store-uri", 360 "mysql+pymysql://user:pwd@host:5432/mydb", 361 "--default-artifact-root", 362 "./mlruns", 363 ], 364 ) 365 assert result.exit_code == 0 366 run_server_mock.assert_called() 367 # Clean up the global variables set by the server 368 mlflow.set_tracking_uri(None) 369 mlflow.set_registry_uri(None) 370 371 372 @pytest.mark.parametrize("command", [server]) 373 def test_registry_store_uri_different_from_tracking_store(command): 374 handlers._tracking_store = None 375 handlers._model_registry_store = None 376 377 from mlflow.server.handlers import ( 378 ModelRegistryStoreRegistryWrapper, 379 TrackingStoreRegistryWrapper, 380 ) 381 382 handlers._tracking_store_registry = TrackingStoreRegistryWrapper() 383 handlers._model_registry_store_registry = ModelRegistryStoreRegistryWrapper() 384 385 with ( 386 mock.patch("mlflow.server._run_server") as run_server_mock, 387 mock.patch("mlflow.store.tracking.file_store.FileStore") as tracking_store, 388 mock.patch( 389 "mlflow.store.model_registry.sqlalchemy_store.SqlAlchemyStore" 390 ) as registry_store, 391 ): 392 result = CliRunner().invoke( 393 command, 394 [ 395 "--backend-store-uri", 396 "./mlruns", 397 "--registry-store-uri", 398 "mysql://user:pwd@host:5432/mydb", 399 ], 400 ) 401 assert result.exit_code == 0 402 run_server_mock.assert_called() 403 tracking_store.assert_called() 404 registry_store.assert_called() 405 # Clean up the global variables set by the server 406 mlflow.set_tracking_uri(None) 407 mlflow.set_registry_uri(None) 408 409 410 @pytest.fixture 411 def sqlite_store(db_uri: str, tmp_path: Path) -> tuple[SqlAlchemyStore, str]: 412 artifact_uri = (tmp_path / "artifacts").as_uri() 413 store = SqlAlchemyStore(db_uri, artifact_uri) 414 return (store, db_uri) 415 416 417 @pytest.fixture 418 def file_store(): 419 pytest.skip("FileStore is no longer supported.") 420 ROOT_LOCATION = os.path.join(tempfile.gettempdir(), "test_mlflow_gc") 421 file_store_uri = f"file:///{ROOT_LOCATION}" 422 yield (FileStore(ROOT_LOCATION), file_store_uri) 423 shutil.rmtree(ROOT_LOCATION) 424 425 426 def _create_run_in_store(store, create_artifacts=True): 427 config = { 428 "experiment_id": "0", 429 "user_id": "Anderson", 430 "start_time": get_current_time_millis(), 431 "tags": [], 432 "run_name": "name", 433 } 434 run = store.create_run(**config) 435 if create_artifacts: 436 artifact_path = url2pathname(unquote(urlparse(run.info.artifact_uri).path)) 437 if not os.path.exists(artifact_path): 438 os.makedirs(artifact_path) 439 return run 440 441 442 @pytest.mark.parametrize("create_artifacts_in_run", [True, False]) 443 def test_mlflow_gc_sqlite(sqlite_store, create_artifacts_in_run): 444 store = sqlite_store[0] 445 run = _create_run_in_store(store, create_artifacts=create_artifacts_in_run) 446 store.delete_run(run.info.run_id) 447 subprocess.check_call([ 448 sys.executable, 449 "-m", 450 "mlflow", 451 "gc", 452 "--backend-store-uri", 453 sqlite_store[1], 454 ]) 455 runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL) 456 assert len(runs) == 0 457 with pytest.raises(MlflowException, match=r"Run .+ not found"): 458 store.get_run(run.info.run_id) 459 460 artifact_path = url2pathname(unquote(urlparse(run.info.artifact_uri).path)) 461 assert not os.path.exists(artifact_path) 462 463 464 def test_mlflow_gc_sqlite_older_than(sqlite_store): 465 store = sqlite_store[0] 466 run = _create_run_in_store(store) 467 store.delete_run(run.info.run_id) 468 with pytest.raises(subprocess.CalledProcessError, match=r".+") as exp: 469 subprocess.run( 470 [ 471 sys.executable, 472 "-m", 473 "mlflow", 474 "gc", 475 "--backend-store-uri", 476 sqlite_store[1], 477 "--older-than", 478 "10d10h10m10s", 479 "--run-ids", 480 run.info.run_id, 481 ], 482 check=True, 483 capture_output=True, 484 text=True, 485 ) 486 assert "is not older than the required age" in exp.value.stderr 487 runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL) 488 assert len(runs) == 1 489 490 time.sleep(1) 491 subprocess.check_output([ 492 sys.executable, 493 "-m", 494 "mlflow", 495 "gc", 496 "--backend-store-uri", 497 sqlite_store[1], 498 "--older-than", 499 "1s", 500 "--run-ids", 501 run.info.run_id, 502 ]) 503 runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL) 504 assert len(runs) == 0 505 506 507 @pytest.mark.parametrize("create_artifacts_in_run", [True, False]) 508 def test_mlflow_gc_file_store(file_store, create_artifacts_in_run): 509 pytest.skip("FileStore is no longer supported.") 510 store = file_store[0] 511 run = _create_run_in_store(store, create_artifacts=create_artifacts_in_run) 512 store.delete_run(run.info.run_id) 513 subprocess.check_output([ 514 sys.executable, 515 "-m", 516 "mlflow", 517 "gc", 518 "--backend-store-uri", 519 file_store[1], 520 ]) 521 runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL) 522 assert len(runs) == 0 523 with pytest.raises(MlflowException, match=r"Run .+ not found"): 524 store.get_run(run.info.run_id) 525 526 artifact_path = url2pathname(unquote(urlparse(run.info.artifact_uri).path)) 527 assert not os.path.exists(artifact_path) 528 529 530 def test_mlflow_gc_file_store_passing_explicit_run_ids(file_store): 531 pytest.skip("FileStore is no longer supported.") 532 store = file_store[0] 533 run = _create_run_in_store(store) 534 store.delete_run(run.info.run_id) 535 subprocess.check_output([ 536 sys.executable, 537 "-m", 538 "mlflow", 539 "gc", 540 "--backend-store-uri", 541 file_store[1], 542 "--run-ids", 543 run.info.run_id, 544 ]) 545 runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL) 546 assert len(runs) == 0 547 with pytest.raises(MlflowException, match=r"Run .+ not found"): 548 store.get_run(run.info.run_id) 549 550 551 def test_mlflow_gc_not_deleted_run(file_store): 552 pytest.skip("FileStore is no longer supported.") 553 store = file_store[0] 554 run = _create_run_in_store(store) 555 with pytest.raises(subprocess.CalledProcessError, match=r".+"): 556 subprocess.check_output([ 557 sys.executable, 558 "-m", 559 "mlflow", 560 "gc", 561 "--backend-store-uri", 562 file_store[1], 563 "--run-ids", 564 run.info.run_id, 565 ]) 566 runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL) 567 assert len(runs) == 1 568 569 570 def test_mlflow_gc_file_store_older_than(file_store): 571 pytest.skip("FileStore is no longer supported.") 572 store = file_store[0] 573 run = _create_run_in_store(store) 574 store.delete_run(run.info.run_id) 575 with pytest.raises(subprocess.CalledProcessError, match=r".+") as exp: 576 subprocess.run( 577 [ 578 sys.executable, 579 "-m", 580 "mlflow", 581 "gc", 582 "--backend-store-uri", 583 file_store[1], 584 "--older-than", 585 "10d10h10m10s", 586 "--run-ids", 587 run.info.run_id, 588 ], 589 check=True, 590 capture_output=True, 591 text=True, 592 ) 593 assert "is not older than the required age" in exp.value.stderr 594 runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL) 595 assert len(runs) == 1 596 597 time.sleep(1) 598 subprocess.check_output([ 599 sys.executable, 600 "-m", 601 "mlflow", 602 "gc", 603 "--backend-store-uri", 604 file_store[1], 605 "--older-than", 606 "1s", 607 "--run-ids", 608 run.info.run_id, 609 ]) 610 runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL) 611 assert len(runs) == 0 612 613 614 @pytest.mark.parametrize("get_store_details", ["file_store", "sqlite_store"]) 615 def test_mlflow_gc_experiments(get_store_details, request): 616 if get_store_details == "file_store": 617 pytest.skip("FileStore is no longer supported.") 618 619 def invoke_gc(*args): 620 return CliRunner().invoke(gc, args, catch_exceptions=False) 621 622 store, uri = request.getfixturevalue(get_store_details) 623 exp_id_1 = store.create_experiment("1") 624 run_id_1 = store.create_run(exp_id_1, user_id="user", start_time=0, tags=[], run_name="1") 625 invoke_gc("--backend-store-uri", uri) 626 experiments = store.search_experiments(view_type=ViewType.ALL) 627 exp_ids = [e.experiment_id for e in experiments] 628 runs = store.search_runs(experiment_ids=exp_ids, filter_string="", run_view_type=ViewType.ALL) 629 assert sorted(exp_ids) == sorted([exp_id_1, store.DEFAULT_EXPERIMENT_ID]) 630 assert [r.info.run_id for r in runs] == [run_id_1.info.run_id] 631 632 store.delete_experiment(exp_id_1) 633 invoke_gc("--backend-store-uri", uri) 634 experiments = store.search_experiments(view_type=ViewType.ALL) 635 runs = store.search_runs(experiment_ids=exp_ids, filter_string="", run_view_type=ViewType.ALL) 636 assert [e.experiment_id for e in experiments] == [store.DEFAULT_EXPERIMENT_ID] 637 assert runs == [] 638 639 exp_id_2 = store.create_experiment("2") 640 exp_id_3 = store.create_experiment("3") 641 store.delete_experiment(exp_id_2) 642 store.delete_experiment(exp_id_3) 643 invoke_gc("--backend-store-uri", uri, "--experiment-ids", exp_id_2) 644 experiments = store.search_experiments(view_type=ViewType.ALL) 645 assert sorted([e.experiment_id for e in experiments]) == sorted([ 646 exp_id_3, 647 store.DEFAULT_EXPERIMENT_ID, 648 ]) 649 650 with mock.patch("time.time", return_value=0) as mock_time: 651 exp_id_4 = store.create_experiment("4") 652 store.delete_experiment(exp_id_4) 653 mock_time.assert_called() 654 655 invoke_gc("--backend-store-uri", uri, "--older-than", "1d") 656 experiments = store.search_experiments(view_type=ViewType.ALL) 657 assert sorted([e.experiment_id for e in experiments]) == sorted([ 658 exp_id_3, 659 store.DEFAULT_EXPERIMENT_ID, 660 ]) 661 662 invoke_gc("--backend-store-uri", uri, "--experiment-ids", exp_id_3, "--older-than", "0s") 663 experiments = store.search_experiments(view_type=ViewType.ALL) 664 assert [e.experiment_id for e in experiments] == [store.DEFAULT_EXPERIMENT_ID] 665 666 exp_id_5 = store.create_experiment("5") 667 store.delete_experiment(exp_id_5) 668 with pytest.raises(MlflowException, match=r"Experiments .+ can be deleted."): 669 invoke_gc( 670 "--backend-store-uri", 671 uri, 672 "--experiment-ids", 673 exp_id_5, 674 "--older-than", 675 "10d10h10m10s", 676 ) 677 experiments = store.search_experiments(view_type=ViewType.ALL) 678 assert sorted([e.experiment_id for e in experiments]) == sorted([ 679 exp_id_5, 680 store.DEFAULT_EXPERIMENT_ID, 681 ]) 682 683 684 def test_mlflow_gc_experiment_with_logged_model_params_tags_and_metrics(sqlite_store): 685 store, uri = sqlite_store 686 exp_id = store.create_experiment("exp_with_logged_model") 687 run = store.create_run(exp_id, user_id="user", start_time=0, tags=[], run_name="run") 688 run_id = run.info.run_id 689 model = store.create_logged_model(experiment_id=exp_id) 690 691 store.log_logged_model_params( 692 model_id=model.model_id, 693 params=[ 694 LoggedModelParameter(key="param1", value="value1"), 695 LoggedModelParameter(key="param2", value="value2"), 696 ], 697 ) 698 store.set_logged_model_tags( 699 model_id=model.model_id, 700 tags=[ 701 LoggedModelTag(key="tag1", value="value1"), 702 LoggedModelTag(key="tag2", value="value2"), 703 ], 704 ) 705 store._log_model_metrics( 706 run_id=run_id, 707 metrics=[Metric(key="m1", value=1.0, timestamp=0, step=0, model_id=model.model_id)], 708 experiment_id=exp_id, 709 ) 710 711 store.delete_experiment(exp_id) 712 713 result = CliRunner().invoke(gc, ["--backend-store-uri", uri], catch_exceptions=False) 714 assert result.exit_code == 0 715 716 experiments = store.search_experiments(view_type=ViewType.ALL) 717 assert [e.experiment_id for e in experiments] == [store.DEFAULT_EXPERIMENT_ID] 718 719 with pytest.raises(MlflowException, match=r".+ not found"): 720 store.get_logged_model(model.model_id) 721 722 with store.ManagedSessionMaker() as session: 723 for table in [SqlLoggedModelParam, SqlLoggedModelTag, SqlLoggedModelMetric]: 724 assert session.query(table).count() == 0 725 726 727 @pytest.fixture 728 def sqlite_store_with_s3_artifact_repository( 729 tmp_path: Path, 730 ) -> tuple[SqlAlchemyStore, str, str]: 731 db_path = tmp_path / "mlflow.db" 732 db_uri = f"sqlite:///{db_path}" 733 s3_uri = "s3://mlflow" 734 store = SqlAlchemyStore(db_uri, s3_uri) 735 return (store, db_uri, s3_uri) 736 737 738 def test_mlflow_gc_sqlite_with_s3_artifact_repository( 739 sqlite_store_with_s3_artifact_repository, 740 ): 741 store = sqlite_store_with_s3_artifact_repository[0] 742 run = _create_run_in_store(store, create_artifacts=False) 743 store.delete_run(run.info.run_id) 744 745 artifact_repo = get_artifact_repository(run.info.artifact_uri) 746 bucket, dest_path = artifact_repo.parse_s3_compliant_uri(run.info.artifact_uri) 747 fake_artifact_path = os.path.join(dest_path, "fake_artifact.txt") 748 with Stubber(artifact_repo._get_s3_client()) as s3_stubber: 749 s3_stubber.add_response( 750 "list_objects_v2", 751 {"Contents": [{"Key": fake_artifact_path}]}, 752 {"Bucket": bucket, "Prefix": dest_path}, 753 ) 754 s3_stubber.add_response( 755 "delete_objects", 756 {"Deleted": [{"Key": fake_artifact_path}]}, 757 {"Bucket": bucket, "Delete": {"Objects": [{"Key": fake_artifact_path}]}}, 758 ) 759 760 CliRunner().invoke( 761 gc, 762 [ 763 "--backend-store-uri", 764 sqlite_store_with_s3_artifact_repository[1], 765 "--artifacts-destination", 766 sqlite_store_with_s3_artifact_repository[2], 767 ], 768 catch_exceptions=False, 769 ) 770 771 runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL) 772 assert len(runs) == 0 773 with pytest.raises(MlflowException, match=r"Run .+ not found"): 774 store.get_run(run.info.run_id) 775 776 777 @pytest.mark.skip(reason="mlserver is incompatible with the latest version of pydantic") 778 @pytest.mark.parametrize( 779 "enable_mlserver", 780 [ 781 # MLServer is not supported in Windows yet, so let's skip this test in that case. 782 # https://github.com/SeldonIO/MLServer/issues/361 783 pytest.param( 784 True, 785 marks=pytest.mark.skipif(is_windows(), reason="MLServer is not supported in Windows"), 786 ), 787 False, 788 ], 789 ) 790 def test_mlflow_models_serve(enable_mlserver): 791 class MyModel(pyfunc.PythonModel): 792 def predict(self, context, model_input, params=None): 793 return np.array([1, 2, 3]) 794 795 model = MyModel() 796 797 with mlflow.start_run(): 798 if enable_mlserver: 799 # We need MLServer to be present on the Conda environment, so we'll 800 # add that as an extra requirement. 801 mlflow.pyfunc.log_model( 802 name="model", 803 python_model=model, 804 extra_pip_requirements=[ 805 "mlserver>=1.2.0,!=1.3.1", 806 "mlserver-mlflow>=1.2.0,!=1.3.1", 807 PROTOBUF_REQUIREMENT, 808 ], 809 ) 810 else: 811 mlflow.pyfunc.log_model(name="model", python_model=model) 812 model_uri = mlflow.get_artifact_uri("model") 813 814 data = pd.DataFrame({"a": [0]}) 815 816 extra_args = ["--env-manager", "local"] 817 if enable_mlserver: 818 extra_args.append("--enable-mlserver") 819 820 scoring_response = pyfunc_serve_and_score_model( 821 model_uri=model_uri, 822 data=data, 823 content_type=pyfunc.scoring_server.CONTENT_TYPE_JSON, 824 extra_args=extra_args, 825 ) 826 assert scoring_response.status_code == 200 827 served_model_preds = np.array(json.loads(scoring_response.content)["predictions"]) 828 np.testing.assert_array_equal(served_model_preds, model.predict(data, None)) 829 830 831 def test_mlflow_tracking_disabled_in_artifacts_only_mode(tmp_path: Path): 832 port = get_safe_port() 833 with subprocess.Popen( 834 [sys.executable, "-m", "mlflow", "server", "--port", str(port), "--artifacts-only"], 835 cwd=tmp_path, 836 ) as process: 837 try: 838 _await_server_up_or_die(port) 839 resp = requests.get(f"http://localhost:{port}/api/2.0/mlflow/experiments/search") 840 assert ( 841 "Endpoint: /api/2.0/mlflow/experiments/search disabled due to the mlflow " 842 "server running in `--artifacts-only` mode." in resp.text 843 ) 844 finally: 845 kill_process_tree(process.pid) 846 847 848 def test_mlflow_artifact_list_in_artifacts_only_mode(tmp_path: Path): 849 port = get_safe_port() 850 with subprocess.Popen( 851 [sys.executable, "-m", "mlflow", "server", "--port", str(port), "--artifacts-only"], 852 cwd=tmp_path, 853 ) as process: 854 try: 855 _await_server_up_or_die(port) 856 resp = requests.get(f"http://localhost:{port}/api/2.0/mlflow-artifacts/artifacts") 857 augmented_raise_for_status(resp) 858 assert resp.status_code == 200 859 assert resp.text == "{}" 860 finally: 861 kill_process_tree(process.pid) 862 863 864 def test_mlflow_artifact_service_unavailable_when_no_server_artifacts_is_specified(): 865 port = get_safe_port() 866 with subprocess.Popen([ 867 sys.executable, 868 "-m", 869 "mlflow", 870 "server", 871 "--port", 872 str(port), 873 "--no-serve-artifacts", 874 ]) as process: 875 try: 876 _await_server_up_or_die(port) 877 endpoint = "/api/2.0/mlflow-artifacts/artifacts" 878 resp = requests.get(f"http://localhost:{port}{endpoint}") 879 assert ( 880 f"Endpoint: {endpoint} disabled due to the mlflow server running with " 881 "`--no-serve-artifacts`" in resp.text 882 ) 883 finally: 884 kill_process_tree(process.pid) 885 886 887 def test_mlflow_artifact_only_prints_warning_for_configs(): 888 with ( 889 mock.patch("mlflow.server._run_server") as run_server_mock, 890 mock.patch("mlflow.store.tracking.sqlalchemy_store.SqlAlchemyStore"), 891 mock.patch("mlflow.store.model_registry.sqlalchemy_store.SqlAlchemyStore"), 892 ): 893 result = CliRunner().invoke( 894 server, 895 ["--artifacts-only", "--backend-store-uri", "sqlite:///my.db"], 896 catch_exceptions=False, 897 ) 898 msg = ( 899 "Usage: server [OPTIONS]\nTry 'server --help' for help.\n\nError: You are starting a " 900 "tracking server in `--artifacts-only` mode and have provided a value for " 901 "`--backend_store_uri`" 902 ) 903 assert msg in result.output 904 assert result.exit_code != 0 905 run_server_mock.assert_not_called() 906 907 908 def test_mlflow_ui_is_alias_for_mlflow_server(): 909 mlflow_ui_stdout = subprocess.check_output( 910 [sys.executable, "-m", "mlflow", "ui", "--help"], text=True 911 ) 912 mlflow_server_stdout = subprocess.check_output( 913 [sys.executable, "-m", "mlflow", "server", "--help"], text=True 914 ) 915 assert ( 916 mlflow_ui_stdout.replace("Usage: python -m mlflow ui", "Usage: python -m mlflow server") 917 == mlflow_server_stdout 918 ) 919 920 921 def test_cli_with_python_mod(): 922 stdout = subprocess.check_output( 923 [ 924 sys.executable, 925 "-m", 926 "mlflow", 927 "--version", 928 ], 929 text=True, 930 ) 931 assert stdout.rstrip().endswith(mlflow.__version__) 932 stdout = subprocess.check_output( 933 [ 934 sys.executable, 935 "-m", 936 "mlflow", 937 "server", 938 "--help", 939 ], 940 text=True, 941 ) 942 assert "mlflow server" in stdout 943 944 945 def test_doctor(): 946 res = CliRunner().invoke(doctor, catch_exceptions=False) 947 assert res.exit_code == 0 948 949 950 def test_env_file_loading(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: 951 # Setup: Create an experiment using the Python SDK 952 test_tracking_uri = f"sqlite:///{tmp_path / 'mlflow.db'}" 953 test_experiment_name = "test_experiment_from_env" 954 955 # Create experiment using SDK 956 mlflow.set_tracking_uri(test_tracking_uri) 957 mlflow.create_experiment(test_experiment_name) 958 959 # Create a test .env file pointing to this tracking URI 960 env_file_path = tmp_path / "test.env" 961 env_file_path.write_text(f"MLFLOW_TRACKING_URI={test_tracking_uri}\n") 962 963 runner = CliRunner() 964 965 # Clear the MLflow environment variables to ensure we're testing the loading 966 monkeypatch.delenv("MLFLOW_TRACKING_URI", raising=False) 967 monkeypatch.delenv("MLFLOW_EXPERIMENT_NAME", raising=False) 968 969 # Ensure variables are not set before running command 970 assert "MLFLOW_TRACKING_URI" not in os.environ 971 972 # Use the existing experiments search CLI command with --env-file 973 result = runner.invoke( 974 cli, 975 ["--env-file", str(env_file_path), "experiments", "search"], 976 catch_exceptions=False, 977 ) 978 979 # Check that the command executed successfully 980 assert result.exit_code == 0 981 982 # Verify the experiment we created is found (proves env vars were loaded) 983 assert test_experiment_name in result.output 984 985 # Verify the loading message 986 assert "Loaded environment variables from:" in result.output 987 assert str(env_file_path) in result.output 988 989 990 def test_env_file_loading_invalid_path() -> None: 991 runner = CliRunner() 992 993 # Test error handling for non-existent file 994 result = runner.invoke( 995 cli, 996 ["--env-file", "nonexistent.env", "experiments", "search"], 997 catch_exceptions=False, 998 ) 999 assert result.exit_code != 0 1000 assert "Environment file 'nonexistent.env' does not exist" in result.output 1001 1002 1003 @pytest.mark.parametrize( 1004 ("args", "expected_max_results", "expected_view"), 1005 [ 1006 (["experiments", "search"], None, ViewType.ACTIVE_ONLY), 1007 (["experiments", "search", "--max-results", "50"], 50, ViewType.ACTIVE_ONLY), 1008 ( 1009 ["experiments", "search", "--view", "all", "--max-results", "100"], 1010 100, 1011 ViewType.ALL, 1012 ), 1013 ], 1014 ) 1015 def test_experiments_search_max_results(args, expected_max_results, expected_view): 1016 with mock.patch("mlflow.search_experiments", return_value=[]) as mock_search: 1017 result = CliRunner().invoke(cli, args, catch_exceptions=False) 1018 assert result.exit_code == 0 1019 mock_search.assert_called_once() 1020 call_kwargs = mock_search.call_args[1] 1021 assert call_kwargs.get("max_results") == expected_max_results 1022 assert call_kwargs.get("view_type") == expected_view 1023 1024 1025 def test_experiments_search_max_results_zero(): 1026 with mock.patch("mlflow.search_experiments", return_value=[]) as mock_search: 1027 result = CliRunner().invoke( 1028 cli, ["experiments", "search", "--max-results", "0"], catch_exceptions=False 1029 ) 1030 assert result.exit_code == 0 1031 mock_search.assert_called_once() 1032 call_kwargs = mock_search.call_args[1] 1033 assert call_kwargs.get("max_results") == 0 1034 1035 1036 def test_experiments_search_max_results_negative(): 1037 result = CliRunner().invoke( 1038 cli, ["experiments", "search", "--max-results", "-1"], catch_exceptions=False 1039 ) 1040 assert result.exit_code != 0 1041 assert "max-results must be a non-negative integer" in result.output 1042 1043 1044 def test_server_with_env_file(tmp_path): 1045 env_file = tmp_path / ".env" 1046 env_file.write_text("TEST_VAR=test_value\n") 1047 1048 with mock.patch("mlflow.server._run_server") as run_server_mock: 1049 result = CliRunner().invoke(cli, ["--env-file", str(env_file), "server"]) 1050 assert result.exit_code == 0 1051 run_server_mock.assert_called_once() 1052 # Verify env_file parameter is passed 1053 assert run_server_mock.call_args.kwargs["env_file"] == str(env_file) 1054 1055 1056 def test_mlflow_gc_with_datasets(sqlite_store): 1057 store = sqlite_store[0] 1058 1059 mlflow.set_tracking_uri(sqlite_store[1]) 1060 mlflow.set_experiment("dataset") 1061 1062 dataset = numpy_dataset.from_numpy(np.array([1, 2, 3])) 1063 1064 with mlflow.start_run() as run: 1065 experiment_id = run.info.experiment_id 1066 mlflow.log_input(dataset) 1067 1068 experiments = store.search_experiments(view_type=ViewType.ALL) 1069 1070 # default and datasets 1071 assert len(experiments) == 2 1072 1073 store.delete_experiment(experiment_id) 1074 1075 # the new experiment is only marked as deleted, not removed 1076 experiments = store.search_experiments(view_type=ViewType.ALL) 1077 assert len(experiments) == 2 1078 1079 subprocess.check_call([ 1080 sys.executable, 1081 "-m", 1082 "mlflow", 1083 "gc", 1084 "--backend-store-uri", 1085 sqlite_store[1], 1086 ]) 1087 experiments = store.search_experiments(view_type=ViewType.ALL) 1088 1089 # only default is left after GC 1090 assert len(experiments) == 1 1091 assert experiments[0].experiment_id == "0" 1092 with pytest.raises(MlflowException, match=f"No Experiment with id={experiment_id} exists"): 1093 store.get_experiment(experiment_id) 1094 1095 1096 @pytest.mark.parametrize("get_store_details", ["file_store", "sqlite_store"]) 1097 def test_mlflow_gc_logged_model(get_store_details, request): 1098 if get_store_details == "file_store": 1099 pytest.skip("FileStore is no longer supported.") 1100 store, uri = request.getfixturevalue(get_store_details) 1101 exp_id = store.create_experiment("exp") 1102 model = store.create_logged_model(experiment_id=exp_id) 1103 1104 store.delete_logged_model(model.model_id) 1105 subprocess.check_output([sys.executable, "-m", "mlflow", "gc", "--backend-store-uri", uri]) 1106 1107 with pytest.raises(MlflowException, match=r".+ not found"): 1108 store.get_logged_model(model.model_id) 1109 1110 1111 @pytest.mark.parametrize("get_store_details", ["file_store", "sqlite_store"]) 1112 def test_mlflow_gc_logged_models_older_than(get_store_details, request): 1113 if get_store_details == "file_store": 1114 pytest.skip("FileStore is no longer supported.") 1115 store, uri = request.getfixturevalue(get_store_details) 1116 exp_id = store.create_experiment("exp") 1117 old_time = time.time() - (2 * 24 * 60 * 60) 1118 with mock.patch("time.time", return_value=old_time): 1119 model = store.create_logged_model(experiment_id=exp_id) 1120 1121 store.delete_logged_model(model.model_id) 1122 1123 subprocess.check_call([ 1124 sys.executable, 1125 "-m", 1126 "mlflow", 1127 "gc", 1128 "--backend-store-uri", 1129 uri, 1130 "--older-than", 1131 "1d", 1132 ]) 1133 1134 retrieved_model = store.get_logged_model(model.model_id, allow_deleted=True) 1135 assert retrieved_model.model_id == model.model_id 1136 1137 1138 @pytest.mark.parametrize("get_store_details", ["file_store", "sqlite_store"]) 1139 def test_mlflow_gc_logged_models_deletes_when_older_than(get_store_details, request): 1140 if get_store_details == "file_store": 1141 pytest.skip("FileStore is no longer supported.") 1142 store, uri = request.getfixturevalue(get_store_details) 1143 exp_id = store.create_experiment("exp") 1144 model = store.create_logged_model(experiment_id=exp_id) 1145 1146 old_deletion_ms = int((time.time() - (2 * 24 * 60 * 60)) * 1000) 1147 with mock.patch("mlflow.utils.time.get_current_time_millis", return_value=old_deletion_ms): 1148 store.delete_logged_model(model.model_id) 1149 1150 subprocess.check_call([ 1151 sys.executable, 1152 "-m", 1153 "mlflow", 1154 "gc", 1155 "--backend-store-uri", 1156 uri, 1157 "--older-than", 1158 "1d", 1159 ]) 1160 1161 with pytest.raises(MlflowException, match=r".+ not found"): 1162 store.get_logged_model(model.model_id) 1163 1164 1165 @pytest.mark.parametrize("get_store_details", ["file_store", "sqlite_store"]) 1166 def test_mlflow_gc_logged_models_mixed_time(get_store_details, request): 1167 if get_store_details == "file_store": 1168 pytest.skip("FileStore is no longer supported.") 1169 store, uri = request.getfixturevalue(get_store_details) 1170 exp_id = store.create_experiment("exp") 1171 old_model = store.create_logged_model(experiment_id=exp_id) 1172 recent_model = store.create_logged_model(experiment_id=exp_id) 1173 1174 old_deletion_ms = int((time.time() - (3 * 24 * 60 * 60)) * 1000) 1175 with mock.patch("mlflow.utils.time.get_current_time_millis", return_value=old_deletion_ms): 1176 store.delete_logged_model(old_model.model_id) 1177 1178 with mock.patch("mlflow.utils.time.get_current_time_millis") as current_time_mock: 1179 current_time_mock.return_value = int(time.time() * 1000) 1180 store.delete_logged_model(recent_model.model_id) 1181 1182 subprocess.check_call([ 1183 sys.executable, 1184 "-m", 1185 "mlflow", 1186 "gc", 1187 "--backend-store-uri", 1188 uri, 1189 "--older-than", 1190 "1d", 1191 ]) 1192 1193 with pytest.raises(MlflowException, match=r".+ not found"): 1194 store.get_logged_model(old_model.model_id) 1195 1196 retrieved_model = store.get_logged_model(recent_model.model_id, allow_deleted=True) 1197 assert retrieved_model.model_id == recent_model.model_id 1198 1199 1200 @pytest.fixture 1201 def sqlite_store_with_jobs( 1202 db_uri: str, tmp_path: Path 1203 ) -> tuple[SqlAlchemyStore, SqlAlchemyJobStore, str]: 1204 artifact_uri = (tmp_path / "artifacts").as_uri() 1205 tracking_store = SqlAlchemyStore(db_uri, artifact_uri) 1206 job_store = SqlAlchemyJobStore(db_uri) 1207 return (tracking_store, job_store, db_uri) 1208 1209 1210 def _create_test_job(job_store, job_name="test_job", finalize=True): 1211 job = job_store.create_job(job_name=job_name, params="{}") 1212 if finalize: 1213 job_store.start_job(job.job_id) 1214 job_store.finish_job(job.job_id, result="done") 1215 return job_store.get_job(job.job_id) 1216 return job 1217 1218 1219 def _set_job_creation_time(job_store, job_id, creation_time_ms): 1220 from mlflow.store.tracking.dbmodels.models import SqlJob 1221 1222 with job_store.ManagedSessionMaker() as session: 1223 job = session.query(SqlJob).filter(SqlJob.id == job_id).first() 1224 job.creation_time = creation_time_ms 1225 1226 1227 def test_mlflow_gc_no_jobs_deleted_without_jobs_flag(sqlite_store_with_jobs): 1228 _, job_store, db_uri = sqlite_store_with_jobs 1229 1230 job1 = _create_test_job(job_store, "job1") 1231 job2 = _create_test_job(job_store, "job2") 1232 1233 subprocess.check_call([ 1234 sys.executable, 1235 "-m", 1236 "mlflow", 1237 "gc", 1238 "--backend-store-uri", 1239 db_uri, 1240 ]) 1241 1242 retrieved_job1 = job_store.get_job(job1.job_id) 1243 retrieved_job2 = job_store.get_job(job2.job_id) 1244 assert retrieved_job1.job_id == job1.job_id 1245 assert retrieved_job2.job_id == job2.job_id 1246 1247 1248 def test_mlflow_gc_all_jobs_deleted_with_jobs_flag(sqlite_store_with_jobs): 1249 _, job_store, db_uri = sqlite_store_with_jobs 1250 1251 job1 = _create_test_job(job_store, "job1") 1252 job2 = _create_test_job(job_store, "job2") 1253 job3 = _create_test_job(job_store, "job3") 1254 1255 subprocess.check_call([ 1256 sys.executable, 1257 "-m", 1258 "mlflow", 1259 "gc", 1260 "--backend-store-uri", 1261 db_uri, 1262 "--jobs", 1263 ]) 1264 1265 with pytest.raises(MlflowException, match=r"Job .+ not found"): 1266 job_store.get_job(job1.job_id) 1267 with pytest.raises(MlflowException, match=r"Job .+ not found"): 1268 job_store.get_job(job2.job_id) 1269 with pytest.raises(MlflowException, match=r"Job .+ not found"): 1270 job_store.get_job(job3.job_id) 1271 1272 1273 def test_mlflow_gc_specific_jobs_deleted_with_job_ids(sqlite_store_with_jobs): 1274 _, job_store, db_uri = sqlite_store_with_jobs 1275 1276 job1 = _create_test_job(job_store, "job1") 1277 job2 = _create_test_job(job_store, "job2") 1278 job3 = _create_test_job(job_store, "job3") 1279 1280 subprocess.check_call([ 1281 sys.executable, 1282 "-m", 1283 "mlflow", 1284 "gc", 1285 "--backend-store-uri", 1286 db_uri, 1287 "--job-ids", 1288 f"{job1.job_id},{job2.job_id}", 1289 ]) 1290 1291 with pytest.raises(MlflowException, match=r"Job .+ not found"): 1292 job_store.get_job(job1.job_id) 1293 with pytest.raises(MlflowException, match=r"Job .+ not found"): 1294 job_store.get_job(job2.job_id) 1295 1296 retrieved_job3 = job_store.get_job(job3.job_id) 1297 assert retrieved_job3.job_id == job3.job_id 1298 1299 1300 def test_mlflow_gc_jobs_deleted_with_older_than_flag(sqlite_store_with_jobs): 1301 _, job_store, db_uri = sqlite_store_with_jobs 1302 1303 old_job = _create_test_job(job_store, "old_job") 1304 new_job = _create_test_job(job_store, "new_job") 1305 1306 one_hour_ago = get_current_time_millis() - (60 * 60 * 1000) 1307 _set_job_creation_time(job_store, old_job.job_id, one_hour_ago) 1308 1309 subprocess.check_call([ 1310 sys.executable, 1311 "-m", 1312 "mlflow", 1313 "gc", 1314 "--backend-store-uri", 1315 db_uri, 1316 "--jobs", 1317 "--older-than", 1318 "30m", 1319 ]) 1320 1321 with pytest.raises(MlflowException, match=r"Job .+ not found"): 1322 job_store.get_job(old_job.job_id) 1323 1324 retrieved_new_job = job_store.get_job(new_job.job_id) 1325 assert retrieved_new_job.job_id == new_job.job_id 1326 1327 1328 def test_mlflow_gc_job_ids_with_older_than_filter(sqlite_store_with_jobs): 1329 _, job_store, db_uri = sqlite_store_with_jobs 1330 1331 old_job = _create_test_job(job_store, "old_job") 1332 new_job = _create_test_job(job_store, "new_job") 1333 1334 one_hour_ago = get_current_time_millis() - (60 * 60 * 1000) 1335 _set_job_creation_time(job_store, old_job.job_id, one_hour_ago) 1336 1337 subprocess.check_call([ 1338 sys.executable, 1339 "-m", 1340 "mlflow", 1341 "gc", 1342 "--backend-store-uri", 1343 db_uri, 1344 "--job-ids", 1345 new_job.job_id, 1346 "--older-than", 1347 "30m", 1348 ]) 1349 1350 retrieved_new_job = job_store.get_job(new_job.job_id) 1351 assert retrieved_new_job.job_id == new_job.job_id 1352 1353 retrieved_old_job = job_store.get_job(old_job.job_id) 1354 assert retrieved_old_job.job_id == old_job.job_id 1355 1356 1357 def test_mlflow_gc_only_deletes_finalized_jobs(sqlite_store_with_jobs): 1358 from mlflow.entities._job_status import JobStatus 1359 1360 _, job_store, db_uri = sqlite_store_with_jobs 1361 1362 # Create jobs with different statuses 1363 pending_job = _create_test_job(job_store, "pending_job", finalize=False) 1364 assert pending_job.status == JobStatus.PENDING 1365 1366 running_job = _create_test_job(job_store, "running_job", finalize=False) 1367 job_store.start_job(running_job.job_id) 1368 running_job = job_store.get_job(running_job.job_id) 1369 assert running_job.status == JobStatus.RUNNING 1370 1371 succeeded_job = _create_test_job(job_store, "succeeded_job", finalize=True) 1372 assert succeeded_job.status == JobStatus.SUCCEEDED 1373 1374 failed_job = _create_test_job(job_store, "failed_job", finalize=False) 1375 job_store.start_job(failed_job.job_id) 1376 job_store.fail_job(failed_job.job_id, "error") 1377 failed_job = job_store.get_job(failed_job.job_id) 1378 assert failed_job.status == JobStatus.FAILED 1379 1380 timeout_job = _create_test_job(job_store, "timeout_job", finalize=False) 1381 job_store.start_job(timeout_job.job_id) 1382 job_store.mark_job_timed_out(timeout_job.job_id) 1383 timeout_job = job_store.get_job(timeout_job.job_id) 1384 assert timeout_job.status == JobStatus.TIMEOUT 1385 1386 canceled_job = _create_test_job(job_store, "canceled_job", finalize=False) 1387 job_store.cancel_job(canceled_job.job_id) 1388 canceled_job = job_store.get_job(canceled_job.job_id) 1389 assert canceled_job.status == JobStatus.CANCELED 1390 1391 subprocess.check_call([ 1392 sys.executable, 1393 "-m", 1394 "mlflow", 1395 "gc", 1396 "--backend-store-uri", 1397 db_uri, 1398 "--jobs", 1399 ]) 1400 1401 retrieved_pending = job_store.get_job(pending_job.job_id) 1402 assert retrieved_pending.job_id == pending_job.job_id 1403 assert retrieved_pending.status == JobStatus.PENDING 1404 1405 retrieved_running = job_store.get_job(running_job.job_id) 1406 assert retrieved_running.job_id == running_job.job_id 1407 assert retrieved_running.status == JobStatus.RUNNING 1408 1409 with pytest.raises(MlflowException, match=r"Job .+ not found"): 1410 job_store.get_job(succeeded_job.job_id) 1411 with pytest.raises(MlflowException, match=r"Job .+ not found"): 1412 job_store.get_job(failed_job.job_id) 1413 with pytest.raises(MlflowException, match=r"Job .+ not found"): 1414 job_store.get_job(timeout_job.job_id) 1415 with pytest.raises(MlflowException, match=r"Job .+ not found"): 1416 job_store.get_job(canceled_job.job_id)