/ tests / data / test_tensorflow_dataset.py
test_tensorflow_dataset.py
  1  import json
  2  
  3  import numpy as np
  4  import pytest
  5  import tensorflow as tf
  6  
  7  import mlflow.data
  8  from mlflow.data.code_dataset_source import CodeDatasetSource
  9  from mlflow.data.evaluation_dataset import EvaluationDataset
 10  from mlflow.data.pyfunc_dataset_mixin import PyFuncInputsOutputs
 11  from mlflow.data.schema import TensorDatasetSchema
 12  from mlflow.data.tensorflow_dataset import TensorFlowDataset
 13  from mlflow.exceptions import MlflowException
 14  from mlflow.types.utils import _infer_schema
 15  
 16  from tests.resources.data.dataset_source import SampleDatasetSource
 17  
 18  
 19  def test_dataset_construction_validates_features_and_targets():
 20      x = np.random.sample((100, 2))
 21      tf_dataset = tf.data.Dataset.from_tensors(x)
 22      tf_tensor = tf.convert_to_tensor(x)
 23  
 24      with pytest.raises(
 25          MlflowException,
 26          match="features' must be an instance of tf.data.Dataset or a TensorFlow Tensor.*NoneType",
 27      ):
 28          mlflow.data.from_tensorflow(features=None)
 29      with pytest.raises(
 30          MlflowException,
 31          match="features' must be an instance of tf.data.Dataset or a TensorFlow Tensor.*str",
 32      ):
 33          mlflow.data.from_tensorflow(features="foo")
 34      with pytest.raises(
 35          MlflowException,
 36          match="features' must be an instance of tf.data.Dataset or a TensorFlow Tensor.*str",
 37      ):
 38          mlflow.data.from_tensorflow(features="foo", targets=tf_tensor)
 39  
 40      mlflow.data.from_tensorflow(features=tf_tensor, targets=tf_tensor)
 41      mlflow.data.from_tensorflow(features=tf_tensor, targets=None)
 42      with pytest.raises(
 43          MlflowException,
 44          match=(
 45              "If 'features' is a TensorFlow Tensor, then 'targets' must also be a TensorFlow"
 46              " Tensor.*str"
 47          ),
 48      ):
 49          mlflow.data.from_tensorflow(features=tf_tensor, targets="foo")
 50      with pytest.raises(
 51          MlflowException,
 52          match=(
 53              "If 'features' is a TensorFlow Tensor, then 'targets' must also be a TensorFlow"
 54              " Tensor.*Dataset"
 55          ),
 56      ):
 57          mlflow.data.from_tensorflow(features=tf_tensor, targets=tf_dataset)
 58  
 59      mlflow.data.from_tensorflow(features=tf_dataset, targets=tf_dataset)
 60      mlflow.data.from_tensorflow(features=tf_dataset, targets=None)
 61      with pytest.raises(
 62          MlflowException,
 63          match=(
 64              "If 'features' is an instance of tf.data.Dataset, then 'targets' must also be an"
 65              " instance of tf.data.Dataset.*str"
 66          ),
 67      ):
 68          mlflow.data.from_tensorflow(features=tf_dataset, targets="foo")
 69      with pytest.raises(
 70          MlflowException,
 71          match=(
 72              "If 'features' is an instance of tf.data.Dataset, then 'targets' must also be an"
 73              " instance of tf.data.Dataset.*Tensor"
 74          ),
 75      ):
 76          mlflow.data.from_tensorflow(features=tf_dataset, targets=tf_tensor)
 77  
 78  
 79  def test_conversion_to_json():
 80      source_uri = "test:/my/test/uri"
 81      x = np.random.sample((100, 2))
 82      tf_dataset = tf.data.Dataset.from_tensors(x)
 83      source = SampleDatasetSource._resolve(source_uri)
 84      dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname")
 85  
 86      dataset_json = dataset.to_json()
 87      parsed_json = json.loads(dataset_json)
 88      assert parsed_json.keys() <= {"name", "digest", "source", "source_type", "schema", "profile"}
 89      assert parsed_json["name"] == dataset.name
 90      assert parsed_json["digest"] == dataset.digest
 91      assert parsed_json["source"] == dataset.source.to_json()
 92      assert parsed_json["source_type"] == dataset.source._get_source_type()
 93      assert parsed_json["profile"] == json.dumps(dataset.profile)
 94  
 95      parsed_schema = json.loads(parsed_json["schema"])
 96      assert TensorDatasetSchema.from_dict(parsed_schema) == dataset.schema
 97  
 98  
 99  @pytest.mark.parametrize(
100      ("features", "targets"),
101      [
102          (
103              tf.data.Dataset.from_tensors({
104                  "a": np.random.sample((100, 2)),
105                  "b": np.random.sample((100, 4)),
106              }),
107              tf.data.Dataset.from_tensors({
108                  "c": np.random.sample((100, 1)),
109                  "d": np.random.sample((100,)),
110              }),
111          ),
112          (
113              tf.data.Dataset.from_tensors((
114                  np.random.sample((100, 2)),
115                  np.random.sample((100, 4)),
116              )),
117              tf.data.Dataset.from_tensors((
118                  np.random.sample((100, 1)),
119                  np.random.sample((100,)),
120              )),
121          ),
122          (
123              tf.data.Dataset.from_tensors((
124                  np.random.sample((100, 2)),
125                  np.random.sample((100, 4)),
126              )),
127              tf.data.Dataset.from_tensors({
128                  "c": np.random.sample((100, 1)),
129                  "d": np.random.sample((100,)),
130              }),
131          ),
132          (
133              tf.data.Dataset.from_tensors((
134                  np.random.sample((100, 2)),
135                  np.random.sample((100, 4)),
136              )),
137              None,
138          ),
139      ],
140  )
141  def test_conversion_to_json_with_multi_tensor_datasets(features, targets):
142      source_uri = "test:/my/test/uri"
143      source = SampleDatasetSource._resolve(source_uri)
144      dataset = TensorFlowDataset(features=features, targets=targets, source=source, name="testname")
145  
146      dataset_json = dataset.to_json()
147      parsed_json = json.loads(dataset_json)
148      assert parsed_json.keys() <= {"name", "digest", "source", "source_type", "schema", "profile"}
149      assert parsed_json["name"] == dataset.name
150      assert parsed_json["digest"] == dataset.digest
151      assert parsed_json["source"] == dataset.source.to_json()
152      assert parsed_json["source_type"] == dataset.source._get_source_type()
153      assert parsed_json["profile"] == json.dumps(dataset.profile)
154  
155      parsed_schema = json.loads(parsed_json["schema"])
156      assert TensorDatasetSchema.from_dict(parsed_schema) == dataset.schema
157  
158  
159  def test_schema_and_profile_with_multi_tensor_tuple_datasets():
160      features_dataset = tf.data.Dataset.from_tensors((
161          np.random.sample((100, 2)),
162          np.random.sample((100, 4)),
163      ))
164      targets_dataset = tf.data.Dataset.from_tensors((
165          np.random.sample((100, 1)),
166          np.random.sample((100,)),
167      ))
168      source_uri = "test:/my/test/uri"
169      source = SampleDatasetSource._resolve(source_uri)
170      dataset = TensorFlowDataset(
171          features=features_dataset, targets=targets_dataset, source=source, name="testname"
172      )
173      assert dataset.schema.features == _infer_schema({
174          "0": np.random.sample((100, 2)),
175          "1": np.random.sample((100, 4)),
176      })
177      assert dataset.schema.targets == _infer_schema({
178          "0": np.random.sample((100, 1)),
179          "1": np.random.sample((100,)),
180      })
181      assert dataset.profile == {
182          "features_cardinality": 1,
183          "targets_cardinality": 1,
184      }
185      assert dataset.profile == {
186          "features_cardinality": features_dataset.cardinality().numpy(),
187          "targets_cardinality": targets_dataset.cardinality().numpy(),
188      }
189  
190  
191  def test_schema_and_profile_with_multi_tensor_dict_datasets():
192      features_dataset = tf.data.Dataset.from_tensors({
193          "a": np.random.sample((100, 2)),
194          "b": np.random.sample((100, 4)),
195      })
196      targets_dataset = tf.data.Dataset.from_tensors({
197          "c": np.random.sample((100, 1)),
198          "d": np.random.sample((100,)),
199      })
200      source_uri = "test:/my/test/uri"
201      source = SampleDatasetSource._resolve(source_uri)
202      dataset = TensorFlowDataset(
203          features=features_dataset, targets=targets_dataset, source=source, name="testname"
204      )
205      assert dataset.schema.features == _infer_schema({
206          "a": np.random.sample((100, 2)),
207          "b": np.random.sample((100, 4)),
208      })
209      assert dataset.schema.targets == _infer_schema({
210          "c": np.random.sample((100, 1)),
211          "d": np.random.sample((100,)),
212      })
213      assert dataset.profile == {
214          "features_cardinality": 1,
215          "targets_cardinality": 1,
216      }
217      assert dataset.profile == {
218          "features_cardinality": features_dataset.cardinality().numpy(),
219          "targets_cardinality": targets_dataset.cardinality().numpy(),
220      }
221  
222  
223  def test_digest_property_has_expected_value():
224      source_uri = "test:/my/test/uri"
225      x = [[1, 2, 3], [4, 5, 6]]
226      tf_dataset = tf.data.Dataset.from_tensors(x)
227      source = SampleDatasetSource._resolve(source_uri)
228      dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname")
229      assert dataset.digest == dataset._compute_digest()
230      assert dataset.digest == "666a9820"
231  
232  
233  def test_data_property_has_expected_value():
234      source_uri = "test:/my/test/uri"
235      x = [[1, 2, 3], [4, 5, 6]]
236      tf_dataset = tf.data.Dataset.from_tensors(x)
237      source = SampleDatasetSource._resolve(source_uri)
238      dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname")
239      assert dataset.data == tf_dataset
240  
241  
242  def test_source_property_has_expected_value():
243      source_uri = "test:/my/test/uri"
244      x = [[1, 2, 3], [4, 5, 6]]
245      tf_dataset = tf.data.Dataset.from_tensors(x)
246      source = SampleDatasetSource._resolve(source_uri)
247      dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname")
248      assert dataset.source == source
249  
250  
251  def test_profile_property_has_expected_value_dataset():
252      source_uri = "test:/my/test/uri"
253      x = [[1, 2, 3], [4, 5, 6]]
254      tf_dataset = tf.data.Dataset.from_tensors(x)
255      source = SampleDatasetSource._resolve(source_uri)
256      dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname")
257      assert dataset.profile == {
258          "features_cardinality": tf_dataset.cardinality().numpy(),
259      }
260  
261  
262  def test_profile_property_has_expected_value_tensors():
263      source_uri = "test:/my/test/uri"
264      x = [[1, 2, 3], [4, 5, 6]]
265      tf_tensor = tf.convert_to_tensor(x)
266      source = SampleDatasetSource._resolve(source_uri)
267      dataset = TensorFlowDataset(features=tf_tensor, source=source, name="testname")
268      assert dataset.profile == {
269          "features_cardinality": tf.size(tf_tensor).numpy(),
270      }
271  
272  
273  def test_to_pyfunc():
274      source_uri = "test:/my/test/uri"
275      x = np.random.sample((100, 2))
276      tf_dataset = tf.data.Dataset.from_tensors(x)
277      source = SampleDatasetSource._resolve(source_uri)
278      dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname")
279      assert isinstance(dataset.to_pyfunc(), PyFuncInputsOutputs)
280  
281  
282  def test_to_evaluation_dataset():
283      source_uri = "test:/my/test/uri"
284      x = np.random.sample((2, 2))
285      y = np.random.sample((2, 1))
286      x_tensors = tf.convert_to_tensor(x)
287      y_tensors = tf.convert_to_tensor(y)
288      source = SampleDatasetSource._resolve(source_uri)
289      dataset = TensorFlowDataset(
290          features=x_tensors, source=source, targets=y_tensors, name="testname"
291      )
292      evaluation_dataset = dataset.to_evaluation_dataset()
293      assert isinstance(evaluation_dataset, EvaluationDataset)
294      assert np.array_equal(evaluation_dataset.features_data, dataset.data.numpy())
295      assert np.array_equal(evaluation_dataset.labels_data, dataset.targets.numpy())
296  
297  
298  def test_to_evaluation_dataset_with_tensorflow_dataset_data():
299      source_uri = "test:/my/test/uri"
300      x = np.random.sample((2, 2))
301      y = np.random.sample((2, 1))
302      x_tf_data = tf.data.Dataset.from_tensors(x)
303      y_tf_data = tf.data.Dataset.from_tensors(y)
304      source = SampleDatasetSource._resolve(source_uri)
305      dataset = TensorFlowDataset(
306          features=x_tf_data, source=source, targets=y_tf_data, name="testname"
307      )
308      with pytest.raises(
309          MlflowException, match="Data must be a Tensor to convert to an EvaluationDataset"
310      ):
311          dataset.to_evaluation_dataset()
312  
313  
314  def test_from_tensorflow_dataset_constructs_expected_dataset():
315      x = np.random.sample((100, 2))
316      tf_dataset = tf.data.Dataset.from_tensors(x)
317      mlflow_ds = mlflow.data.from_tensorflow(tf_dataset, source="my_source")
318      assert isinstance(mlflow_ds, TensorFlowDataset)
319      assert mlflow_ds.data == tf_dataset
320      assert mlflow_ds.schema == TensorDatasetSchema(
321          features=_infer_schema(next(tf_dataset.as_numpy_iterator()))
322      )
323      assert mlflow_ds.profile == {
324          "features_cardinality": tf_dataset.cardinality().numpy(),
325      }
326  
327  
328  def test_from_tensorflow_dataset_with_targets_constructs_expected_dataset():
329      x = np.random.sample((100, 2))
330      y = np.random.sample((100, 1))
331      tf_dataset_x = tf.data.Dataset.from_tensors(x)
332      tf_dataset_y = tf.data.Dataset.from_tensors(y)
333      mlflow_ds = mlflow.data.from_tensorflow(tf_dataset_x, source="my_source", targets=tf_dataset_y)
334      assert isinstance(mlflow_ds, TensorFlowDataset)
335      assert mlflow_ds.data == tf_dataset_x
336      assert mlflow_ds.targets == tf_dataset_y
337      assert mlflow_ds.schema == TensorDatasetSchema(
338          features=_infer_schema(next(tf_dataset_x.as_numpy_iterator())),
339          targets=_infer_schema(next(tf_dataset_y.as_numpy_iterator())),
340      )
341      assert mlflow_ds.profile == {
342          "features_cardinality": tf_dataset_x.cardinality().numpy(),
343          "targets_cardinality": tf_dataset_y.cardinality().numpy(),
344      }
345  
346  
347  def test_from_tensorflow_tensor_constructs_expected_dataset():
348      x = np.random.sample((100, 2))
349      tf_tensor = tf.convert_to_tensor(x)
350      mlflow_ds = mlflow.data.from_tensorflow(tf_tensor, source="my_source")
351      assert isinstance(mlflow_ds, TensorFlowDataset)
352      # compare if two tensors are equal using tensorflow utils
353      assert tf.reduce_all(tf.math.equal(mlflow_ds.data, tf_tensor))
354      assert mlflow_ds.schema == TensorDatasetSchema(features=_infer_schema(tf_tensor.numpy()))
355      assert mlflow_ds.profile == {
356          "features_cardinality": tf.size(tf_tensor).numpy(),
357      }
358  
359  
360  def test_from_tensorflow_tensor_with_targets_constructs_expected_dataset():
361      x = np.random.sample((100, 2))
362      y = np.random.sample((100, 1))
363      tf_tensor_x = tf.convert_to_tensor(x)
364      tf_tensor_y = tf.convert_to_tensor(y)
365      mlflow_ds = mlflow.data.from_tensorflow(tf_tensor_x, source="my_source", targets=tf_tensor_y)
366      assert isinstance(mlflow_ds, TensorFlowDataset)
367      assert tf.reduce_all(tf.math.equal(mlflow_ds.data, tf_tensor_x))
368      assert tf.reduce_all(tf.math.equal(mlflow_ds.targets, tf_tensor_y))
369      assert mlflow_ds.schema == TensorDatasetSchema(
370          features=_infer_schema(tf_tensor_x.numpy()),
371          targets=_infer_schema(tf_tensor_y.numpy()),
372      )
373      assert mlflow_ds.profile == {
374          "features_cardinality": tf.size(tf_tensor_x).numpy(),
375          "targets_cardinality": tf.size(tf_tensor_y).numpy(),
376      }
377  
378  
379  def test_from_tensorflow_no_source_specified():
380      x = np.random.sample((100, 2))
381      tf_dataset = tf.data.Dataset.from_tensors(x)
382      mlflow_ds = mlflow.data.from_tensorflow(tf_dataset)
383  
384      assert isinstance(mlflow_ds, TensorFlowDataset)
385  
386      assert isinstance(mlflow_ds.source, CodeDatasetSource)
387      assert "mlflow.source.name" in mlflow_ds.source.to_json()
388  
389  
390  def test_digest_computation_succeeds_with_none_element_in_numpy_iterator():
391      x = np.array([[0, 1], [1, 2]])
392      tf_dataset = tf.data.Dataset.from_tensors(x)
393      tf_dataset.as_numpy_iterator = lambda: [None, x]
394      mlflow_ds = mlflow.data.from_tensorflow(tf_dataset)
395      assert mlflow_ds.digest == "bc8ef018"