/ mlflow / tracking / _workspace / fluent.py
fluent.py
  1  from __future__ import annotations
  2  
  3  import threading
  4  from typing import Callable, TypeVar
  5  
  6  from mlflow.entities.workspace import Workspace, WorkspaceDeletionMode
  7  from mlflow.exceptions import MlflowException, RestException
  8  from mlflow.protos import databricks_pb2
  9  from mlflow.protos.databricks_pb2 import FEATURE_DISABLED
 10  from mlflow.store.workspace.abstract_store import WorkspaceNameValidator
 11  from mlflow.tracking.client import MlflowClient
 12  from mlflow.utils.annotations import experimental
 13  from mlflow.utils.workspace_context import set_workspace as set_context_workspace
 14  from mlflow.utils.workspace_utils import DEFAULT_WORKSPACE_NAME
 15  
 16  T = TypeVar("T")
 17  
 18  _workspace_lock = threading.Lock()
 19  
 20  
 21  def _workspace_client_call(func: Callable[[MlflowClient], T]) -> T:
 22      client = MlflowClient()
 23      try:
 24          return func(client)
 25      except RestException as exc:
 26          if exc.error_code == databricks_pb2.ErrorCode.Name(databricks_pb2.ENDPOINT_NOT_FOUND):
 27              raise MlflowException(
 28                  "The configured tracking server does not expose workspace APIs. "
 29                  "Ensure workspace is enabled.",
 30                  error_code=FEATURE_DISABLED,
 31              ) from exc
 32          raise
 33  
 34  
 35  @experimental(version="3.10.0")
 36  def set_workspace(workspace: str | None) -> None:
 37      """Set the active workspace for subsequent MLflow operations."""
 38  
 39      with _workspace_lock:
 40          if workspace is None:
 41              set_context_workspace(None)
 42              return
 43  
 44          if workspace != DEFAULT_WORKSPACE_NAME:
 45              WorkspaceNameValidator.validate(workspace)
 46  
 47          set_context_workspace(workspace)
 48  
 49  
 50  @experimental(version="3.10.0")
 51  def list_workspaces() -> list[Workspace]:
 52      """Return the list of workspaces available to the current user."""
 53  
 54      return _workspace_client_call(lambda client: client.list_workspaces())
 55  
 56  
 57  @experimental(version="3.10.0")
 58  def get_workspace(name: str) -> Workspace:
 59      """Return metadata for the specified workspace."""
 60  
 61      return _workspace_client_call(lambda client: client.get_workspace(name))
 62  
 63  
 64  @experimental(version="3.10.0")
 65  def create_workspace(
 66      name: str, description: str | None = None, default_artifact_root: str | None = None
 67  ) -> Workspace:
 68      """Create a new workspace.
 69  
 70      Args:
 71          name: The workspace name (lowercase alphanumeric with optional internal hyphens).
 72          description: Optional description of the workspace.
 73          default_artifact_root: Optional artifact root URI; falls back to server default.
 74  
 75      Returns:
 76          The newly created workspace.
 77  
 78      Raises:
 79          MlflowException: If the name is invalid, already exists, or no artifact root available.
 80      """
 81      WorkspaceNameValidator.validate(name)
 82      return _workspace_client_call(
 83          lambda client: client.create_workspace(
 84              name=name,
 85              description=description,
 86              default_artifact_root=default_artifact_root,
 87          )
 88      )
 89  
 90  
 91  @experimental(version="3.10.0")
 92  def update_workspace(
 93      name: str, description: str | None = None, default_artifact_root: str | None = None
 94  ) -> Workspace:
 95      """Update metadata for an existing workspace.
 96  
 97      Args:
 98          name: The name of the workspace to update.
 99          description: New description, or ``None`` to leave unchanged.
100          default_artifact_root: New artifact root URI, empty string to clear, or ``None``.
101  
102      Returns:
103          The updated workspace.
104  
105      Raises:
106          MlflowException: If the workspace does not exist or no artifact root available.
107      """
108      if name != DEFAULT_WORKSPACE_NAME:
109          WorkspaceNameValidator.validate(name)
110      return _workspace_client_call(
111          lambda client: client.update_workspace(
112              name=name,
113              description=description,
114              default_artifact_root=default_artifact_root,
115          )
116      )
117  
118  
119  @experimental(version="3.10.0")
120  def delete_workspace(name: str, *, mode: str = WorkspaceDeletionMode.RESTRICT) -> None:
121      """Delete an existing workspace.
122  
123      Args:
124          name: Name of the workspace to delete.
125          mode: Deletion mode. One of SET_DEFAULT, CASCADE, or RESTRICT.
126      """
127      try:
128          deletion_mode = WorkspaceDeletionMode(mode)
129      except ValueError:
130          raise MlflowException.invalid_parameter_value(
131              f"Invalid deletion mode '{mode}'. "
132              f"Must be one of: {', '.join(m.value for m in WorkspaceDeletionMode)}"
133          )
134      if name != DEFAULT_WORKSPACE_NAME:
135          WorkspaceNameValidator.validate(name)
136      _workspace_client_call(lambda client: client.delete_workspace(name=name, mode=deletion_mode))
137  
138  
139  __all__ = [
140      "Workspace",
141      "set_workspace",
142      "list_workspaces",
143      "get_workspace",
144      "create_workspace",
145      "update_workspace",
146      "delete_workspace",
147  ]