/ mlflow / data / numpy_dataset.py
numpy_dataset.py
  1  import json
  2  import logging
  3  from functools import cached_property
  4  from typing import Any
  5  
  6  import numpy as np
  7  
  8  from mlflow.data.dataset import Dataset
  9  from mlflow.data.dataset_source import DatasetSource
 10  from mlflow.data.digest_utils import compute_numpy_digest
 11  from mlflow.data.evaluation_dataset import EvaluationDataset
 12  from mlflow.data.pyfunc_dataset_mixin import PyFuncConvertibleDatasetMixin, PyFuncInputsOutputs
 13  from mlflow.data.schema import TensorDatasetSchema
 14  from mlflow.types.utils import _infer_schema
 15  
 16  _logger = logging.getLogger(__name__)
 17  
 18  
 19  class NumpyDataset(Dataset, PyFuncConvertibleDatasetMixin):
 20      """
 21      Represents a NumPy dataset for use with MLflow Tracking.
 22      """
 23  
 24      def __init__(
 25          self,
 26          features: np.ndarray | dict[str, np.ndarray],
 27          source: DatasetSource,
 28          targets: np.ndarray | dict[str, np.ndarray] = None,
 29          name: str | None = None,
 30          digest: str | None = None,
 31      ):
 32          """
 33          Args:
 34              features: A numpy array or dictionary of numpy arrays containing dataset features.
 35              source: The source of the numpy dataset.
 36              targets: A numpy array or dictionary of numpy arrays containing dataset targets.
 37                  Optional.
 38              name: The name of the dataset. E.g. "wiki_train". If unspecified, a name is
 39                  automatically generated.
 40              digest: The digest (hash, fingerprint) of the dataset. If unspecified, a digest
 41                  is automatically computed.
 42          """
 43          self._features = features
 44          self._targets = targets
 45          super().__init__(source=source, name=name, digest=digest)
 46  
 47      def _compute_digest(self) -> str:
 48          """
 49          Computes a digest for the dataset. Called if the user doesn't supply
 50          a digest when constructing the dataset.
 51          """
 52          return compute_numpy_digest(self._features, self._targets)
 53  
 54      def to_dict(self) -> dict[str, str]:
 55          """Create config dictionary for the dataset.
 56  
 57          Returns a string dictionary containing the following fields: name, digest, source, source
 58          type, schema, and profile.
 59          """
 60          schema = json.dumps(self.schema.to_dict()) if self.schema else None
 61          config = super().to_dict()
 62          config.update({
 63              "schema": schema,
 64              "profile": json.dumps(self.profile),
 65          })
 66          return config
 67  
 68      @property
 69      def source(self) -> DatasetSource:
 70          """
 71          The source of the dataset.
 72          """
 73          return self._source
 74  
 75      @property
 76      def features(self) -> np.ndarray | dict[str, np.ndarray]:
 77          """
 78          The features of the dataset.
 79          """
 80          return self._features
 81  
 82      @property
 83      def targets(self) -> np.ndarray | dict[str, np.ndarray] | None:
 84          """
 85          The targets of the dataset. May be ``None`` if no targets are available.
 86          """
 87          return self._targets
 88  
 89      @property
 90      def profile(self) -> Any | None:
 91          """
 92          A profile of the dataset. May be ``None`` if a profile cannot be computed.
 93          """
 94  
 95          def get_profile_attribute(numpy_data, attr_name):
 96              if isinstance(numpy_data, dict):
 97                  return {key: getattr(array, attr_name) for key, array in numpy_data.items()}
 98              else:
 99                  return getattr(numpy_data, attr_name)
100  
101          profile = {
102              "features_shape": get_profile_attribute(self._features, "shape"),
103              "features_size": get_profile_attribute(self._features, "size"),
104              "features_nbytes": get_profile_attribute(self._features, "nbytes"),
105          }
106          if self._targets is not None:
107              profile.update({
108                  "targets_shape": get_profile_attribute(self._targets, "shape"),
109                  "targets_size": get_profile_attribute(self._targets, "size"),
110                  "targets_nbytes": get_profile_attribute(self._targets, "nbytes"),
111              })
112  
113          return profile
114  
115      @cached_property
116      def schema(self) -> TensorDatasetSchema | None:
117          """
118          MLflow TensorSpec schema representing the dataset features and targets (optional).
119          """
120          try:
121              features_schema = _infer_schema(self._features)
122              targets_schema = None
123              if self._targets is not None:
124                  targets_schema = _infer_schema(self._targets)
125              return TensorDatasetSchema(features=features_schema, targets=targets_schema)
126          except Exception as e:
127              _logger.warning("Failed to infer schema for NumPy dataset. Exception: %s", e)
128              return None
129  
130      def to_pyfunc(self) -> PyFuncInputsOutputs:
131          """
132          Converts the dataset to a collection of pyfunc inputs and outputs for model
133          evaluation. Required for use with mlflow.evaluate().
134          """
135          return PyFuncInputsOutputs(self._features, self._targets)
136  
137      def to_evaluation_dataset(self, path=None, feature_names=None) -> EvaluationDataset:
138          """
139          Converts the dataset to an EvaluationDataset for model evaluation. Required
140          for use with mlflow.sklearn.evaluate().
141          """
142          return EvaluationDataset(
143              data=self._features,
144              targets=self._targets,
145              path=path,
146              feature_names=feature_names,
147              name=self.name,
148              digest=self.digest,
149          )
150  
151  
152  def from_numpy(
153      features: np.ndarray | dict[str, np.ndarray],
154      source: str | DatasetSource = None,
155      targets: np.ndarray | dict[str, np.ndarray] = None,
156      name: str | None = None,
157      digest: str | None = None,
158  ) -> NumpyDataset:
159      """
160      Constructs a :py:class:`NumpyDataset <mlflow.data.numpy_dataset.NumpyDataset>` object from
161      NumPy features, optional targets, and source. If the source is path like, then this will
162      construct a DatasetSource object from the source path. Otherwise, the source is assumed to
163      be a DatasetSource object.
164  
165      Args:
166          features: NumPy features, represented as an np.ndarray or dictionary of named np.ndarrays.
167          source: The source from which the numpy data was derived, e.g. a filesystem path, an S3 URI,
168              an HTTPS URL, a delta table name with version, or spark table etc. ``source`` may be
169              specified as a URI, a path-like string, or an instance of
170              :py:class:`DatasetSource <mlflow.data.dataset_source.DatasetSource>`. If unspecified,
171              the source is assumed to be the code location (e.g. notebook cell, script, etc.) where
172              :py:func:`from_numpy <mlflow.data.from_numpy>` is being called.
173          targets: Optional NumPy targets, represented as an np.ndarray or dictionary of named
174              np.ndarrays.
175          name: The name of the dataset. If unspecified, a name is generated.
176          digest: The dataset digest (hash). If unspecified, a digest is computed automatically.
177  
178      .. code-block:: python
179          :test:
180          :caption: Basic Example
181  
182          import mlflow
183          import numpy as np
184  
185          x = np.random.uniform(size=[2, 5, 4])
186          y = np.random.randint(2, size=[2])
187          dataset = mlflow.data.from_numpy(x, targets=y)
188  
189      .. code-block:: python
190          :test:
191          :caption: Dict Example
192  
193          import mlflow
194          import numpy as np
195  
196          x = {
197              "feature_1": np.random.uniform(size=[2, 5, 4]),
198              "feature_2": np.random.uniform(size=[2, 5, 4]),
199          }
200          y = np.random.randint(2, size=[2])
201          dataset = mlflow.data.from_numpy(x, targets=y)
202      """
203      from mlflow.data.code_dataset_source import CodeDatasetSource
204      from mlflow.data.dataset_source_registry import resolve_dataset_source
205      from mlflow.tracking.context import registry
206  
207      if source is not None:
208          if isinstance(source, DatasetSource):
209              resolved_source = source
210          else:
211              resolved_source = resolve_dataset_source(
212                  source,
213              )
214      else:
215          context_tags = registry.resolve_tags()
216          resolved_source = CodeDatasetSource(tags=context_tags)
217      return NumpyDataset(
218          features=features, source=resolved_source, targets=targets, name=name, digest=digest
219      )