numpy_dataset.py
1 import json 2 import logging 3 from functools import cached_property 4 from typing import Any 5 6 import numpy as np 7 8 from mlflow.data.dataset import Dataset 9 from mlflow.data.dataset_source import DatasetSource 10 from mlflow.data.digest_utils import compute_numpy_digest 11 from mlflow.data.evaluation_dataset import EvaluationDataset 12 from mlflow.data.pyfunc_dataset_mixin import PyFuncConvertibleDatasetMixin, PyFuncInputsOutputs 13 from mlflow.data.schema import TensorDatasetSchema 14 from mlflow.types.utils import _infer_schema 15 16 _logger = logging.getLogger(__name__) 17 18 19 class NumpyDataset(Dataset, PyFuncConvertibleDatasetMixin): 20 """ 21 Represents a NumPy dataset for use with MLflow Tracking. 22 """ 23 24 def __init__( 25 self, 26 features: np.ndarray | dict[str, np.ndarray], 27 source: DatasetSource, 28 targets: np.ndarray | dict[str, np.ndarray] = None, 29 name: str | None = None, 30 digest: str | None = None, 31 ): 32 """ 33 Args: 34 features: A numpy array or dictionary of numpy arrays containing dataset features. 35 source: The source of the numpy dataset. 36 targets: A numpy array or dictionary of numpy arrays containing dataset targets. 37 Optional. 38 name: The name of the dataset. E.g. "wiki_train". If unspecified, a name is 39 automatically generated. 40 digest: The digest (hash, fingerprint) of the dataset. If unspecified, a digest 41 is automatically computed. 42 """ 43 self._features = features 44 self._targets = targets 45 super().__init__(source=source, name=name, digest=digest) 46 47 def _compute_digest(self) -> str: 48 """ 49 Computes a digest for the dataset. Called if the user doesn't supply 50 a digest when constructing the dataset. 51 """ 52 return compute_numpy_digest(self._features, self._targets) 53 54 def to_dict(self) -> dict[str, str]: 55 """Create config dictionary for the dataset. 56 57 Returns a string dictionary containing the following fields: name, digest, source, source 58 type, schema, and profile. 59 """ 60 schema = json.dumps(self.schema.to_dict()) if self.schema else None 61 config = super().to_dict() 62 config.update({ 63 "schema": schema, 64 "profile": json.dumps(self.profile), 65 }) 66 return config 67 68 @property 69 def source(self) -> DatasetSource: 70 """ 71 The source of the dataset. 72 """ 73 return self._source 74 75 @property 76 def features(self) -> np.ndarray | dict[str, np.ndarray]: 77 """ 78 The features of the dataset. 79 """ 80 return self._features 81 82 @property 83 def targets(self) -> np.ndarray | dict[str, np.ndarray] | None: 84 """ 85 The targets of the dataset. May be ``None`` if no targets are available. 86 """ 87 return self._targets 88 89 @property 90 def profile(self) -> Any | None: 91 """ 92 A profile of the dataset. May be ``None`` if a profile cannot be computed. 93 """ 94 95 def get_profile_attribute(numpy_data, attr_name): 96 if isinstance(numpy_data, dict): 97 return {key: getattr(array, attr_name) for key, array in numpy_data.items()} 98 else: 99 return getattr(numpy_data, attr_name) 100 101 profile = { 102 "features_shape": get_profile_attribute(self._features, "shape"), 103 "features_size": get_profile_attribute(self._features, "size"), 104 "features_nbytes": get_profile_attribute(self._features, "nbytes"), 105 } 106 if self._targets is not None: 107 profile.update({ 108 "targets_shape": get_profile_attribute(self._targets, "shape"), 109 "targets_size": get_profile_attribute(self._targets, "size"), 110 "targets_nbytes": get_profile_attribute(self._targets, "nbytes"), 111 }) 112 113 return profile 114 115 @cached_property 116 def schema(self) -> TensorDatasetSchema | None: 117 """ 118 MLflow TensorSpec schema representing the dataset features and targets (optional). 119 """ 120 try: 121 features_schema = _infer_schema(self._features) 122 targets_schema = None 123 if self._targets is not None: 124 targets_schema = _infer_schema(self._targets) 125 return TensorDatasetSchema(features=features_schema, targets=targets_schema) 126 except Exception as e: 127 _logger.warning("Failed to infer schema for NumPy dataset. Exception: %s", e) 128 return None 129 130 def to_pyfunc(self) -> PyFuncInputsOutputs: 131 """ 132 Converts the dataset to a collection of pyfunc inputs and outputs for model 133 evaluation. Required for use with mlflow.evaluate(). 134 """ 135 return PyFuncInputsOutputs(self._features, self._targets) 136 137 def to_evaluation_dataset(self, path=None, feature_names=None) -> EvaluationDataset: 138 """ 139 Converts the dataset to an EvaluationDataset for model evaluation. Required 140 for use with mlflow.sklearn.evaluate(). 141 """ 142 return EvaluationDataset( 143 data=self._features, 144 targets=self._targets, 145 path=path, 146 feature_names=feature_names, 147 name=self.name, 148 digest=self.digest, 149 ) 150 151 152 def from_numpy( 153 features: np.ndarray | dict[str, np.ndarray], 154 source: str | DatasetSource = None, 155 targets: np.ndarray | dict[str, np.ndarray] = None, 156 name: str | None = None, 157 digest: str | None = None, 158 ) -> NumpyDataset: 159 """ 160 Constructs a :py:class:`NumpyDataset <mlflow.data.numpy_dataset.NumpyDataset>` object from 161 NumPy features, optional targets, and source. If the source is path like, then this will 162 construct a DatasetSource object from the source path. Otherwise, the source is assumed to 163 be a DatasetSource object. 164 165 Args: 166 features: NumPy features, represented as an np.ndarray or dictionary of named np.ndarrays. 167 source: The source from which the numpy data was derived, e.g. a filesystem path, an S3 URI, 168 an HTTPS URL, a delta table name with version, or spark table etc. ``source`` may be 169 specified as a URI, a path-like string, or an instance of 170 :py:class:`DatasetSource <mlflow.data.dataset_source.DatasetSource>`. If unspecified, 171 the source is assumed to be the code location (e.g. notebook cell, script, etc.) where 172 :py:func:`from_numpy <mlflow.data.from_numpy>` is being called. 173 targets: Optional NumPy targets, represented as an np.ndarray or dictionary of named 174 np.ndarrays. 175 name: The name of the dataset. If unspecified, a name is generated. 176 digest: The dataset digest (hash). If unspecified, a digest is computed automatically. 177 178 .. code-block:: python 179 :test: 180 :caption: Basic Example 181 182 import mlflow 183 import numpy as np 184 185 x = np.random.uniform(size=[2, 5, 4]) 186 y = np.random.randint(2, size=[2]) 187 dataset = mlflow.data.from_numpy(x, targets=y) 188 189 .. code-block:: python 190 :test: 191 :caption: Dict Example 192 193 import mlflow 194 import numpy as np 195 196 x = { 197 "feature_1": np.random.uniform(size=[2, 5, 4]), 198 "feature_2": np.random.uniform(size=[2, 5, 4]), 199 } 200 y = np.random.randint(2, size=[2]) 201 dataset = mlflow.data.from_numpy(x, targets=y) 202 """ 203 from mlflow.data.code_dataset_source import CodeDatasetSource 204 from mlflow.data.dataset_source_registry import resolve_dataset_source 205 from mlflow.tracking.context import registry 206 207 if source is not None: 208 if isinstance(source, DatasetSource): 209 resolved_source = source 210 else: 211 resolved_source = resolve_dataset_source( 212 source, 213 ) 214 else: 215 context_tags = registry.resolve_tags() 216 resolved_source = CodeDatasetSource(tags=context_tags) 217 return NumpyDataset( 218 features=features, source=resolved_source, targets=targets, name=name, digest=digest 219 )