/ tests / pyfunc / test_scoring_server.py
test_scoring_server.py
   1  import json
   2  import math
   3  import os
   4  import random
   5  import signal
   6  from io import BytesIO, StringIO
   7  from typing import Any, NamedTuple
   8  
   9  import keras
  10  import numpy as np
  11  import pandas as pd
  12  import pydantic
  13  import pytest
  14  import sklearn.neighbors as knn
  15  from packaging.version import Version
  16  from sklearn import datasets
  17  
  18  import mlflow.pyfunc.scoring_server as pyfunc_scoring_server
  19  import mlflow.sklearn
  20  from mlflow.environment_variables import MLFLOW_SCORING_SERVER_REQUEST_TIMEOUT
  21  from mlflow.models import ModelSignature, infer_signature
  22  from mlflow.protos.databricks_pb2 import BAD_REQUEST, ErrorCode
  23  from mlflow.pyfunc import PythonModel
  24  from mlflow.pyfunc.scoring_server import _get_jsonable_obj, get_cmd
  25  from mlflow.types import ColSpec, DataType, ParamSchema, ParamSpec, Schema
  26  from mlflow.types.schema import Array, Object, Property
  27  from mlflow.utils import env_manager as _EnvManager
  28  from mlflow.utils.file_utils import TempDir
  29  from mlflow.utils.proto_json_utils import NumpyEncoder
  30  from mlflow.version import VERSION
  31  
  32  from tests.helper_functions import (
  33      expect_status_code,
  34      random_int,
  35      random_str,
  36  )
  37  from tests.pyfunc.utils import score_model_in_process
  38  
  39  if Version(keras.__version__) >= Version("2.6.0"):
  40      from tensorflow.keras.layers import Concatenate, Dense, Input
  41      from tensorflow.keras.models import Model
  42      from tensorflow.keras.optimizers import SGD
  43  else:
  44      from keras.layers import Concatenate, Dense, Input
  45      from keras.models import Model
  46      from keras.optimizers import SGD
  47  
  48  
  49  class ModelWithData(NamedTuple):
  50      model: Any
  51      inference_data: Any
  52  
  53  
  54  def build_and_save_sklearn_model(model_path):
  55      from sklearn.datasets import load_iris
  56      from sklearn.linear_model import LogisticRegression
  57  
  58      X, y = load_iris(return_X_y=True)
  59      model = LogisticRegression().fit(X, y)
  60  
  61      mlflow.sklearn.save_model(model, path=model_path)
  62  
  63  
  64  class MyChatLLM(PythonModel):
  65      def predict(self, context, model_input, params=None):
  66          # If (and only-if) we define model signature, input is converted
  67          # to pandas DataFrame in _enforce_schema applied in Pyfunc.predict.
  68          # TODO: Confirm if this is ok, for me it sounds confusing.
  69          if isinstance(model_input, pd.DataFrame):
  70              model_input = model_input.to_dict(orient="records")[0]
  71  
  72          messages = model_input["messages"]
  73          ret = " ".join([m["content"] for m in messages])
  74  
  75          return {
  76              "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
  77              "object": "chat.completion",
  78              "created": 1698916461,
  79              "model": "llama-2-70b-chat-hf",
  80              "choices": [
  81                  {
  82                      "index": 0,
  83                      "message": {
  84                          "role": "assistant",
  85                          "content": ret,
  86                      },
  87                      "finish_reason": "stop",
  88                  }
  89              ],
  90              "usage": {"prompt_tokens": 47, "completion_tokens": 49, "total_tokens": 96},
  91              # Echo model input and params for testing purposes
  92              "model_input": model_input,
  93              "params": params,
  94          }
  95  
  96  
  97  class MyCompletionsLLM(PythonModel):
  98      # Example model that takes "prompt" as model input
  99      def predict(self, context, model_input, params=None):
 100          if isinstance(model_input, pd.DataFrame):
 101              model_input = model_input.to_dict(orient="records")[0]
 102  
 103          ret = model_input["prompt"]
 104  
 105          return {
 106              "choices": [
 107                  {
 108                      "index": 0,
 109                      "text": ret,
 110                      "finish_reason": "stop",
 111                  }
 112              ],
 113              # Echo model input and params for testing purposes
 114              "model_input": model_input,
 115              "params": params,
 116          }
 117  
 118  
 119  class MyEmbeddingsLLM(PythonModel):
 120      # Example model that takes "input" as model input
 121      def predict(self, context, model_input, params=None):
 122          if isinstance(model_input, pd.DataFrame):
 123              model_input = model_input.to_dict(orient="records")[0]
 124  
 125          return {
 126              "data": [
 127                  {
 128                      "index": 0,
 129                      "embedding": [0.1, 0.2, 0.3],
 130                  }
 131              ],
 132              # Echo model input and params for testing purposes
 133              "model_input": model_input,
 134              "params": params,
 135          }
 136  
 137  
 138  @pytest.fixture
 139  def pandas_df_with_all_types():
 140      pdf = pd.DataFrame({
 141          "boolean": [True, False, True],
 142          "integer": np.array([1, 2, 3], np.int32),
 143          "long": np.array([1, 2, 3], np.int64),
 144          "float": np.array([math.pi, 2 * math.pi, 3 * math.pi], np.float32),
 145          "double": [math.pi, 2 * math.pi, 3 * math.pi],
 146          "binary": [bytearray([1, 2, 3]), bytearray([4, 5, 6]), bytearray([7, 8, 9])],
 147          "datetime": [
 148              np.datetime64("2021-01-01 00:00:00"),
 149              np.datetime64("2021-02-02 00:00:00"),
 150              np.datetime64("2021-03-03 12:00:00"),
 151          ],
 152      })
 153      pdf["string"] = pd.Series(["a", "b", "c"], dtype=DataType.string.to_pandas())
 154      return pdf
 155  
 156  
 157  @pytest.fixture
 158  def pandas_df_with_csv_types():
 159      pdf = pd.DataFrame({
 160          "boolean": [True, False, True],
 161          "integer": np.array([1, 2, 3], np.int32),
 162          "long": np.array([1, 2, 3], np.int64),
 163          "float": np.array([math.pi, 2 * math.pi, 3 * math.pi], np.float32),
 164          "double": [math.pi, 2 * math.pi, 3 * math.pi],
 165      })
 166      pdf["string"] = pd.Series(["a", "b", "c"], dtype=DataType.string.to_pandas())
 167      return pdf
 168  
 169  
 170  @pytest.fixture(scope="module")
 171  def sklearn_model():
 172      iris = datasets.load_iris()
 173      X = iris.data[:, :2]  # we only take the first two features.
 174      y = iris.target
 175      knn_model = knn.KNeighborsClassifier()
 176      knn_model.fit(X, y)
 177      return ModelWithData(model=knn_model, inference_data=X)
 178  
 179  
 180  @pytest.fixture(scope="module")
 181  def keras_model():
 182      iris = datasets.load_iris()
 183      data = pd.DataFrame(
 184          data=np.c_[iris["data"], iris["target"]], columns=iris["feature_names"] + ["target"]
 185      )
 186      y = data["target"]
 187      X = data.drop("target", axis=1).values
 188      input_a = Input(shape=(2,), name="a")
 189      input_b = Input(shape=(2,), name="b")
 190      output = Dense(1)(Dense(3, input_dim=4)(Concatenate()([input_a, input_b])))
 191      model = Model(inputs=[input_a, input_b], outputs=output)
 192      model.compile(loss="mean_squared_error", optimizer=SGD())
 193      model.fit([X[:, :2], X[:, -2:]], y)
 194      return ModelWithData(model=model, inference_data=X)
 195  
 196  
 197  @pytest.fixture
 198  def model_path(tmp_path):
 199      return os.path.join(tmp_path, "model")
 200  
 201  
 202  def test_scoring_server_responds_to_malformed_json_input_with_error_code_and_message(
 203      sklearn_model, model_path
 204  ):
 205      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 206  
 207      malformed_json_content = "this is,,,, not valid json"
 208      response = score_model_in_process(
 209          model_uri=os.path.abspath(model_path),
 210          data=malformed_json_content,
 211          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 212      )
 213      response_json = json.loads(response.content)
 214      assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST)
 215      message = response_json.get("message")
 216      expected_message = "Invalid input. Ensure that input is a valid JSON formatted string."
 217      assert expected_message in message
 218  
 219  
 220  def test_scoring_server_responds_to_invalid_json_format_with_error_code_and_message(
 221      sklearn_model, model_path
 222  ):
 223      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 224      for not_a_dict_content in [1, "1", [1]]:
 225          incorrect_json_content = json.dumps(not_a_dict_content)
 226          response = score_model_in_process(
 227              model_uri=os.path.abspath(model_path),
 228              data=incorrect_json_content,
 229              content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 230          )
 231          response_json = json.loads(response.content)
 232          assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST)
 233          assert "message" in response_json
 234          message = response_json.get("message")
 235          assert "The input must be a JSON dictionary with exactly one of the input fields" in message
 236  
 237      for incorrect_format in [
 238          {"not": "a serialized dataframe"},
 239          {"dataframe_records": [], "dataframe_split": {"data": []}},
 240      ]:
 241          incorrect_json_content = json.dumps(incorrect_format)
 242          response = score_model_in_process(
 243              model_uri=os.path.abspath(model_path),
 244              data=incorrect_json_content,
 245              content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 246          )
 247          response_json = json.loads(response.content)
 248          assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST)
 249          message = response_json.get("message")
 250          assert "The input must be a JSON dictionary with exactly one of the input fields" in message
 251  
 252  
 253  def test_scoring_server_responds_to_invalid_pandas_input_format_with_stacktrace_and_error_code(
 254      sklearn_model, model_path
 255  ):
 256      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 257  
 258      pdf = pd.DataFrame(sklearn_model.inference_data)
 259      wrong_records_content = json.dumps({"dataframe_records": pdf.to_dict(orient="split")})
 260      wrong_split_content = json.dumps({"dataframe_split": pdf.to_dict(orient="records")})
 261  
 262      response = score_model_in_process(
 263          model_uri=os.path.abspath(model_path),
 264          data=wrong_split_content,
 265          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 266      )
 267      response_json = json.loads(response.content)
 268      assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST)
 269      message = response_json.get("message")
 270      assert "Dataframe split format must be a dictionary. Got list" in message
 271  
 272      response = score_model_in_process(
 273          model_uri=os.path.abspath(model_path),
 274          data=wrong_records_content,
 275          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 276      )
 277      response_json = json.loads(response.content)
 278      assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST)
 279      message = response_json.get("message")
 280      assert "Dataframe records format must be a list of records. Got dictionary." in message
 281  
 282  
 283  def test_scoring_server_responds_to_invalid_dataframe_with_stacktrace_and_error_code(
 284      sklearn_model, model_path
 285  ):
 286      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 287  
 288      invalid_dataframe_content = json.dumps({
 289          "dataframe_split": {"index": [1, 2], "data": [[1], [2], [3]]}
 290      })
 291  
 292      response = score_model_in_process(
 293          model_uri=os.path.abspath(model_path),
 294          data=invalid_dataframe_content,
 295          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 296      )
 297      response_json = json.loads(response.content)
 298      assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST)
 299      message = response_json.get("message")
 300      assert "Provided dataframe_split field is not a valid dataframe representation" in message
 301  
 302  
 303  def test_scoring_server_responds_to_incompatible_inference_dataframe_with_stacktrace_and_error_code(
 304      sklearn_model, model_path
 305  ):
 306      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 307      incompatible_df = pd.DataFrame(np.array(range(10)))
 308  
 309      response = score_model_in_process(
 310          model_uri=os.path.abspath(model_path),
 311          data=incompatible_df,
 312          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 313      )
 314      response_json = json.loads(response.content)
 315      assert "error_code" in response_json
 316      assert response_json["error_code"] == ErrorCode.Name(BAD_REQUEST)
 317      assert "message" in response_json
 318      assert "stack_trace" in response_json
 319  
 320  
 321  def test_scoring_server_responds_to_invalid_csv_input_with_stacktrace_and_error_code(
 322      sklearn_model, model_path
 323  ):
 324      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 325  
 326      # Any empty string is not valid pandas CSV
 327      incorrect_csv_content = ""
 328      response = score_model_in_process(
 329          model_uri=os.path.abspath(model_path),
 330          data=incorrect_csv_content,
 331          content_type=pyfunc_scoring_server.CONTENT_TYPE_CSV,
 332      )
 333      response_json = json.loads(response.content)
 334      assert "error_code" in response_json
 335      assert response_json["error_code"] == ErrorCode.Name(BAD_REQUEST)
 336      assert "message" in response_json
 337      assert "stack_trace" in response_json
 338  
 339  
 340  def test_scoring_server_responds_to_invalid_parquet_input_with_stacktrace_and_error_code(
 341      sklearn_model, model_path
 342  ):
 343      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 344  
 345      # Any empty string is not valid pandas parquet input
 346      incorrect_parquet_content = ""
 347      response = score_model_in_process(
 348          model_uri=os.path.abspath(model_path),
 349          data=incorrect_parquet_content,
 350          content_type=pyfunc_scoring_server.CONTENT_TYPE_PARQUET,
 351      )
 352      response_json = json.loads(response.content)
 353      assert "error_code" in response_json
 354      assert response_json["error_code"] == ErrorCode.Name(BAD_REQUEST)
 355      assert "message" in response_json
 356      assert "stack_trace" in response_json
 357  
 358  
 359  def test_scoring_server_successfully_evaluates_correct_dataframes_with_pandas_records_orientation(
 360      sklearn_model, model_path
 361  ):
 362      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 363  
 364      pandas_record_content = json.dumps({
 365          "dataframe_records": pd.DataFrame(sklearn_model.inference_data).to_dict(orient="records")
 366      })
 367  
 368      response_records_content_type = score_model_in_process(
 369          model_uri=os.path.abspath(model_path),
 370          data=pandas_record_content,
 371          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 372      )
 373      expect_status_code(response_records_content_type, 200)
 374  
 375      # Testing the charset parameter
 376      response_records_content_type = score_model_in_process(
 377          model_uri=os.path.abspath(model_path),
 378          data=pandas_record_content,
 379          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; charset=UTF-8",
 380      )
 381      expect_status_code(response_records_content_type, 200)
 382  
 383  
 384  def test_scoring_server_successfully_evaluates_correct_dataframes_with_pandas_split_orientation(
 385      sklearn_model, model_path
 386  ):
 387      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 388  
 389      pandas_split_content = json.dumps({
 390          "dataframe_split": pd.DataFrame(sklearn_model.inference_data).to_dict(orient="split")
 391      })
 392  
 393      # Testing the charset parameter
 394      response = score_model_in_process(
 395          model_uri=os.path.abspath(model_path),
 396          data=pandas_split_content,
 397          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; charset=UTF-8",
 398      )
 399  
 400      expect_status_code(response, 200)
 401  
 402      response = score_model_in_process(
 403          model_uri=os.path.abspath(model_path),
 404          data=pandas_split_content,
 405          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 406      )
 407      expect_status_code(response, 200)
 408  
 409  
 410  def test_scoring_server_responds_to_invalid_content_type_request_with_unsupported_content_type_code(
 411      sklearn_model, model_path
 412  ):
 413      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 414  
 415      pandas_split_content = pd.DataFrame(sklearn_model.inference_data).to_json(orient="split")
 416      response = score_model_in_process(
 417          model_uri=os.path.abspath(model_path),
 418          data=pandas_split_content,
 419          content_type="not_a_supported_content_type",
 420      )
 421      expect_status_code(response, 415)
 422  
 423  
 424  def test_scoring_server_responds_to_invalid_content_type_request_with_unrecognized_content_param(
 425      sklearn_model, model_path
 426  ):
 427      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 428      pandas_split_content = pd.DataFrame(sklearn_model.inference_data).to_json(orient="split")
 429      response = score_model_in_process(
 430          model_uri=os.path.abspath(model_path),
 431          data=pandas_split_content,
 432          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; something=something",
 433      )
 434      expect_status_code(response, 415)
 435  
 436  
 437  def test_scoring_server_successfully_evaluates_correct_tf_serving_sklearn(
 438      sklearn_model, model_path
 439  ):
 440      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path)
 441  
 442      inp_dict = {"instances": sklearn_model.inference_data.tolist()}
 443      response_records_content_type = score_model_in_process(
 444          model_uri=os.path.abspath(model_path),
 445          data=json.dumps(inp_dict),
 446          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 447      )
 448      expect_status_code(response_records_content_type, 200)
 449  
 450  
 451  def test_scoring_server_successfully_evaluates_correct_tf_serving_keras_instances(
 452      keras_model, model_path
 453  ):
 454      mlflow.tensorflow.save_model(keras_model.model, path=model_path)
 455  
 456      inp_dict = {
 457          "instances": [
 458              {"a": a.tolist(), "b": b.tolist()}
 459              for (a, b) in zip(keras_model.inference_data[:, :2], keras_model.inference_data[:, -2:])
 460          ]
 461      }
 462      response_records_content_type = score_model_in_process(
 463          model_uri=os.path.abspath(model_path),
 464          data=json.dumps(inp_dict),
 465          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 466      )
 467      expect_status_code(response_records_content_type, 200)
 468  
 469  
 470  def test_scoring_server_successfully_evaluates_correct_tf_serving_keras_inputs(
 471      keras_model, model_path
 472  ):
 473      mlflow.tensorflow.save_model(keras_model.model, path=model_path)
 474  
 475      inp_dict = {
 476          "inputs": {
 477              "a": keras_model.inference_data[:, :2].tolist(),
 478              "b": keras_model.inference_data[:, -2:].tolist(),
 479          }
 480      }
 481      response_records_content_type = score_model_in_process(
 482          model_uri=os.path.abspath(model_path),
 483          data=json.dumps(inp_dict),
 484          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 485      )
 486      expect_status_code(response_records_content_type, 200)
 487  
 488  
 489  def test_parse_json_input_records_oriented():
 490      size = 2
 491      data = {
 492          "col_m": [random_int(0, 1000) for _ in range(size)],
 493          "col_z": [random_str() for _ in range(size)],
 494          "col_a": [random_int() for _ in range(size)],
 495      }
 496      p1 = pd.DataFrame.from_dict(data)
 497      records_content = json.dumps({"dataframe_records": p1.to_dict(orient="records")})
 498      records_content, _ = pyfunc_scoring_server._split_data_and_params(records_content)
 499      p2 = pyfunc_scoring_server.infer_and_parse_data(records_content)
 500      # "records" orient may shuffle column ordering. Hence comparing each column Series
 501      for col in data:
 502          assert all(p1[col] == p2[col])
 503  
 504  
 505  def test_parse_json_input_split_oriented():
 506      size = 200
 507      data = {
 508          "col_m": [random_int(0, 1000) for _ in range(size)],
 509          "col_z": [random_str() for _ in range(size)],
 510          "col_a": [random_int() for _ in range(size)],
 511      }
 512      p1 = pd.DataFrame.from_dict(data)
 513      split_content = json.dumps({"dataframe_split": p1.to_dict(orient="split")})
 514      split_content, _ = pyfunc_scoring_server._split_data_and_params(split_content)
 515      p2 = pyfunc_scoring_server.infer_and_parse_data(split_content)
 516      assert all(p1 == p2)
 517  
 518  
 519  def test_records_oriented_json_to_df():
 520      # test that datatype for "zip" column is not converted to "int64"
 521      jstr = """
 522        {
 523          "dataframe_records": [
 524            {"zip":"95120","cost":10.45,"score":8},
 525            {"zip":"95128","cost":23.0,"score":0},
 526            {"zip":"95128","cost":12.1,"score":10}
 527          ]
 528        }
 529      """
 530      jstr, _ = pyfunc_scoring_server._split_data_and_params(jstr)
 531      df = pyfunc_scoring_server.infer_and_parse_data(jstr)
 532      assert set(df.columns) == {"zip", "cost", "score"}
 533      assert {str(dt) for dt in df.dtypes} == {"object", "float64", "int64"}
 534  
 535  
 536  def _shuffle_pdf(pdf):
 537      cols = list(pdf.columns)
 538      random.shuffle(cols)
 539      return pdf[cols]
 540  
 541  
 542  def test_split_oriented_json_to_df():
 543      # test that datatype for "zip" column is not converted to "int64"
 544      jstr = """
 545        {
 546          "dataframe_split": {
 547            "columns":["zip","cost","count"],
 548            "index":[0,1,2],
 549            "data":[["95120",10.45,-8],["95128",23.0,-1],["95128",12.1,1000]]
 550          }
 551        }
 552      """
 553      jstr, _ = pyfunc_scoring_server._split_data_and_params(jstr)
 554      df = pyfunc_scoring_server.infer_and_parse_data(jstr)
 555  
 556      assert set(df.columns) == {"zip", "cost", "count"}
 557      assert {str(dt) for dt in df.dtypes} == {"object", "float64", "int64"}
 558  
 559  
 560  def test_parse_with_schema_csv(pandas_df_with_csv_types):
 561      schema = Schema([ColSpec(c, c) for c in pandas_df_with_csv_types.columns])
 562      df = _shuffle_pdf(pandas_df_with_csv_types)
 563      csv_str = df.to_csv(index=False)
 564      df = pyfunc_scoring_server.parse_csv_input(StringIO(csv_str), schema=schema)
 565      assert schema == infer_signature(df[schema.input_names()]).inputs
 566  
 567  
 568  def test_parse_parquet_schema(pandas_df_with_all_types):
 569      schema = Schema([ColSpec(c, c) for c in pandas_df_with_all_types.columns])
 570      df = _shuffle_pdf(pandas_df_with_all_types)
 571      parquet_stream = df.to_parquet()
 572      df = pyfunc_scoring_server.parse_parquet_input(BytesIO(parquet_stream))
 573      assert schema == infer_signature(df[schema.input_names()]).inputs
 574  
 575  
 576  def test_parse_with_schema(pandas_df_with_all_types):
 577      schema = Schema([ColSpec(c, c) for c in pandas_df_with_all_types.columns])
 578      df = _shuffle_pdf(pandas_df_with_all_types)
 579      json_str = json.dumps({"dataframe_split": df.to_dict(orient="split")}, cls=NumpyEncoder)
 580      json_str, _ = pyfunc_scoring_server._split_data_and_params(json_str)
 581      df = pyfunc_scoring_server.infer_and_parse_data(json_str, schema=schema)
 582      json_str = json.dumps({"dataframe_records": df.to_dict(orient="records")}, cls=NumpyEncoder)
 583      json_str, _ = pyfunc_scoring_server._split_data_and_params(json_str)
 584      df = pyfunc_scoring_server.infer_and_parse_data(json_str, schema=schema)
 585      assert schema == infer_signature(df[schema.input_names()]).inputs
 586  
 587      # The current behavior with pandas json parse with type hints is weird. In some cases, the
 588      # types are forced ignoring overflow and loss of precision:
 589  
 590      bad_df = """
 591      {
 592        "dataframe_split": {
 593          "columns":["bad_integer", "bad_float", "bad_string", "bad_boolean"],
 594          "data":[
 595            [9007199254740991.0, 1.1,                1, 1.5],
 596            [9007199254740992.0, 9007199254740992.0, 2, 0],
 597            [9007199254740994.0, 3.3,                3, "some arbitrary string"]
 598          ]
 599        }
 600      }
 601      """
 602      schema = Schema([
 603          ColSpec("integer", "bad_integer"),
 604          ColSpec("float", "bad_float"),
 605          ColSpec("string", "bad_string"),
 606          ColSpec("boolean", "bad_boolean"),
 607      ])
 608      bad_df, _ = pyfunc_scoring_server._split_data_and_params(bad_df)
 609      df = pyfunc_scoring_server.infer_and_parse_data(bad_df, schema=schema)
 610      # Unfortunately, the current behavior of pandas parse is to force numbers to int32 even if
 611      # they don't fit:
 612      assert df["bad_integer"].dtype == np.int32
 613      assert all(df["bad_integer"] == [-2147483648, -2147483648, -2147483648])
 614  
 615      # The same goes for floats:
 616      assert df["bad_float"].dtype == np.float32
 617      assert all(df["bad_float"] == np.array([1.1, 9007199254740992, 3.3], dtype=np.float32))
 618      # However bad string is recognized as int64:
 619      assert all(df["bad_string"] == np.array([1, 2, 3], dtype=object))
 620  
 621      # Boolean is forced - zero and empty string is false, everything else is true:
 622      assert df["bad_boolean"].dtype == bool
 623      assert all(df["bad_boolean"] == [True, False, True])
 624  
 625  
 626  def test_serving_model_with_schema(pandas_df_with_all_types):
 627      class TestModel(PythonModel):
 628          def predict(self, context, model_input, params=None):
 629              return [[k, str(v)] for k, v in model_input.dtypes.items()]
 630  
 631      schema = Schema([ColSpec(c, c) for c in pandas_df_with_all_types.columns])
 632      df = _shuffle_pdf(pandas_df_with_all_types)
 633      with TempDir(chdr=True):
 634          with mlflow.start_run():
 635              model_info = mlflow.pyfunc.log_model(
 636                  name="model", python_model=TestModel(), signature=ModelSignature(schema)
 637              )
 638          response = score_model_in_process(
 639              model_uri=model_info.model_uri,
 640              data=json.dumps({"dataframe_split": df.to_dict(orient="split")}, cls=NumpyEncoder),
 641              content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 642          )
 643          response_json = json.loads(response.content)["predictions"]
 644  
 645          # objects are not converted to pandas Strings at the moment
 646          expected_types = {**pandas_df_with_all_types.dtypes, "string": np.dtype(object)}
 647          assert response_json == [[k, str(v)] for k, v in expected_types.items()]
 648          response = score_model_in_process(
 649              model_uri=model_info.model_uri,
 650              data=json.dumps(
 651                  {"dataframe_records": pandas_df_with_all_types.to_dict(orient="records")},
 652                  cls=NumpyEncoder,
 653              ),
 654              content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 655          )
 656          response_json = json.loads(response.content)["predictions"]
 657          assert response_json == [[k, str(v)] for k, v in expected_types.items()]
 658  
 659          # Test 'inputs' format
 660          response = score_model_in_process(
 661              model_uri=model_info.model_uri,
 662              data=json.dumps({"inputs": df.to_dict(orient="list")}, cls=NumpyEncoder),
 663              content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 664          )
 665          response_json = json.loads(response.content)["predictions"]
 666          assert response_json == [[k, str(v)] for k, v in expected_types.items()]
 667  
 668  
 669  def test_serving_model_with_param_schema(sklearn_model, model_path):
 670      dataframe = {
 671          "dataframe_split": pd.DataFrame(sklearn_model.inference_data).to_dict(orient="split")
 672      }
 673      signature = infer_signature(sklearn_model.inference_data)
 674      param_schema = ParamSchema([
 675          ParamSpec("param1", DataType.datetime, np.datetime64("2023-07-01"))
 676      ])
 677      signature.params = param_schema
 678      mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path, signature=signature)
 679  
 680      # Success if passing no parameters
 681      response = score_model_in_process(
 682          model_uri=os.path.abspath(model_path),
 683          data=json.dumps(dataframe),
 684          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; charset=UTF-8",
 685      )
 686      expect_status_code(response, 200)
 687  
 688      # Raise error if invalid value is passed
 689      payload = dataframe.copy()
 690      payload.update({"params": {"param1": "invalid_value1"}})
 691      response = score_model_in_process(
 692          model_uri=os.path.abspath(model_path),
 693          data=json.dumps(payload),
 694          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; charset=UTF-8",
 695      )
 696      expect_status_code(response, 400)
 697      assert (
 698          " Failed to convert value `invalid_value1` from type `<class 'str'>` "
 699          "to `DataType.datetime`" in json.loads(response.content.decode("utf-8"))["message"]
 700      )
 701  
 702      # Ignore parameters specified in payload if it is not defined in ParamSchema
 703      payload = dataframe.copy()
 704      payload.update({"params": {"invalid_param": "value"}})
 705      response = score_model_in_process(
 706          model_uri=os.path.abspath(model_path),
 707          data=json.dumps(payload),
 708          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; charset=UTF-8",
 709      )
 710      expect_status_code(response, 200)
 711  
 712  
 713  def test_get_jsonnable_obj():
 714      py_ary = [["a", "b", "c"], ["e", "f", "g"]]
 715      np_ary = _get_jsonable_obj(np.array(py_ary))
 716      assert json.dumps(py_ary, cls=NumpyEncoder) == json.dumps(np_ary, cls=NumpyEncoder)
 717      np_ary = _get_jsonable_obj(np.array(py_ary, dtype=type(str)))
 718      assert json.dumps(py_ary, cls=NumpyEncoder) == json.dumps(np_ary, cls=NumpyEncoder)
 719  
 720  
 721  def test_numpy_encoder_for_pydantic():
 722      class Message(pydantic.BaseModel):
 723          role: str
 724          content: str
 725  
 726      class Messages(pydantic.BaseModel):
 727          messages: list[Message]
 728  
 729      messages = Messages(
 730          messages=[Message(role="user", content="hello!"), Message(role="assistant", content="hi!")]
 731      )
 732      msg_dict = messages.model_dump()
 733      assert json.dumps(_get_jsonable_obj(messages), cls=NumpyEncoder) == json.dumps(
 734          msg_dict, cls=NumpyEncoder
 735      )
 736  
 737  
 738  def test_parse_parquet_input():
 739      class TestModel(PythonModel):
 740          def predict(self, context, model_input, params=None):
 741              return 1
 742  
 743      with mlflow.start_run() as run:
 744          mlflow.pyfunc.log_model(name="model", python_model=TestModel())
 745  
 746      pandas_df = pd.DataFrame({
 747          "foo": [3.0, 4.0],
 748          "bar": [1.0, 2.0],
 749      })
 750  
 751      response_records_content_type = score_model_in_process(
 752          model_uri=f"runs:/{run.info.run_id}/model",
 753          data=pandas_df,
 754          content_type=pyfunc_scoring_server.CONTENT_TYPE_PARQUET,
 755      )
 756      expect_status_code(response_records_content_type, 200)
 757  
 758  
 759  def test_parse_json_input_including_path():
 760      class TestModel(PythonModel):
 761          def predict(self, context, model_input, params=None):
 762              return 1
 763  
 764      with mlflow.start_run() as run:
 765          mlflow.pyfunc.log_model(name="model", python_model=TestModel())
 766  
 767      pandas_split_content = pd.DataFrame({
 768          "url": ["http://foo.com", "https://bar.com"],
 769          "bad_protocol": ["aaa://bbb", "address:/path"],
 770      })
 771  
 772      response_records_content_type = score_model_in_process(
 773          model_uri=f"runs:/{run.info.run_id}/model",
 774          data=pandas_split_content,
 775          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 776      )
 777      expect_status_code(response_records_content_type, 200)
 778  
 779  
 780  @pytest.mark.parametrize(
 781      ("args", "expected", "timeout"),
 782      [
 783          (
 784              {"port": 5000, "host": "0.0.0.0", "nworkers": 4, "timeout": 60},
 785              "--host 0.0.0.0 --port 5000 --workers 4",
 786              "60",
 787          ),
 788          (
 789              {"host": "0.0.0.0", "nworkers": 4, "timeout": 60},
 790              "--host 0.0.0.0 --workers 4",
 791              "60",
 792          ),
 793          (
 794              {"port": 5000, "nworkers": 4, "timeout": 60},
 795              "--port 5000 --workers 4",
 796              "60",
 797          ),
 798          ({"nworkers": 4, "timeout": 60}, "--workers 4", "60"),
 799          ({"timeout": 30}, "", "30"),
 800      ],
 801  )
 802  def test_get_cmd(args: dict[str, Any], expected: str, timeout: str):
 803      cmd, env = get_cmd(model_uri="foo", **args)
 804  
 805      assert cmd == (f"uvicorn {expected} mlflow.pyfunc.scoring_server.app:app")
 806      assert env[MLFLOW_SCORING_SERVER_REQUEST_TIMEOUT.name] == timeout
 807  
 808  
 809  def test_scoring_server_client(sklearn_model, model_path):
 810      from mlflow.models.flavor_backend_registry import get_flavor_backend
 811      from mlflow.pyfunc.scoring_server.client import ScoringServerClient
 812      from mlflow.utils import find_free_port
 813  
 814      mlflow.sklearn.save_model(
 815          sk_model=sklearn_model.model, path=model_path, metadata={"metadata_key": "value"}
 816      )
 817      expected_result = sklearn_model.model.predict(sklearn_model.inference_data)
 818  
 819      port = find_free_port()
 820      timeout = 60
 821      server_proc = None
 822      try:
 823          server_proc = get_flavor_backend(
 824              model_path, env_manager=_EnvManager.CONDA, workers=1, install_mlflow=False
 825          ).serve(
 826              model_uri=model_path,
 827              port=port,
 828              host="127.0.0.1",
 829              timeout=timeout,
 830              enable_mlserver=False,
 831              synchronous=False,
 832          )
 833  
 834          client = ScoringServerClient(host="127.0.0.1", port=port)
 835          client.wait_server_ready()
 836  
 837          data = pd.DataFrame(sklearn_model.inference_data)
 838          result = client.invoke(data).get_predictions().to_numpy()[:, 0]
 839          np.testing.assert_allclose(result, expected_result, rtol=1e-5)
 840  
 841          version = client.get_version()
 842          assert version == VERSION
 843      finally:
 844          if server_proc is not None:
 845              os.kill(server_proc.pid, signal.SIGTERM)
 846  
 847  
 848  _LLM_CHAT_INPUT_SCHEMA = Schema([
 849      ColSpec(
 850          Array(
 851              Object([
 852                  Property("role", DataType.string),
 853                  Property("content", DataType.string),
 854              ]),
 855          ),
 856          name="messages",
 857      )
 858  ])
 859  
 860  
 861  @pytest.mark.parametrize(
 862      ("signature", "expected_model_input", "expected_params"),
 863      [
 864          # Test case: no signature, everything should go to data
 865          (
 866              None,
 867              {
 868                  "messages": [{"role": "user", "content": "hello!"}],
 869                  "max_tokens": 20,
 870                  "temperature": 0.5,
 871              },
 872              {},
 873          ),
 874          # Test case: signature with params, split params and data
 875          (
 876              ModelSignature(
 877                  inputs=_LLM_CHAT_INPUT_SCHEMA,
 878                  params=ParamSchema([
 879                      ParamSpec("temperature", DataType.double, default=0.5),
 880                      ParamSpec("max_tokens", DataType.integer, default=20),
 881                      ParamSpec("top_p", DataType.double, default=0.9),
 882                  ]),
 883              ),
 884              {
 885                  "messages": [{"role": "user", "content": "hello!"}],
 886              },
 887              {
 888                  "temperature": 0.5,
 889                  "max_tokens": 20,
 890                  "top_p": 0.9,  # filled with the default value
 891              },
 892          ),
 893          # Test case: if some params are not defined in either input and params schema,
 894          # they will be dropped
 895          (
 896              ModelSignature(
 897                  inputs=_LLM_CHAT_INPUT_SCHEMA,
 898                  params=ParamSchema([
 899                      ParamSpec("temperature", DataType.double, default=0.5),
 900                  ]),
 901              ),
 902              {
 903                  "messages": [{"role": "user", "content": "hello!"}],
 904              },
 905              {
 906                  # only params defined in the schema are passed
 907                  "temperature": 0.5,
 908              },
 909          ),
 910          # Test case: params can be defined in the input schema
 911          (
 912              ModelSignature(
 913                  inputs=Schema([
 914                      *_LLM_CHAT_INPUT_SCHEMA.inputs,
 915                      ColSpec(DataType.long, "max_tokens", required=False),
 916                      ColSpec(DataType.double, "temperature", required=False),
 917                  ]),
 918              ),
 919              {
 920                  "messages": [{"role": "user", "content": "hello!"}],
 921                  "temperature": 0.5,
 922                  "max_tokens": 20,
 923              },
 924              {},
 925          ),
 926      ],
 927  )
 928  def test_scoring_server_allows_payloads_with_llm_chat_keys_for_pyfunc(
 929      model_path, signature, expected_model_input, expected_params
 930  ):
 931      mlflow.pyfunc.save_model(model_path, python_model=MyChatLLM(), signature=signature)
 932  
 933      payload = json.dumps({
 934          "messages": [{"role": "user", "content": "hello!"}],
 935          "temperature": 0.5,
 936          "max_tokens": 20,
 937      })
 938      response = score_model_in_process(
 939          model_uri=model_path,
 940          data=payload,
 941          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
 942      )
 943      expect_status_code(response, 200)
 944      assert json.loads(response.content)["choices"][0]["message"]["content"] == "hello!"
 945      assert json.loads(response.content)["model_input"] == expected_model_input
 946      assert json.loads(response.content)["params"] == expected_params
 947  
 948  
 949  _LLM_COMPLETIONS_INPUT_SCHEMA = Schema([
 950      ColSpec(
 951          DataType.string,
 952          name="prompt",
 953      )
 954  ])
 955  
 956  
 957  @pytest.mark.parametrize(
 958      ("signature", "expected_model_input", "expected_params"),
 959      [
 960          # Test case: no signature, everything should go to data
 961          (
 962              None,
 963              {
 964                  "prompt": "hello!",
 965                  "max_tokens": 20,
 966                  "temperature": 0.5,
 967              },
 968              {},
 969          ),
 970          # Test case: signature with params, split params and data
 971          (
 972              ModelSignature(
 973                  inputs=_LLM_COMPLETIONS_INPUT_SCHEMA,
 974                  params=ParamSchema([
 975                      ParamSpec("temperature", DataType.double, default=0.5),
 976                      ParamSpec("max_tokens", DataType.integer, default=20),
 977                      ParamSpec("top_p", DataType.double, default=0.9),
 978                  ]),
 979              ),
 980              {
 981                  "prompt": "hello!",
 982              },
 983              {
 984                  "temperature": 0.5,
 985                  "max_tokens": 20,
 986                  "top_p": 0.9,  # filled with the default value
 987              },
 988          ),
 989      ],
 990  )
 991  def test_scoring_server_allows_payloads_with_llm_completions_keys_for_pyfunc(
 992      model_path, signature, expected_model_input, expected_params
 993  ):
 994      mlflow.pyfunc.save_model(model_path, python_model=MyCompletionsLLM(), signature=signature)
 995  
 996      payload = json.dumps({
 997          "prompt": "hello!",
 998          "temperature": 0.5,
 999          "max_tokens": 20,
1000      })
1001      response = score_model_in_process(
1002          model_uri=model_path,
1003          data=payload,
1004          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
1005      )
1006      expect_status_code(response, 200)
1007      assert json.loads(response.content)["choices"][0]["text"] == "hello!"
1008      assert json.loads(response.content)["model_input"] == expected_model_input
1009      assert json.loads(response.content)["params"] == expected_params
1010  
1011  
1012  _LLM_EMBEDDINGS_INPUT_SCHEMA = Schema([
1013      ColSpec(
1014          DataType.string,
1015          name="input",
1016      )
1017  ])
1018  
1019  
1020  @pytest.mark.parametrize(
1021      ("signature", "expected_model_input", "expected_params"),
1022      [
1023          # Test case: no signature, everything should go to data
1024          (
1025              None,
1026              {
1027                  "input": "hello!",
1028                  "random": "test",
1029              },
1030              {},
1031          ),
1032          # Test case: signature with no params accepted, ignores params
1033          (
1034              ModelSignature(
1035                  inputs=_LLM_EMBEDDINGS_INPUT_SCHEMA,
1036              ),
1037              {
1038                  "input": "hello!",
1039              },
1040              {},
1041          ),
1042      ],
1043  )
1044  def test_scoring_server_allows_payloads_with_llm_embeddings_keys_for_pyfunc(
1045      model_path, signature, expected_model_input, expected_params
1046  ):
1047      mlflow.pyfunc.save_model(model_path, python_model=MyEmbeddingsLLM(), signature=signature)
1048  
1049      payload = json.dumps({
1050          "input": "hello!",
1051          "random": "test",
1052      })
1053      response = score_model_in_process(
1054          model_uri=model_path,
1055          data=payload,
1056          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
1057      )
1058      expect_status_code(response, 200)
1059      assert json.loads(response.content)["data"][0]["embedding"] == [0.1, 0.2, 0.3]
1060      assert json.loads(response.content)["model_input"] == expected_model_input
1061      assert json.loads(response.content)["params"] == expected_params
1062  
1063  
1064  def test_scoring_server_allows_payloads_with_messages_for_pyfunc_wrapped(model_path):
1065      sklearn_path = model_path + "-sklearn"
1066      build_and_save_sklearn_model(sklearn_path)
1067  
1068      # wrapped pyfuncs count as pyfuncs (sklearn is not present in model.metadata.flavors)
1069      class WrappedSklearn(PythonModel):
1070          def load_context(self, context):
1071              self.model = mlflow.pyfunc.load_model(context.artifacts["model_path"])
1072  
1073          # note: model_input is the value of "messages", not a dict
1074          def predict(self, context, model_input):
1075              weird_but_valid_parse = [json.loads(model_input["messages"][0]["content"])]
1076              return self.model.predict(weird_but_valid_parse)
1077  
1078      mlflow.pyfunc.save_model(
1079          model_path, python_model=WrappedSklearn(), artifacts={"model_path": sklearn_path}
1080      )
1081  
1082      payload = json.dumps({
1083          "messages": [{"role": "user", "content": "[2,2,2,2]"}],
1084          "max_tokens": 20,
1085      })
1086      response = score_model_in_process(
1087          model_uri=model_path,
1088          data=payload,
1089          content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON,
1090      )
1091      expect_status_code(response, 200)
1092  
1093  
1094  @pytest.mark.parametrize(
1095      ("dict_input", "param_schema", "expected"),
1096      [
1097          (
1098              # no param signature, everything should go
1099              # to data no params should get dropped
1100              {"messages": ["test"], "max_tokens": 20, "random": "test"},
1101              None,
1102              ({"messages": ["test"], "max_tokens": 20, "random": "test"}, {}),
1103          ),
1104          (
1105              # params defined in the param schema should go to params
1106              # rest should go to data
1107              {"messages": ["test"], "max_tokens": 20, "random": "test"},
1108              ParamSchema([
1109                  ParamSpec("max_tokens", DataType.integer, default=20),
1110              ]),
1111              ({"messages": ["test"], "random": "test"}, {"max_tokens": 20}),
1112          ),
1113      ],
1114  )
1115  def test_split_data_and_params_for_llm_input(dict_input, param_schema, expected):
1116      data, params = pyfunc_scoring_server._split_data_and_params_for_llm_input(
1117          dict_input, param_schema
1118      )
1119      expected_data, expected_params = expected
1120      assert data == expected_data
1121      assert params == expected_params