/ mlflow / utils / proto_json_utils.py
proto_json_utils.py
  1  import base64
  2  import datetime
  3  import importlib
  4  import json
  5  import os
  6  from collections import defaultdict
  7  from copy import deepcopy
  8  from functools import partial
  9  from json import JSONEncoder
 10  from typing import Any
 11  
 12  import pydantic
 13  from google.protobuf.descriptor import FieldDescriptor
 14  from google.protobuf.duration_pb2 import Duration
 15  from google.protobuf.json_format import MessageToJson, ParseDict
 16  from google.protobuf.struct_pb2 import NULL_VALUE, Value
 17  from google.protobuf.timestamp_pb2 import Timestamp
 18  
 19  from mlflow.exceptions import MlflowException
 20  
 21  _PROTOBUF_INT64_FIELDS = [
 22      FieldDescriptor.TYPE_INT64,
 23      FieldDescriptor.TYPE_UINT64,
 24      FieldDescriptor.TYPE_FIXED64,
 25      FieldDescriptor.TYPE_SFIXED64,
 26      FieldDescriptor.TYPE_SINT64,
 27  ]
 28  
 29  from mlflow.protos.databricks_pb2 import BAD_REQUEST
 30  
 31  
 32  def _mark_int64_fields_for_proto_maps(proto_map, value_field_type):
 33      """Converts a proto map to JSON, preserving only int64-related fields."""
 34      json_dict = {}
 35      for key, value in proto_map.items():
 36          # The value of a protobuf map can only be a scalar or a message (not a map or repeated
 37          # field).
 38          if value_field_type == FieldDescriptor.TYPE_MESSAGE:
 39              json_dict[key] = _mark_int64_fields(value)
 40          elif value_field_type in _PROTOBUF_INT64_FIELDS:
 41              json_dict[key] = int(value)
 42          elif isinstance(key, int):
 43              json_dict[key] = value
 44      return json_dict
 45  
 46  
 47  def _mark_int64_fields(proto_message):
 48      """Converts a proto message to JSON, preserving only int64-related fields."""
 49      json_dict = {}
 50      for field, value in proto_message.ListFields():
 51          if (
 52              # These three conditions check if this field is a protobuf map.
 53              # See the official implementation: https://bit.ly/3EMx1rl
 54              field.type == FieldDescriptor.TYPE_MESSAGE
 55              and field.message_type.has_options
 56              and field.message_type.GetOptions().map_entry
 57          ):
 58              # Deal with proto map fields separately in another function.
 59              json_dict[field.name] = _mark_int64_fields_for_proto_maps(
 60                  value, field.message_type.fields_by_name["value"].type
 61              )
 62              continue
 63  
 64          if field.type == FieldDescriptor.TYPE_MESSAGE:
 65              ftype = partial(_mark_int64_fields)
 66          elif field.type in _PROTOBUF_INT64_FIELDS:
 67              ftype = int
 68          else:
 69              # Skip all non-int64 fields.
 70              continue
 71  
 72          # Use is_repeated property (preferred) with fallback to deprecated label
 73          try:
 74              is_repeated = field.is_repeated
 75          except AttributeError:
 76              is_repeated = field.label == FieldDescriptor.LABEL_REPEATED
 77  
 78          json_dict[field.name] = [ftype(v) for v in value] if is_repeated else ftype(value)
 79      return json_dict
 80  
 81  
 82  def _merge_json_dicts(from_dict, to_dict):
 83      """Merges the json elements of from_dict into to_dict. Only works for json dicts
 84      converted from proto messages
 85      """
 86      for key, value in from_dict.items():
 87          if isinstance(key, int) and str(key) in to_dict:
 88              # When the key (i.e. the proto field name) is an integer, it must be a proto map field
 89              # with integer as the key. For example:
 90              # from_dict is {'field_map': {1: '2', 3: '4'}}
 91              # to_dict is {'field_map': {'1': '2', '3': '4'}}
 92              # So we need to replace the str keys with int keys in to_dict.
 93              to_dict[key] = to_dict[str(key)]
 94              del to_dict[str(key)]
 95  
 96          if key not in to_dict:
 97              continue
 98  
 99          if isinstance(value, dict):
100              _merge_json_dicts(from_dict[key], to_dict[key])
101          elif isinstance(value, list):
102              for i, v in enumerate(value):
103                  if isinstance(v, dict):
104                      _merge_json_dicts(v, to_dict[key][i])
105                  else:
106                      to_dict[key][i] = v
107          else:
108              to_dict[key] = from_dict[key]
109      return to_dict
110  
111  
112  def message_to_json(message):
113      """Converts a message to JSON, using snake_case for field names."""
114  
115      # Google's MessageToJson API converts int64 proto fields to JSON strings.
116      # For more info, see https://github.com/protocolbuffers/protobuf/issues/2954
117      json_dict_with_int64_as_str = json.loads(
118          MessageToJson(message, preserving_proto_field_name=True)
119      )
120      # We convert this proto message into a JSON dict where only int64 proto fields
121      # are preserved, and they are treated as JSON numbers, not strings.
122      json_dict_with_int64_fields_only = _mark_int64_fields(message)
123      # By merging these two JSON dicts, we end up with a JSON dict where int64 proto fields are not
124      # converted to JSON strings. Int64 keys in proto maps will always be converted to JSON strings
125      # because JSON doesn't support non-string keys.
126      json_dict_with_int64_as_numbers = _merge_json_dicts(
127          json_dict_with_int64_fields_only, json_dict_with_int64_as_str
128      )
129      return json.dumps(json_dict_with_int64_as_numbers, indent=2)
130  
131  
132  def proto_timestamp_to_milliseconds(timestamp: str) -> int:
133      """
134      Converts a timestamp string (e.g. "2025-04-15T08:49:18.699Z") to milliseconds.
135      """
136      t = Timestamp()
137      t.FromJsonString(timestamp)
138      return t.ToMilliseconds()
139  
140  
141  def milliseconds_to_proto_timestamp(milliseconds: int) -> str:
142      """
143      Converts milliseconds to a timestamp string (e.g. "2025-04-15T08:49:18.699Z").
144      """
145      t = Timestamp()
146      t.FromMilliseconds(milliseconds)
147      return t.ToJsonString()
148  
149  
150  def proto_duration_to_milliseconds(duration: str) -> int:
151      """
152      Converts a duration string (e.g. "1.5s") to milliseconds.
153      """
154      d = Duration()
155      d.FromJsonString(duration)
156      return d.ToMilliseconds()
157  
158  
159  def milliseconds_to_proto_duration(milliseconds: int) -> str:
160      """
161      Converts milliseconds to a duration string (e.g. "1.5s").
162      """
163      d = Duration()
164      d.FromMilliseconds(milliseconds)
165      return d.ToJsonString()
166  
167  
168  def parse_dict(js_dict, message):
169      """Parses a JSON dictionary into a message proto, ignoring unknown fields in the JSON."""
170      ParseDict(js_dict=js_dict, message=message, ignore_unknown_fields=True)
171  
172  
173  def set_pb_value(proto: Value, value: Any):
174      """
175      DO NOT USE THIS FUNCTION. Preserved for backwards compatibility.
176  
177      Set a value to the google.protobuf.Value object.
178      """
179      if isinstance(value, dict):
180          for key, val in value.items():
181              set_pb_value(proto.struct_value.fields[key], val)
182      elif isinstance(value, list):
183          for val in value:
184              pb = Value()
185              set_pb_value(pb, val)
186              proto.list_value.values.append(pb)
187      elif isinstance(value, bool):
188          proto.bool_value = value
189      elif isinstance(value, (int, float)):
190          proto.number_value = value
191      elif isinstance(value, str):
192          proto.string_value = value
193      elif value is None:
194          proto.null_value = NULL_VALUE
195  
196      else:
197          raise ValueError(f"Unsupported value type: {type(value)}")
198  
199  
200  def parse_pb_value(proto: Value) -> Any | None:
201      """
202      DO NOT USE THIS FUNCTION. Preserved for backwards compatibility.
203  
204      Extract a value from the google.protobuf.Value object.
205      """
206      if proto.HasField("struct_value"):
207          return {key: parse_pb_value(val) for key, val in proto.struct_value.fields.items()}
208      elif proto.HasField("list_value"):
209          return [parse_pb_value(val) for val in proto.list_value.values]
210      elif proto.HasField("bool_value"):
211          return proto.bool_value
212      elif proto.HasField("number_value"):
213          return proto.number_value
214      elif proto.HasField("string_value"):
215          return proto.string_value
216  
217      return None
218  
219  
220  class NumpyEncoder(JSONEncoder):
221      """Special json encoder for numpy types.
222      Note that some numpy types doesn't have native python equivalence,
223      hence json.dumps will raise TypeError.
224      In this case, you'll need to convert your numpy types into its closest python equivalence.
225      """
226  
227      def try_convert(self, o):
228          import numpy as np
229          import pandas as pd
230  
231          def encode_binary(x):
232              return base64.encodebytes(x).decode("ascii")
233  
234          if isinstance(o, np.ndarray):
235              if o.dtype == object:
236                  return [self.try_convert(x)[0] for x in o.tolist()], True
237              elif o.dtype == np.bytes_:
238                  return np.vectorize(encode_binary)(o), True
239              else:
240                  return o.tolist(), True
241  
242          if isinstance(o, np.generic):
243              return o.item(), True
244          if isinstance(o, (bytes, bytearray)):
245              return encode_binary(o), True
246          if isinstance(o, np.datetime64):
247              return np.datetime_as_string(o), True
248          if isinstance(o, (pd.Timestamp, datetime.date, datetime.datetime, datetime.time)):
249              return o.isoformat(), True
250          if isinstance(o, pydantic.BaseModel):
251              return o.model_dump(), True
252          return o, False
253  
254      def default(self, o):
255          res, converted = self.try_convert(o)
256          if converted:
257              return res
258          else:
259              return super().default(o)
260  
261  
262  class MlflowInvalidInputException(MlflowException):
263      def __init__(self, message):
264          super().__init__(f"Invalid input. {message}", error_code=BAD_REQUEST)
265  
266  
267  class MlflowFailedTypeConversion(MlflowInvalidInputException):
268      def __init__(self, col_name, col_type, ex):
269          super().__init__(
270              message=f"Data is not compatible with model signature. "
271              f"Failed to convert column {col_name} to type '{col_type}'. Error: '{ex!r}'"
272          )
273  
274  
275  def cast_df_types_according_to_schema(pdf, schema):
276      import numpy as np
277      import pandas as pd
278  
279      from mlflow.models.utils import _enforce_array, _enforce_map, _enforce_object
280      from mlflow.types.schema import AnyType, Array, DataType, Map, Object
281  
282      actual_cols = set(pdf.columns)
283      if schema.has_input_names():
284          dtype_list = zip(schema.input_names(), schema.input_types())
285      elif schema.is_tensor_spec() and len(schema.input_types()) == 1:
286          dtype_list = zip(actual_cols, [schema.input_types()[0] for _ in actual_cols])
287      else:
288          n = min(len(schema.input_types()), len(pdf.columns))
289          dtype_list = zip(pdf.columns[:n], schema.input_types()[:n])
290      required_input_names = set(schema.required_input_names())
291  
292      for col_name, col_type_spec in dtype_list:
293          if isinstance(col_type_spec, DataType):
294              col_type = col_type_spec.to_pandas()
295          else:
296              col_type = col_type_spec
297          if col_name in actual_cols:
298              required = col_name in required_input_names
299              try:
300                  if isinstance(col_type_spec, DataType) and col_type_spec == DataType.binary:
301                      # NB: We expect binary data to be passed base64 encoded
302                      pdf[col_name] = pdf[col_name].map(
303                          lambda x: base64.decodebytes(bytes(x, "utf8"))
304                      )
305                  elif col_type == np.dtype(bytes):
306                      pdf[col_name] = pdf[col_name].map(lambda x: bytes(x, "utf8"))
307                  elif schema.is_tensor_spec() and isinstance(pdf[col_name].iloc[0], list):
308                      # For dataframe with multidimensional column, it contains
309                      # list type values, we cannot convert
310                      # its type by `astype`, skip conversion.
311                      # The conversion will be done in `_enforce_schema` while
312                      # `PyFuncModel.predict` being called.
313                      pass
314                  elif isinstance(col_type_spec, Array):
315                      pdf[col_name] = pdf[col_name].map(
316                          lambda x: _enforce_array(x, col_type_spec, required=required)
317                      )
318                  elif isinstance(col_type_spec, Object):
319                      pdf[col_name] = pdf[col_name].map(
320                          lambda x: _enforce_object(x, col_type_spec, required=required)
321                      )
322                  elif isinstance(col_type_spec, Map):
323                      pdf[col_name] = pdf[col_name].map(
324                          lambda x: _enforce_map(x, col_type_spec, required=required)
325                      )
326                  elif isinstance(col_type_spec, AnyType):
327                      pass
328                  elif isinstance(col_type_spec, DataType) and col_type_spec == DataType.datetime:
329                      pdf[col_name] = pd.to_datetime(pdf[col_name])
330                  else:
331                      # In pandas 3.0+, string columns with NaN are inferred as StringDtype
332                      # instead of object. Skip casting StringDtype to object/numpy str as they
333                      # are compatible; casting would downgrade StringDtype back to object.
334                      if (
335                          col_type == object
336                          or (isinstance(col_type, np.dtype) and col_type.kind == "U")
337                      ) and isinstance(pdf[col_name].dtype, pd.StringDtype):
338                          continue
339                      pdf[col_name] = pdf[col_name].astype(col_type)
340              except Exception as ex:
341                  raise MlflowFailedTypeConversion(col_name, col_type, ex)
342      return pdf
343  
344  
345  def dataframe_from_parsed_json(decoded_input, pandas_orient, schema=None):
346      """Convert parsed json into pandas.DataFrame. If schema is provided this methods will attempt to
347      cast data types according to the schema. This include base64 decoding for binary columns.
348  
349      Args:
350          decoded_input: Parsed json - either a list or a dictionary.
351          pandas_orient: pandas data frame convention used to store the data.
352          schema: MLflow schema used when parsing the data.
353  
354      Returns:
355          pandas.DataFrame.
356      """
357      import pandas as pd
358  
359      if pandas_orient == "records":
360          if not isinstance(decoded_input, list):
361              if isinstance(decoded_input, dict):
362                  typemessage = "dictionary"
363              else:
364                  typemessage = f"type {type(decoded_input)}"
365              raise MlflowInvalidInputException(
366                  f"Dataframe records format must be a list of records. Got {typemessage}."
367              )
368          try:
369              pdf = pd.DataFrame(data=decoded_input)
370          except Exception as ex:
371              raise MlflowInvalidInputException(
372                  f"Provided dataframe_records field is not a valid dataframe representation in "
373                  f"'records' format. Error: '{ex}'"
374              )
375      elif pandas_orient == "split":
376          if not isinstance(decoded_input, dict):
377              if isinstance(decoded_input, list):
378                  typemessage = "list"
379              else:
380                  typemessage = f"type {type(decoded_input)}"
381              raise MlflowInvalidInputException(
382                  f"Dataframe split format must be a dictionary. Got {typemessage}."
383              )
384          keys = set(decoded_input.keys())
385          missing_data = "data" not in keys
386          extra_keys = keys.difference({"columns", "data", "index"})
387          if missing_data or extra_keys:
388              raise MlflowInvalidInputException(
389                  f"Dataframe split format must have 'data' field and optionally 'columns' "
390                  f"and 'index' fields. Got {keys}.'"
391              )
392          try:
393              pdf = pd.DataFrame(
394                  index=decoded_input.get("index"),
395                  columns=decoded_input.get("columns"),
396                  data=decoded_input["data"],
397              )
398          except Exception as ex:
399              raise MlflowInvalidInputException(
400                  f"Provided dataframe_split field is not a valid dataframe representation in "
401                  f"'split' format. Error: '{ex}'"
402              )
403      if schema is not None:
404          pdf = cast_df_types_according_to_schema(pdf, schema)
405      return pdf
406  
407  
408  def dataframe_from_raw_json(path_or_str, schema=None, pandas_orient: str = "split"):
409      """Parse raw json into a pandas.Dataframe.
410  
411      If schema is provided this methods will attempt to cast data types according to the schema. This
412      include base64 decoding for binary columns.
413  
414      Args:
415          path_or_str: Path to a json file or a json string.
416          schema: MLflow schema used when parsing the data.
417          pandas_orient: pandas data frame convention used to store the data.
418  
419      Returns:
420          pandas.DataFrame.
421      """
422      if os.path.exists(path_or_str):
423          with open(path_or_str) as f:
424              parsed_json = json.load(f)
425      else:
426          parsed_json = json.loads(path_or_str)
427  
428      return dataframe_from_parsed_json(parsed_json, pandas_orient, schema)
429  
430  
431  def _get_jsonable_obj(data, pandas_orient="records"):
432      """Attempt to make the data json-able via standard library.
433  
434      Look for some commonly used types that are not jsonable and convert them into json-able ones.
435      Unknown data types are returned as is.
436  
437      Args:
438          data: Data to be converted, works with pandas and numpy, rest will be returned as is.
439          pandas_orient: If `data` is a Pandas DataFrame, it will be converted to a JSON
440              dictionary using this Pandas serialization orientation.
441      """
442      import numpy as np
443      import pandas as pd
444  
445      if isinstance(data, np.ndarray):
446          return data.tolist()
447      if isinstance(data, pd.DataFrame):
448          return data.to_dict(orient=pandas_orient)
449      if isinstance(data, pd.Series):
450          return pd.DataFrame(data).to_dict(orient=pandas_orient)
451      else:  # by default just return whatever this is and hope for the best
452          return data
453  
454  
455  def convert_data_type(data, spec):
456      """
457      Convert input data to the type specified in the spec.
458  
459      Args:
460          data: Input data.
461          spec: ColSpec or TensorSpec.
462      """
463      import numpy as np
464  
465      from mlflow.models.utils import _enforce_array, _enforce_map, _enforce_object
466      from mlflow.types.schema import AnyType, Array, ColSpec, DataType, Map, Object, TensorSpec
467  
468      try:
469          if spec is None:
470              return np.array(data)
471          if isinstance(spec, TensorSpec):
472              return np.array(data, dtype=spec.type)
473          if isinstance(spec, ColSpec):
474              if isinstance(spec.type, DataType):
475                  return (
476                      np.array(data, spec.type.to_numpy())
477                      if isinstance(data, (list, np.ndarray))
478                      else np.array([data], spec.type.to_numpy())[0]
479                  )
480              elif isinstance(spec.type, Array):
481                  # convert to numpy array for backwards compatibility
482                  return np.array(_enforce_array(data, spec.type, required=spec.required))
483              elif isinstance(spec.type, Object):
484                  return _enforce_object(data, spec.type, required=spec.required)
485              elif isinstance(spec.type, Map):
486                  return _enforce_map(data, spec.type, required=spec.required)
487              elif isinstance(spec.type, AnyType):
488                  return data
489      except MlflowException as e:
490          raise MlflowInvalidInputException(e.message)
491      except Exception as ex:
492          raise MlflowInvalidInputException(f"{ex}")
493  
494      raise MlflowInvalidInputException(
495          f"Failed to convert data type for data `{data}` with spec `{spec}`."
496      )
497  
498  
499  def _cast_schema_type(input_data, schema=None):
500      import numpy as np
501  
502      input_data = deepcopy(input_data)
503      # spec_name -> spec mapping
504      types_dict = schema.input_dict() if schema and schema.has_input_names() else {}
505      if schema is not None:
506          if (
507              len(types_dict) == 1
508              and isinstance(input_data, list)
509              and not any(isinstance(x, dict) for x in input_data)
510          ):
511              # for data with a single column (not List[Dict]), match input with column
512              input_data = {next(iter(types_dict)): input_data}
513          # Un-named schema should only contain a single column or a single value
514          elif not schema.has_input_names() and not (
515              isinstance(input_data, list) or np.isscalar(input_data)
516          ):
517              raise MlflowInvalidInputException(
518                  "Failed to parse input data. This model contains an un-named "
519                  " model signature which expects a single n-dimensional array or "
520                  "a single value as input, however, an input of type "
521                  f"{type(input_data)} was found."
522              )
523      if isinstance(input_data, dict):
524          # each key corresponds to a column, values should be
525          # checked against the schema
526          input_data = {
527              col: convert_data_type(data, types_dict.get(col)) for col, data in input_data.items()
528          }
529      elif isinstance(input_data, list):
530          # List of dictionaries of column_name -> value mapping
531          # List[Dict] must correspond to a schema with named columns
532          if all(isinstance(x, dict) for x in input_data):
533              input_data = [
534                  {col: convert_data_type(value, types_dict.get(col)) for col, value in data.items()}
535                  for data in input_data
536              ]
537          # List of values
538          else:
539              spec = schema.inputs[0] if schema else None
540              input_data = convert_data_type(input_data, spec)
541      else:
542          spec = schema.inputs[0] if schema else None
543          try:
544              input_data = convert_data_type(input_data, spec)
545          except Exception as e:
546              raise MlflowInvalidInputException(
547                  f"Failed to convert data `{input_data}` to type `{spec}` defined "
548                  "in the model signature."
549              ) from e
550      return input_data
551  
552  
553  def parse_instances_data(data, schema=None):
554      import numpy as np
555  
556      from mlflow.types.schema import Array
557  
558      if "instances" not in data:
559          raise MlflowInvalidInputException("Expecting data to have `instances` as key.")
560      data = data["instances"]
561      # List[Dict]
562      if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
563          # convert items to column format (map column/input name to tensor)
564          data_dict = defaultdict(list)
565          types_dict = schema.input_dict() if schema and schema.has_input_names() else {}
566          for item in data:
567              for col, v in item.items():
568                  data_dict[col].append(convert_data_type(v, types_dict.get(col)))
569          # convert to numpy array for backwards compatibility
570          data = {col: np.array(v) for col, v in data_dict.items()}
571      else:
572          data = _cast_schema_type(data, schema)
573  
574      # Sanity check inputted data. This check will only be applied
575      # when the row-format `instances` is used since it requires
576      # same 0-th dimension for all items.
577      if isinstance(data, dict):
578          # ensure all columns have the same number of items
579          # Only check the data when it's a list or numpy array
580          check_data = {k: v for k, v in data.items() if isinstance(v, (list, np.ndarray))}
581          if schema and schema.has_input_names():
582              # Only check required columns
583              required_cols = schema.required_input_names()
584              # For Array schema we should not check the length of the data matching
585              check_cols = {
586                  col for col, spec in schema.input_dict().items() if not isinstance(spec.type, Array)
587              }
588              check_cols = list(set(required_cols) & check_cols & set(check_data.keys()))
589          else:
590              check_cols = list(check_data.keys())
591  
592          if check_cols:
593              expected_len = len(check_data[check_cols[0]])
594              if not all(len(check_data[col]) == expected_len for col in check_cols[1:]):
595                  raise MlflowInvalidInputException(
596                      "The length of values for each input/column name are not the same"
597                  )
598      return data
599  
600  
601  # TODO: Reuse this function for `inputs` key data parsing in serving, and
602  # add `convert_to_numpy` param to avoid converting data to numpy arrays for
603  # genAI flavors.
604  def parse_inputs_data(inputs_data_or_path, schema=None):
605      """
606      Helper function to cast inputs_data based on the schema.
607      Inputs data must be able to pass to the model for pyfunc predict directly.
608  
609      Args:
610          inputs_data_or_path: A json-serializable object or path to a json file
611          schema: data schema to cast to. Be of type `mlflow.types.Schema`.
612      """
613      if isinstance(inputs_data_or_path, str) and os.path.exists(inputs_data_or_path):
614          with open(inputs_data_or_path) as handle:
615              inputs_data = json.load(handle)
616      else:
617          inputs_data = inputs_data_or_path
618      return _cast_schema_type(inputs_data, schema)
619  
620  
621  def parse_tf_serving_input(inp_dict, schema=None):
622      """
623      Args:
624          inp_dict: A dict deserialized from a JSON string formatted as described in TF's
625              serving API doc
626              (https://www.tensorflow.org/tfx/serving/api_rest#request_format_2)
627          schema: MLflow schema used when parsing the data.
628      """
629  
630      if "signature_name" in inp_dict:
631          raise MlflowInvalidInputException('"signature_name" parameter is currently not supported')
632  
633      if not (list(inp_dict.keys()) == ["instances"] or list(inp_dict.keys()) == ["inputs"]):
634          raise MlflowInvalidInputException(
635              'One of "instances" and "inputs" must be specified (not both or any other keys).'
636              f"Received: {list(inp_dict.keys())}"
637          )
638  
639      # Read the JSON
640      try:
641          # objects & arrays schema for List[Dict] and Dict[List] are different
642          # so the conversion for `instances` below changes the schema.
643          # e.g.
644          # [{"col1": 1, "col2": 2}, {"col1": 3, "col2": 4}] -> {"col1": [1, 3], "col2": [2, 4]}
645          # Schema([ColSpec(long, "col1"), ColSpec(long, "col2")]) ->
646          # Schema([ColSpec(Array(long), "col1"), ColSpec(Array(long), "col2")])
647          # To avoid this, we shouldn't use `instances` for such data.
648          if "instances" in inp_dict:
649              return parse_instances_data(inp_dict, schema)
650          else:
651              # items already in column format, convert values to tensor
652              return _cast_schema_type(inp_dict["inputs"], schema)
653      except MlflowException as e:
654          raise e
655      except Exception as e:
656          # Add error into message to provide details for serving usage
657          raise MlflowInvalidInputException(
658              f"Ensure that the input is a valid JSON-formatted string.\nError: {e!r}"
659          ) from e
660  
661  
662  # Reference: https://stackoverflow.com/a/12126976
663  class _CustomJsonEncoder(json.JSONEncoder):
664      def default(self, o):
665          import numpy as np
666          import pandas as pd
667  
668          if isinstance(o, (datetime.datetime, datetime.date, datetime.time, pd.Timestamp)):
669              return o.isoformat()
670  
671          if isinstance(o, np.ndarray):
672              return o.tolist()
673  
674          return super().default(o)
675  
676  
677  def get_jsonable_input(name, data):
678      import numpy as np
679  
680      if isinstance(data, np.ndarray):
681          return data.tolist()
682      else:
683          raise MlflowException(f"Incompatible input type:{type(data)} for input {name}.")
684  
685  
686  def dump_input_data(data, inputs_key="inputs", params: dict[str, Any] | None = None):
687      """
688      Args:
689          data: Input data.
690          inputs_key: Key to represent data in the request payload.
691          params: Additional parameters to pass to the model for inference.
692      """
693      import numpy as np
694      import pandas as pd
695  
696      # Convert scipy data to numpy array
697      if importlib.util.find_spec("scipy.sparse"):
698          from scipy.sparse import csc_matrix, csr_matrix
699  
700          if isinstance(data, (csc_matrix, csr_matrix)):
701              data = data.toarray()
702  
703      if isinstance(data, pd.DataFrame):
704          post_data = {"dataframe_split": data.to_dict(orient="split")}
705      elif isinstance(data, dict):
706          post_data = {inputs_key: {k: get_jsonable_input(k, v) for k, v in data}}
707      elif isinstance(data, np.ndarray):
708          post_data = {inputs_key: data.tolist()}
709      elif isinstance(data, list):
710          post_data = {inputs_key: data}
711      else:
712          post_data = data
713  
714      if params is not None:
715          if not isinstance(params, dict):
716              raise MlflowException(
717                  f"Params must be a dictionary. Got type '{type(params).__name__}'."
718              )
719          # if post_data is not dictionary, params should be included in post_data directly
720          if isinstance(post_data, dict):
721              post_data["params"] = params
722  
723      if not isinstance(post_data, str):
724          post_data = json.dumps(post_data, cls=_CustomJsonEncoder)
725  
726      return post_data