proto_json_utils.py
1 import base64 2 import datetime 3 import importlib 4 import json 5 import os 6 from collections import defaultdict 7 from copy import deepcopy 8 from functools import partial 9 from json import JSONEncoder 10 from typing import Any 11 12 import pydantic 13 from google.protobuf.descriptor import FieldDescriptor 14 from google.protobuf.duration_pb2 import Duration 15 from google.protobuf.json_format import MessageToJson, ParseDict 16 from google.protobuf.struct_pb2 import NULL_VALUE, Value 17 from google.protobuf.timestamp_pb2 import Timestamp 18 19 from mlflow.exceptions import MlflowException 20 21 _PROTOBUF_INT64_FIELDS = [ 22 FieldDescriptor.TYPE_INT64, 23 FieldDescriptor.TYPE_UINT64, 24 FieldDescriptor.TYPE_FIXED64, 25 FieldDescriptor.TYPE_SFIXED64, 26 FieldDescriptor.TYPE_SINT64, 27 ] 28 29 from mlflow.protos.databricks_pb2 import BAD_REQUEST 30 31 32 def _mark_int64_fields_for_proto_maps(proto_map, value_field_type): 33 """Converts a proto map to JSON, preserving only int64-related fields.""" 34 json_dict = {} 35 for key, value in proto_map.items(): 36 # The value of a protobuf map can only be a scalar or a message (not a map or repeated 37 # field). 38 if value_field_type == FieldDescriptor.TYPE_MESSAGE: 39 json_dict[key] = _mark_int64_fields(value) 40 elif value_field_type in _PROTOBUF_INT64_FIELDS: 41 json_dict[key] = int(value) 42 elif isinstance(key, int): 43 json_dict[key] = value 44 return json_dict 45 46 47 def _mark_int64_fields(proto_message): 48 """Converts a proto message to JSON, preserving only int64-related fields.""" 49 json_dict = {} 50 for field, value in proto_message.ListFields(): 51 if ( 52 # These three conditions check if this field is a protobuf map. 53 # See the official implementation: https://bit.ly/3EMx1rl 54 field.type == FieldDescriptor.TYPE_MESSAGE 55 and field.message_type.has_options 56 and field.message_type.GetOptions().map_entry 57 ): 58 # Deal with proto map fields separately in another function. 59 json_dict[field.name] = _mark_int64_fields_for_proto_maps( 60 value, field.message_type.fields_by_name["value"].type 61 ) 62 continue 63 64 if field.type == FieldDescriptor.TYPE_MESSAGE: 65 ftype = partial(_mark_int64_fields) 66 elif field.type in _PROTOBUF_INT64_FIELDS: 67 ftype = int 68 else: 69 # Skip all non-int64 fields. 70 continue 71 72 # Use is_repeated property (preferred) with fallback to deprecated label 73 try: 74 is_repeated = field.is_repeated 75 except AttributeError: 76 is_repeated = field.label == FieldDescriptor.LABEL_REPEATED 77 78 json_dict[field.name] = [ftype(v) for v in value] if is_repeated else ftype(value) 79 return json_dict 80 81 82 def _merge_json_dicts(from_dict, to_dict): 83 """Merges the json elements of from_dict into to_dict. Only works for json dicts 84 converted from proto messages 85 """ 86 for key, value in from_dict.items(): 87 if isinstance(key, int) and str(key) in to_dict: 88 # When the key (i.e. the proto field name) is an integer, it must be a proto map field 89 # with integer as the key. For example: 90 # from_dict is {'field_map': {1: '2', 3: '4'}} 91 # to_dict is {'field_map': {'1': '2', '3': '4'}} 92 # So we need to replace the str keys with int keys in to_dict. 93 to_dict[key] = to_dict[str(key)] 94 del to_dict[str(key)] 95 96 if key not in to_dict: 97 continue 98 99 if isinstance(value, dict): 100 _merge_json_dicts(from_dict[key], to_dict[key]) 101 elif isinstance(value, list): 102 for i, v in enumerate(value): 103 if isinstance(v, dict): 104 _merge_json_dicts(v, to_dict[key][i]) 105 else: 106 to_dict[key][i] = v 107 else: 108 to_dict[key] = from_dict[key] 109 return to_dict 110 111 112 def message_to_json(message): 113 """Converts a message to JSON, using snake_case for field names.""" 114 115 # Google's MessageToJson API converts int64 proto fields to JSON strings. 116 # For more info, see https://github.com/protocolbuffers/protobuf/issues/2954 117 json_dict_with_int64_as_str = json.loads( 118 MessageToJson(message, preserving_proto_field_name=True) 119 ) 120 # We convert this proto message into a JSON dict where only int64 proto fields 121 # are preserved, and they are treated as JSON numbers, not strings. 122 json_dict_with_int64_fields_only = _mark_int64_fields(message) 123 # By merging these two JSON dicts, we end up with a JSON dict where int64 proto fields are not 124 # converted to JSON strings. Int64 keys in proto maps will always be converted to JSON strings 125 # because JSON doesn't support non-string keys. 126 json_dict_with_int64_as_numbers = _merge_json_dicts( 127 json_dict_with_int64_fields_only, json_dict_with_int64_as_str 128 ) 129 return json.dumps(json_dict_with_int64_as_numbers, indent=2) 130 131 132 def proto_timestamp_to_milliseconds(timestamp: str) -> int: 133 """ 134 Converts a timestamp string (e.g. "2025-04-15T08:49:18.699Z") to milliseconds. 135 """ 136 t = Timestamp() 137 t.FromJsonString(timestamp) 138 return t.ToMilliseconds() 139 140 141 def milliseconds_to_proto_timestamp(milliseconds: int) -> str: 142 """ 143 Converts milliseconds to a timestamp string (e.g. "2025-04-15T08:49:18.699Z"). 144 """ 145 t = Timestamp() 146 t.FromMilliseconds(milliseconds) 147 return t.ToJsonString() 148 149 150 def proto_duration_to_milliseconds(duration: str) -> int: 151 """ 152 Converts a duration string (e.g. "1.5s") to milliseconds. 153 """ 154 d = Duration() 155 d.FromJsonString(duration) 156 return d.ToMilliseconds() 157 158 159 def milliseconds_to_proto_duration(milliseconds: int) -> str: 160 """ 161 Converts milliseconds to a duration string (e.g. "1.5s"). 162 """ 163 d = Duration() 164 d.FromMilliseconds(milliseconds) 165 return d.ToJsonString() 166 167 168 def parse_dict(js_dict, message): 169 """Parses a JSON dictionary into a message proto, ignoring unknown fields in the JSON.""" 170 ParseDict(js_dict=js_dict, message=message, ignore_unknown_fields=True) 171 172 173 def set_pb_value(proto: Value, value: Any): 174 """ 175 DO NOT USE THIS FUNCTION. Preserved for backwards compatibility. 176 177 Set a value to the google.protobuf.Value object. 178 """ 179 if isinstance(value, dict): 180 for key, val in value.items(): 181 set_pb_value(proto.struct_value.fields[key], val) 182 elif isinstance(value, list): 183 for val in value: 184 pb = Value() 185 set_pb_value(pb, val) 186 proto.list_value.values.append(pb) 187 elif isinstance(value, bool): 188 proto.bool_value = value 189 elif isinstance(value, (int, float)): 190 proto.number_value = value 191 elif isinstance(value, str): 192 proto.string_value = value 193 elif value is None: 194 proto.null_value = NULL_VALUE 195 196 else: 197 raise ValueError(f"Unsupported value type: {type(value)}") 198 199 200 def parse_pb_value(proto: Value) -> Any | None: 201 """ 202 DO NOT USE THIS FUNCTION. Preserved for backwards compatibility. 203 204 Extract a value from the google.protobuf.Value object. 205 """ 206 if proto.HasField("struct_value"): 207 return {key: parse_pb_value(val) for key, val in proto.struct_value.fields.items()} 208 elif proto.HasField("list_value"): 209 return [parse_pb_value(val) for val in proto.list_value.values] 210 elif proto.HasField("bool_value"): 211 return proto.bool_value 212 elif proto.HasField("number_value"): 213 return proto.number_value 214 elif proto.HasField("string_value"): 215 return proto.string_value 216 217 return None 218 219 220 class NumpyEncoder(JSONEncoder): 221 """Special json encoder for numpy types. 222 Note that some numpy types doesn't have native python equivalence, 223 hence json.dumps will raise TypeError. 224 In this case, you'll need to convert your numpy types into its closest python equivalence. 225 """ 226 227 def try_convert(self, o): 228 import numpy as np 229 import pandas as pd 230 231 def encode_binary(x): 232 return base64.encodebytes(x).decode("ascii") 233 234 if isinstance(o, np.ndarray): 235 if o.dtype == object: 236 return [self.try_convert(x)[0] for x in o.tolist()], True 237 elif o.dtype == np.bytes_: 238 return np.vectorize(encode_binary)(o), True 239 else: 240 return o.tolist(), True 241 242 if isinstance(o, np.generic): 243 return o.item(), True 244 if isinstance(o, (bytes, bytearray)): 245 return encode_binary(o), True 246 if isinstance(o, np.datetime64): 247 return np.datetime_as_string(o), True 248 if isinstance(o, (pd.Timestamp, datetime.date, datetime.datetime, datetime.time)): 249 return o.isoformat(), True 250 if isinstance(o, pydantic.BaseModel): 251 return o.model_dump(), True 252 return o, False 253 254 def default(self, o): 255 res, converted = self.try_convert(o) 256 if converted: 257 return res 258 else: 259 return super().default(o) 260 261 262 class MlflowInvalidInputException(MlflowException): 263 def __init__(self, message): 264 super().__init__(f"Invalid input. {message}", error_code=BAD_REQUEST) 265 266 267 class MlflowFailedTypeConversion(MlflowInvalidInputException): 268 def __init__(self, col_name, col_type, ex): 269 super().__init__( 270 message=f"Data is not compatible with model signature. " 271 f"Failed to convert column {col_name} to type '{col_type}'. Error: '{ex!r}'" 272 ) 273 274 275 def cast_df_types_according_to_schema(pdf, schema): 276 import numpy as np 277 import pandas as pd 278 279 from mlflow.models.utils import _enforce_array, _enforce_map, _enforce_object 280 from mlflow.types.schema import AnyType, Array, DataType, Map, Object 281 282 actual_cols = set(pdf.columns) 283 if schema.has_input_names(): 284 dtype_list = zip(schema.input_names(), schema.input_types()) 285 elif schema.is_tensor_spec() and len(schema.input_types()) == 1: 286 dtype_list = zip(actual_cols, [schema.input_types()[0] for _ in actual_cols]) 287 else: 288 n = min(len(schema.input_types()), len(pdf.columns)) 289 dtype_list = zip(pdf.columns[:n], schema.input_types()[:n]) 290 required_input_names = set(schema.required_input_names()) 291 292 for col_name, col_type_spec in dtype_list: 293 if isinstance(col_type_spec, DataType): 294 col_type = col_type_spec.to_pandas() 295 else: 296 col_type = col_type_spec 297 if col_name in actual_cols: 298 required = col_name in required_input_names 299 try: 300 if isinstance(col_type_spec, DataType) and col_type_spec == DataType.binary: 301 # NB: We expect binary data to be passed base64 encoded 302 pdf[col_name] = pdf[col_name].map( 303 lambda x: base64.decodebytes(bytes(x, "utf8")) 304 ) 305 elif col_type == np.dtype(bytes): 306 pdf[col_name] = pdf[col_name].map(lambda x: bytes(x, "utf8")) 307 elif schema.is_tensor_spec() and isinstance(pdf[col_name].iloc[0], list): 308 # For dataframe with multidimensional column, it contains 309 # list type values, we cannot convert 310 # its type by `astype`, skip conversion. 311 # The conversion will be done in `_enforce_schema` while 312 # `PyFuncModel.predict` being called. 313 pass 314 elif isinstance(col_type_spec, Array): 315 pdf[col_name] = pdf[col_name].map( 316 lambda x: _enforce_array(x, col_type_spec, required=required) 317 ) 318 elif isinstance(col_type_spec, Object): 319 pdf[col_name] = pdf[col_name].map( 320 lambda x: _enforce_object(x, col_type_spec, required=required) 321 ) 322 elif isinstance(col_type_spec, Map): 323 pdf[col_name] = pdf[col_name].map( 324 lambda x: _enforce_map(x, col_type_spec, required=required) 325 ) 326 elif isinstance(col_type_spec, AnyType): 327 pass 328 elif isinstance(col_type_spec, DataType) and col_type_spec == DataType.datetime: 329 pdf[col_name] = pd.to_datetime(pdf[col_name]) 330 else: 331 # In pandas 3.0+, string columns with NaN are inferred as StringDtype 332 # instead of object. Skip casting StringDtype to object/numpy str as they 333 # are compatible; casting would downgrade StringDtype back to object. 334 if ( 335 col_type == object 336 or (isinstance(col_type, np.dtype) and col_type.kind == "U") 337 ) and isinstance(pdf[col_name].dtype, pd.StringDtype): 338 continue 339 pdf[col_name] = pdf[col_name].astype(col_type) 340 except Exception as ex: 341 raise MlflowFailedTypeConversion(col_name, col_type, ex) 342 return pdf 343 344 345 def dataframe_from_parsed_json(decoded_input, pandas_orient, schema=None): 346 """Convert parsed json into pandas.DataFrame. If schema is provided this methods will attempt to 347 cast data types according to the schema. This include base64 decoding for binary columns. 348 349 Args: 350 decoded_input: Parsed json - either a list or a dictionary. 351 pandas_orient: pandas data frame convention used to store the data. 352 schema: MLflow schema used when parsing the data. 353 354 Returns: 355 pandas.DataFrame. 356 """ 357 import pandas as pd 358 359 if pandas_orient == "records": 360 if not isinstance(decoded_input, list): 361 if isinstance(decoded_input, dict): 362 typemessage = "dictionary" 363 else: 364 typemessage = f"type {type(decoded_input)}" 365 raise MlflowInvalidInputException( 366 f"Dataframe records format must be a list of records. Got {typemessage}." 367 ) 368 try: 369 pdf = pd.DataFrame(data=decoded_input) 370 except Exception as ex: 371 raise MlflowInvalidInputException( 372 f"Provided dataframe_records field is not a valid dataframe representation in " 373 f"'records' format. Error: '{ex}'" 374 ) 375 elif pandas_orient == "split": 376 if not isinstance(decoded_input, dict): 377 if isinstance(decoded_input, list): 378 typemessage = "list" 379 else: 380 typemessage = f"type {type(decoded_input)}" 381 raise MlflowInvalidInputException( 382 f"Dataframe split format must be a dictionary. Got {typemessage}." 383 ) 384 keys = set(decoded_input.keys()) 385 missing_data = "data" not in keys 386 extra_keys = keys.difference({"columns", "data", "index"}) 387 if missing_data or extra_keys: 388 raise MlflowInvalidInputException( 389 f"Dataframe split format must have 'data' field and optionally 'columns' " 390 f"and 'index' fields. Got {keys}.'" 391 ) 392 try: 393 pdf = pd.DataFrame( 394 index=decoded_input.get("index"), 395 columns=decoded_input.get("columns"), 396 data=decoded_input["data"], 397 ) 398 except Exception as ex: 399 raise MlflowInvalidInputException( 400 f"Provided dataframe_split field is not a valid dataframe representation in " 401 f"'split' format. Error: '{ex}'" 402 ) 403 if schema is not None: 404 pdf = cast_df_types_according_to_schema(pdf, schema) 405 return pdf 406 407 408 def dataframe_from_raw_json(path_or_str, schema=None, pandas_orient: str = "split"): 409 """Parse raw json into a pandas.Dataframe. 410 411 If schema is provided this methods will attempt to cast data types according to the schema. This 412 include base64 decoding for binary columns. 413 414 Args: 415 path_or_str: Path to a json file or a json string. 416 schema: MLflow schema used when parsing the data. 417 pandas_orient: pandas data frame convention used to store the data. 418 419 Returns: 420 pandas.DataFrame. 421 """ 422 if os.path.exists(path_or_str): 423 with open(path_or_str) as f: 424 parsed_json = json.load(f) 425 else: 426 parsed_json = json.loads(path_or_str) 427 428 return dataframe_from_parsed_json(parsed_json, pandas_orient, schema) 429 430 431 def _get_jsonable_obj(data, pandas_orient="records"): 432 """Attempt to make the data json-able via standard library. 433 434 Look for some commonly used types that are not jsonable and convert them into json-able ones. 435 Unknown data types are returned as is. 436 437 Args: 438 data: Data to be converted, works with pandas and numpy, rest will be returned as is. 439 pandas_orient: If `data` is a Pandas DataFrame, it will be converted to a JSON 440 dictionary using this Pandas serialization orientation. 441 """ 442 import numpy as np 443 import pandas as pd 444 445 if isinstance(data, np.ndarray): 446 return data.tolist() 447 if isinstance(data, pd.DataFrame): 448 return data.to_dict(orient=pandas_orient) 449 if isinstance(data, pd.Series): 450 return pd.DataFrame(data).to_dict(orient=pandas_orient) 451 else: # by default just return whatever this is and hope for the best 452 return data 453 454 455 def convert_data_type(data, spec): 456 """ 457 Convert input data to the type specified in the spec. 458 459 Args: 460 data: Input data. 461 spec: ColSpec or TensorSpec. 462 """ 463 import numpy as np 464 465 from mlflow.models.utils import _enforce_array, _enforce_map, _enforce_object 466 from mlflow.types.schema import AnyType, Array, ColSpec, DataType, Map, Object, TensorSpec 467 468 try: 469 if spec is None: 470 return np.array(data) 471 if isinstance(spec, TensorSpec): 472 return np.array(data, dtype=spec.type) 473 if isinstance(spec, ColSpec): 474 if isinstance(spec.type, DataType): 475 return ( 476 np.array(data, spec.type.to_numpy()) 477 if isinstance(data, (list, np.ndarray)) 478 else np.array([data], spec.type.to_numpy())[0] 479 ) 480 elif isinstance(spec.type, Array): 481 # convert to numpy array for backwards compatibility 482 return np.array(_enforce_array(data, spec.type, required=spec.required)) 483 elif isinstance(spec.type, Object): 484 return _enforce_object(data, spec.type, required=spec.required) 485 elif isinstance(spec.type, Map): 486 return _enforce_map(data, spec.type, required=spec.required) 487 elif isinstance(spec.type, AnyType): 488 return data 489 except MlflowException as e: 490 raise MlflowInvalidInputException(e.message) 491 except Exception as ex: 492 raise MlflowInvalidInputException(f"{ex}") 493 494 raise MlflowInvalidInputException( 495 f"Failed to convert data type for data `{data}` with spec `{spec}`." 496 ) 497 498 499 def _cast_schema_type(input_data, schema=None): 500 import numpy as np 501 502 input_data = deepcopy(input_data) 503 # spec_name -> spec mapping 504 types_dict = schema.input_dict() if schema and schema.has_input_names() else {} 505 if schema is not None: 506 if ( 507 len(types_dict) == 1 508 and isinstance(input_data, list) 509 and not any(isinstance(x, dict) for x in input_data) 510 ): 511 # for data with a single column (not List[Dict]), match input with column 512 input_data = {next(iter(types_dict)): input_data} 513 # Un-named schema should only contain a single column or a single value 514 elif not schema.has_input_names() and not ( 515 isinstance(input_data, list) or np.isscalar(input_data) 516 ): 517 raise MlflowInvalidInputException( 518 "Failed to parse input data. This model contains an un-named " 519 " model signature which expects a single n-dimensional array or " 520 "a single value as input, however, an input of type " 521 f"{type(input_data)} was found." 522 ) 523 if isinstance(input_data, dict): 524 # each key corresponds to a column, values should be 525 # checked against the schema 526 input_data = { 527 col: convert_data_type(data, types_dict.get(col)) for col, data in input_data.items() 528 } 529 elif isinstance(input_data, list): 530 # List of dictionaries of column_name -> value mapping 531 # List[Dict] must correspond to a schema with named columns 532 if all(isinstance(x, dict) for x in input_data): 533 input_data = [ 534 {col: convert_data_type(value, types_dict.get(col)) for col, value in data.items()} 535 for data in input_data 536 ] 537 # List of values 538 else: 539 spec = schema.inputs[0] if schema else None 540 input_data = convert_data_type(input_data, spec) 541 else: 542 spec = schema.inputs[0] if schema else None 543 try: 544 input_data = convert_data_type(input_data, spec) 545 except Exception as e: 546 raise MlflowInvalidInputException( 547 f"Failed to convert data `{input_data}` to type `{spec}` defined " 548 "in the model signature." 549 ) from e 550 return input_data 551 552 553 def parse_instances_data(data, schema=None): 554 import numpy as np 555 556 from mlflow.types.schema import Array 557 558 if "instances" not in data: 559 raise MlflowInvalidInputException("Expecting data to have `instances` as key.") 560 data = data["instances"] 561 # List[Dict] 562 if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict): 563 # convert items to column format (map column/input name to tensor) 564 data_dict = defaultdict(list) 565 types_dict = schema.input_dict() if schema and schema.has_input_names() else {} 566 for item in data: 567 for col, v in item.items(): 568 data_dict[col].append(convert_data_type(v, types_dict.get(col))) 569 # convert to numpy array for backwards compatibility 570 data = {col: np.array(v) for col, v in data_dict.items()} 571 else: 572 data = _cast_schema_type(data, schema) 573 574 # Sanity check inputted data. This check will only be applied 575 # when the row-format `instances` is used since it requires 576 # same 0-th dimension for all items. 577 if isinstance(data, dict): 578 # ensure all columns have the same number of items 579 # Only check the data when it's a list or numpy array 580 check_data = {k: v for k, v in data.items() if isinstance(v, (list, np.ndarray))} 581 if schema and schema.has_input_names(): 582 # Only check required columns 583 required_cols = schema.required_input_names() 584 # For Array schema we should not check the length of the data matching 585 check_cols = { 586 col for col, spec in schema.input_dict().items() if not isinstance(spec.type, Array) 587 } 588 check_cols = list(set(required_cols) & check_cols & set(check_data.keys())) 589 else: 590 check_cols = list(check_data.keys()) 591 592 if check_cols: 593 expected_len = len(check_data[check_cols[0]]) 594 if not all(len(check_data[col]) == expected_len for col in check_cols[1:]): 595 raise MlflowInvalidInputException( 596 "The length of values for each input/column name are not the same" 597 ) 598 return data 599 600 601 # TODO: Reuse this function for `inputs` key data parsing in serving, and 602 # add `convert_to_numpy` param to avoid converting data to numpy arrays for 603 # genAI flavors. 604 def parse_inputs_data(inputs_data_or_path, schema=None): 605 """ 606 Helper function to cast inputs_data based on the schema. 607 Inputs data must be able to pass to the model for pyfunc predict directly. 608 609 Args: 610 inputs_data_or_path: A json-serializable object or path to a json file 611 schema: data schema to cast to. Be of type `mlflow.types.Schema`. 612 """ 613 if isinstance(inputs_data_or_path, str) and os.path.exists(inputs_data_or_path): 614 with open(inputs_data_or_path) as handle: 615 inputs_data = json.load(handle) 616 else: 617 inputs_data = inputs_data_or_path 618 return _cast_schema_type(inputs_data, schema) 619 620 621 def parse_tf_serving_input(inp_dict, schema=None): 622 """ 623 Args: 624 inp_dict: A dict deserialized from a JSON string formatted as described in TF's 625 serving API doc 626 (https://www.tensorflow.org/tfx/serving/api_rest#request_format_2) 627 schema: MLflow schema used when parsing the data. 628 """ 629 630 if "signature_name" in inp_dict: 631 raise MlflowInvalidInputException('"signature_name" parameter is currently not supported') 632 633 if not (list(inp_dict.keys()) == ["instances"] or list(inp_dict.keys()) == ["inputs"]): 634 raise MlflowInvalidInputException( 635 'One of "instances" and "inputs" must be specified (not both or any other keys).' 636 f"Received: {list(inp_dict.keys())}" 637 ) 638 639 # Read the JSON 640 try: 641 # objects & arrays schema for List[Dict] and Dict[List] are different 642 # so the conversion for `instances` below changes the schema. 643 # e.g. 644 # [{"col1": 1, "col2": 2}, {"col1": 3, "col2": 4}] -> {"col1": [1, 3], "col2": [2, 4]} 645 # Schema([ColSpec(long, "col1"), ColSpec(long, "col2")]) -> 646 # Schema([ColSpec(Array(long), "col1"), ColSpec(Array(long), "col2")]) 647 # To avoid this, we shouldn't use `instances` for such data. 648 if "instances" in inp_dict: 649 return parse_instances_data(inp_dict, schema) 650 else: 651 # items already in column format, convert values to tensor 652 return _cast_schema_type(inp_dict["inputs"], schema) 653 except MlflowException as e: 654 raise e 655 except Exception as e: 656 # Add error into message to provide details for serving usage 657 raise MlflowInvalidInputException( 658 f"Ensure that the input is a valid JSON-formatted string.\nError: {e!r}" 659 ) from e 660 661 662 # Reference: https://stackoverflow.com/a/12126976 663 class _CustomJsonEncoder(json.JSONEncoder): 664 def default(self, o): 665 import numpy as np 666 import pandas as pd 667 668 if isinstance(o, (datetime.datetime, datetime.date, datetime.time, pd.Timestamp)): 669 return o.isoformat() 670 671 if isinstance(o, np.ndarray): 672 return o.tolist() 673 674 return super().default(o) 675 676 677 def get_jsonable_input(name, data): 678 import numpy as np 679 680 if isinstance(data, np.ndarray): 681 return data.tolist() 682 else: 683 raise MlflowException(f"Incompatible input type:{type(data)} for input {name}.") 684 685 686 def dump_input_data(data, inputs_key="inputs", params: dict[str, Any] | None = None): 687 """ 688 Args: 689 data: Input data. 690 inputs_key: Key to represent data in the request payload. 691 params: Additional parameters to pass to the model for inference. 692 """ 693 import numpy as np 694 import pandas as pd 695 696 # Convert scipy data to numpy array 697 if importlib.util.find_spec("scipy.sparse"): 698 from scipy.sparse import csc_matrix, csr_matrix 699 700 if isinstance(data, (csc_matrix, csr_matrix)): 701 data = data.toarray() 702 703 if isinstance(data, pd.DataFrame): 704 post_data = {"dataframe_split": data.to_dict(orient="split")} 705 elif isinstance(data, dict): 706 post_data = {inputs_key: {k: get_jsonable_input(k, v) for k, v in data}} 707 elif isinstance(data, np.ndarray): 708 post_data = {inputs_key: data.tolist()} 709 elif isinstance(data, list): 710 post_data = {inputs_key: data} 711 else: 712 post_data = data 713 714 if params is not None: 715 if not isinstance(params, dict): 716 raise MlflowException( 717 f"Params must be a dictionary. Got type '{type(params).__name__}'." 718 ) 719 # if post_data is not dictionary, params should be included in post_data directly 720 if isinstance(post_data, dict): 721 post_data["params"] = params 722 723 if not isinstance(post_data, str): 724 post_data = json.dumps(post_data, cls=_CustomJsonEncoder) 725 726 return post_data