__init__.py
1 """ 2 The ``mlflow.projects`` module provides an API for running MLflow projects locally or remotely. 3 """ 4 5 import json 6 import logging 7 import os 8 9 import yaml 10 11 import mlflow.projects.databricks 12 from mlflow import tracking 13 from mlflow.entities import RunStatus 14 from mlflow.exceptions import ExecutionException, MlflowException 15 from mlflow.projects.backend import loader 16 from mlflow.projects.submitted_run import SubmittedRun 17 from mlflow.projects.utils import ( 18 MLFLOW_LOCAL_BACKEND_RUN_ID_CONFIG, 19 PROJECT_BUILD_IMAGE, 20 PROJECT_DOCKER_ARGS, 21 PROJECT_DOCKER_AUTH, 22 PROJECT_ENV_MANAGER, 23 PROJECT_STORAGE_DIR, 24 PROJECT_SYNCHRONOUS, 25 fetch_and_validate_project, 26 get_entry_point_command, 27 get_or_create_run, 28 get_run_env_vars, 29 load_project, 30 ) 31 from mlflow.tracking.fluent import _get_experiment_id 32 from mlflow.utils import env_manager as _EnvManager 33 from mlflow.utils.mlflow_tags import ( 34 MLFLOW_DOCKER_IMAGE_ID, 35 MLFLOW_PROJECT_BACKEND, 36 MLFLOW_PROJECT_ENV, 37 MLFLOW_RUN_NAME, 38 ) 39 40 _logger = logging.getLogger(__name__) 41 42 43 def _resolve_experiment_id(experiment_name=None, experiment_id=None): 44 """ 45 Resolve experiment. 46 47 Verifies either one or other is specified - cannot be both selected. 48 49 If ``experiment_name`` is provided and does not exist, an experiment 50 of that name is created and its id is returned. 51 52 Args: 53 experiment_name: Name of experiment under which to launch the run. 54 experiment_id: ID of experiment under which to launch the run. 55 56 Returns: 57 str 58 """ 59 60 if experiment_name and experiment_id: 61 raise MlflowException("Specify only one of 'experiment_name' or 'experiment_id'.") 62 63 if experiment_id: 64 return str(experiment_id) 65 66 if experiment_name: 67 client = tracking.MlflowClient() 68 if exp := client.get_experiment_by_name(experiment_name): 69 return exp.experiment_id 70 else: 71 _logger.info("'%s' does not exist. Creating a new experiment", experiment_name) 72 return client.create_experiment(experiment_name) 73 74 return _get_experiment_id() 75 76 77 def _run( 78 uri, 79 experiment_id, 80 entry_point, 81 version, 82 parameters, 83 docker_args, 84 backend_name, 85 backend_config, 86 storage_dir, 87 env_manager, 88 synchronous, 89 run_name, 90 build_image, 91 docker_auth, 92 ): 93 """ 94 Helper that delegates to the project-running method corresponding to the passed-in backend. 95 Returns a ``SubmittedRun`` corresponding to the project run. 96 """ 97 tracking_store_uri = tracking.get_tracking_uri() 98 backend_config[PROJECT_ENV_MANAGER] = env_manager 99 backend_config[PROJECT_SYNCHRONOUS] = synchronous 100 backend_config[PROJECT_DOCKER_ARGS] = docker_args 101 backend_config[PROJECT_STORAGE_DIR] = storage_dir 102 backend_config[PROJECT_BUILD_IMAGE] = build_image 103 backend_config[PROJECT_DOCKER_AUTH] = docker_auth 104 # TODO: remove this check once kubernetes execution has been refactored 105 if backend_name not in {"databricks", "kubernetes"}: 106 if backend := loader.load_backend(backend_name): 107 submitted_run = backend.run( 108 uri, 109 entry_point, 110 parameters, 111 version, 112 backend_config, 113 tracking_store_uri, 114 experiment_id, 115 ) 116 tracking.MlflowClient().set_tag( 117 submitted_run.run_id, MLFLOW_PROJECT_BACKEND, backend_name 118 ) 119 if run_name is not None: 120 tracking.MlflowClient().set_tag(submitted_run.run_id, MLFLOW_RUN_NAME, run_name) 121 return submitted_run 122 123 work_dir = fetch_and_validate_project(uri, version, entry_point, parameters) 124 project = load_project(work_dir) 125 _validate_execution_environment(project, backend_name) 126 127 active_run = get_or_create_run( 128 None, uri, experiment_id, work_dir, version, entry_point, parameters 129 ) 130 131 if run_name is not None: 132 tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_RUN_NAME, run_name) 133 134 if backend_name == "databricks": 135 tracking.MlflowClient().set_tag( 136 active_run.info.run_id, MLFLOW_PROJECT_BACKEND, "databricks" 137 ) 138 from mlflow.projects.databricks import run_databricks, run_databricks_spark_job 139 140 if project.databricks_spark_job_spec is not None: 141 return run_databricks_spark_job( 142 remote_run=active_run, 143 uri=uri, 144 work_dir=work_dir, 145 experiment_id=experiment_id, 146 cluster_spec=backend_config, 147 project_spec=project, 148 entry_point=entry_point, 149 parameters=parameters, 150 ) 151 152 return run_databricks( 153 remote_run=active_run, 154 uri=uri, 155 entry_point=entry_point, 156 work_dir=work_dir, 157 parameters=parameters, 158 experiment_id=experiment_id, 159 cluster_spec=backend_config, 160 env_manager=env_manager, 161 ) 162 163 elif backend_name == "kubernetes": 164 from mlflow.projects import kubernetes as kb 165 from mlflow.projects.docker import ( 166 build_docker_image, 167 validate_docker_env, 168 validate_docker_installation, 169 ) 170 171 tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_PROJECT_ENV, "docker") 172 tracking.MlflowClient().set_tag( 173 active_run.info.run_id, MLFLOW_PROJECT_BACKEND, "kubernetes" 174 ) 175 validate_docker_env(project) 176 validate_docker_installation() 177 kube_config = _parse_kubernetes_config(backend_config) 178 image = build_docker_image( 179 work_dir=work_dir, 180 repository_uri=kube_config["repository-uri"], 181 base_image=project.docker_env.get("image"), 182 run_id=active_run.info.run_id, 183 build_image=build_image, 184 docker_auth=docker_auth, 185 ) 186 image_digest = kb.push_image_to_registry(image.tags[0]) 187 tracking.MlflowClient().set_tag( 188 active_run.info.run_id, MLFLOW_DOCKER_IMAGE_ID, image_digest 189 ) 190 return kb.run_kubernetes_job( 191 project.name, 192 active_run, 193 image.tags[0], 194 image_digest, 195 get_entry_point_command(project, entry_point, parameters, storage_dir), 196 get_run_env_vars( 197 run_id=active_run.info.run_id, experiment_id=active_run.info.experiment_id 198 ), 199 kube_config.get("kube-context", None), 200 kube_config["kube-job-template"], 201 ) 202 203 supported_backends = ["databricks", "kubernetes"] + list(loader.MLFLOW_BACKENDS.keys()) 204 raise ExecutionException( 205 f"Got unsupported execution mode {backend_name}. Supported values: {supported_backends}" 206 ) 207 208 209 def run( 210 uri, 211 entry_point="main", 212 version=None, 213 parameters=None, 214 docker_args=None, 215 experiment_name=None, 216 experiment_id=None, 217 backend="local", 218 backend_config=None, 219 storage_dir=None, 220 synchronous=True, 221 run_id=None, 222 run_name=None, 223 env_manager=None, 224 build_image=False, 225 docker_auth=None, 226 ): 227 """ 228 Run an MLflow project. The project can be local or stored at a Git URI. 229 230 MLflow provides built-in support for running projects locally or remotely on a Databricks or 231 Kubernetes cluster. You can also run projects against other targets by installing an appropriate 232 third-party plugin. See `Community Plugins <../plugins.html#community-plugins>`_ for more 233 information. 234 235 For information on using this method in chained workflows, see `Building Multistep Workflows 236 <../projects.html#building-multistep-workflows>`_. 237 238 Raises: 239 :py:class:`mlflow.exceptions.ExecutionException` If a run launched in blocking mode 240 is unsuccessful. 241 242 Args: 243 uri: URI of project to run. A local filesystem path 244 or a Git repository URI (e.g. https://github.com/mlflow/mlflow-example) 245 pointing to a project directory containing an MLproject file. 246 entry_point: Entry point to run within the project. If no entry point with the specified 247 name is found, runs the project file ``entry_point`` as a script, 248 using "python" to run ``.py`` files and the default shell (specified by 249 environment variable ``$SHELL``) to run ``.sh`` files. 250 version: For Git-based projects, either a commit hash or a branch name. 251 parameters: Parameters (dictionary) for the entry point command. 252 docker_args: Arguments (dictionary) for the docker command. 253 experiment_name: Name of experiment under which to launch the run. 254 experiment_id: ID of experiment under which to launch the run. 255 backend: Execution backend for the run: MLflow provides built-in support for "local", 256 "databricks", and "kubernetes" (experimental) backends. If running against 257 Databricks, will run against a Databricks workspace determined as follows: 258 if a Databricks tracking URI of the form ``databricks://profile`` has been set 259 (e.g. by setting the MLFLOW_TRACKING_URI environment variable), will run 260 against the workspace specified by <profile>. Otherwise, runs against the 261 workspace specified by the default Databricks CLI profile. 262 backend_config: A dictionary, or a path to a JSON file (must end in '.json'), which will 263 be passed as config to the backend. The exact content which should be 264 provided is different for each execution backend and is documented 265 at https://www.mlflow.org/docs/latest/projects.html. 266 storage_dir: Used only if ``backend`` is "local". MLflow downloads artifacts from 267 distributed URIs passed to parameters of type ``path`` to subdirectories of 268 ``storage_dir``. 269 synchronous: Whether to block while waiting for a run to complete. Defaults to True. 270 Note that if ``synchronous`` is False and ``backend`` is "local", this 271 method will return, but the current process will block when exiting until 272 the local run completes. If the current process is interrupted, any 273 asynchronous runs launched via this method will be terminated. If 274 ``synchronous`` is True and the run fails, the current process will 275 error out as well. 276 run_id: Note: this argument is used internally by the MLflow project APIs and should 277 not be specified. If specified, the run ID will be used instead of 278 creating a new run. 279 run_name: The name to give the MLflow Run associated with the project execution. 280 If ``None``, the MLflow Run name is left unset. 281 env_manager: Specify an environment manager to create a new environment for the run and 282 install project dependencies within that environment. The following values 283 are supported: 284 285 - local: use the local environment 286 - virtualenv: use virtualenv (and pyenv for Python version management) 287 - uv: use uv 288 - conda: use conda 289 290 If unspecified, MLflow automatically determines the environment manager to 291 use by inspecting files in the project directory. For example, if 292 ``python_env.yaml`` is present, virtualenv will be used. 293 build_image: Whether to build a new docker image of the project or to reuse an existing 294 image. Default: False (reuse an existing image) 295 docker_auth: A dictionary representing information to authenticate with a Docker 296 registry. See `docker.client.DockerClient.login 297 <https://docker-py.readthedocs.io/en/stable/client.html#docker.client.DockerClient.login>`_ 298 for available options. 299 300 Returns: 301 :py:class:`mlflow.projects.SubmittedRun` exposing information (e.g. run ID) 302 about the launched run. 303 304 .. code-block:: python 305 :caption: Example 306 307 import mlflow 308 309 project_uri = "https://github.com/mlflow/mlflow-example" 310 params = {"alpha": 0.5, "l1_ratio": 0.01} 311 312 # Run MLflow project and create a reproducible conda environment 313 # on a local host 314 mlflow.run(project_uri, parameters=params) 315 316 .. code-block:: text 317 :caption: Output 318 319 ... 320 ... 321 Elasticnet model (alpha=0.500000, l1_ratio=0.010000): 322 RMSE: 0.788347345611717 323 MAE: 0.6155576449938276 324 R2: 0.19729662005412607 325 ... mlflow.projects: === Run (ID '6a5109febe5e4a549461e149590d0a7c') succeeded === 326 """ 327 backend_config_dict = backend_config if backend_config is not None else {} 328 if ( 329 backend_config 330 and type(backend_config) != dict 331 and os.path.splitext(backend_config)[-1] == ".json" 332 ): 333 with open(backend_config) as handle: 334 try: 335 backend_config_dict = json.load(handle) 336 except ValueError: 337 _logger.error( 338 "Error when attempting to load and parse JSON cluster spec from file %s", 339 backend_config, 340 ) 341 raise 342 343 if env_manager is not None: 344 _EnvManager.validate(env_manager) 345 346 if backend == "databricks": 347 mlflow.projects.databricks.before_run_validations(mlflow.get_tracking_uri(), backend_config) 348 elif backend == "local" and run_id is not None: 349 backend_config_dict[MLFLOW_LOCAL_BACKEND_RUN_ID_CONFIG] = run_id 350 351 experiment_id = _resolve_experiment_id( 352 experiment_name=experiment_name, experiment_id=experiment_id 353 ) 354 355 submitted_run_obj = _run( 356 uri=uri, 357 experiment_id=experiment_id, 358 entry_point=entry_point, 359 version=version, 360 parameters=parameters, 361 docker_args=docker_args, 362 backend_name=backend, 363 backend_config=backend_config_dict, 364 env_manager=env_manager, 365 storage_dir=storage_dir, 366 synchronous=synchronous, 367 run_name=run_name, 368 build_image=build_image, 369 docker_auth=docker_auth, 370 ) 371 if synchronous: 372 _wait_for(submitted_run_obj) 373 return submitted_run_obj 374 375 376 def _wait_for(submitted_run_obj): 377 """Wait on the passed-in submitted run, reporting its status to the tracking server.""" 378 run_id = submitted_run_obj.run_id 379 active_run = None 380 # Note: there's a small chance we fail to report the run's status to the tracking server if 381 # we're interrupted before we reach the try block below 382 try: 383 active_run = tracking.MlflowClient().get_run(run_id) if run_id is not None else None 384 if submitted_run_obj.wait(): 385 _logger.info("=== Run (ID '%s') succeeded ===", run_id) 386 _maybe_set_run_terminated(active_run, "FINISHED") 387 else: 388 _maybe_set_run_terminated(active_run, "FAILED") 389 raise ExecutionException(f"Run (ID '{run_id}') failed") 390 except KeyboardInterrupt: 391 _logger.error("=== Run (ID '%s') interrupted, cancelling run ===", run_id) 392 submitted_run_obj.cancel() 393 _maybe_set_run_terminated(active_run, "FAILED") 394 raise 395 396 397 def _maybe_set_run_terminated(active_run, status): 398 """ 399 If the passed-in active run is defined and still running (i.e. hasn't already been terminated 400 within user code), mark it as terminated with the passed-in status. 401 """ 402 if active_run is None: 403 return 404 run_id = active_run.info.run_id 405 cur_status = tracking.MlflowClient().get_run(run_id).info.status 406 if RunStatus.is_terminated(cur_status): 407 return 408 tracking.MlflowClient().set_terminated(run_id, status) 409 410 411 def _validate_execution_environment(project, backend): 412 if project.docker_env and backend == "databricks": 413 raise ExecutionException( 414 "Running docker-based projects on Databricks is not yet supported." 415 ) 416 417 418 def _parse_kubernetes_config(backend_config): 419 """ 420 Creates build context tarfile containing Dockerfile and project code, returning path to tarfile 421 """ 422 if not backend_config: 423 raise ExecutionException("Backend_config file not found.") 424 kube_config = backend_config.copy() 425 if "kube-job-template-path" not in backend_config.keys(): 426 raise ExecutionException( 427 "'kube-job-template-path' attribute must be specified in backend_config." 428 ) 429 kube_job_template = backend_config["kube-job-template-path"] 430 if os.path.exists(kube_job_template): 431 with open(kube_job_template) as job_template: 432 yaml_obj = yaml.safe_load(job_template.read()) 433 kube_job_template = yaml_obj 434 kube_config["kube-job-template"] = kube_job_template 435 else: 436 raise ExecutionException(f"Could not find 'kube-job-template-path': {kube_job_template}") 437 if "kube-context" not in backend_config.keys(): 438 _logger.debug( 439 "Could not find kube-context in backend_config." 440 " Using current context or in-cluster config." 441 ) 442 if "repository-uri" not in backend_config.keys(): 443 raise ExecutionException("Could not find 'repository-uri' in backend_config.") 444 return kube_config 445 446 447 __all__ = ["run", "SubmittedRun"]