/ tests / test_cli.py
test_cli.py
   1  import json
   2  import os
   3  import shutil
   4  import subprocess
   5  import sys
   6  import tempfile
   7  import time
   8  from pathlib import Path
   9  from unittest import mock
  10  from urllib.parse import unquote, urlparse
  11  from urllib.request import url2pathname
  12  
  13  import click
  14  import numpy as np
  15  import pandas as pd
  16  import pytest
  17  import requests
  18  from botocore.stub import Stubber
  19  from click.testing import CliRunner
  20  
  21  import mlflow
  22  from mlflow import pyfunc
  23  from mlflow.cli import cli, doctor, gc, server
  24  from mlflow.data import numpy_dataset
  25  from mlflow.entities import Metric, ViewType
  26  from mlflow.entities.logged_model import LoggedModelParameter, LoggedModelTag
  27  from mlflow.environment_variables import MLFLOW_ENABLE_WORKSPACES, MLFLOW_WORKSPACE_STORE_URI
  28  from mlflow.exceptions import MlflowException
  29  from mlflow.server import handlers
  30  from mlflow.store.artifact.artifact_repository_registry import get_artifact_repository
  31  from mlflow.store.jobs.sqlalchemy_store import SqlAlchemyJobStore
  32  from mlflow.store.tracking.dbmodels.models import (
  33      SqlLoggedModelMetric,
  34      SqlLoggedModelParam,
  35      SqlLoggedModelTag,
  36  )
  37  from mlflow.store.tracking.file_store import FileStore
  38  from mlflow.store.tracking.sqlalchemy_store import SqlAlchemyStore
  39  from mlflow.utils.os import is_windows
  40  from mlflow.utils.rest_utils import augmented_raise_for_status
  41  from mlflow.utils.time import get_current_time_millis
  42  
  43  from tests.helper_functions import (
  44      PROTOBUF_REQUIREMENT,
  45      get_safe_port,
  46      kill_process_tree,
  47      pyfunc_serve_and_score_model,
  48  )
  49  from tests.tracking.integration_test_utils import _await_server_up_or_die
  50  
  51  
  52  @pytest.mark.parametrize("command", ["server"])
  53  def test_mlflow_server_command(command):
  54      port = get_safe_port()
  55      with subprocess.Popen([
  56          sys.executable,
  57          "-m",
  58          "mlflow",
  59          command,
  60          "--port",
  61          str(port),
  62      ]) as process:
  63          try:
  64              _await_server_up_or_die(port)
  65              resp = requests.get(f"http://localhost:{port}/health")
  66              augmented_raise_for_status(resp)
  67              assert resp.text == "OK"
  68          finally:
  69              kill_process_tree(process.pid)
  70  
  71  
  72  def test_server_static_prefix_validation():
  73      with mock.patch("mlflow.server._run_server") as run_server_mock:
  74          CliRunner().invoke(server)
  75          run_server_mock.assert_called_once()
  76      with mock.patch("mlflow.server._run_server") as run_server_mock:
  77          CliRunner().invoke(server, ["--static-prefix", "/mlflow"])
  78          run_server_mock.assert_called_once()
  79      with mock.patch("mlflow.server._run_server") as run_server_mock:
  80          result = CliRunner().invoke(server, ["--static-prefix", "mlflow/"])
  81          assert "--static-prefix must begin with a '/'." in result.output
  82          run_server_mock.assert_not_called()
  83      with mock.patch("mlflow.server._run_server") as run_server_mock:
  84          result = CliRunner().invoke(server, ["--static-prefix", "/mlflow/"])
  85          assert "--static-prefix should not end with a '/'." in result.output
  86          run_server_mock.assert_not_called()
  87  
  88  
  89  def test_server_cli_fails_when_workspace_env_set():
  90      env = os.environ.copy()
  91      # Trigger server-mode guard in mlflow.server.__init__
  92      env["_MLFLOW_SERVER_SERVE_ARTIFACTS"] = "1"
  93      env["MLFLOW_WORKSPACE"] = "team-a"
  94  
  95      result = subprocess.run(
  96          [sys.executable, "-m", "mlflow", "server", "--port", "0"],
  97          capture_output=True,
  98          text=True,
  99          env=env,
 100      )
 101      assert result.returncode != 0
 102      assert "MLFLOW_WORKSPACE=team-a is client-only" in result.stderr
 103  
 104  
 105  def test_server_uvicorn_options():
 106      with mock.patch("mlflow.server._run_server") as run_server_mock:
 107          # Test default behavior (uvicorn should be used when no server options specified)
 108          CliRunner().invoke(server)
 109          run_server_mock.assert_called_once_with(
 110              file_store_path=mock.ANY,
 111              registry_store_uri=mock.ANY,
 112              default_artifact_root=mock.ANY,
 113              serve_artifacts=mock.ANY,
 114              artifacts_only=mock.ANY,
 115              artifacts_destination=mock.ANY,
 116              host="127.0.0.1",
 117              port=5000,
 118              static_prefix=None,
 119              workers=None,
 120              gunicorn_opts=None,
 121              waitress_opts=None,
 122              expose_prometheus=None,
 123              app_name=None,
 124              uvicorn_opts=None,
 125              env_file=None,
 126              secrets_cache_ttl=60,
 127              secrets_cache_max_size=1000,
 128          )
 129  
 130      with mock.patch("mlflow.server._run_server") as run_server_mock:
 131          # Test with uvicorn-opts - use different options than dev mode
 132          CliRunner().invoke(server, ["--uvicorn-opts", "--loop asyncio --limit-concurrency 100"])
 133          run_server_mock.assert_called_once_with(
 134              file_store_path=mock.ANY,
 135              registry_store_uri=mock.ANY,
 136              default_artifact_root=mock.ANY,
 137              serve_artifacts=mock.ANY,
 138              artifacts_only=mock.ANY,
 139              artifacts_destination=mock.ANY,
 140              host="127.0.0.1",
 141              port=5000,
 142              static_prefix=None,
 143              workers=None,
 144              gunicorn_opts=None,
 145              waitress_opts=None,
 146              expose_prometheus=None,
 147              app_name=None,
 148              uvicorn_opts="--loop asyncio --limit-concurrency 100",
 149              env_file=None,
 150              secrets_cache_ttl=60,
 151              secrets_cache_max_size=1000,
 152          )
 153  
 154  
 155  @pytest.mark.skipif(is_windows(), reason="--dev mode is not supported on Windows")
 156  def test_server_dev_mode():
 157      with mock.patch("mlflow.server._run_server") as run_server_mock:
 158          # Test with --dev flag (should set uvicorn opts)
 159          CliRunner().invoke(server, ["--dev"])
 160          run_server_mock.assert_called_once_with(
 161              file_store_path=mock.ANY,
 162              registry_store_uri=mock.ANY,
 163              default_artifact_root=mock.ANY,
 164              serve_artifacts=mock.ANY,
 165              artifacts_only=mock.ANY,
 166              artifacts_destination=mock.ANY,
 167              host="127.0.0.1",
 168              port=5000,
 169              static_prefix=None,
 170              workers=None,
 171              gunicorn_opts=None,
 172              waitress_opts=None,
 173              expose_prometheus=None,
 174              app_name=None,
 175              uvicorn_opts="--reload --log-level debug",
 176              env_file=None,
 177              secrets_cache_ttl=60,
 178              secrets_cache_max_size=1000,
 179          )
 180  
 181  
 182  @pytest.mark.skipif(is_windows(), reason="Gunicorn is not supported on Windows")
 183  def test_server_gunicorn_options():
 184      with mock.patch("mlflow.server._run_server") as run_server_mock:
 185          # Test that gunicorn-opts disables uvicorn
 186          CliRunner().invoke(server, ["--gunicorn-opts", "--timeout 120 --max-requests 1000"])
 187          run_server_mock.assert_called_once_with(
 188              file_store_path=mock.ANY,
 189              registry_store_uri=mock.ANY,
 190              default_artifact_root=mock.ANY,
 191              serve_artifacts=mock.ANY,
 192              artifacts_only=mock.ANY,
 193              artifacts_destination=mock.ANY,
 194              host="127.0.0.1",
 195              port=5000,
 196              static_prefix=None,
 197              workers=None,
 198              gunicorn_opts="--timeout 120 --max-requests 1000",
 199              waitress_opts=None,
 200              expose_prometheus=None,
 201              app_name=None,
 202              uvicorn_opts=None,
 203              env_file=None,
 204              secrets_cache_ttl=60,
 205              secrets_cache_max_size=1000,
 206          )
 207  
 208      # Test conflicting options
 209      result = CliRunner().invoke(
 210          server, ["--uvicorn-opts", "--reload", "--gunicorn-opts", "--log-level debug"]
 211      )
 212      assert result.exit_code != 0
 213      assert "Cannot specify multiple server options" in result.output
 214  
 215  
 216  def test_server_initializes_backend_store_when_tracking_enabled():
 217      handlers._tracking_store = None
 218      handlers._model_registry_store = None
 219      runner = CliRunner()
 220      with runner.isolated_filesystem():
 221          with (
 222              mock.patch("mlflow.server.handlers.initialize_backend_stores") as init_backend_mock,
 223              mock.patch("mlflow.server._run_server") as run_server_mock,
 224          ):
 225              result = runner.invoke(server)
 226      assert result.exit_code == 0
 227      init_backend_mock.assert_called_once_with(
 228          mock.ANY, mock.ANY, mock.ANY, workspace_store_uri=None
 229      )
 230      run_server_mock.assert_called_once()
 231  
 232  
 233  def test_server_skips_backend_store_init_in_artifacts_only_mode():
 234      handlers._tracking_store = None
 235      handlers._model_registry_store = None
 236      runner = CliRunner()
 237      with runner.isolated_filesystem():
 238          with (
 239              mock.patch("mlflow.server.handlers.initialize_backend_stores") as init_backend_mock,
 240              mock.patch("mlflow.server._run_server") as run_server_mock,
 241          ):
 242              result = runner.invoke(server, ["--artifacts-only"])
 243      assert result.exit_code == 0
 244      init_backend_mock.assert_not_called()
 245      run_server_mock.assert_called_once()
 246  
 247  
 248  def test_server_mlflow_artifacts_options():
 249      handlers._tracking_store = None
 250      handlers._model_registry_store = None
 251      with mock.patch(
 252          "mlflow.tracking._tracking_service.utils._has_existing_mlruns_data", return_value=False
 253      ):
 254          runner = CliRunner()
 255          with runner.isolated_filesystem():
 256              with mock.patch("mlflow.server._run_server") as run_server_mock:
 257                  runner.invoke(server, ["--artifacts-only"])
 258                  run_server_mock.assert_called_once()
 259              with mock.patch("mlflow.server._run_server") as run_server_mock:
 260                  runner.invoke(server, ["--serve-artifacts"])
 261                  run_server_mock.assert_called_once()
 262              with mock.patch("mlflow.server._run_server") as run_server_mock:
 263                  runner.invoke(server, ["--no-serve-artifacts"])
 264                  run_server_mock.assert_called_once()
 265              with mock.patch("mlflow.server._run_server") as run_server_mock:
 266                  runner.invoke(server, ["--artifacts-only"])
 267                  run_server_mock.assert_called_once()
 268  
 269  
 270  def test_server_artifacts_only_conflicts_with_enable_workspaces():
 271      with pytest.raises(
 272          click.UsageError, match="--enable-workspaces cannot be combined with --artifacts-only"
 273      ):
 274          CliRunner().invoke(
 275              server,
 276              ["--artifacts-only", "--enable-workspaces"],
 277              catch_exceptions=False,
 278              standalone_mode=False,
 279          )
 280  
 281  
 282  def test_server_workspace_uri_sets_env_when_workspaces_enabled(tmp_path):
 283      handlers._tracking_store = None
 284      handlers._model_registry_store = None
 285      workspace_uri = f"sqlite:///{tmp_path / 'workspace.db'}"
 286      backend_uri = f"sqlite:///{tmp_path / 'backend.db'}"
 287      artifact_root_path = tmp_path / "artifacts"
 288      artifact_root_path.mkdir()
 289      artifact_root = artifact_root_path.as_uri()
 290  
 291      MLFLOW_WORKSPACE_STORE_URI.unset()
 292      MLFLOW_ENABLE_WORKSPACES.unset()
 293  
 294      try:
 295          with (
 296              mock.patch("mlflow.server._run_server") as run_server_mock,
 297              mock.patch("mlflow.server.handlers.initialize_backend_stores") as init_backend,
 298          ):
 299              result = CliRunner().invoke(
 300                  server,
 301                  [
 302                      "--enable-workspaces",
 303                      "--workspace-store-uri",
 304                      workspace_uri,
 305                      "--backend-store-uri",
 306                      backend_uri,
 307                      "--registry-store-uri",
 308                      backend_uri,
 309                      "--default-artifact-root",
 310                      artifact_root,
 311                  ],
 312                  catch_exceptions=False,
 313                  standalone_mode=False,
 314              )
 315          assert result.exit_code == 0
 316          run_server_mock.assert_called_once()
 317          init_backend.assert_called_once_with(
 318              backend_uri,
 319              backend_uri,
 320              artifact_root,
 321              workspace_store_uri=workspace_uri,
 322          )
 323          assert MLFLOW_WORKSPACE_STORE_URI.get() == workspace_uri
 324          assert MLFLOW_ENABLE_WORKSPACES.get() is True
 325      finally:
 326          MLFLOW_WORKSPACE_STORE_URI.unset()
 327          MLFLOW_ENABLE_WORKSPACES.unset()
 328  
 329  
 330  @pytest.mark.parametrize("command", [server])
 331  def test_tracking_uri_validation_failure(command):
 332      handlers._tracking_store = None
 333      with mock.patch("mlflow.server._run_server") as run_server_mock:
 334          # SQLAlchemy expects postgresql:// not postgres://
 335          CliRunner().invoke(
 336              command,
 337              [
 338                  "--backend-store-uri",
 339                  "postgres://user:pwd@host:5432/mydb",
 340                  "--default-artifact-root",
 341                  "./mlruns",
 342              ],
 343          )
 344          run_server_mock.assert_not_called()
 345  
 346  
 347  @pytest.mark.parametrize("command", [server])
 348  def test_tracking_uri_validation_sql_driver_uris(command):
 349      handlers._tracking_store = None
 350      handlers._model_registry_store = None
 351      with (
 352          mock.patch("mlflow.server._run_server") as run_server_mock,
 353          mock.patch("mlflow.store.tracking.sqlalchemy_store.SqlAlchemyStore"),
 354          mock.patch("mlflow.store.model_registry.sqlalchemy_store.SqlAlchemyStore"),
 355      ):
 356          result = CliRunner().invoke(
 357              command,
 358              [
 359                  "--backend-store-uri",
 360                  "mysql+pymysql://user:pwd@host:5432/mydb",
 361                  "--default-artifact-root",
 362                  "./mlruns",
 363              ],
 364          )
 365          assert result.exit_code == 0
 366          run_server_mock.assert_called()
 367      # Clean up the global variables set by the server
 368      mlflow.set_tracking_uri(None)
 369      mlflow.set_registry_uri(None)
 370  
 371  
 372  @pytest.mark.parametrize("command", [server])
 373  def test_registry_store_uri_different_from_tracking_store(command):
 374      handlers._tracking_store = None
 375      handlers._model_registry_store = None
 376  
 377      from mlflow.server.handlers import (
 378          ModelRegistryStoreRegistryWrapper,
 379          TrackingStoreRegistryWrapper,
 380      )
 381  
 382      handlers._tracking_store_registry = TrackingStoreRegistryWrapper()
 383      handlers._model_registry_store_registry = ModelRegistryStoreRegistryWrapper()
 384  
 385      with (
 386          mock.patch("mlflow.server._run_server") as run_server_mock,
 387          mock.patch("mlflow.store.tracking.file_store.FileStore") as tracking_store,
 388          mock.patch(
 389              "mlflow.store.model_registry.sqlalchemy_store.SqlAlchemyStore"
 390          ) as registry_store,
 391      ):
 392          result = CliRunner().invoke(
 393              command,
 394              [
 395                  "--backend-store-uri",
 396                  "./mlruns",
 397                  "--registry-store-uri",
 398                  "mysql://user:pwd@host:5432/mydb",
 399              ],
 400          )
 401          assert result.exit_code == 0
 402          run_server_mock.assert_called()
 403          tracking_store.assert_called()
 404          registry_store.assert_called()
 405      # Clean up the global variables set by the server
 406      mlflow.set_tracking_uri(None)
 407      mlflow.set_registry_uri(None)
 408  
 409  
 410  @pytest.fixture
 411  def sqlite_store(db_uri: str, tmp_path: Path) -> tuple[SqlAlchemyStore, str]:
 412      artifact_uri = (tmp_path / "artifacts").as_uri()
 413      store = SqlAlchemyStore(db_uri, artifact_uri)
 414      return (store, db_uri)
 415  
 416  
 417  @pytest.fixture
 418  def file_store():
 419      pytest.skip("FileStore is no longer supported.")
 420      ROOT_LOCATION = os.path.join(tempfile.gettempdir(), "test_mlflow_gc")
 421      file_store_uri = f"file:///{ROOT_LOCATION}"
 422      yield (FileStore(ROOT_LOCATION), file_store_uri)
 423      shutil.rmtree(ROOT_LOCATION)
 424  
 425  
 426  def _create_run_in_store(store, create_artifacts=True):
 427      config = {
 428          "experiment_id": "0",
 429          "user_id": "Anderson",
 430          "start_time": get_current_time_millis(),
 431          "tags": [],
 432          "run_name": "name",
 433      }
 434      run = store.create_run(**config)
 435      if create_artifacts:
 436          artifact_path = url2pathname(unquote(urlparse(run.info.artifact_uri).path))
 437          if not os.path.exists(artifact_path):
 438              os.makedirs(artifact_path)
 439      return run
 440  
 441  
 442  @pytest.mark.parametrize("create_artifacts_in_run", [True, False])
 443  def test_mlflow_gc_sqlite(sqlite_store, create_artifacts_in_run):
 444      store = sqlite_store[0]
 445      run = _create_run_in_store(store, create_artifacts=create_artifacts_in_run)
 446      store.delete_run(run.info.run_id)
 447      subprocess.check_call([
 448          sys.executable,
 449          "-m",
 450          "mlflow",
 451          "gc",
 452          "--backend-store-uri",
 453          sqlite_store[1],
 454      ])
 455      runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL)
 456      assert len(runs) == 0
 457      with pytest.raises(MlflowException, match=r"Run .+ not found"):
 458          store.get_run(run.info.run_id)
 459  
 460      artifact_path = url2pathname(unquote(urlparse(run.info.artifact_uri).path))
 461      assert not os.path.exists(artifact_path)
 462  
 463  
 464  def test_mlflow_gc_sqlite_older_than(sqlite_store):
 465      store = sqlite_store[0]
 466      run = _create_run_in_store(store)
 467      store.delete_run(run.info.run_id)
 468      with pytest.raises(subprocess.CalledProcessError, match=r".+") as exp:
 469          subprocess.run(
 470              [
 471                  sys.executable,
 472                  "-m",
 473                  "mlflow",
 474                  "gc",
 475                  "--backend-store-uri",
 476                  sqlite_store[1],
 477                  "--older-than",
 478                  "10d10h10m10s",
 479                  "--run-ids",
 480                  run.info.run_id,
 481              ],
 482              check=True,
 483              capture_output=True,
 484              text=True,
 485          )
 486      assert "is not older than the required age" in exp.value.stderr
 487      runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL)
 488      assert len(runs) == 1
 489  
 490      time.sleep(1)
 491      subprocess.check_output([
 492          sys.executable,
 493          "-m",
 494          "mlflow",
 495          "gc",
 496          "--backend-store-uri",
 497          sqlite_store[1],
 498          "--older-than",
 499          "1s",
 500          "--run-ids",
 501          run.info.run_id,
 502      ])
 503      runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL)
 504      assert len(runs) == 0
 505  
 506  
 507  @pytest.mark.parametrize("create_artifacts_in_run", [True, False])
 508  def test_mlflow_gc_file_store(file_store, create_artifacts_in_run):
 509      pytest.skip("FileStore is no longer supported.")
 510      store = file_store[0]
 511      run = _create_run_in_store(store, create_artifacts=create_artifacts_in_run)
 512      store.delete_run(run.info.run_id)
 513      subprocess.check_output([
 514          sys.executable,
 515          "-m",
 516          "mlflow",
 517          "gc",
 518          "--backend-store-uri",
 519          file_store[1],
 520      ])
 521      runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL)
 522      assert len(runs) == 0
 523      with pytest.raises(MlflowException, match=r"Run .+ not found"):
 524          store.get_run(run.info.run_id)
 525  
 526      artifact_path = url2pathname(unquote(urlparse(run.info.artifact_uri).path))
 527      assert not os.path.exists(artifact_path)
 528  
 529  
 530  def test_mlflow_gc_file_store_passing_explicit_run_ids(file_store):
 531      pytest.skip("FileStore is no longer supported.")
 532      store = file_store[0]
 533      run = _create_run_in_store(store)
 534      store.delete_run(run.info.run_id)
 535      subprocess.check_output([
 536          sys.executable,
 537          "-m",
 538          "mlflow",
 539          "gc",
 540          "--backend-store-uri",
 541          file_store[1],
 542          "--run-ids",
 543          run.info.run_id,
 544      ])
 545      runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL)
 546      assert len(runs) == 0
 547      with pytest.raises(MlflowException, match=r"Run .+ not found"):
 548          store.get_run(run.info.run_id)
 549  
 550  
 551  def test_mlflow_gc_not_deleted_run(file_store):
 552      pytest.skip("FileStore is no longer supported.")
 553      store = file_store[0]
 554      run = _create_run_in_store(store)
 555      with pytest.raises(subprocess.CalledProcessError, match=r".+"):
 556          subprocess.check_output([
 557              sys.executable,
 558              "-m",
 559              "mlflow",
 560              "gc",
 561              "--backend-store-uri",
 562              file_store[1],
 563              "--run-ids",
 564              run.info.run_id,
 565          ])
 566      runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL)
 567      assert len(runs) == 1
 568  
 569  
 570  def test_mlflow_gc_file_store_older_than(file_store):
 571      pytest.skip("FileStore is no longer supported.")
 572      store = file_store[0]
 573      run = _create_run_in_store(store)
 574      store.delete_run(run.info.run_id)
 575      with pytest.raises(subprocess.CalledProcessError, match=r".+") as exp:
 576          subprocess.run(
 577              [
 578                  sys.executable,
 579                  "-m",
 580                  "mlflow",
 581                  "gc",
 582                  "--backend-store-uri",
 583                  file_store[1],
 584                  "--older-than",
 585                  "10d10h10m10s",
 586                  "--run-ids",
 587                  run.info.run_id,
 588              ],
 589              check=True,
 590              capture_output=True,
 591              text=True,
 592          )
 593      assert "is not older than the required age" in exp.value.stderr
 594      runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL)
 595      assert len(runs) == 1
 596  
 597      time.sleep(1)
 598      subprocess.check_output([
 599          sys.executable,
 600          "-m",
 601          "mlflow",
 602          "gc",
 603          "--backend-store-uri",
 604          file_store[1],
 605          "--older-than",
 606          "1s",
 607          "--run-ids",
 608          run.info.run_id,
 609      ])
 610      runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL)
 611      assert len(runs) == 0
 612  
 613  
 614  @pytest.mark.parametrize("get_store_details", ["file_store", "sqlite_store"])
 615  def test_mlflow_gc_experiments(get_store_details, request):
 616      if get_store_details == "file_store":
 617          pytest.skip("FileStore is no longer supported.")
 618  
 619      def invoke_gc(*args):
 620          return CliRunner().invoke(gc, args, catch_exceptions=False)
 621  
 622      store, uri = request.getfixturevalue(get_store_details)
 623      exp_id_1 = store.create_experiment("1")
 624      run_id_1 = store.create_run(exp_id_1, user_id="user", start_time=0, tags=[], run_name="1")
 625      invoke_gc("--backend-store-uri", uri)
 626      experiments = store.search_experiments(view_type=ViewType.ALL)
 627      exp_ids = [e.experiment_id for e in experiments]
 628      runs = store.search_runs(experiment_ids=exp_ids, filter_string="", run_view_type=ViewType.ALL)
 629      assert sorted(exp_ids) == sorted([exp_id_1, store.DEFAULT_EXPERIMENT_ID])
 630      assert [r.info.run_id for r in runs] == [run_id_1.info.run_id]
 631  
 632      store.delete_experiment(exp_id_1)
 633      invoke_gc("--backend-store-uri", uri)
 634      experiments = store.search_experiments(view_type=ViewType.ALL)
 635      runs = store.search_runs(experiment_ids=exp_ids, filter_string="", run_view_type=ViewType.ALL)
 636      assert [e.experiment_id for e in experiments] == [store.DEFAULT_EXPERIMENT_ID]
 637      assert runs == []
 638  
 639      exp_id_2 = store.create_experiment("2")
 640      exp_id_3 = store.create_experiment("3")
 641      store.delete_experiment(exp_id_2)
 642      store.delete_experiment(exp_id_3)
 643      invoke_gc("--backend-store-uri", uri, "--experiment-ids", exp_id_2)
 644      experiments = store.search_experiments(view_type=ViewType.ALL)
 645      assert sorted([e.experiment_id for e in experiments]) == sorted([
 646          exp_id_3,
 647          store.DEFAULT_EXPERIMENT_ID,
 648      ])
 649  
 650      with mock.patch("time.time", return_value=0) as mock_time:
 651          exp_id_4 = store.create_experiment("4")
 652          store.delete_experiment(exp_id_4)
 653          mock_time.assert_called()
 654  
 655      invoke_gc("--backend-store-uri", uri, "--older-than", "1d")
 656      experiments = store.search_experiments(view_type=ViewType.ALL)
 657      assert sorted([e.experiment_id for e in experiments]) == sorted([
 658          exp_id_3,
 659          store.DEFAULT_EXPERIMENT_ID,
 660      ])
 661  
 662      invoke_gc("--backend-store-uri", uri, "--experiment-ids", exp_id_3, "--older-than", "0s")
 663      experiments = store.search_experiments(view_type=ViewType.ALL)
 664      assert [e.experiment_id for e in experiments] == [store.DEFAULT_EXPERIMENT_ID]
 665  
 666      exp_id_5 = store.create_experiment("5")
 667      store.delete_experiment(exp_id_5)
 668      with pytest.raises(MlflowException, match=r"Experiments .+ can be deleted."):
 669          invoke_gc(
 670              "--backend-store-uri",
 671              uri,
 672              "--experiment-ids",
 673              exp_id_5,
 674              "--older-than",
 675              "10d10h10m10s",
 676          )
 677      experiments = store.search_experiments(view_type=ViewType.ALL)
 678      assert sorted([e.experiment_id for e in experiments]) == sorted([
 679          exp_id_5,
 680          store.DEFAULT_EXPERIMENT_ID,
 681      ])
 682  
 683  
 684  def test_mlflow_gc_experiment_with_logged_model_params_tags_and_metrics(sqlite_store):
 685      store, uri = sqlite_store
 686      exp_id = store.create_experiment("exp_with_logged_model")
 687      run = store.create_run(exp_id, user_id="user", start_time=0, tags=[], run_name="run")
 688      run_id = run.info.run_id
 689      model = store.create_logged_model(experiment_id=exp_id)
 690  
 691      store.log_logged_model_params(
 692          model_id=model.model_id,
 693          params=[
 694              LoggedModelParameter(key="param1", value="value1"),
 695              LoggedModelParameter(key="param2", value="value2"),
 696          ],
 697      )
 698      store.set_logged_model_tags(
 699          model_id=model.model_id,
 700          tags=[
 701              LoggedModelTag(key="tag1", value="value1"),
 702              LoggedModelTag(key="tag2", value="value2"),
 703          ],
 704      )
 705      store._log_model_metrics(
 706          run_id=run_id,
 707          metrics=[Metric(key="m1", value=1.0, timestamp=0, step=0, model_id=model.model_id)],
 708          experiment_id=exp_id,
 709      )
 710  
 711      store.delete_experiment(exp_id)
 712  
 713      result = CliRunner().invoke(gc, ["--backend-store-uri", uri], catch_exceptions=False)
 714      assert result.exit_code == 0
 715  
 716      experiments = store.search_experiments(view_type=ViewType.ALL)
 717      assert [e.experiment_id for e in experiments] == [store.DEFAULT_EXPERIMENT_ID]
 718  
 719      with pytest.raises(MlflowException, match=r".+ not found"):
 720          store.get_logged_model(model.model_id)
 721  
 722      with store.ManagedSessionMaker() as session:
 723          for table in [SqlLoggedModelParam, SqlLoggedModelTag, SqlLoggedModelMetric]:
 724              assert session.query(table).count() == 0
 725  
 726  
 727  @pytest.fixture
 728  def sqlite_store_with_s3_artifact_repository(
 729      tmp_path: Path,
 730  ) -> tuple[SqlAlchemyStore, str, str]:
 731      db_path = tmp_path / "mlflow.db"
 732      db_uri = f"sqlite:///{db_path}"
 733      s3_uri = "s3://mlflow"
 734      store = SqlAlchemyStore(db_uri, s3_uri)
 735      return (store, db_uri, s3_uri)
 736  
 737  
 738  def test_mlflow_gc_sqlite_with_s3_artifact_repository(
 739      sqlite_store_with_s3_artifact_repository,
 740  ):
 741      store = sqlite_store_with_s3_artifact_repository[0]
 742      run = _create_run_in_store(store, create_artifacts=False)
 743      store.delete_run(run.info.run_id)
 744  
 745      artifact_repo = get_artifact_repository(run.info.artifact_uri)
 746      bucket, dest_path = artifact_repo.parse_s3_compliant_uri(run.info.artifact_uri)
 747      fake_artifact_path = os.path.join(dest_path, "fake_artifact.txt")
 748      with Stubber(artifact_repo._get_s3_client()) as s3_stubber:
 749          s3_stubber.add_response(
 750              "list_objects_v2",
 751              {"Contents": [{"Key": fake_artifact_path}]},
 752              {"Bucket": bucket, "Prefix": dest_path},
 753          )
 754          s3_stubber.add_response(
 755              "delete_objects",
 756              {"Deleted": [{"Key": fake_artifact_path}]},
 757              {"Bucket": bucket, "Delete": {"Objects": [{"Key": fake_artifact_path}]}},
 758          )
 759  
 760          CliRunner().invoke(
 761              gc,
 762              [
 763                  "--backend-store-uri",
 764                  sqlite_store_with_s3_artifact_repository[1],
 765                  "--artifacts-destination",
 766                  sqlite_store_with_s3_artifact_repository[2],
 767              ],
 768              catch_exceptions=False,
 769          )
 770  
 771          runs = store.search_runs(experiment_ids=["0"], filter_string="", run_view_type=ViewType.ALL)
 772          assert len(runs) == 0
 773          with pytest.raises(MlflowException, match=r"Run .+ not found"):
 774              store.get_run(run.info.run_id)
 775  
 776  
 777  @pytest.mark.skip(reason="mlserver is incompatible with the latest version of pydantic")
 778  @pytest.mark.parametrize(
 779      "enable_mlserver",
 780      [
 781          # MLServer is not supported in Windows yet, so let's skip this test in that case.
 782          # https://github.com/SeldonIO/MLServer/issues/361
 783          pytest.param(
 784              True,
 785              marks=pytest.mark.skipif(is_windows(), reason="MLServer is not supported in Windows"),
 786          ),
 787          False,
 788      ],
 789  )
 790  def test_mlflow_models_serve(enable_mlserver):
 791      class MyModel(pyfunc.PythonModel):
 792          def predict(self, context, model_input, params=None):
 793              return np.array([1, 2, 3])
 794  
 795      model = MyModel()
 796  
 797      with mlflow.start_run():
 798          if enable_mlserver:
 799              # We need MLServer to be present on the Conda environment, so we'll
 800              # add that as an extra requirement.
 801              mlflow.pyfunc.log_model(
 802                  name="model",
 803                  python_model=model,
 804                  extra_pip_requirements=[
 805                      "mlserver>=1.2.0,!=1.3.1",
 806                      "mlserver-mlflow>=1.2.0,!=1.3.1",
 807                      PROTOBUF_REQUIREMENT,
 808                  ],
 809              )
 810          else:
 811              mlflow.pyfunc.log_model(name="model", python_model=model)
 812          model_uri = mlflow.get_artifact_uri("model")
 813  
 814      data = pd.DataFrame({"a": [0]})
 815  
 816      extra_args = ["--env-manager", "local"]
 817      if enable_mlserver:
 818          extra_args.append("--enable-mlserver")
 819  
 820      scoring_response = pyfunc_serve_and_score_model(
 821          model_uri=model_uri,
 822          data=data,
 823          content_type=pyfunc.scoring_server.CONTENT_TYPE_JSON,
 824          extra_args=extra_args,
 825      )
 826      assert scoring_response.status_code == 200
 827      served_model_preds = np.array(json.loads(scoring_response.content)["predictions"])
 828      np.testing.assert_array_equal(served_model_preds, model.predict(data, None))
 829  
 830  
 831  def test_mlflow_tracking_disabled_in_artifacts_only_mode(tmp_path: Path):
 832      port = get_safe_port()
 833      with subprocess.Popen(
 834          [sys.executable, "-m", "mlflow", "server", "--port", str(port), "--artifacts-only"],
 835          cwd=tmp_path,
 836      ) as process:
 837          try:
 838              _await_server_up_or_die(port)
 839              resp = requests.get(f"http://localhost:{port}/api/2.0/mlflow/experiments/search")
 840              assert (
 841                  "Endpoint: /api/2.0/mlflow/experiments/search disabled due to the mlflow "
 842                  "server running in `--artifacts-only` mode." in resp.text
 843              )
 844          finally:
 845              kill_process_tree(process.pid)
 846  
 847  
 848  def test_mlflow_artifact_list_in_artifacts_only_mode(tmp_path: Path):
 849      port = get_safe_port()
 850      with subprocess.Popen(
 851          [sys.executable, "-m", "mlflow", "server", "--port", str(port), "--artifacts-only"],
 852          cwd=tmp_path,
 853      ) as process:
 854          try:
 855              _await_server_up_or_die(port)
 856              resp = requests.get(f"http://localhost:{port}/api/2.0/mlflow-artifacts/artifacts")
 857              augmented_raise_for_status(resp)
 858              assert resp.status_code == 200
 859              assert resp.text == "{}"
 860          finally:
 861              kill_process_tree(process.pid)
 862  
 863  
 864  def test_mlflow_artifact_service_unavailable_when_no_server_artifacts_is_specified():
 865      port = get_safe_port()
 866      with subprocess.Popen([
 867          sys.executable,
 868          "-m",
 869          "mlflow",
 870          "server",
 871          "--port",
 872          str(port),
 873          "--no-serve-artifacts",
 874      ]) as process:
 875          try:
 876              _await_server_up_or_die(port)
 877              endpoint = "/api/2.0/mlflow-artifacts/artifacts"
 878              resp = requests.get(f"http://localhost:{port}{endpoint}")
 879              assert (
 880                  f"Endpoint: {endpoint} disabled due to the mlflow server running with "
 881                  "`--no-serve-artifacts`" in resp.text
 882              )
 883          finally:
 884              kill_process_tree(process.pid)
 885  
 886  
 887  def test_mlflow_artifact_only_prints_warning_for_configs():
 888      with (
 889          mock.patch("mlflow.server._run_server") as run_server_mock,
 890          mock.patch("mlflow.store.tracking.sqlalchemy_store.SqlAlchemyStore"),
 891          mock.patch("mlflow.store.model_registry.sqlalchemy_store.SqlAlchemyStore"),
 892      ):
 893          result = CliRunner().invoke(
 894              server,
 895              ["--artifacts-only", "--backend-store-uri", "sqlite:///my.db"],
 896              catch_exceptions=False,
 897          )
 898          msg = (
 899              "Usage: server [OPTIONS]\nTry 'server --help' for help.\n\nError: You are starting a "
 900              "tracking server in `--artifacts-only` mode and have provided a value for "
 901              "`--backend_store_uri`"
 902          )
 903          assert msg in result.output
 904          assert result.exit_code != 0
 905          run_server_mock.assert_not_called()
 906  
 907  
 908  def test_mlflow_ui_is_alias_for_mlflow_server():
 909      mlflow_ui_stdout = subprocess.check_output(
 910          [sys.executable, "-m", "mlflow", "ui", "--help"], text=True
 911      )
 912      mlflow_server_stdout = subprocess.check_output(
 913          [sys.executable, "-m", "mlflow", "server", "--help"], text=True
 914      )
 915      assert (
 916          mlflow_ui_stdout.replace("Usage: python -m mlflow ui", "Usage: python -m mlflow server")
 917          == mlflow_server_stdout
 918      )
 919  
 920  
 921  def test_cli_with_python_mod():
 922      stdout = subprocess.check_output(
 923          [
 924              sys.executable,
 925              "-m",
 926              "mlflow",
 927              "--version",
 928          ],
 929          text=True,
 930      )
 931      assert stdout.rstrip().endswith(mlflow.__version__)
 932      stdout = subprocess.check_output(
 933          [
 934              sys.executable,
 935              "-m",
 936              "mlflow",
 937              "server",
 938              "--help",
 939          ],
 940          text=True,
 941      )
 942      assert "mlflow server" in stdout
 943  
 944  
 945  def test_doctor():
 946      res = CliRunner().invoke(doctor, catch_exceptions=False)
 947      assert res.exit_code == 0
 948  
 949  
 950  def test_env_file_loading(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
 951      # Setup: Create an experiment using the Python SDK
 952      test_tracking_uri = f"sqlite:///{tmp_path / 'mlflow.db'}"
 953      test_experiment_name = "test_experiment_from_env"
 954  
 955      # Create experiment using SDK
 956      mlflow.set_tracking_uri(test_tracking_uri)
 957      mlflow.create_experiment(test_experiment_name)
 958  
 959      # Create a test .env file pointing to this tracking URI
 960      env_file_path = tmp_path / "test.env"
 961      env_file_path.write_text(f"MLFLOW_TRACKING_URI={test_tracking_uri}\n")
 962  
 963      runner = CliRunner()
 964  
 965      # Clear the MLflow environment variables to ensure we're testing the loading
 966      monkeypatch.delenv("MLFLOW_TRACKING_URI", raising=False)
 967      monkeypatch.delenv("MLFLOW_EXPERIMENT_NAME", raising=False)
 968  
 969      # Ensure variables are not set before running command
 970      assert "MLFLOW_TRACKING_URI" not in os.environ
 971  
 972      # Use the existing experiments search CLI command with --env-file
 973      result = runner.invoke(
 974          cli,
 975          ["--env-file", str(env_file_path), "experiments", "search"],
 976          catch_exceptions=False,
 977      )
 978  
 979      # Check that the command executed successfully
 980      assert result.exit_code == 0
 981  
 982      # Verify the experiment we created is found (proves env vars were loaded)
 983      assert test_experiment_name in result.output
 984  
 985      # Verify the loading message
 986      assert "Loaded environment variables from:" in result.output
 987      assert str(env_file_path) in result.output
 988  
 989  
 990  def test_env_file_loading_invalid_path() -> None:
 991      runner = CliRunner()
 992  
 993      # Test error handling for non-existent file
 994      result = runner.invoke(
 995          cli,
 996          ["--env-file", "nonexistent.env", "experiments", "search"],
 997          catch_exceptions=False,
 998      )
 999      assert result.exit_code != 0
1000      assert "Environment file 'nonexistent.env' does not exist" in result.output
1001  
1002  
1003  @pytest.mark.parametrize(
1004      ("args", "expected_max_results", "expected_view"),
1005      [
1006          (["experiments", "search"], None, ViewType.ACTIVE_ONLY),
1007          (["experiments", "search", "--max-results", "50"], 50, ViewType.ACTIVE_ONLY),
1008          (
1009              ["experiments", "search", "--view", "all", "--max-results", "100"],
1010              100,
1011              ViewType.ALL,
1012          ),
1013      ],
1014  )
1015  def test_experiments_search_max_results(args, expected_max_results, expected_view):
1016      with mock.patch("mlflow.search_experiments", return_value=[]) as mock_search:
1017          result = CliRunner().invoke(cli, args, catch_exceptions=False)
1018          assert result.exit_code == 0
1019          mock_search.assert_called_once()
1020          call_kwargs = mock_search.call_args[1]
1021          assert call_kwargs.get("max_results") == expected_max_results
1022          assert call_kwargs.get("view_type") == expected_view
1023  
1024  
1025  def test_experiments_search_max_results_zero():
1026      with mock.patch("mlflow.search_experiments", return_value=[]) as mock_search:
1027          result = CliRunner().invoke(
1028              cli, ["experiments", "search", "--max-results", "0"], catch_exceptions=False
1029          )
1030          assert result.exit_code == 0
1031          mock_search.assert_called_once()
1032          call_kwargs = mock_search.call_args[1]
1033          assert call_kwargs.get("max_results") == 0
1034  
1035  
1036  def test_experiments_search_max_results_negative():
1037      result = CliRunner().invoke(
1038          cli, ["experiments", "search", "--max-results", "-1"], catch_exceptions=False
1039      )
1040      assert result.exit_code != 0
1041      assert "max-results must be a non-negative integer" in result.output
1042  
1043  
1044  def test_server_with_env_file(tmp_path):
1045      env_file = tmp_path / ".env"
1046      env_file.write_text("TEST_VAR=test_value\n")
1047  
1048      with mock.patch("mlflow.server._run_server") as run_server_mock:
1049          result = CliRunner().invoke(cli, ["--env-file", str(env_file), "server"])
1050          assert result.exit_code == 0
1051          run_server_mock.assert_called_once()
1052          # Verify env_file parameter is passed
1053          assert run_server_mock.call_args.kwargs["env_file"] == str(env_file)
1054  
1055  
1056  def test_mlflow_gc_with_datasets(sqlite_store):
1057      store = sqlite_store[0]
1058  
1059      mlflow.set_tracking_uri(sqlite_store[1])
1060      mlflow.set_experiment("dataset")
1061  
1062      dataset = numpy_dataset.from_numpy(np.array([1, 2, 3]))
1063  
1064      with mlflow.start_run() as run:
1065          experiment_id = run.info.experiment_id
1066          mlflow.log_input(dataset)
1067  
1068      experiments = store.search_experiments(view_type=ViewType.ALL)
1069  
1070      # default and datasets
1071      assert len(experiments) == 2
1072  
1073      store.delete_experiment(experiment_id)
1074  
1075      # the new experiment is only marked as deleted, not removed
1076      experiments = store.search_experiments(view_type=ViewType.ALL)
1077      assert len(experiments) == 2
1078  
1079      subprocess.check_call([
1080          sys.executable,
1081          "-m",
1082          "mlflow",
1083          "gc",
1084          "--backend-store-uri",
1085          sqlite_store[1],
1086      ])
1087      experiments = store.search_experiments(view_type=ViewType.ALL)
1088  
1089      # only default is left after GC
1090      assert len(experiments) == 1
1091      assert experiments[0].experiment_id == "0"
1092      with pytest.raises(MlflowException, match=f"No Experiment with id={experiment_id} exists"):
1093          store.get_experiment(experiment_id)
1094  
1095  
1096  @pytest.mark.parametrize("get_store_details", ["file_store", "sqlite_store"])
1097  def test_mlflow_gc_logged_model(get_store_details, request):
1098      if get_store_details == "file_store":
1099          pytest.skip("FileStore is no longer supported.")
1100      store, uri = request.getfixturevalue(get_store_details)
1101      exp_id = store.create_experiment("exp")
1102      model = store.create_logged_model(experiment_id=exp_id)
1103  
1104      store.delete_logged_model(model.model_id)
1105      subprocess.check_output([sys.executable, "-m", "mlflow", "gc", "--backend-store-uri", uri])
1106  
1107      with pytest.raises(MlflowException, match=r".+ not found"):
1108          store.get_logged_model(model.model_id)
1109  
1110  
1111  @pytest.mark.parametrize("get_store_details", ["file_store", "sqlite_store"])
1112  def test_mlflow_gc_logged_models_older_than(get_store_details, request):
1113      if get_store_details == "file_store":
1114          pytest.skip("FileStore is no longer supported.")
1115      store, uri = request.getfixturevalue(get_store_details)
1116      exp_id = store.create_experiment("exp")
1117      old_time = time.time() - (2 * 24 * 60 * 60)
1118      with mock.patch("time.time", return_value=old_time):
1119          model = store.create_logged_model(experiment_id=exp_id)
1120  
1121      store.delete_logged_model(model.model_id)
1122  
1123      subprocess.check_call([
1124          sys.executable,
1125          "-m",
1126          "mlflow",
1127          "gc",
1128          "--backend-store-uri",
1129          uri,
1130          "--older-than",
1131          "1d",
1132      ])
1133  
1134      retrieved_model = store.get_logged_model(model.model_id, allow_deleted=True)
1135      assert retrieved_model.model_id == model.model_id
1136  
1137  
1138  @pytest.mark.parametrize("get_store_details", ["file_store", "sqlite_store"])
1139  def test_mlflow_gc_logged_models_deletes_when_older_than(get_store_details, request):
1140      if get_store_details == "file_store":
1141          pytest.skip("FileStore is no longer supported.")
1142      store, uri = request.getfixturevalue(get_store_details)
1143      exp_id = store.create_experiment("exp")
1144      model = store.create_logged_model(experiment_id=exp_id)
1145  
1146      old_deletion_ms = int((time.time() - (2 * 24 * 60 * 60)) * 1000)
1147      with mock.patch("mlflow.utils.time.get_current_time_millis", return_value=old_deletion_ms):
1148          store.delete_logged_model(model.model_id)
1149  
1150      subprocess.check_call([
1151          sys.executable,
1152          "-m",
1153          "mlflow",
1154          "gc",
1155          "--backend-store-uri",
1156          uri,
1157          "--older-than",
1158          "1d",
1159      ])
1160  
1161      with pytest.raises(MlflowException, match=r".+ not found"):
1162          store.get_logged_model(model.model_id)
1163  
1164  
1165  @pytest.mark.parametrize("get_store_details", ["file_store", "sqlite_store"])
1166  def test_mlflow_gc_logged_models_mixed_time(get_store_details, request):
1167      if get_store_details == "file_store":
1168          pytest.skip("FileStore is no longer supported.")
1169      store, uri = request.getfixturevalue(get_store_details)
1170      exp_id = store.create_experiment("exp")
1171      old_model = store.create_logged_model(experiment_id=exp_id)
1172      recent_model = store.create_logged_model(experiment_id=exp_id)
1173  
1174      old_deletion_ms = int((time.time() - (3 * 24 * 60 * 60)) * 1000)
1175      with mock.patch("mlflow.utils.time.get_current_time_millis", return_value=old_deletion_ms):
1176          store.delete_logged_model(old_model.model_id)
1177  
1178      with mock.patch("mlflow.utils.time.get_current_time_millis") as current_time_mock:
1179          current_time_mock.return_value = int(time.time() * 1000)
1180          store.delete_logged_model(recent_model.model_id)
1181  
1182      subprocess.check_call([
1183          sys.executable,
1184          "-m",
1185          "mlflow",
1186          "gc",
1187          "--backend-store-uri",
1188          uri,
1189          "--older-than",
1190          "1d",
1191      ])
1192  
1193      with pytest.raises(MlflowException, match=r".+ not found"):
1194          store.get_logged_model(old_model.model_id)
1195  
1196      retrieved_model = store.get_logged_model(recent_model.model_id, allow_deleted=True)
1197      assert retrieved_model.model_id == recent_model.model_id
1198  
1199  
1200  @pytest.fixture
1201  def sqlite_store_with_jobs(
1202      db_uri: str, tmp_path: Path
1203  ) -> tuple[SqlAlchemyStore, SqlAlchemyJobStore, str]:
1204      artifact_uri = (tmp_path / "artifacts").as_uri()
1205      tracking_store = SqlAlchemyStore(db_uri, artifact_uri)
1206      job_store = SqlAlchemyJobStore(db_uri)
1207      return (tracking_store, job_store, db_uri)
1208  
1209  
1210  def _create_test_job(job_store, job_name="test_job", finalize=True):
1211      job = job_store.create_job(job_name=job_name, params="{}")
1212      if finalize:
1213          job_store.start_job(job.job_id)
1214          job_store.finish_job(job.job_id, result="done")
1215          return job_store.get_job(job.job_id)
1216      return job
1217  
1218  
1219  def _set_job_creation_time(job_store, job_id, creation_time_ms):
1220      from mlflow.store.tracking.dbmodels.models import SqlJob
1221  
1222      with job_store.ManagedSessionMaker() as session:
1223          job = session.query(SqlJob).filter(SqlJob.id == job_id).first()
1224          job.creation_time = creation_time_ms
1225  
1226  
1227  def test_mlflow_gc_no_jobs_deleted_without_jobs_flag(sqlite_store_with_jobs):
1228      _, job_store, db_uri = sqlite_store_with_jobs
1229  
1230      job1 = _create_test_job(job_store, "job1")
1231      job2 = _create_test_job(job_store, "job2")
1232  
1233      subprocess.check_call([
1234          sys.executable,
1235          "-m",
1236          "mlflow",
1237          "gc",
1238          "--backend-store-uri",
1239          db_uri,
1240      ])
1241  
1242      retrieved_job1 = job_store.get_job(job1.job_id)
1243      retrieved_job2 = job_store.get_job(job2.job_id)
1244      assert retrieved_job1.job_id == job1.job_id
1245      assert retrieved_job2.job_id == job2.job_id
1246  
1247  
1248  def test_mlflow_gc_all_jobs_deleted_with_jobs_flag(sqlite_store_with_jobs):
1249      _, job_store, db_uri = sqlite_store_with_jobs
1250  
1251      job1 = _create_test_job(job_store, "job1")
1252      job2 = _create_test_job(job_store, "job2")
1253      job3 = _create_test_job(job_store, "job3")
1254  
1255      subprocess.check_call([
1256          sys.executable,
1257          "-m",
1258          "mlflow",
1259          "gc",
1260          "--backend-store-uri",
1261          db_uri,
1262          "--jobs",
1263      ])
1264  
1265      with pytest.raises(MlflowException, match=r"Job .+ not found"):
1266          job_store.get_job(job1.job_id)
1267      with pytest.raises(MlflowException, match=r"Job .+ not found"):
1268          job_store.get_job(job2.job_id)
1269      with pytest.raises(MlflowException, match=r"Job .+ not found"):
1270          job_store.get_job(job3.job_id)
1271  
1272  
1273  def test_mlflow_gc_specific_jobs_deleted_with_job_ids(sqlite_store_with_jobs):
1274      _, job_store, db_uri = sqlite_store_with_jobs
1275  
1276      job1 = _create_test_job(job_store, "job1")
1277      job2 = _create_test_job(job_store, "job2")
1278      job3 = _create_test_job(job_store, "job3")
1279  
1280      subprocess.check_call([
1281          sys.executable,
1282          "-m",
1283          "mlflow",
1284          "gc",
1285          "--backend-store-uri",
1286          db_uri,
1287          "--job-ids",
1288          f"{job1.job_id},{job2.job_id}",
1289      ])
1290  
1291      with pytest.raises(MlflowException, match=r"Job .+ not found"):
1292          job_store.get_job(job1.job_id)
1293      with pytest.raises(MlflowException, match=r"Job .+ not found"):
1294          job_store.get_job(job2.job_id)
1295  
1296      retrieved_job3 = job_store.get_job(job3.job_id)
1297      assert retrieved_job3.job_id == job3.job_id
1298  
1299  
1300  def test_mlflow_gc_jobs_deleted_with_older_than_flag(sqlite_store_with_jobs):
1301      _, job_store, db_uri = sqlite_store_with_jobs
1302  
1303      old_job = _create_test_job(job_store, "old_job")
1304      new_job = _create_test_job(job_store, "new_job")
1305  
1306      one_hour_ago = get_current_time_millis() - (60 * 60 * 1000)
1307      _set_job_creation_time(job_store, old_job.job_id, one_hour_ago)
1308  
1309      subprocess.check_call([
1310          sys.executable,
1311          "-m",
1312          "mlflow",
1313          "gc",
1314          "--backend-store-uri",
1315          db_uri,
1316          "--jobs",
1317          "--older-than",
1318          "30m",
1319      ])
1320  
1321      with pytest.raises(MlflowException, match=r"Job .+ not found"):
1322          job_store.get_job(old_job.job_id)
1323  
1324      retrieved_new_job = job_store.get_job(new_job.job_id)
1325      assert retrieved_new_job.job_id == new_job.job_id
1326  
1327  
1328  def test_mlflow_gc_job_ids_with_older_than_filter(sqlite_store_with_jobs):
1329      _, job_store, db_uri = sqlite_store_with_jobs
1330  
1331      old_job = _create_test_job(job_store, "old_job")
1332      new_job = _create_test_job(job_store, "new_job")
1333  
1334      one_hour_ago = get_current_time_millis() - (60 * 60 * 1000)
1335      _set_job_creation_time(job_store, old_job.job_id, one_hour_ago)
1336  
1337      subprocess.check_call([
1338          sys.executable,
1339          "-m",
1340          "mlflow",
1341          "gc",
1342          "--backend-store-uri",
1343          db_uri,
1344          "--job-ids",
1345          new_job.job_id,
1346          "--older-than",
1347          "30m",
1348      ])
1349  
1350      retrieved_new_job = job_store.get_job(new_job.job_id)
1351      assert retrieved_new_job.job_id == new_job.job_id
1352  
1353      retrieved_old_job = job_store.get_job(old_job.job_id)
1354      assert retrieved_old_job.job_id == old_job.job_id
1355  
1356  
1357  def test_mlflow_gc_only_deletes_finalized_jobs(sqlite_store_with_jobs):
1358      from mlflow.entities._job_status import JobStatus
1359  
1360      _, job_store, db_uri = sqlite_store_with_jobs
1361  
1362      # Create jobs with different statuses
1363      pending_job = _create_test_job(job_store, "pending_job", finalize=False)
1364      assert pending_job.status == JobStatus.PENDING
1365  
1366      running_job = _create_test_job(job_store, "running_job", finalize=False)
1367      job_store.start_job(running_job.job_id)
1368      running_job = job_store.get_job(running_job.job_id)
1369      assert running_job.status == JobStatus.RUNNING
1370  
1371      succeeded_job = _create_test_job(job_store, "succeeded_job", finalize=True)
1372      assert succeeded_job.status == JobStatus.SUCCEEDED
1373  
1374      failed_job = _create_test_job(job_store, "failed_job", finalize=False)
1375      job_store.start_job(failed_job.job_id)
1376      job_store.fail_job(failed_job.job_id, "error")
1377      failed_job = job_store.get_job(failed_job.job_id)
1378      assert failed_job.status == JobStatus.FAILED
1379  
1380      timeout_job = _create_test_job(job_store, "timeout_job", finalize=False)
1381      job_store.start_job(timeout_job.job_id)
1382      job_store.mark_job_timed_out(timeout_job.job_id)
1383      timeout_job = job_store.get_job(timeout_job.job_id)
1384      assert timeout_job.status == JobStatus.TIMEOUT
1385  
1386      canceled_job = _create_test_job(job_store, "canceled_job", finalize=False)
1387      job_store.cancel_job(canceled_job.job_id)
1388      canceled_job = job_store.get_job(canceled_job.job_id)
1389      assert canceled_job.status == JobStatus.CANCELED
1390  
1391      subprocess.check_call([
1392          sys.executable,
1393          "-m",
1394          "mlflow",
1395          "gc",
1396          "--backend-store-uri",
1397          db_uri,
1398          "--jobs",
1399      ])
1400  
1401      retrieved_pending = job_store.get_job(pending_job.job_id)
1402      assert retrieved_pending.job_id == pending_job.job_id
1403      assert retrieved_pending.status == JobStatus.PENDING
1404  
1405      retrieved_running = job_store.get_job(running_job.job_id)
1406      assert retrieved_running.job_id == running_job.job_id
1407      assert retrieved_running.status == JobStatus.RUNNING
1408  
1409      with pytest.raises(MlflowException, match=r"Job .+ not found"):
1410          job_store.get_job(succeeded_job.job_id)
1411      with pytest.raises(MlflowException, match=r"Job .+ not found"):
1412          job_store.get_job(failed_job.job_id)
1413      with pytest.raises(MlflowException, match=r"Job .+ not found"):
1414          job_store.get_job(timeout_job.job_id)
1415      with pytest.raises(MlflowException, match=r"Job .+ not found"):
1416          job_store.get_job(canceled_job.job_id)