fluent.py
1 from __future__ import annotations 2 3 import threading 4 from typing import Callable, TypeVar 5 6 from mlflow.entities.workspace import Workspace, WorkspaceDeletionMode 7 from mlflow.exceptions import MlflowException, RestException 8 from mlflow.protos import databricks_pb2 9 from mlflow.protos.databricks_pb2 import FEATURE_DISABLED 10 from mlflow.store.workspace.abstract_store import WorkspaceNameValidator 11 from mlflow.tracking.client import MlflowClient 12 from mlflow.utils.annotations import experimental 13 from mlflow.utils.workspace_context import set_workspace as set_context_workspace 14 from mlflow.utils.workspace_utils import DEFAULT_WORKSPACE_NAME 15 16 T = TypeVar("T") 17 18 _workspace_lock = threading.Lock() 19 20 21 def _workspace_client_call(func: Callable[[MlflowClient], T]) -> T: 22 client = MlflowClient() 23 try: 24 return func(client) 25 except RestException as exc: 26 if exc.error_code == databricks_pb2.ErrorCode.Name(databricks_pb2.ENDPOINT_NOT_FOUND): 27 raise MlflowException( 28 "The configured tracking server does not expose workspace APIs. " 29 "Ensure workspace is enabled.", 30 error_code=FEATURE_DISABLED, 31 ) from exc 32 raise 33 34 35 @experimental(version="3.10.0") 36 def set_workspace(workspace: str | None) -> None: 37 """Set the active workspace for subsequent MLflow operations.""" 38 39 with _workspace_lock: 40 if workspace is None: 41 set_context_workspace(None) 42 return 43 44 if workspace != DEFAULT_WORKSPACE_NAME: 45 WorkspaceNameValidator.validate(workspace) 46 47 set_context_workspace(workspace) 48 49 50 @experimental(version="3.10.0") 51 def list_workspaces() -> list[Workspace]: 52 """Return the list of workspaces available to the current user.""" 53 54 return _workspace_client_call(lambda client: client.list_workspaces()) 55 56 57 @experimental(version="3.10.0") 58 def get_workspace(name: str) -> Workspace: 59 """Return metadata for the specified workspace.""" 60 61 return _workspace_client_call(lambda client: client.get_workspace(name)) 62 63 64 @experimental(version="3.10.0") 65 def create_workspace( 66 name: str, description: str | None = None, default_artifact_root: str | None = None 67 ) -> Workspace: 68 """Create a new workspace. 69 70 Args: 71 name: The workspace name (lowercase alphanumeric with optional internal hyphens). 72 description: Optional description of the workspace. 73 default_artifact_root: Optional artifact root URI; falls back to server default. 74 75 Returns: 76 The newly created workspace. 77 78 Raises: 79 MlflowException: If the name is invalid, already exists, or no artifact root available. 80 """ 81 WorkspaceNameValidator.validate(name) 82 return _workspace_client_call( 83 lambda client: client.create_workspace( 84 name=name, 85 description=description, 86 default_artifact_root=default_artifact_root, 87 ) 88 ) 89 90 91 @experimental(version="3.10.0") 92 def update_workspace( 93 name: str, description: str | None = None, default_artifact_root: str | None = None 94 ) -> Workspace: 95 """Update metadata for an existing workspace. 96 97 Args: 98 name: The name of the workspace to update. 99 description: New description, or ``None`` to leave unchanged. 100 default_artifact_root: New artifact root URI, empty string to clear, or ``None``. 101 102 Returns: 103 The updated workspace. 104 105 Raises: 106 MlflowException: If the workspace does not exist or no artifact root available. 107 """ 108 if name != DEFAULT_WORKSPACE_NAME: 109 WorkspaceNameValidator.validate(name) 110 return _workspace_client_call( 111 lambda client: client.update_workspace( 112 name=name, 113 description=description, 114 default_artifact_root=default_artifact_root, 115 ) 116 ) 117 118 119 @experimental(version="3.10.0") 120 def delete_workspace(name: str, *, mode: str = WorkspaceDeletionMode.RESTRICT) -> None: 121 """Delete an existing workspace. 122 123 Args: 124 name: Name of the workspace to delete. 125 mode: Deletion mode. One of SET_DEFAULT, CASCADE, or RESTRICT. 126 """ 127 try: 128 deletion_mode = WorkspaceDeletionMode(mode) 129 except ValueError: 130 raise MlflowException.invalid_parameter_value( 131 f"Invalid deletion mode '{mode}'. " 132 f"Must be one of: {', '.join(m.value for m in WorkspaceDeletionMode)}" 133 ) 134 if name != DEFAULT_WORKSPACE_NAME: 135 WorkspaceNameValidator.validate(name) 136 _workspace_client_call(lambda client: client.delete_workspace(name=name, mode=deletion_mode)) 137 138 139 __all__ = [ 140 "Workspace", 141 "set_workspace", 142 "list_workspaces", 143 "get_workspace", 144 "create_workspace", 145 "update_workspace", 146 "delete_workspace", 147 ]