/ tests / data / test_numpy_dataset.py
test_numpy_dataset.py
  1  import json
  2  
  3  import numpy as np
  4  import pandas as pd
  5  import pytest
  6  
  7  import mlflow.data
  8  from mlflow.data.code_dataset_source import CodeDatasetSource
  9  from mlflow.data.evaluation_dataset import EvaluationDataset
 10  from mlflow.data.filesystem_dataset_source import FileSystemDatasetSource
 11  from mlflow.data.numpy_dataset import NumpyDataset
 12  from mlflow.data.pyfunc_dataset_mixin import PyFuncInputsOutputs
 13  from mlflow.data.schema import TensorDatasetSchema
 14  from mlflow.types.utils import _infer_schema
 15  
 16  from tests.resources.data.dataset_source import SampleDatasetSource
 17  
 18  
 19  def test_conversion_to_json():
 20      source_uri = "test:/my/test/uri"
 21      source = SampleDatasetSource._resolve(source_uri)
 22      dataset = NumpyDataset(features=np.array([1, 2, 3]), source=source, name="testname")
 23  
 24      dataset_json = dataset.to_json()
 25      parsed_json = json.loads(dataset_json)
 26      assert parsed_json.keys() <= {"name", "digest", "source", "source_type", "schema", "profile"}
 27      assert parsed_json["name"] == dataset.name
 28      assert parsed_json["digest"] == dataset.digest
 29      assert parsed_json["source"] == dataset.source.to_json()
 30      assert parsed_json["source_type"] == dataset.source._get_source_type()
 31      assert parsed_json["profile"] == json.dumps(dataset.profile)
 32  
 33      parsed_schema = json.loads(parsed_json["schema"])
 34      assert TensorDatasetSchema.from_dict(parsed_schema) == dataset.schema
 35  
 36  
 37  @pytest.mark.parametrize(
 38      ("features", "targets"),
 39      [
 40          (
 41              {
 42                  "a": np.array([1, 2, 3]),
 43                  "b": np.array([[4, 5]]),
 44              },
 45              {
 46                  "c": np.array([1]),
 47                  "d": np.array([[[2]]]),
 48              },
 49          ),
 50          (
 51              np.array([1, 2, 3]),
 52              {
 53                  "c": np.array([1]),
 54                  "d": np.array([[[2]]]),
 55              },
 56          ),
 57          (
 58              {
 59                  "a": np.array([1, 2, 3]),
 60                  "b": np.array([[4, 5]]),
 61              },
 62              np.array([1, 2, 3]),
 63          ),
 64      ],
 65  )
 66  def test_conversion_to_json_with_multi_tensor_features_and_targets(features, targets):
 67      source_uri = "test:/my/test/uri"
 68      source = SampleDatasetSource._resolve(source_uri)
 69      dataset = NumpyDataset(features=features, targets=targets, source=source)
 70  
 71      dataset_json = dataset.to_json()
 72      parsed_json = json.loads(dataset_json)
 73      assert parsed_json.keys() <= {"name", "digest", "source", "source_type", "schema", "profile"}
 74      assert parsed_json["name"] == dataset.name
 75      assert parsed_json["digest"] == dataset.digest
 76      assert parsed_json["source"] == dataset.source.to_json()
 77      assert parsed_json["source_type"] == dataset.source._get_source_type()
 78      assert parsed_json["profile"] == json.dumps(dataset.profile)
 79      parsed_schema = json.loads(parsed_json["schema"])
 80      assert TensorDatasetSchema.from_dict(parsed_schema) == dataset.schema
 81  
 82  
 83  @pytest.mark.parametrize(
 84      ("features", "targets"),
 85      [
 86          (
 87              {
 88                  "a": np.array([1, 2, 3]),
 89                  "b": np.array([[4, 5]]),
 90              },
 91              {
 92                  "c": np.array([1]),
 93                  "d": np.array([[[2]]]),
 94              },
 95          ),
 96          (
 97              np.array([1, 2, 3]),
 98              {
 99                  "c": np.array([1]),
100                  "d": np.array([[[2]]]),
101              },
102          ),
103          (
104              {
105                  "a": np.array([1, 2, 3]),
106                  "b": np.array([[4, 5]]),
107              },
108              np.array([1, 2, 3]),
109          ),
110      ],
111  )
112  def test_schema_and_profile_with_multi_tensor_features_and_targets(features, targets):
113      source_uri = "test:/my/test/uri"
114      source = SampleDatasetSource._resolve(source_uri)
115      dataset = NumpyDataset(features=features, targets=targets, source=source)
116  
117      assert isinstance(dataset.schema, TensorDatasetSchema)
118      assert dataset.schema.features == _infer_schema(features)
119      assert dataset.schema.targets == _infer_schema(targets)
120  
121      if isinstance(features, dict):
122          assert {
123              "features_shape": {key: array.shape for key, array in features.items()},
124              "features_size": {key: array.size for key, array in features.items()},
125              "features_nbytes": {key: array.nbytes for key, array in features.items()},
126          }.items() <= dataset.profile.items()
127      else:
128          assert {
129              "features_shape": features.shape,
130              "features_size": features.size,
131              "features_nbytes": features.nbytes,
132          }.items() <= dataset.profile.items()
133  
134      if isinstance(targets, dict):
135          assert {
136              "targets_shape": {key: array.shape for key, array in targets.items()},
137              "targets_size": {key: array.size for key, array in targets.items()},
138              "targets_nbytes": {key: array.nbytes for key, array in targets.items()},
139          }.items() <= dataset.profile.items()
140      else:
141          assert {
142              "targets_shape": targets.shape,
143              "targets_size": targets.size,
144              "targets_nbytes": targets.nbytes,
145          }.items() <= dataset.profile.items()
146  
147  
148  def test_digest_property_has_expected_value():
149      source_uri = "test:/my/test/uri"
150      source = SampleDatasetSource._resolve(source_uri)
151      features = np.array([1, 2, 3])
152      targets = np.array([4, 5, 6])
153      dataset_with_features = NumpyDataset(features=features, source=source, name="testname")
154      assert dataset_with_features.digest == dataset_with_features._compute_digest()
155      assert dataset_with_features.digest == "fdf1765f"
156      dataset_with_features_and_targets = NumpyDataset(
157          features=features, targets=targets, source=source, name="testname"
158      )
159      assert (
160          dataset_with_features_and_targets.digest
161          == dataset_with_features_and_targets._compute_digest()
162      )
163      assert dataset_with_features_and_targets.digest == "1387de76"
164  
165  
166  def test_features_property():
167      source_uri = "test:/my/test/uri"
168      source = SampleDatasetSource._resolve(source_uri)
169      features = np.array([1, 2, 3])
170      dataset = NumpyDataset(features=features, source=source, name="testname")
171      assert np.array_equal(dataset.features, features)
172  
173  
174  def test_targets_property():
175      source_uri = "test:/my/test/uri"
176      source = SampleDatasetSource._resolve(source_uri)
177      features = np.array([1, 2, 3])
178      targets = np.array([4, 5, 6])
179      dataset_with_targets = NumpyDataset(
180          features=features, targets=targets, source=source, name="testname"
181      )
182      assert np.array_equal(dataset_with_targets.targets, targets)
183      dataset_without_targets = NumpyDataset(features=features, source=source, name="testname")
184      assert dataset_without_targets.targets is None
185  
186  
187  def test_to_pyfunc():
188      source_uri = "test:/my/test/uri"
189      source = SampleDatasetSource._resolve(source_uri)
190      features = np.array([1, 2, 3])
191      dataset = NumpyDataset(features=features, source=source, name="testname")
192      assert isinstance(dataset.to_pyfunc(), PyFuncInputsOutputs)
193  
194  
195  def test_to_evaluation_dataset():
196      source_uri = "test:/my/test/uri"
197      source = SampleDatasetSource._resolve(source_uri)
198      features = np.array([[1, 2], [3, 4]])
199      targets = np.array([0, 1])
200      dataset = NumpyDataset(features=features, targets=targets, source=source, name="testname")
201      evaluation_dataset = dataset.to_evaluation_dataset()
202      assert isinstance(evaluation_dataset, EvaluationDataset)
203      assert np.array_equal(evaluation_dataset.features_data, features)
204      assert np.array_equal(evaluation_dataset.labels_data, targets)
205  
206  
207  def test_from_numpy_features_only(tmp_path):
208      features = np.array([1, 2, 3])
209      path = tmp_path / "temp.csv"
210      pd.DataFrame(features).to_csv(path)
211      mlflow_features = mlflow.data.from_numpy(features, source=path)
212  
213      assert isinstance(mlflow_features, NumpyDataset)
214      assert np.array_equal(mlflow_features.features, features)
215      assert mlflow_features.schema == TensorDatasetSchema(features=_infer_schema(features))
216      assert mlflow_features.profile == {
217          "features_shape": features.shape,
218          "features_size": features.size,
219          "features_nbytes": features.nbytes,
220      }
221  
222      assert isinstance(mlflow_features.source, FileSystemDatasetSource)
223  
224  
225  def test_from_numpy_features_and_targets(tmp_path):
226      features = np.array([[1, 2, 3], [3, 2, 1], [2, 3, 1]])
227      targets = np.array([4, 5, 6])
228      path = tmp_path / "temp.csv"
229      pd.DataFrame(features).to_csv(path)
230      mlflow_ds = mlflow.data.from_numpy(features, targets=targets, source=path)
231  
232      assert isinstance(mlflow_ds, NumpyDataset)
233      assert np.array_equal(mlflow_ds.features, features)
234      assert np.array_equal(mlflow_ds.targets, targets)
235      assert mlflow_ds.schema == TensorDatasetSchema(
236          features=_infer_schema(features), targets=_infer_schema(targets)
237      )
238      assert mlflow_ds.profile == {
239          "features_shape": features.shape,
240          "features_size": features.size,
241          "features_nbytes": features.nbytes,
242          "targets_shape": targets.shape,
243          "targets_size": targets.size,
244          "targets_nbytes": targets.nbytes,
245      }
246  
247      assert isinstance(mlflow_ds.source, FileSystemDatasetSource)
248  
249  
250  def test_from_numpy_no_source_specified():
251      features = np.array([1, 2, 3])
252      mlflow_features = mlflow.data.from_numpy(features)
253  
254      assert isinstance(mlflow_features, NumpyDataset)
255  
256      assert isinstance(mlflow_features.source, CodeDatasetSource)
257      assert "mlflow.source.name" in mlflow_features.source.to_json()