test_numpy_dataset.py
1 import json 2 3 import numpy as np 4 import pandas as pd 5 import pytest 6 7 import mlflow.data 8 from mlflow.data.code_dataset_source import CodeDatasetSource 9 from mlflow.data.evaluation_dataset import EvaluationDataset 10 from mlflow.data.filesystem_dataset_source import FileSystemDatasetSource 11 from mlflow.data.numpy_dataset import NumpyDataset 12 from mlflow.data.pyfunc_dataset_mixin import PyFuncInputsOutputs 13 from mlflow.data.schema import TensorDatasetSchema 14 from mlflow.types.utils import _infer_schema 15 16 from tests.resources.data.dataset_source import SampleDatasetSource 17 18 19 def test_conversion_to_json(): 20 source_uri = "test:/my/test/uri" 21 source = SampleDatasetSource._resolve(source_uri) 22 dataset = NumpyDataset(features=np.array([1, 2, 3]), source=source, name="testname") 23 24 dataset_json = dataset.to_json() 25 parsed_json = json.loads(dataset_json) 26 assert parsed_json.keys() <= {"name", "digest", "source", "source_type", "schema", "profile"} 27 assert parsed_json["name"] == dataset.name 28 assert parsed_json["digest"] == dataset.digest 29 assert parsed_json["source"] == dataset.source.to_json() 30 assert parsed_json["source_type"] == dataset.source._get_source_type() 31 assert parsed_json["profile"] == json.dumps(dataset.profile) 32 33 parsed_schema = json.loads(parsed_json["schema"]) 34 assert TensorDatasetSchema.from_dict(parsed_schema) == dataset.schema 35 36 37 @pytest.mark.parametrize( 38 ("features", "targets"), 39 [ 40 ( 41 { 42 "a": np.array([1, 2, 3]), 43 "b": np.array([[4, 5]]), 44 }, 45 { 46 "c": np.array([1]), 47 "d": np.array([[[2]]]), 48 }, 49 ), 50 ( 51 np.array([1, 2, 3]), 52 { 53 "c": np.array([1]), 54 "d": np.array([[[2]]]), 55 }, 56 ), 57 ( 58 { 59 "a": np.array([1, 2, 3]), 60 "b": np.array([[4, 5]]), 61 }, 62 np.array([1, 2, 3]), 63 ), 64 ], 65 ) 66 def test_conversion_to_json_with_multi_tensor_features_and_targets(features, targets): 67 source_uri = "test:/my/test/uri" 68 source = SampleDatasetSource._resolve(source_uri) 69 dataset = NumpyDataset(features=features, targets=targets, source=source) 70 71 dataset_json = dataset.to_json() 72 parsed_json = json.loads(dataset_json) 73 assert parsed_json.keys() <= {"name", "digest", "source", "source_type", "schema", "profile"} 74 assert parsed_json["name"] == dataset.name 75 assert parsed_json["digest"] == dataset.digest 76 assert parsed_json["source"] == dataset.source.to_json() 77 assert parsed_json["source_type"] == dataset.source._get_source_type() 78 assert parsed_json["profile"] == json.dumps(dataset.profile) 79 parsed_schema = json.loads(parsed_json["schema"]) 80 assert TensorDatasetSchema.from_dict(parsed_schema) == dataset.schema 81 82 83 @pytest.mark.parametrize( 84 ("features", "targets"), 85 [ 86 ( 87 { 88 "a": np.array([1, 2, 3]), 89 "b": np.array([[4, 5]]), 90 }, 91 { 92 "c": np.array([1]), 93 "d": np.array([[[2]]]), 94 }, 95 ), 96 ( 97 np.array([1, 2, 3]), 98 { 99 "c": np.array([1]), 100 "d": np.array([[[2]]]), 101 }, 102 ), 103 ( 104 { 105 "a": np.array([1, 2, 3]), 106 "b": np.array([[4, 5]]), 107 }, 108 np.array([1, 2, 3]), 109 ), 110 ], 111 ) 112 def test_schema_and_profile_with_multi_tensor_features_and_targets(features, targets): 113 source_uri = "test:/my/test/uri" 114 source = SampleDatasetSource._resolve(source_uri) 115 dataset = NumpyDataset(features=features, targets=targets, source=source) 116 117 assert isinstance(dataset.schema, TensorDatasetSchema) 118 assert dataset.schema.features == _infer_schema(features) 119 assert dataset.schema.targets == _infer_schema(targets) 120 121 if isinstance(features, dict): 122 assert { 123 "features_shape": {key: array.shape for key, array in features.items()}, 124 "features_size": {key: array.size for key, array in features.items()}, 125 "features_nbytes": {key: array.nbytes for key, array in features.items()}, 126 }.items() <= dataset.profile.items() 127 else: 128 assert { 129 "features_shape": features.shape, 130 "features_size": features.size, 131 "features_nbytes": features.nbytes, 132 }.items() <= dataset.profile.items() 133 134 if isinstance(targets, dict): 135 assert { 136 "targets_shape": {key: array.shape for key, array in targets.items()}, 137 "targets_size": {key: array.size for key, array in targets.items()}, 138 "targets_nbytes": {key: array.nbytes for key, array in targets.items()}, 139 }.items() <= dataset.profile.items() 140 else: 141 assert { 142 "targets_shape": targets.shape, 143 "targets_size": targets.size, 144 "targets_nbytes": targets.nbytes, 145 }.items() <= dataset.profile.items() 146 147 148 def test_digest_property_has_expected_value(): 149 source_uri = "test:/my/test/uri" 150 source = SampleDatasetSource._resolve(source_uri) 151 features = np.array([1, 2, 3]) 152 targets = np.array([4, 5, 6]) 153 dataset_with_features = NumpyDataset(features=features, source=source, name="testname") 154 assert dataset_with_features.digest == dataset_with_features._compute_digest() 155 assert dataset_with_features.digest == "fdf1765f" 156 dataset_with_features_and_targets = NumpyDataset( 157 features=features, targets=targets, source=source, name="testname" 158 ) 159 assert ( 160 dataset_with_features_and_targets.digest 161 == dataset_with_features_and_targets._compute_digest() 162 ) 163 assert dataset_with_features_and_targets.digest == "1387de76" 164 165 166 def test_features_property(): 167 source_uri = "test:/my/test/uri" 168 source = SampleDatasetSource._resolve(source_uri) 169 features = np.array([1, 2, 3]) 170 dataset = NumpyDataset(features=features, source=source, name="testname") 171 assert np.array_equal(dataset.features, features) 172 173 174 def test_targets_property(): 175 source_uri = "test:/my/test/uri" 176 source = SampleDatasetSource._resolve(source_uri) 177 features = np.array([1, 2, 3]) 178 targets = np.array([4, 5, 6]) 179 dataset_with_targets = NumpyDataset( 180 features=features, targets=targets, source=source, name="testname" 181 ) 182 assert np.array_equal(dataset_with_targets.targets, targets) 183 dataset_without_targets = NumpyDataset(features=features, source=source, name="testname") 184 assert dataset_without_targets.targets is None 185 186 187 def test_to_pyfunc(): 188 source_uri = "test:/my/test/uri" 189 source = SampleDatasetSource._resolve(source_uri) 190 features = np.array([1, 2, 3]) 191 dataset = NumpyDataset(features=features, source=source, name="testname") 192 assert isinstance(dataset.to_pyfunc(), PyFuncInputsOutputs) 193 194 195 def test_to_evaluation_dataset(): 196 source_uri = "test:/my/test/uri" 197 source = SampleDatasetSource._resolve(source_uri) 198 features = np.array([[1, 2], [3, 4]]) 199 targets = np.array([0, 1]) 200 dataset = NumpyDataset(features=features, targets=targets, source=source, name="testname") 201 evaluation_dataset = dataset.to_evaluation_dataset() 202 assert isinstance(evaluation_dataset, EvaluationDataset) 203 assert np.array_equal(evaluation_dataset.features_data, features) 204 assert np.array_equal(evaluation_dataset.labels_data, targets) 205 206 207 def test_from_numpy_features_only(tmp_path): 208 features = np.array([1, 2, 3]) 209 path = tmp_path / "temp.csv" 210 pd.DataFrame(features).to_csv(path) 211 mlflow_features = mlflow.data.from_numpy(features, source=path) 212 213 assert isinstance(mlflow_features, NumpyDataset) 214 assert np.array_equal(mlflow_features.features, features) 215 assert mlflow_features.schema == TensorDatasetSchema(features=_infer_schema(features)) 216 assert mlflow_features.profile == { 217 "features_shape": features.shape, 218 "features_size": features.size, 219 "features_nbytes": features.nbytes, 220 } 221 222 assert isinstance(mlflow_features.source, FileSystemDatasetSource) 223 224 225 def test_from_numpy_features_and_targets(tmp_path): 226 features = np.array([[1, 2, 3], [3, 2, 1], [2, 3, 1]]) 227 targets = np.array([4, 5, 6]) 228 path = tmp_path / "temp.csv" 229 pd.DataFrame(features).to_csv(path) 230 mlflow_ds = mlflow.data.from_numpy(features, targets=targets, source=path) 231 232 assert isinstance(mlflow_ds, NumpyDataset) 233 assert np.array_equal(mlflow_ds.features, features) 234 assert np.array_equal(mlflow_ds.targets, targets) 235 assert mlflow_ds.schema == TensorDatasetSchema( 236 features=_infer_schema(features), targets=_infer_schema(targets) 237 ) 238 assert mlflow_ds.profile == { 239 "features_shape": features.shape, 240 "features_size": features.size, 241 "features_nbytes": features.nbytes, 242 "targets_shape": targets.shape, 243 "targets_size": targets.size, 244 "targets_nbytes": targets.nbytes, 245 } 246 247 assert isinstance(mlflow_ds.source, FileSystemDatasetSource) 248 249 250 def test_from_numpy_no_source_specified(): 251 features = np.array([1, 2, 3]) 252 mlflow_features = mlflow.data.from_numpy(features) 253 254 assert isinstance(mlflow_features, NumpyDataset) 255 256 assert isinstance(mlflow_features.source, CodeDatasetSource) 257 assert "mlflow.source.name" in mlflow_features.source.to_json()