/ mlflow / projects / __init__.py
__init__.py
  1  """
  2  The ``mlflow.projects`` module provides an API for running MLflow projects locally or remotely.
  3  """
  4  
  5  import json
  6  import logging
  7  import os
  8  
  9  import yaml
 10  
 11  import mlflow.projects.databricks
 12  from mlflow import tracking
 13  from mlflow.entities import RunStatus
 14  from mlflow.exceptions import ExecutionException, MlflowException
 15  from mlflow.projects.backend import loader
 16  from mlflow.projects.submitted_run import SubmittedRun
 17  from mlflow.projects.utils import (
 18      MLFLOW_LOCAL_BACKEND_RUN_ID_CONFIG,
 19      PROJECT_BUILD_IMAGE,
 20      PROJECT_DOCKER_ARGS,
 21      PROJECT_DOCKER_AUTH,
 22      PROJECT_ENV_MANAGER,
 23      PROJECT_STORAGE_DIR,
 24      PROJECT_SYNCHRONOUS,
 25      fetch_and_validate_project,
 26      get_entry_point_command,
 27      get_or_create_run,
 28      get_run_env_vars,
 29      load_project,
 30  )
 31  from mlflow.tracking.fluent import _get_experiment_id
 32  from mlflow.utils import env_manager as _EnvManager
 33  from mlflow.utils.mlflow_tags import (
 34      MLFLOW_DOCKER_IMAGE_ID,
 35      MLFLOW_PROJECT_BACKEND,
 36      MLFLOW_PROJECT_ENV,
 37      MLFLOW_RUN_NAME,
 38  )
 39  
 40  _logger = logging.getLogger(__name__)
 41  
 42  
 43  def _resolve_experiment_id(experiment_name=None, experiment_id=None):
 44      """
 45      Resolve experiment.
 46  
 47      Verifies either one or other is specified - cannot be both selected.
 48  
 49      If ``experiment_name`` is provided and does not exist, an experiment
 50      of that name is created and its id is returned.
 51  
 52      Args:
 53          experiment_name: Name of experiment under which to launch the run.
 54          experiment_id: ID of experiment under which to launch the run.
 55  
 56      Returns:
 57          str
 58      """
 59  
 60      if experiment_name and experiment_id:
 61          raise MlflowException("Specify only one of 'experiment_name' or 'experiment_id'.")
 62  
 63      if experiment_id:
 64          return str(experiment_id)
 65  
 66      if experiment_name:
 67          client = tracking.MlflowClient()
 68          if exp := client.get_experiment_by_name(experiment_name):
 69              return exp.experiment_id
 70          else:
 71              _logger.info("'%s' does not exist. Creating a new experiment", experiment_name)
 72              return client.create_experiment(experiment_name)
 73  
 74      return _get_experiment_id()
 75  
 76  
 77  def _run(
 78      uri,
 79      experiment_id,
 80      entry_point,
 81      version,
 82      parameters,
 83      docker_args,
 84      backend_name,
 85      backend_config,
 86      storage_dir,
 87      env_manager,
 88      synchronous,
 89      run_name,
 90      build_image,
 91      docker_auth,
 92  ):
 93      """
 94      Helper that delegates to the project-running method corresponding to the passed-in backend.
 95      Returns a ``SubmittedRun`` corresponding to the project run.
 96      """
 97      tracking_store_uri = tracking.get_tracking_uri()
 98      backend_config[PROJECT_ENV_MANAGER] = env_manager
 99      backend_config[PROJECT_SYNCHRONOUS] = synchronous
100      backend_config[PROJECT_DOCKER_ARGS] = docker_args
101      backend_config[PROJECT_STORAGE_DIR] = storage_dir
102      backend_config[PROJECT_BUILD_IMAGE] = build_image
103      backend_config[PROJECT_DOCKER_AUTH] = docker_auth
104      # TODO: remove this check once kubernetes execution has been refactored
105      if backend_name not in {"databricks", "kubernetes"}:
106          if backend := loader.load_backend(backend_name):
107              submitted_run = backend.run(
108                  uri,
109                  entry_point,
110                  parameters,
111                  version,
112                  backend_config,
113                  tracking_store_uri,
114                  experiment_id,
115              )
116              tracking.MlflowClient().set_tag(
117                  submitted_run.run_id, MLFLOW_PROJECT_BACKEND, backend_name
118              )
119              if run_name is not None:
120                  tracking.MlflowClient().set_tag(submitted_run.run_id, MLFLOW_RUN_NAME, run_name)
121              return submitted_run
122  
123      work_dir = fetch_and_validate_project(uri, version, entry_point, parameters)
124      project = load_project(work_dir)
125      _validate_execution_environment(project, backend_name)
126  
127      active_run = get_or_create_run(
128          None, uri, experiment_id, work_dir, version, entry_point, parameters
129      )
130  
131      if run_name is not None:
132          tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_RUN_NAME, run_name)
133  
134      if backend_name == "databricks":
135          tracking.MlflowClient().set_tag(
136              active_run.info.run_id, MLFLOW_PROJECT_BACKEND, "databricks"
137          )
138          from mlflow.projects.databricks import run_databricks, run_databricks_spark_job
139  
140          if project.databricks_spark_job_spec is not None:
141              return run_databricks_spark_job(
142                  remote_run=active_run,
143                  uri=uri,
144                  work_dir=work_dir,
145                  experiment_id=experiment_id,
146                  cluster_spec=backend_config,
147                  project_spec=project,
148                  entry_point=entry_point,
149                  parameters=parameters,
150              )
151  
152          return run_databricks(
153              remote_run=active_run,
154              uri=uri,
155              entry_point=entry_point,
156              work_dir=work_dir,
157              parameters=parameters,
158              experiment_id=experiment_id,
159              cluster_spec=backend_config,
160              env_manager=env_manager,
161          )
162  
163      elif backend_name == "kubernetes":
164          from mlflow.projects import kubernetes as kb
165          from mlflow.projects.docker import (
166              build_docker_image,
167              validate_docker_env,
168              validate_docker_installation,
169          )
170  
171          tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_PROJECT_ENV, "docker")
172          tracking.MlflowClient().set_tag(
173              active_run.info.run_id, MLFLOW_PROJECT_BACKEND, "kubernetes"
174          )
175          validate_docker_env(project)
176          validate_docker_installation()
177          kube_config = _parse_kubernetes_config(backend_config)
178          image = build_docker_image(
179              work_dir=work_dir,
180              repository_uri=kube_config["repository-uri"],
181              base_image=project.docker_env.get("image"),
182              run_id=active_run.info.run_id,
183              build_image=build_image,
184              docker_auth=docker_auth,
185          )
186          image_digest = kb.push_image_to_registry(image.tags[0])
187          tracking.MlflowClient().set_tag(
188              active_run.info.run_id, MLFLOW_DOCKER_IMAGE_ID, image_digest
189          )
190          return kb.run_kubernetes_job(
191              project.name,
192              active_run,
193              image.tags[0],
194              image_digest,
195              get_entry_point_command(project, entry_point, parameters, storage_dir),
196              get_run_env_vars(
197                  run_id=active_run.info.run_id, experiment_id=active_run.info.experiment_id
198              ),
199              kube_config.get("kube-context", None),
200              kube_config["kube-job-template"],
201          )
202  
203      supported_backends = ["databricks", "kubernetes"] + list(loader.MLFLOW_BACKENDS.keys())
204      raise ExecutionException(
205          f"Got unsupported execution mode {backend_name}. Supported values: {supported_backends}"
206      )
207  
208  
209  def run(
210      uri,
211      entry_point="main",
212      version=None,
213      parameters=None,
214      docker_args=None,
215      experiment_name=None,
216      experiment_id=None,
217      backend="local",
218      backend_config=None,
219      storage_dir=None,
220      synchronous=True,
221      run_id=None,
222      run_name=None,
223      env_manager=None,
224      build_image=False,
225      docker_auth=None,
226  ):
227      """
228      Run an MLflow project. The project can be local or stored at a Git URI.
229  
230      MLflow provides built-in support for running projects locally or remotely on a Databricks or
231      Kubernetes cluster. You can also run projects against other targets by installing an appropriate
232      third-party plugin. See `Community Plugins <../plugins.html#community-plugins>`_ for more
233      information.
234  
235      For information on using this method in chained workflows, see `Building Multistep Workflows
236      <../projects.html#building-multistep-workflows>`_.
237  
238      Raises:
239          :py:class:`mlflow.exceptions.ExecutionException` If a run launched in blocking mode
240              is unsuccessful.
241  
242      Args:
243          uri: URI of project to run. A local filesystem path
244              or a Git repository URI (e.g. https://github.com/mlflow/mlflow-example)
245              pointing to a project directory containing an MLproject file.
246          entry_point: Entry point to run within the project. If no entry point with the specified
247              name is found, runs the project file ``entry_point`` as a script,
248              using "python" to run ``.py`` files and the default shell (specified by
249              environment variable ``$SHELL``) to run ``.sh`` files.
250          version: For Git-based projects, either a commit hash or a branch name.
251          parameters: Parameters (dictionary) for the entry point command.
252          docker_args: Arguments (dictionary) for the docker command.
253          experiment_name: Name of experiment under which to launch the run.
254          experiment_id: ID of experiment under which to launch the run.
255          backend: Execution backend for the run: MLflow provides built-in support for "local",
256              "databricks", and "kubernetes" (experimental) backends. If running against
257              Databricks, will run against a Databricks workspace determined as follows:
258              if a Databricks tracking URI of the form ``databricks://profile`` has been set
259              (e.g. by setting the MLFLOW_TRACKING_URI environment variable), will run
260              against the workspace specified by <profile>. Otherwise, runs against the
261              workspace specified by the default Databricks CLI profile.
262          backend_config: A dictionary, or a path to a JSON file (must end in '.json'), which will
263              be passed as config to the backend. The exact content which should be
264              provided is different for each execution backend and is documented
265              at https://www.mlflow.org/docs/latest/projects.html.
266          storage_dir: Used only if ``backend`` is "local". MLflow downloads artifacts from
267              distributed URIs passed to parameters of type ``path`` to subdirectories of
268              ``storage_dir``.
269          synchronous: Whether to block while waiting for a run to complete. Defaults to True.
270              Note that if ``synchronous`` is False and ``backend`` is "local", this
271              method will return, but the current process will block when exiting until
272              the local run completes. If the current process is interrupted, any
273              asynchronous runs launched via this method will be terminated. If
274              ``synchronous`` is True and the run fails, the current process will
275              error out as well.
276          run_id: Note: this argument is used internally by the MLflow project APIs and should
277              not be specified. If specified, the run ID will be used instead of
278              creating a new run.
279          run_name: The name to give the MLflow Run associated with the project execution.
280              If ``None``, the MLflow Run name is left unset.
281          env_manager: Specify an environment manager to create a new environment for the run and
282              install project dependencies within that environment. The following values
283              are supported:
284  
285              - local: use the local environment
286              - virtualenv: use virtualenv (and pyenv for Python version management)
287              - uv: use uv
288              - conda: use conda
289  
290              If unspecified, MLflow automatically determines the environment manager to
291              use by inspecting files in the project directory. For example, if
292              ``python_env.yaml`` is present, virtualenv will be used.
293          build_image: Whether to build a new docker image of the project or to reuse an existing
294              image. Default: False (reuse an existing image)
295          docker_auth: A dictionary representing information to authenticate with a Docker
296              registry. See `docker.client.DockerClient.login
297              <https://docker-py.readthedocs.io/en/stable/client.html#docker.client.DockerClient.login>`_
298              for available options.
299  
300      Returns:
301          :py:class:`mlflow.projects.SubmittedRun` exposing information (e.g. run ID)
302          about the launched run.
303  
304      .. code-block:: python
305          :caption: Example
306  
307          import mlflow
308  
309          project_uri = "https://github.com/mlflow/mlflow-example"
310          params = {"alpha": 0.5, "l1_ratio": 0.01}
311  
312          # Run MLflow project and create a reproducible conda environment
313          # on a local host
314          mlflow.run(project_uri, parameters=params)
315  
316      .. code-block:: text
317          :caption: Output
318  
319          ...
320          ...
321          Elasticnet model (alpha=0.500000, l1_ratio=0.010000):
322          RMSE: 0.788347345611717
323          MAE: 0.6155576449938276
324          R2: 0.19729662005412607
325          ... mlflow.projects: === Run (ID '6a5109febe5e4a549461e149590d0a7c') succeeded ===
326      """
327      backend_config_dict = backend_config if backend_config is not None else {}
328      if (
329          backend_config
330          and type(backend_config) != dict
331          and os.path.splitext(backend_config)[-1] == ".json"
332      ):
333          with open(backend_config) as handle:
334              try:
335                  backend_config_dict = json.load(handle)
336              except ValueError:
337                  _logger.error(
338                      "Error when attempting to load and parse JSON cluster spec from file %s",
339                      backend_config,
340                  )
341                  raise
342  
343      if env_manager is not None:
344          _EnvManager.validate(env_manager)
345  
346      if backend == "databricks":
347          mlflow.projects.databricks.before_run_validations(mlflow.get_tracking_uri(), backend_config)
348      elif backend == "local" and run_id is not None:
349          backend_config_dict[MLFLOW_LOCAL_BACKEND_RUN_ID_CONFIG] = run_id
350  
351      experiment_id = _resolve_experiment_id(
352          experiment_name=experiment_name, experiment_id=experiment_id
353      )
354  
355      submitted_run_obj = _run(
356          uri=uri,
357          experiment_id=experiment_id,
358          entry_point=entry_point,
359          version=version,
360          parameters=parameters,
361          docker_args=docker_args,
362          backend_name=backend,
363          backend_config=backend_config_dict,
364          env_manager=env_manager,
365          storage_dir=storage_dir,
366          synchronous=synchronous,
367          run_name=run_name,
368          build_image=build_image,
369          docker_auth=docker_auth,
370      )
371      if synchronous:
372          _wait_for(submitted_run_obj)
373      return submitted_run_obj
374  
375  
376  def _wait_for(submitted_run_obj):
377      """Wait on the passed-in submitted run, reporting its status to the tracking server."""
378      run_id = submitted_run_obj.run_id
379      active_run = None
380      # Note: there's a small chance we fail to report the run's status to the tracking server if
381      # we're interrupted before we reach the try block below
382      try:
383          active_run = tracking.MlflowClient().get_run(run_id) if run_id is not None else None
384          if submitted_run_obj.wait():
385              _logger.info("=== Run (ID '%s') succeeded ===", run_id)
386              _maybe_set_run_terminated(active_run, "FINISHED")
387          else:
388              _maybe_set_run_terminated(active_run, "FAILED")
389              raise ExecutionException(f"Run (ID '{run_id}') failed")
390      except KeyboardInterrupt:
391          _logger.error("=== Run (ID '%s') interrupted, cancelling run ===", run_id)
392          submitted_run_obj.cancel()
393          _maybe_set_run_terminated(active_run, "FAILED")
394          raise
395  
396  
397  def _maybe_set_run_terminated(active_run, status):
398      """
399      If the passed-in active run is defined and still running (i.e. hasn't already been terminated
400      within user code), mark it as terminated with the passed-in status.
401      """
402      if active_run is None:
403          return
404      run_id = active_run.info.run_id
405      cur_status = tracking.MlflowClient().get_run(run_id).info.status
406      if RunStatus.is_terminated(cur_status):
407          return
408      tracking.MlflowClient().set_terminated(run_id, status)
409  
410  
411  def _validate_execution_environment(project, backend):
412      if project.docker_env and backend == "databricks":
413          raise ExecutionException(
414              "Running docker-based projects on Databricks is not yet supported."
415          )
416  
417  
418  def _parse_kubernetes_config(backend_config):
419      """
420      Creates build context tarfile containing Dockerfile and project code, returning path to tarfile
421      """
422      if not backend_config:
423          raise ExecutionException("Backend_config file not found.")
424      kube_config = backend_config.copy()
425      if "kube-job-template-path" not in backend_config.keys():
426          raise ExecutionException(
427              "'kube-job-template-path' attribute must be specified in backend_config."
428          )
429      kube_job_template = backend_config["kube-job-template-path"]
430      if os.path.exists(kube_job_template):
431          with open(kube_job_template) as job_template:
432              yaml_obj = yaml.safe_load(job_template.read())
433          kube_job_template = yaml_obj
434          kube_config["kube-job-template"] = kube_job_template
435      else:
436          raise ExecutionException(f"Could not find 'kube-job-template-path': {kube_job_template}")
437      if "kube-context" not in backend_config.keys():
438          _logger.debug(
439              "Could not find kube-context in backend_config."
440              " Using current context or in-cluster config."
441          )
442      if "repository-uri" not in backend_config.keys():
443          raise ExecutionException("Could not find 'repository-uri' in backend_config.")
444      return kube_config
445  
446  
447  __all__ = ["run", "SubmittedRun"]