workspace_helpers.py
1 from __future__ import annotations 2 3 import logging 4 import os 5 6 from flask import Response, request 7 8 from mlflow.entities import Workspace 9 from mlflow.environment_variables import ( 10 MLFLOW_ENABLE_WORKSPACES, 11 MLFLOW_WORKSPACE_STORE_URI, 12 ) 13 from mlflow.exceptions import MlflowException 14 from mlflow.protos import databricks_pb2 15 from mlflow.store.workspace.abstract_store import WorkspaceNameValidator 16 from mlflow.store.workspace.utils import get_default_workspace_optional 17 from mlflow.tracking._workspace.registry import get_workspace_store 18 from mlflow.utils import workspace_context, workspace_utils 19 from mlflow.utils.workspace_utils import ( 20 DEFAULT_WORKSPACE_NAME, 21 WORKSPACE_HEADER_NAME, 22 _normalize_workspace, 23 ) 24 25 _logger = logging.getLogger(__name__) 26 27 _workspace_store = None 28 29 30 def resolve_workspace_from_header(header_workspace: str | None) -> Workspace | None: 31 """ 32 Resolve (and validate) the active workspace given an optional header value. 33 34 When ``header_workspace`` is None or empty, the default workspace is used (if configured). 35 Returns None if no workspace can be resolved. 36 """ 37 store = _get_workspace_store() 38 39 if header_workspace := _normalize_workspace(header_workspace): 40 if header_workspace != DEFAULT_WORKSPACE_NAME: 41 WorkspaceNameValidator.validate(header_workspace) 42 return store.get_workspace(header_workspace) 43 44 workspace, _ = get_default_workspace_optional(store) 45 return workspace 46 47 48 def _get_workspace_store(workspace_uri: str | None = None, tracking_uri: str | None = None): 49 """ 50 Resolve and cache the workspace store configured for this server process. 51 52 The store is constructed on first invocation using the provided arguments (or their 53 environment-derived defaults) and memoized for all subsequent calls, regardless of any new 54 ``workspace_uri`` / ``tracking_uri`` values supplied later. 55 """ 56 if not MLFLOW_ENABLE_WORKSPACES.get(): 57 raise MlflowException( 58 "Workspace APIs are not available: workspaces are not enabled on this server", 59 databricks_pb2.FEATURE_DISABLED, 60 ) 61 62 global _workspace_store 63 if _workspace_store is not None: 64 return _workspace_store 65 66 from mlflow.server import BACKEND_STORE_URI_ENV_VAR 67 68 resolved_tracking_uri = tracking_uri or os.environ.get(BACKEND_STORE_URI_ENV_VAR) 69 resolved_workspace_uri = workspace_utils.resolve_workspace_store_uri( 70 workspace_uri, tracking_uri=resolved_tracking_uri 71 ) 72 if resolved_workspace_uri is None: 73 raise MlflowException.invalid_parameter_value( 74 "Workspace URI could not be resolved. Provide --workspace-store-uri or set " 75 f"{MLFLOW_WORKSPACE_STORE_URI.name}." 76 ) 77 78 _workspace_store = get_workspace_store(workspace_uri=resolved_workspace_uri) 79 return _workspace_store 80 81 82 def _workspace_error_response(exc: Exception) -> Response: 83 if isinstance(exc, MlflowException): 84 mlflow_exc = exc 85 else: 86 mlflow_exc = MlflowException( 87 str(exc), 88 error_code=databricks_pb2.INTERNAL_ERROR, 89 ) 90 # Preserve the original stack for debugging by chaining the exception. 91 mlflow_exc.__cause__ = exc 92 93 response = Response(mimetype="application/json") 94 response.set_data(mlflow_exc.serialize_as_json()) 95 response.status_code = mlflow_exc.get_http_status_code() 96 return response 97 98 99 def resolve_workspace_for_request_if_enabled( 100 path: str, 101 header_value: str | None, 102 ) -> Workspace | None: 103 # The server-info endpoint must remain reachable even if the workspace header points to a 104 # missing workspace, so skip workspace resolution entirely for this route. 105 if path.rstrip("/").endswith("/mlflow/server-info"): 106 return None 107 108 if not MLFLOW_ENABLE_WORKSPACES.get(): 109 if (header_value or "").strip(): 110 raise MlflowException( 111 "Workspace APIs are not available: workspaces are not enabled on this server", 112 error_code=databricks_pb2.FEATURE_DISABLED, 113 ) 114 return None 115 116 try: 117 return resolve_workspace_from_header(header_value) 118 except MlflowException: 119 raise 120 except Exception as exc: 121 _logger.exception("Unexpected error while resolving workspace") 122 raise MlflowException( 123 str(exc), 124 error_code=databricks_pb2.INTERNAL_ERROR, 125 ) from exc 126 127 128 def workspace_before_request_handler(): 129 # FastAPI middleware may have already resolved the workspace for this request, and the 130 # server does not set the env var so this should reflect request-scoped state. 131 if workspace_context.is_request_workspace_resolved(): 132 return None 133 134 header_value = request.headers.get(WORKSPACE_HEADER_NAME) 135 try: 136 workspace = resolve_workspace_for_request_if_enabled(request.path, header_value) 137 except MlflowException as exc: 138 return _workspace_error_response(exc) 139 140 workspace_context.set_server_request_workspace(workspace.name if workspace else None) 141 142 143 def workspace_teardown_request_handler(_exc): 144 if MLFLOW_ENABLE_WORKSPACES.get(): 145 workspace_context.clear_server_request_workspace() 146 147 148 __all__ = [ 149 "WORKSPACE_HEADER_NAME", 150 "resolve_workspace_from_header", 151 "resolve_workspace_for_request_if_enabled", 152 "_get_workspace_store", 153 "workspace_before_request_handler", 154 "workspace_teardown_request_handler", 155 ]