test_tensorflow_dataset.py
1 import json 2 3 import numpy as np 4 import pytest 5 import tensorflow as tf 6 7 import mlflow.data 8 from mlflow.data.code_dataset_source import CodeDatasetSource 9 from mlflow.data.evaluation_dataset import EvaluationDataset 10 from mlflow.data.pyfunc_dataset_mixin import PyFuncInputsOutputs 11 from mlflow.data.schema import TensorDatasetSchema 12 from mlflow.data.tensorflow_dataset import TensorFlowDataset 13 from mlflow.exceptions import MlflowException 14 from mlflow.types.utils import _infer_schema 15 16 from tests.resources.data.dataset_source import SampleDatasetSource 17 18 19 def test_dataset_construction_validates_features_and_targets(): 20 x = np.random.sample((100, 2)) 21 tf_dataset = tf.data.Dataset.from_tensors(x) 22 tf_tensor = tf.convert_to_tensor(x) 23 24 with pytest.raises( 25 MlflowException, 26 match="features' must be an instance of tf.data.Dataset or a TensorFlow Tensor.*NoneType", 27 ): 28 mlflow.data.from_tensorflow(features=None) 29 with pytest.raises( 30 MlflowException, 31 match="features' must be an instance of tf.data.Dataset or a TensorFlow Tensor.*str", 32 ): 33 mlflow.data.from_tensorflow(features="foo") 34 with pytest.raises( 35 MlflowException, 36 match="features' must be an instance of tf.data.Dataset or a TensorFlow Tensor.*str", 37 ): 38 mlflow.data.from_tensorflow(features="foo", targets=tf_tensor) 39 40 mlflow.data.from_tensorflow(features=tf_tensor, targets=tf_tensor) 41 mlflow.data.from_tensorflow(features=tf_tensor, targets=None) 42 with pytest.raises( 43 MlflowException, 44 match=( 45 "If 'features' is a TensorFlow Tensor, then 'targets' must also be a TensorFlow" 46 " Tensor.*str" 47 ), 48 ): 49 mlflow.data.from_tensorflow(features=tf_tensor, targets="foo") 50 with pytest.raises( 51 MlflowException, 52 match=( 53 "If 'features' is a TensorFlow Tensor, then 'targets' must also be a TensorFlow" 54 " Tensor.*Dataset" 55 ), 56 ): 57 mlflow.data.from_tensorflow(features=tf_tensor, targets=tf_dataset) 58 59 mlflow.data.from_tensorflow(features=tf_dataset, targets=tf_dataset) 60 mlflow.data.from_tensorflow(features=tf_dataset, targets=None) 61 with pytest.raises( 62 MlflowException, 63 match=( 64 "If 'features' is an instance of tf.data.Dataset, then 'targets' must also be an" 65 " instance of tf.data.Dataset.*str" 66 ), 67 ): 68 mlflow.data.from_tensorflow(features=tf_dataset, targets="foo") 69 with pytest.raises( 70 MlflowException, 71 match=( 72 "If 'features' is an instance of tf.data.Dataset, then 'targets' must also be an" 73 " instance of tf.data.Dataset.*Tensor" 74 ), 75 ): 76 mlflow.data.from_tensorflow(features=tf_dataset, targets=tf_tensor) 77 78 79 def test_conversion_to_json(): 80 source_uri = "test:/my/test/uri" 81 x = np.random.sample((100, 2)) 82 tf_dataset = tf.data.Dataset.from_tensors(x) 83 source = SampleDatasetSource._resolve(source_uri) 84 dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname") 85 86 dataset_json = dataset.to_json() 87 parsed_json = json.loads(dataset_json) 88 assert parsed_json.keys() <= {"name", "digest", "source", "source_type", "schema", "profile"} 89 assert parsed_json["name"] == dataset.name 90 assert parsed_json["digest"] == dataset.digest 91 assert parsed_json["source"] == dataset.source.to_json() 92 assert parsed_json["source_type"] == dataset.source._get_source_type() 93 assert parsed_json["profile"] == json.dumps(dataset.profile) 94 95 parsed_schema = json.loads(parsed_json["schema"]) 96 assert TensorDatasetSchema.from_dict(parsed_schema) == dataset.schema 97 98 99 @pytest.mark.parametrize( 100 ("features", "targets"), 101 [ 102 ( 103 tf.data.Dataset.from_tensors({ 104 "a": np.random.sample((100, 2)), 105 "b": np.random.sample((100, 4)), 106 }), 107 tf.data.Dataset.from_tensors({ 108 "c": np.random.sample((100, 1)), 109 "d": np.random.sample((100,)), 110 }), 111 ), 112 ( 113 tf.data.Dataset.from_tensors(( 114 np.random.sample((100, 2)), 115 np.random.sample((100, 4)), 116 )), 117 tf.data.Dataset.from_tensors(( 118 np.random.sample((100, 1)), 119 np.random.sample((100,)), 120 )), 121 ), 122 ( 123 tf.data.Dataset.from_tensors(( 124 np.random.sample((100, 2)), 125 np.random.sample((100, 4)), 126 )), 127 tf.data.Dataset.from_tensors({ 128 "c": np.random.sample((100, 1)), 129 "d": np.random.sample((100,)), 130 }), 131 ), 132 ( 133 tf.data.Dataset.from_tensors(( 134 np.random.sample((100, 2)), 135 np.random.sample((100, 4)), 136 )), 137 None, 138 ), 139 ], 140 ) 141 def test_conversion_to_json_with_multi_tensor_datasets(features, targets): 142 source_uri = "test:/my/test/uri" 143 source = SampleDatasetSource._resolve(source_uri) 144 dataset = TensorFlowDataset(features=features, targets=targets, source=source, name="testname") 145 146 dataset_json = dataset.to_json() 147 parsed_json = json.loads(dataset_json) 148 assert parsed_json.keys() <= {"name", "digest", "source", "source_type", "schema", "profile"} 149 assert parsed_json["name"] == dataset.name 150 assert parsed_json["digest"] == dataset.digest 151 assert parsed_json["source"] == dataset.source.to_json() 152 assert parsed_json["source_type"] == dataset.source._get_source_type() 153 assert parsed_json["profile"] == json.dumps(dataset.profile) 154 155 parsed_schema = json.loads(parsed_json["schema"]) 156 assert TensorDatasetSchema.from_dict(parsed_schema) == dataset.schema 157 158 159 def test_schema_and_profile_with_multi_tensor_tuple_datasets(): 160 features_dataset = tf.data.Dataset.from_tensors(( 161 np.random.sample((100, 2)), 162 np.random.sample((100, 4)), 163 )) 164 targets_dataset = tf.data.Dataset.from_tensors(( 165 np.random.sample((100, 1)), 166 np.random.sample((100,)), 167 )) 168 source_uri = "test:/my/test/uri" 169 source = SampleDatasetSource._resolve(source_uri) 170 dataset = TensorFlowDataset( 171 features=features_dataset, targets=targets_dataset, source=source, name="testname" 172 ) 173 assert dataset.schema.features == _infer_schema({ 174 "0": np.random.sample((100, 2)), 175 "1": np.random.sample((100, 4)), 176 }) 177 assert dataset.schema.targets == _infer_schema({ 178 "0": np.random.sample((100, 1)), 179 "1": np.random.sample((100,)), 180 }) 181 assert dataset.profile == { 182 "features_cardinality": 1, 183 "targets_cardinality": 1, 184 } 185 assert dataset.profile == { 186 "features_cardinality": features_dataset.cardinality().numpy(), 187 "targets_cardinality": targets_dataset.cardinality().numpy(), 188 } 189 190 191 def test_schema_and_profile_with_multi_tensor_dict_datasets(): 192 features_dataset = tf.data.Dataset.from_tensors({ 193 "a": np.random.sample((100, 2)), 194 "b": np.random.sample((100, 4)), 195 }) 196 targets_dataset = tf.data.Dataset.from_tensors({ 197 "c": np.random.sample((100, 1)), 198 "d": np.random.sample((100,)), 199 }) 200 source_uri = "test:/my/test/uri" 201 source = SampleDatasetSource._resolve(source_uri) 202 dataset = TensorFlowDataset( 203 features=features_dataset, targets=targets_dataset, source=source, name="testname" 204 ) 205 assert dataset.schema.features == _infer_schema({ 206 "a": np.random.sample((100, 2)), 207 "b": np.random.sample((100, 4)), 208 }) 209 assert dataset.schema.targets == _infer_schema({ 210 "c": np.random.sample((100, 1)), 211 "d": np.random.sample((100,)), 212 }) 213 assert dataset.profile == { 214 "features_cardinality": 1, 215 "targets_cardinality": 1, 216 } 217 assert dataset.profile == { 218 "features_cardinality": features_dataset.cardinality().numpy(), 219 "targets_cardinality": targets_dataset.cardinality().numpy(), 220 } 221 222 223 def test_digest_property_has_expected_value(): 224 source_uri = "test:/my/test/uri" 225 x = [[1, 2, 3], [4, 5, 6]] 226 tf_dataset = tf.data.Dataset.from_tensors(x) 227 source = SampleDatasetSource._resolve(source_uri) 228 dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname") 229 assert dataset.digest == dataset._compute_digest() 230 assert dataset.digest == "666a9820" 231 232 233 def test_data_property_has_expected_value(): 234 source_uri = "test:/my/test/uri" 235 x = [[1, 2, 3], [4, 5, 6]] 236 tf_dataset = tf.data.Dataset.from_tensors(x) 237 source = SampleDatasetSource._resolve(source_uri) 238 dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname") 239 assert dataset.data == tf_dataset 240 241 242 def test_source_property_has_expected_value(): 243 source_uri = "test:/my/test/uri" 244 x = [[1, 2, 3], [4, 5, 6]] 245 tf_dataset = tf.data.Dataset.from_tensors(x) 246 source = SampleDatasetSource._resolve(source_uri) 247 dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname") 248 assert dataset.source == source 249 250 251 def test_profile_property_has_expected_value_dataset(): 252 source_uri = "test:/my/test/uri" 253 x = [[1, 2, 3], [4, 5, 6]] 254 tf_dataset = tf.data.Dataset.from_tensors(x) 255 source = SampleDatasetSource._resolve(source_uri) 256 dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname") 257 assert dataset.profile == { 258 "features_cardinality": tf_dataset.cardinality().numpy(), 259 } 260 261 262 def test_profile_property_has_expected_value_tensors(): 263 source_uri = "test:/my/test/uri" 264 x = [[1, 2, 3], [4, 5, 6]] 265 tf_tensor = tf.convert_to_tensor(x) 266 source = SampleDatasetSource._resolve(source_uri) 267 dataset = TensorFlowDataset(features=tf_tensor, source=source, name="testname") 268 assert dataset.profile == { 269 "features_cardinality": tf.size(tf_tensor).numpy(), 270 } 271 272 273 def test_to_pyfunc(): 274 source_uri = "test:/my/test/uri" 275 x = np.random.sample((100, 2)) 276 tf_dataset = tf.data.Dataset.from_tensors(x) 277 source = SampleDatasetSource._resolve(source_uri) 278 dataset = TensorFlowDataset(features=tf_dataset, source=source, name="testname") 279 assert isinstance(dataset.to_pyfunc(), PyFuncInputsOutputs) 280 281 282 def test_to_evaluation_dataset(): 283 source_uri = "test:/my/test/uri" 284 x = np.random.sample((2, 2)) 285 y = np.random.sample((2, 1)) 286 x_tensors = tf.convert_to_tensor(x) 287 y_tensors = tf.convert_to_tensor(y) 288 source = SampleDatasetSource._resolve(source_uri) 289 dataset = TensorFlowDataset( 290 features=x_tensors, source=source, targets=y_tensors, name="testname" 291 ) 292 evaluation_dataset = dataset.to_evaluation_dataset() 293 assert isinstance(evaluation_dataset, EvaluationDataset) 294 assert np.array_equal(evaluation_dataset.features_data, dataset.data.numpy()) 295 assert np.array_equal(evaluation_dataset.labels_data, dataset.targets.numpy()) 296 297 298 def test_to_evaluation_dataset_with_tensorflow_dataset_data(): 299 source_uri = "test:/my/test/uri" 300 x = np.random.sample((2, 2)) 301 y = np.random.sample((2, 1)) 302 x_tf_data = tf.data.Dataset.from_tensors(x) 303 y_tf_data = tf.data.Dataset.from_tensors(y) 304 source = SampleDatasetSource._resolve(source_uri) 305 dataset = TensorFlowDataset( 306 features=x_tf_data, source=source, targets=y_tf_data, name="testname" 307 ) 308 with pytest.raises( 309 MlflowException, match="Data must be a Tensor to convert to an EvaluationDataset" 310 ): 311 dataset.to_evaluation_dataset() 312 313 314 def test_from_tensorflow_dataset_constructs_expected_dataset(): 315 x = np.random.sample((100, 2)) 316 tf_dataset = tf.data.Dataset.from_tensors(x) 317 mlflow_ds = mlflow.data.from_tensorflow(tf_dataset, source="my_source") 318 assert isinstance(mlflow_ds, TensorFlowDataset) 319 assert mlflow_ds.data == tf_dataset 320 assert mlflow_ds.schema == TensorDatasetSchema( 321 features=_infer_schema(next(tf_dataset.as_numpy_iterator())) 322 ) 323 assert mlflow_ds.profile == { 324 "features_cardinality": tf_dataset.cardinality().numpy(), 325 } 326 327 328 def test_from_tensorflow_dataset_with_targets_constructs_expected_dataset(): 329 x = np.random.sample((100, 2)) 330 y = np.random.sample((100, 1)) 331 tf_dataset_x = tf.data.Dataset.from_tensors(x) 332 tf_dataset_y = tf.data.Dataset.from_tensors(y) 333 mlflow_ds = mlflow.data.from_tensorflow(tf_dataset_x, source="my_source", targets=tf_dataset_y) 334 assert isinstance(mlflow_ds, TensorFlowDataset) 335 assert mlflow_ds.data == tf_dataset_x 336 assert mlflow_ds.targets == tf_dataset_y 337 assert mlflow_ds.schema == TensorDatasetSchema( 338 features=_infer_schema(next(tf_dataset_x.as_numpy_iterator())), 339 targets=_infer_schema(next(tf_dataset_y.as_numpy_iterator())), 340 ) 341 assert mlflow_ds.profile == { 342 "features_cardinality": tf_dataset_x.cardinality().numpy(), 343 "targets_cardinality": tf_dataset_y.cardinality().numpy(), 344 } 345 346 347 def test_from_tensorflow_tensor_constructs_expected_dataset(): 348 x = np.random.sample((100, 2)) 349 tf_tensor = tf.convert_to_tensor(x) 350 mlflow_ds = mlflow.data.from_tensorflow(tf_tensor, source="my_source") 351 assert isinstance(mlflow_ds, TensorFlowDataset) 352 # compare if two tensors are equal using tensorflow utils 353 assert tf.reduce_all(tf.math.equal(mlflow_ds.data, tf_tensor)) 354 assert mlflow_ds.schema == TensorDatasetSchema(features=_infer_schema(tf_tensor.numpy())) 355 assert mlflow_ds.profile == { 356 "features_cardinality": tf.size(tf_tensor).numpy(), 357 } 358 359 360 def test_from_tensorflow_tensor_with_targets_constructs_expected_dataset(): 361 x = np.random.sample((100, 2)) 362 y = np.random.sample((100, 1)) 363 tf_tensor_x = tf.convert_to_tensor(x) 364 tf_tensor_y = tf.convert_to_tensor(y) 365 mlflow_ds = mlflow.data.from_tensorflow(tf_tensor_x, source="my_source", targets=tf_tensor_y) 366 assert isinstance(mlflow_ds, TensorFlowDataset) 367 assert tf.reduce_all(tf.math.equal(mlflow_ds.data, tf_tensor_x)) 368 assert tf.reduce_all(tf.math.equal(mlflow_ds.targets, tf_tensor_y)) 369 assert mlflow_ds.schema == TensorDatasetSchema( 370 features=_infer_schema(tf_tensor_x.numpy()), 371 targets=_infer_schema(tf_tensor_y.numpy()), 372 ) 373 assert mlflow_ds.profile == { 374 "features_cardinality": tf.size(tf_tensor_x).numpy(), 375 "targets_cardinality": tf.size(tf_tensor_y).numpy(), 376 } 377 378 379 def test_from_tensorflow_no_source_specified(): 380 x = np.random.sample((100, 2)) 381 tf_dataset = tf.data.Dataset.from_tensors(x) 382 mlflow_ds = mlflow.data.from_tensorflow(tf_dataset) 383 384 assert isinstance(mlflow_ds, TensorFlowDataset) 385 386 assert isinstance(mlflow_ds.source, CodeDatasetSource) 387 assert "mlflow.source.name" in mlflow_ds.source.to_json() 388 389 390 def test_digest_computation_succeeds_with_none_element_in_numpy_iterator(): 391 x = np.array([[0, 1], [1, 2]]) 392 tf_dataset = tf.data.Dataset.from_tensors(x) 393 tf_dataset.as_numpy_iterator = lambda: [None, x] 394 mlflow_ds = mlflow.data.from_tensorflow(tf_dataset) 395 assert mlflow_ds.digest == "bc8ef018"