test_scoring_server.py
1 import json 2 import math 3 import os 4 import random 5 import signal 6 from io import BytesIO, StringIO 7 from typing import Any, NamedTuple 8 9 import keras 10 import numpy as np 11 import pandas as pd 12 import pydantic 13 import pytest 14 import sklearn.neighbors as knn 15 from packaging.version import Version 16 from sklearn import datasets 17 18 import mlflow.pyfunc.scoring_server as pyfunc_scoring_server 19 import mlflow.sklearn 20 from mlflow.environment_variables import MLFLOW_SCORING_SERVER_REQUEST_TIMEOUT 21 from mlflow.models import ModelSignature, infer_signature 22 from mlflow.protos.databricks_pb2 import BAD_REQUEST, ErrorCode 23 from mlflow.pyfunc import PythonModel 24 from mlflow.pyfunc.scoring_server import _get_jsonable_obj, get_cmd 25 from mlflow.types import ColSpec, DataType, ParamSchema, ParamSpec, Schema 26 from mlflow.types.schema import Array, Object, Property 27 from mlflow.utils import env_manager as _EnvManager 28 from mlflow.utils.file_utils import TempDir 29 from mlflow.utils.proto_json_utils import NumpyEncoder 30 from mlflow.version import VERSION 31 32 from tests.helper_functions import ( 33 expect_status_code, 34 random_int, 35 random_str, 36 ) 37 from tests.pyfunc.utils import score_model_in_process 38 39 if Version(keras.__version__) >= Version("2.6.0"): 40 from tensorflow.keras.layers import Concatenate, Dense, Input 41 from tensorflow.keras.models import Model 42 from tensorflow.keras.optimizers import SGD 43 else: 44 from keras.layers import Concatenate, Dense, Input 45 from keras.models import Model 46 from keras.optimizers import SGD 47 48 49 class ModelWithData(NamedTuple): 50 model: Any 51 inference_data: Any 52 53 54 def build_and_save_sklearn_model(model_path): 55 from sklearn.datasets import load_iris 56 from sklearn.linear_model import LogisticRegression 57 58 X, y = load_iris(return_X_y=True) 59 model = LogisticRegression().fit(X, y) 60 61 mlflow.sklearn.save_model(model, path=model_path) 62 63 64 class MyChatLLM(PythonModel): 65 def predict(self, context, model_input, params=None): 66 # If (and only-if) we define model signature, input is converted 67 # to pandas DataFrame in _enforce_schema applied in Pyfunc.predict. 68 # TODO: Confirm if this is ok, for me it sounds confusing. 69 if isinstance(model_input, pd.DataFrame): 70 model_input = model_input.to_dict(orient="records")[0] 71 72 messages = model_input["messages"] 73 ret = " ".join([m["content"] for m in messages]) 74 75 return { 76 "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx", 77 "object": "chat.completion", 78 "created": 1698916461, 79 "model": "llama-2-70b-chat-hf", 80 "choices": [ 81 { 82 "index": 0, 83 "message": { 84 "role": "assistant", 85 "content": ret, 86 }, 87 "finish_reason": "stop", 88 } 89 ], 90 "usage": {"prompt_tokens": 47, "completion_tokens": 49, "total_tokens": 96}, 91 # Echo model input and params for testing purposes 92 "model_input": model_input, 93 "params": params, 94 } 95 96 97 class MyCompletionsLLM(PythonModel): 98 # Example model that takes "prompt" as model input 99 def predict(self, context, model_input, params=None): 100 if isinstance(model_input, pd.DataFrame): 101 model_input = model_input.to_dict(orient="records")[0] 102 103 ret = model_input["prompt"] 104 105 return { 106 "choices": [ 107 { 108 "index": 0, 109 "text": ret, 110 "finish_reason": "stop", 111 } 112 ], 113 # Echo model input and params for testing purposes 114 "model_input": model_input, 115 "params": params, 116 } 117 118 119 class MyEmbeddingsLLM(PythonModel): 120 # Example model that takes "input" as model input 121 def predict(self, context, model_input, params=None): 122 if isinstance(model_input, pd.DataFrame): 123 model_input = model_input.to_dict(orient="records")[0] 124 125 return { 126 "data": [ 127 { 128 "index": 0, 129 "embedding": [0.1, 0.2, 0.3], 130 } 131 ], 132 # Echo model input and params for testing purposes 133 "model_input": model_input, 134 "params": params, 135 } 136 137 138 @pytest.fixture 139 def pandas_df_with_all_types(): 140 pdf = pd.DataFrame({ 141 "boolean": [True, False, True], 142 "integer": np.array([1, 2, 3], np.int32), 143 "long": np.array([1, 2, 3], np.int64), 144 "float": np.array([math.pi, 2 * math.pi, 3 * math.pi], np.float32), 145 "double": [math.pi, 2 * math.pi, 3 * math.pi], 146 "binary": [bytearray([1, 2, 3]), bytearray([4, 5, 6]), bytearray([7, 8, 9])], 147 "datetime": [ 148 np.datetime64("2021-01-01 00:00:00"), 149 np.datetime64("2021-02-02 00:00:00"), 150 np.datetime64("2021-03-03 12:00:00"), 151 ], 152 }) 153 pdf["string"] = pd.Series(["a", "b", "c"], dtype=DataType.string.to_pandas()) 154 return pdf 155 156 157 @pytest.fixture 158 def pandas_df_with_csv_types(): 159 pdf = pd.DataFrame({ 160 "boolean": [True, False, True], 161 "integer": np.array([1, 2, 3], np.int32), 162 "long": np.array([1, 2, 3], np.int64), 163 "float": np.array([math.pi, 2 * math.pi, 3 * math.pi], np.float32), 164 "double": [math.pi, 2 * math.pi, 3 * math.pi], 165 }) 166 pdf["string"] = pd.Series(["a", "b", "c"], dtype=DataType.string.to_pandas()) 167 return pdf 168 169 170 @pytest.fixture(scope="module") 171 def sklearn_model(): 172 iris = datasets.load_iris() 173 X = iris.data[:, :2] # we only take the first two features. 174 y = iris.target 175 knn_model = knn.KNeighborsClassifier() 176 knn_model.fit(X, y) 177 return ModelWithData(model=knn_model, inference_data=X) 178 179 180 @pytest.fixture(scope="module") 181 def keras_model(): 182 iris = datasets.load_iris() 183 data = pd.DataFrame( 184 data=np.c_[iris["data"], iris["target"]], columns=iris["feature_names"] + ["target"] 185 ) 186 y = data["target"] 187 X = data.drop("target", axis=1).values 188 input_a = Input(shape=(2,), name="a") 189 input_b = Input(shape=(2,), name="b") 190 output = Dense(1)(Dense(3, input_dim=4)(Concatenate()([input_a, input_b]))) 191 model = Model(inputs=[input_a, input_b], outputs=output) 192 model.compile(loss="mean_squared_error", optimizer=SGD()) 193 model.fit([X[:, :2], X[:, -2:]], y) 194 return ModelWithData(model=model, inference_data=X) 195 196 197 @pytest.fixture 198 def model_path(tmp_path): 199 return os.path.join(tmp_path, "model") 200 201 202 def test_scoring_server_responds_to_malformed_json_input_with_error_code_and_message( 203 sklearn_model, model_path 204 ): 205 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 206 207 malformed_json_content = "this is,,,, not valid json" 208 response = score_model_in_process( 209 model_uri=os.path.abspath(model_path), 210 data=malformed_json_content, 211 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 212 ) 213 response_json = json.loads(response.content) 214 assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST) 215 message = response_json.get("message") 216 expected_message = "Invalid input. Ensure that input is a valid JSON formatted string." 217 assert expected_message in message 218 219 220 def test_scoring_server_responds_to_invalid_json_format_with_error_code_and_message( 221 sklearn_model, model_path 222 ): 223 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 224 for not_a_dict_content in [1, "1", [1]]: 225 incorrect_json_content = json.dumps(not_a_dict_content) 226 response = score_model_in_process( 227 model_uri=os.path.abspath(model_path), 228 data=incorrect_json_content, 229 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 230 ) 231 response_json = json.loads(response.content) 232 assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST) 233 assert "message" in response_json 234 message = response_json.get("message") 235 assert "The input must be a JSON dictionary with exactly one of the input fields" in message 236 237 for incorrect_format in [ 238 {"not": "a serialized dataframe"}, 239 {"dataframe_records": [], "dataframe_split": {"data": []}}, 240 ]: 241 incorrect_json_content = json.dumps(incorrect_format) 242 response = score_model_in_process( 243 model_uri=os.path.abspath(model_path), 244 data=incorrect_json_content, 245 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 246 ) 247 response_json = json.loads(response.content) 248 assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST) 249 message = response_json.get("message") 250 assert "The input must be a JSON dictionary with exactly one of the input fields" in message 251 252 253 def test_scoring_server_responds_to_invalid_pandas_input_format_with_stacktrace_and_error_code( 254 sklearn_model, model_path 255 ): 256 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 257 258 pdf = pd.DataFrame(sklearn_model.inference_data) 259 wrong_records_content = json.dumps({"dataframe_records": pdf.to_dict(orient="split")}) 260 wrong_split_content = json.dumps({"dataframe_split": pdf.to_dict(orient="records")}) 261 262 response = score_model_in_process( 263 model_uri=os.path.abspath(model_path), 264 data=wrong_split_content, 265 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 266 ) 267 response_json = json.loads(response.content) 268 assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST) 269 message = response_json.get("message") 270 assert "Dataframe split format must be a dictionary. Got list" in message 271 272 response = score_model_in_process( 273 model_uri=os.path.abspath(model_path), 274 data=wrong_records_content, 275 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 276 ) 277 response_json = json.loads(response.content) 278 assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST) 279 message = response_json.get("message") 280 assert "Dataframe records format must be a list of records. Got dictionary." in message 281 282 283 def test_scoring_server_responds_to_invalid_dataframe_with_stacktrace_and_error_code( 284 sklearn_model, model_path 285 ): 286 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 287 288 invalid_dataframe_content = json.dumps({ 289 "dataframe_split": {"index": [1, 2], "data": [[1], [2], [3]]} 290 }) 291 292 response = score_model_in_process( 293 model_uri=os.path.abspath(model_path), 294 data=invalid_dataframe_content, 295 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 296 ) 297 response_json = json.loads(response.content) 298 assert response_json.get("error_code") == ErrorCode.Name(BAD_REQUEST) 299 message = response_json.get("message") 300 assert "Provided dataframe_split field is not a valid dataframe representation" in message 301 302 303 def test_scoring_server_responds_to_incompatible_inference_dataframe_with_stacktrace_and_error_code( 304 sklearn_model, model_path 305 ): 306 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 307 incompatible_df = pd.DataFrame(np.array(range(10))) 308 309 response = score_model_in_process( 310 model_uri=os.path.abspath(model_path), 311 data=incompatible_df, 312 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 313 ) 314 response_json = json.loads(response.content) 315 assert "error_code" in response_json 316 assert response_json["error_code"] == ErrorCode.Name(BAD_REQUEST) 317 assert "message" in response_json 318 assert "stack_trace" in response_json 319 320 321 def test_scoring_server_responds_to_invalid_csv_input_with_stacktrace_and_error_code( 322 sklearn_model, model_path 323 ): 324 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 325 326 # Any empty string is not valid pandas CSV 327 incorrect_csv_content = "" 328 response = score_model_in_process( 329 model_uri=os.path.abspath(model_path), 330 data=incorrect_csv_content, 331 content_type=pyfunc_scoring_server.CONTENT_TYPE_CSV, 332 ) 333 response_json = json.loads(response.content) 334 assert "error_code" in response_json 335 assert response_json["error_code"] == ErrorCode.Name(BAD_REQUEST) 336 assert "message" in response_json 337 assert "stack_trace" in response_json 338 339 340 def test_scoring_server_responds_to_invalid_parquet_input_with_stacktrace_and_error_code( 341 sklearn_model, model_path 342 ): 343 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 344 345 # Any empty string is not valid pandas parquet input 346 incorrect_parquet_content = "" 347 response = score_model_in_process( 348 model_uri=os.path.abspath(model_path), 349 data=incorrect_parquet_content, 350 content_type=pyfunc_scoring_server.CONTENT_TYPE_PARQUET, 351 ) 352 response_json = json.loads(response.content) 353 assert "error_code" in response_json 354 assert response_json["error_code"] == ErrorCode.Name(BAD_REQUEST) 355 assert "message" in response_json 356 assert "stack_trace" in response_json 357 358 359 def test_scoring_server_successfully_evaluates_correct_dataframes_with_pandas_records_orientation( 360 sklearn_model, model_path 361 ): 362 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 363 364 pandas_record_content = json.dumps({ 365 "dataframe_records": pd.DataFrame(sklearn_model.inference_data).to_dict(orient="records") 366 }) 367 368 response_records_content_type = score_model_in_process( 369 model_uri=os.path.abspath(model_path), 370 data=pandas_record_content, 371 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 372 ) 373 expect_status_code(response_records_content_type, 200) 374 375 # Testing the charset parameter 376 response_records_content_type = score_model_in_process( 377 model_uri=os.path.abspath(model_path), 378 data=pandas_record_content, 379 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; charset=UTF-8", 380 ) 381 expect_status_code(response_records_content_type, 200) 382 383 384 def test_scoring_server_successfully_evaluates_correct_dataframes_with_pandas_split_orientation( 385 sklearn_model, model_path 386 ): 387 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 388 389 pandas_split_content = json.dumps({ 390 "dataframe_split": pd.DataFrame(sklearn_model.inference_data).to_dict(orient="split") 391 }) 392 393 # Testing the charset parameter 394 response = score_model_in_process( 395 model_uri=os.path.abspath(model_path), 396 data=pandas_split_content, 397 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; charset=UTF-8", 398 ) 399 400 expect_status_code(response, 200) 401 402 response = score_model_in_process( 403 model_uri=os.path.abspath(model_path), 404 data=pandas_split_content, 405 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 406 ) 407 expect_status_code(response, 200) 408 409 410 def test_scoring_server_responds_to_invalid_content_type_request_with_unsupported_content_type_code( 411 sklearn_model, model_path 412 ): 413 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 414 415 pandas_split_content = pd.DataFrame(sklearn_model.inference_data).to_json(orient="split") 416 response = score_model_in_process( 417 model_uri=os.path.abspath(model_path), 418 data=pandas_split_content, 419 content_type="not_a_supported_content_type", 420 ) 421 expect_status_code(response, 415) 422 423 424 def test_scoring_server_responds_to_invalid_content_type_request_with_unrecognized_content_param( 425 sklearn_model, model_path 426 ): 427 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 428 pandas_split_content = pd.DataFrame(sklearn_model.inference_data).to_json(orient="split") 429 response = score_model_in_process( 430 model_uri=os.path.abspath(model_path), 431 data=pandas_split_content, 432 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; something=something", 433 ) 434 expect_status_code(response, 415) 435 436 437 def test_scoring_server_successfully_evaluates_correct_tf_serving_sklearn( 438 sklearn_model, model_path 439 ): 440 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path) 441 442 inp_dict = {"instances": sklearn_model.inference_data.tolist()} 443 response_records_content_type = score_model_in_process( 444 model_uri=os.path.abspath(model_path), 445 data=json.dumps(inp_dict), 446 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 447 ) 448 expect_status_code(response_records_content_type, 200) 449 450 451 def test_scoring_server_successfully_evaluates_correct_tf_serving_keras_instances( 452 keras_model, model_path 453 ): 454 mlflow.tensorflow.save_model(keras_model.model, path=model_path) 455 456 inp_dict = { 457 "instances": [ 458 {"a": a.tolist(), "b": b.tolist()} 459 for (a, b) in zip(keras_model.inference_data[:, :2], keras_model.inference_data[:, -2:]) 460 ] 461 } 462 response_records_content_type = score_model_in_process( 463 model_uri=os.path.abspath(model_path), 464 data=json.dumps(inp_dict), 465 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 466 ) 467 expect_status_code(response_records_content_type, 200) 468 469 470 def test_scoring_server_successfully_evaluates_correct_tf_serving_keras_inputs( 471 keras_model, model_path 472 ): 473 mlflow.tensorflow.save_model(keras_model.model, path=model_path) 474 475 inp_dict = { 476 "inputs": { 477 "a": keras_model.inference_data[:, :2].tolist(), 478 "b": keras_model.inference_data[:, -2:].tolist(), 479 } 480 } 481 response_records_content_type = score_model_in_process( 482 model_uri=os.path.abspath(model_path), 483 data=json.dumps(inp_dict), 484 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 485 ) 486 expect_status_code(response_records_content_type, 200) 487 488 489 def test_parse_json_input_records_oriented(): 490 size = 2 491 data = { 492 "col_m": [random_int(0, 1000) for _ in range(size)], 493 "col_z": [random_str() for _ in range(size)], 494 "col_a": [random_int() for _ in range(size)], 495 } 496 p1 = pd.DataFrame.from_dict(data) 497 records_content = json.dumps({"dataframe_records": p1.to_dict(orient="records")}) 498 records_content, _ = pyfunc_scoring_server._split_data_and_params(records_content) 499 p2 = pyfunc_scoring_server.infer_and_parse_data(records_content) 500 # "records" orient may shuffle column ordering. Hence comparing each column Series 501 for col in data: 502 assert all(p1[col] == p2[col]) 503 504 505 def test_parse_json_input_split_oriented(): 506 size = 200 507 data = { 508 "col_m": [random_int(0, 1000) for _ in range(size)], 509 "col_z": [random_str() for _ in range(size)], 510 "col_a": [random_int() for _ in range(size)], 511 } 512 p1 = pd.DataFrame.from_dict(data) 513 split_content = json.dumps({"dataframe_split": p1.to_dict(orient="split")}) 514 split_content, _ = pyfunc_scoring_server._split_data_and_params(split_content) 515 p2 = pyfunc_scoring_server.infer_and_parse_data(split_content) 516 assert all(p1 == p2) 517 518 519 def test_records_oriented_json_to_df(): 520 # test that datatype for "zip" column is not converted to "int64" 521 jstr = """ 522 { 523 "dataframe_records": [ 524 {"zip":"95120","cost":10.45,"score":8}, 525 {"zip":"95128","cost":23.0,"score":0}, 526 {"zip":"95128","cost":12.1,"score":10} 527 ] 528 } 529 """ 530 jstr, _ = pyfunc_scoring_server._split_data_and_params(jstr) 531 df = pyfunc_scoring_server.infer_and_parse_data(jstr) 532 assert set(df.columns) == {"zip", "cost", "score"} 533 assert {str(dt) for dt in df.dtypes} == {"object", "float64", "int64"} 534 535 536 def _shuffle_pdf(pdf): 537 cols = list(pdf.columns) 538 random.shuffle(cols) 539 return pdf[cols] 540 541 542 def test_split_oriented_json_to_df(): 543 # test that datatype for "zip" column is not converted to "int64" 544 jstr = """ 545 { 546 "dataframe_split": { 547 "columns":["zip","cost","count"], 548 "index":[0,1,2], 549 "data":[["95120",10.45,-8],["95128",23.0,-1],["95128",12.1,1000]] 550 } 551 } 552 """ 553 jstr, _ = pyfunc_scoring_server._split_data_and_params(jstr) 554 df = pyfunc_scoring_server.infer_and_parse_data(jstr) 555 556 assert set(df.columns) == {"zip", "cost", "count"} 557 assert {str(dt) for dt in df.dtypes} == {"object", "float64", "int64"} 558 559 560 def test_parse_with_schema_csv(pandas_df_with_csv_types): 561 schema = Schema([ColSpec(c, c) for c in pandas_df_with_csv_types.columns]) 562 df = _shuffle_pdf(pandas_df_with_csv_types) 563 csv_str = df.to_csv(index=False) 564 df = pyfunc_scoring_server.parse_csv_input(StringIO(csv_str), schema=schema) 565 assert schema == infer_signature(df[schema.input_names()]).inputs 566 567 568 def test_parse_parquet_schema(pandas_df_with_all_types): 569 schema = Schema([ColSpec(c, c) for c in pandas_df_with_all_types.columns]) 570 df = _shuffle_pdf(pandas_df_with_all_types) 571 parquet_stream = df.to_parquet() 572 df = pyfunc_scoring_server.parse_parquet_input(BytesIO(parquet_stream)) 573 assert schema == infer_signature(df[schema.input_names()]).inputs 574 575 576 def test_parse_with_schema(pandas_df_with_all_types): 577 schema = Schema([ColSpec(c, c) for c in pandas_df_with_all_types.columns]) 578 df = _shuffle_pdf(pandas_df_with_all_types) 579 json_str = json.dumps({"dataframe_split": df.to_dict(orient="split")}, cls=NumpyEncoder) 580 json_str, _ = pyfunc_scoring_server._split_data_and_params(json_str) 581 df = pyfunc_scoring_server.infer_and_parse_data(json_str, schema=schema) 582 json_str = json.dumps({"dataframe_records": df.to_dict(orient="records")}, cls=NumpyEncoder) 583 json_str, _ = pyfunc_scoring_server._split_data_and_params(json_str) 584 df = pyfunc_scoring_server.infer_and_parse_data(json_str, schema=schema) 585 assert schema == infer_signature(df[schema.input_names()]).inputs 586 587 # The current behavior with pandas json parse with type hints is weird. In some cases, the 588 # types are forced ignoring overflow and loss of precision: 589 590 bad_df = """ 591 { 592 "dataframe_split": { 593 "columns":["bad_integer", "bad_float", "bad_string", "bad_boolean"], 594 "data":[ 595 [9007199254740991.0, 1.1, 1, 1.5], 596 [9007199254740992.0, 9007199254740992.0, 2, 0], 597 [9007199254740994.0, 3.3, 3, "some arbitrary string"] 598 ] 599 } 600 } 601 """ 602 schema = Schema([ 603 ColSpec("integer", "bad_integer"), 604 ColSpec("float", "bad_float"), 605 ColSpec("string", "bad_string"), 606 ColSpec("boolean", "bad_boolean"), 607 ]) 608 bad_df, _ = pyfunc_scoring_server._split_data_and_params(bad_df) 609 df = pyfunc_scoring_server.infer_and_parse_data(bad_df, schema=schema) 610 # Unfortunately, the current behavior of pandas parse is to force numbers to int32 even if 611 # they don't fit: 612 assert df["bad_integer"].dtype == np.int32 613 assert all(df["bad_integer"] == [-2147483648, -2147483648, -2147483648]) 614 615 # The same goes for floats: 616 assert df["bad_float"].dtype == np.float32 617 assert all(df["bad_float"] == np.array([1.1, 9007199254740992, 3.3], dtype=np.float32)) 618 # However bad string is recognized as int64: 619 assert all(df["bad_string"] == np.array([1, 2, 3], dtype=object)) 620 621 # Boolean is forced - zero and empty string is false, everything else is true: 622 assert df["bad_boolean"].dtype == bool 623 assert all(df["bad_boolean"] == [True, False, True]) 624 625 626 def test_serving_model_with_schema(pandas_df_with_all_types): 627 class TestModel(PythonModel): 628 def predict(self, context, model_input, params=None): 629 return [[k, str(v)] for k, v in model_input.dtypes.items()] 630 631 schema = Schema([ColSpec(c, c) for c in pandas_df_with_all_types.columns]) 632 df = _shuffle_pdf(pandas_df_with_all_types) 633 with TempDir(chdr=True): 634 with mlflow.start_run(): 635 model_info = mlflow.pyfunc.log_model( 636 name="model", python_model=TestModel(), signature=ModelSignature(schema) 637 ) 638 response = score_model_in_process( 639 model_uri=model_info.model_uri, 640 data=json.dumps({"dataframe_split": df.to_dict(orient="split")}, cls=NumpyEncoder), 641 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 642 ) 643 response_json = json.loads(response.content)["predictions"] 644 645 # objects are not converted to pandas Strings at the moment 646 expected_types = {**pandas_df_with_all_types.dtypes, "string": np.dtype(object)} 647 assert response_json == [[k, str(v)] for k, v in expected_types.items()] 648 response = score_model_in_process( 649 model_uri=model_info.model_uri, 650 data=json.dumps( 651 {"dataframe_records": pandas_df_with_all_types.to_dict(orient="records")}, 652 cls=NumpyEncoder, 653 ), 654 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 655 ) 656 response_json = json.loads(response.content)["predictions"] 657 assert response_json == [[k, str(v)] for k, v in expected_types.items()] 658 659 # Test 'inputs' format 660 response = score_model_in_process( 661 model_uri=model_info.model_uri, 662 data=json.dumps({"inputs": df.to_dict(orient="list")}, cls=NumpyEncoder), 663 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 664 ) 665 response_json = json.loads(response.content)["predictions"] 666 assert response_json == [[k, str(v)] for k, v in expected_types.items()] 667 668 669 def test_serving_model_with_param_schema(sklearn_model, model_path): 670 dataframe = { 671 "dataframe_split": pd.DataFrame(sklearn_model.inference_data).to_dict(orient="split") 672 } 673 signature = infer_signature(sklearn_model.inference_data) 674 param_schema = ParamSchema([ 675 ParamSpec("param1", DataType.datetime, np.datetime64("2023-07-01")) 676 ]) 677 signature.params = param_schema 678 mlflow.sklearn.save_model(sk_model=sklearn_model.model, path=model_path, signature=signature) 679 680 # Success if passing no parameters 681 response = score_model_in_process( 682 model_uri=os.path.abspath(model_path), 683 data=json.dumps(dataframe), 684 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; charset=UTF-8", 685 ) 686 expect_status_code(response, 200) 687 688 # Raise error if invalid value is passed 689 payload = dataframe.copy() 690 payload.update({"params": {"param1": "invalid_value1"}}) 691 response = score_model_in_process( 692 model_uri=os.path.abspath(model_path), 693 data=json.dumps(payload), 694 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; charset=UTF-8", 695 ) 696 expect_status_code(response, 400) 697 assert ( 698 " Failed to convert value `invalid_value1` from type `<class 'str'>` " 699 "to `DataType.datetime`" in json.loads(response.content.decode("utf-8"))["message"] 700 ) 701 702 # Ignore parameters specified in payload if it is not defined in ParamSchema 703 payload = dataframe.copy() 704 payload.update({"params": {"invalid_param": "value"}}) 705 response = score_model_in_process( 706 model_uri=os.path.abspath(model_path), 707 data=json.dumps(payload), 708 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON + "; charset=UTF-8", 709 ) 710 expect_status_code(response, 200) 711 712 713 def test_get_jsonnable_obj(): 714 py_ary = [["a", "b", "c"], ["e", "f", "g"]] 715 np_ary = _get_jsonable_obj(np.array(py_ary)) 716 assert json.dumps(py_ary, cls=NumpyEncoder) == json.dumps(np_ary, cls=NumpyEncoder) 717 np_ary = _get_jsonable_obj(np.array(py_ary, dtype=type(str))) 718 assert json.dumps(py_ary, cls=NumpyEncoder) == json.dumps(np_ary, cls=NumpyEncoder) 719 720 721 def test_numpy_encoder_for_pydantic(): 722 class Message(pydantic.BaseModel): 723 role: str 724 content: str 725 726 class Messages(pydantic.BaseModel): 727 messages: list[Message] 728 729 messages = Messages( 730 messages=[Message(role="user", content="hello!"), Message(role="assistant", content="hi!")] 731 ) 732 msg_dict = messages.model_dump() 733 assert json.dumps(_get_jsonable_obj(messages), cls=NumpyEncoder) == json.dumps( 734 msg_dict, cls=NumpyEncoder 735 ) 736 737 738 def test_parse_parquet_input(): 739 class TestModel(PythonModel): 740 def predict(self, context, model_input, params=None): 741 return 1 742 743 with mlflow.start_run() as run: 744 mlflow.pyfunc.log_model(name="model", python_model=TestModel()) 745 746 pandas_df = pd.DataFrame({ 747 "foo": [3.0, 4.0], 748 "bar": [1.0, 2.0], 749 }) 750 751 response_records_content_type = score_model_in_process( 752 model_uri=f"runs:/{run.info.run_id}/model", 753 data=pandas_df, 754 content_type=pyfunc_scoring_server.CONTENT_TYPE_PARQUET, 755 ) 756 expect_status_code(response_records_content_type, 200) 757 758 759 def test_parse_json_input_including_path(): 760 class TestModel(PythonModel): 761 def predict(self, context, model_input, params=None): 762 return 1 763 764 with mlflow.start_run() as run: 765 mlflow.pyfunc.log_model(name="model", python_model=TestModel()) 766 767 pandas_split_content = pd.DataFrame({ 768 "url": ["http://foo.com", "https://bar.com"], 769 "bad_protocol": ["aaa://bbb", "address:/path"], 770 }) 771 772 response_records_content_type = score_model_in_process( 773 model_uri=f"runs:/{run.info.run_id}/model", 774 data=pandas_split_content, 775 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 776 ) 777 expect_status_code(response_records_content_type, 200) 778 779 780 @pytest.mark.parametrize( 781 ("args", "expected", "timeout"), 782 [ 783 ( 784 {"port": 5000, "host": "0.0.0.0", "nworkers": 4, "timeout": 60}, 785 "--host 0.0.0.0 --port 5000 --workers 4", 786 "60", 787 ), 788 ( 789 {"host": "0.0.0.0", "nworkers": 4, "timeout": 60}, 790 "--host 0.0.0.0 --workers 4", 791 "60", 792 ), 793 ( 794 {"port": 5000, "nworkers": 4, "timeout": 60}, 795 "--port 5000 --workers 4", 796 "60", 797 ), 798 ({"nworkers": 4, "timeout": 60}, "--workers 4", "60"), 799 ({"timeout": 30}, "", "30"), 800 ], 801 ) 802 def test_get_cmd(args: dict[str, Any], expected: str, timeout: str): 803 cmd, env = get_cmd(model_uri="foo", **args) 804 805 assert cmd == (f"uvicorn {expected} mlflow.pyfunc.scoring_server.app:app") 806 assert env[MLFLOW_SCORING_SERVER_REQUEST_TIMEOUT.name] == timeout 807 808 809 def test_scoring_server_client(sklearn_model, model_path): 810 from mlflow.models.flavor_backend_registry import get_flavor_backend 811 from mlflow.pyfunc.scoring_server.client import ScoringServerClient 812 from mlflow.utils import find_free_port 813 814 mlflow.sklearn.save_model( 815 sk_model=sklearn_model.model, path=model_path, metadata={"metadata_key": "value"} 816 ) 817 expected_result = sklearn_model.model.predict(sklearn_model.inference_data) 818 819 port = find_free_port() 820 timeout = 60 821 server_proc = None 822 try: 823 server_proc = get_flavor_backend( 824 model_path, env_manager=_EnvManager.CONDA, workers=1, install_mlflow=False 825 ).serve( 826 model_uri=model_path, 827 port=port, 828 host="127.0.0.1", 829 timeout=timeout, 830 enable_mlserver=False, 831 synchronous=False, 832 ) 833 834 client = ScoringServerClient(host="127.0.0.1", port=port) 835 client.wait_server_ready() 836 837 data = pd.DataFrame(sklearn_model.inference_data) 838 result = client.invoke(data).get_predictions().to_numpy()[:, 0] 839 np.testing.assert_allclose(result, expected_result, rtol=1e-5) 840 841 version = client.get_version() 842 assert version == VERSION 843 finally: 844 if server_proc is not None: 845 os.kill(server_proc.pid, signal.SIGTERM) 846 847 848 _LLM_CHAT_INPUT_SCHEMA = Schema([ 849 ColSpec( 850 Array( 851 Object([ 852 Property("role", DataType.string), 853 Property("content", DataType.string), 854 ]), 855 ), 856 name="messages", 857 ) 858 ]) 859 860 861 @pytest.mark.parametrize( 862 ("signature", "expected_model_input", "expected_params"), 863 [ 864 # Test case: no signature, everything should go to data 865 ( 866 None, 867 { 868 "messages": [{"role": "user", "content": "hello!"}], 869 "max_tokens": 20, 870 "temperature": 0.5, 871 }, 872 {}, 873 ), 874 # Test case: signature with params, split params and data 875 ( 876 ModelSignature( 877 inputs=_LLM_CHAT_INPUT_SCHEMA, 878 params=ParamSchema([ 879 ParamSpec("temperature", DataType.double, default=0.5), 880 ParamSpec("max_tokens", DataType.integer, default=20), 881 ParamSpec("top_p", DataType.double, default=0.9), 882 ]), 883 ), 884 { 885 "messages": [{"role": "user", "content": "hello!"}], 886 }, 887 { 888 "temperature": 0.5, 889 "max_tokens": 20, 890 "top_p": 0.9, # filled with the default value 891 }, 892 ), 893 # Test case: if some params are not defined in either input and params schema, 894 # they will be dropped 895 ( 896 ModelSignature( 897 inputs=_LLM_CHAT_INPUT_SCHEMA, 898 params=ParamSchema([ 899 ParamSpec("temperature", DataType.double, default=0.5), 900 ]), 901 ), 902 { 903 "messages": [{"role": "user", "content": "hello!"}], 904 }, 905 { 906 # only params defined in the schema are passed 907 "temperature": 0.5, 908 }, 909 ), 910 # Test case: params can be defined in the input schema 911 ( 912 ModelSignature( 913 inputs=Schema([ 914 *_LLM_CHAT_INPUT_SCHEMA.inputs, 915 ColSpec(DataType.long, "max_tokens", required=False), 916 ColSpec(DataType.double, "temperature", required=False), 917 ]), 918 ), 919 { 920 "messages": [{"role": "user", "content": "hello!"}], 921 "temperature": 0.5, 922 "max_tokens": 20, 923 }, 924 {}, 925 ), 926 ], 927 ) 928 def test_scoring_server_allows_payloads_with_llm_chat_keys_for_pyfunc( 929 model_path, signature, expected_model_input, expected_params 930 ): 931 mlflow.pyfunc.save_model(model_path, python_model=MyChatLLM(), signature=signature) 932 933 payload = json.dumps({ 934 "messages": [{"role": "user", "content": "hello!"}], 935 "temperature": 0.5, 936 "max_tokens": 20, 937 }) 938 response = score_model_in_process( 939 model_uri=model_path, 940 data=payload, 941 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 942 ) 943 expect_status_code(response, 200) 944 assert json.loads(response.content)["choices"][0]["message"]["content"] == "hello!" 945 assert json.loads(response.content)["model_input"] == expected_model_input 946 assert json.loads(response.content)["params"] == expected_params 947 948 949 _LLM_COMPLETIONS_INPUT_SCHEMA = Schema([ 950 ColSpec( 951 DataType.string, 952 name="prompt", 953 ) 954 ]) 955 956 957 @pytest.mark.parametrize( 958 ("signature", "expected_model_input", "expected_params"), 959 [ 960 # Test case: no signature, everything should go to data 961 ( 962 None, 963 { 964 "prompt": "hello!", 965 "max_tokens": 20, 966 "temperature": 0.5, 967 }, 968 {}, 969 ), 970 # Test case: signature with params, split params and data 971 ( 972 ModelSignature( 973 inputs=_LLM_COMPLETIONS_INPUT_SCHEMA, 974 params=ParamSchema([ 975 ParamSpec("temperature", DataType.double, default=0.5), 976 ParamSpec("max_tokens", DataType.integer, default=20), 977 ParamSpec("top_p", DataType.double, default=0.9), 978 ]), 979 ), 980 { 981 "prompt": "hello!", 982 }, 983 { 984 "temperature": 0.5, 985 "max_tokens": 20, 986 "top_p": 0.9, # filled with the default value 987 }, 988 ), 989 ], 990 ) 991 def test_scoring_server_allows_payloads_with_llm_completions_keys_for_pyfunc( 992 model_path, signature, expected_model_input, expected_params 993 ): 994 mlflow.pyfunc.save_model(model_path, python_model=MyCompletionsLLM(), signature=signature) 995 996 payload = json.dumps({ 997 "prompt": "hello!", 998 "temperature": 0.5, 999 "max_tokens": 20, 1000 }) 1001 response = score_model_in_process( 1002 model_uri=model_path, 1003 data=payload, 1004 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 1005 ) 1006 expect_status_code(response, 200) 1007 assert json.loads(response.content)["choices"][0]["text"] == "hello!" 1008 assert json.loads(response.content)["model_input"] == expected_model_input 1009 assert json.loads(response.content)["params"] == expected_params 1010 1011 1012 _LLM_EMBEDDINGS_INPUT_SCHEMA = Schema([ 1013 ColSpec( 1014 DataType.string, 1015 name="input", 1016 ) 1017 ]) 1018 1019 1020 @pytest.mark.parametrize( 1021 ("signature", "expected_model_input", "expected_params"), 1022 [ 1023 # Test case: no signature, everything should go to data 1024 ( 1025 None, 1026 { 1027 "input": "hello!", 1028 "random": "test", 1029 }, 1030 {}, 1031 ), 1032 # Test case: signature with no params accepted, ignores params 1033 ( 1034 ModelSignature( 1035 inputs=_LLM_EMBEDDINGS_INPUT_SCHEMA, 1036 ), 1037 { 1038 "input": "hello!", 1039 }, 1040 {}, 1041 ), 1042 ], 1043 ) 1044 def test_scoring_server_allows_payloads_with_llm_embeddings_keys_for_pyfunc( 1045 model_path, signature, expected_model_input, expected_params 1046 ): 1047 mlflow.pyfunc.save_model(model_path, python_model=MyEmbeddingsLLM(), signature=signature) 1048 1049 payload = json.dumps({ 1050 "input": "hello!", 1051 "random": "test", 1052 }) 1053 response = score_model_in_process( 1054 model_uri=model_path, 1055 data=payload, 1056 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 1057 ) 1058 expect_status_code(response, 200) 1059 assert json.loads(response.content)["data"][0]["embedding"] == [0.1, 0.2, 0.3] 1060 assert json.loads(response.content)["model_input"] == expected_model_input 1061 assert json.loads(response.content)["params"] == expected_params 1062 1063 1064 def test_scoring_server_allows_payloads_with_messages_for_pyfunc_wrapped(model_path): 1065 sklearn_path = model_path + "-sklearn" 1066 build_and_save_sklearn_model(sklearn_path) 1067 1068 # wrapped pyfuncs count as pyfuncs (sklearn is not present in model.metadata.flavors) 1069 class WrappedSklearn(PythonModel): 1070 def load_context(self, context): 1071 self.model = mlflow.pyfunc.load_model(context.artifacts["model_path"]) 1072 1073 # note: model_input is the value of "messages", not a dict 1074 def predict(self, context, model_input): 1075 weird_but_valid_parse = [json.loads(model_input["messages"][0]["content"])] 1076 return self.model.predict(weird_but_valid_parse) 1077 1078 mlflow.pyfunc.save_model( 1079 model_path, python_model=WrappedSklearn(), artifacts={"model_path": sklearn_path} 1080 ) 1081 1082 payload = json.dumps({ 1083 "messages": [{"role": "user", "content": "[2,2,2,2]"}], 1084 "max_tokens": 20, 1085 }) 1086 response = score_model_in_process( 1087 model_uri=model_path, 1088 data=payload, 1089 content_type=pyfunc_scoring_server.CONTENT_TYPE_JSON, 1090 ) 1091 expect_status_code(response, 200) 1092 1093 1094 @pytest.mark.parametrize( 1095 ("dict_input", "param_schema", "expected"), 1096 [ 1097 ( 1098 # no param signature, everything should go 1099 # to data no params should get dropped 1100 {"messages": ["test"], "max_tokens": 20, "random": "test"}, 1101 None, 1102 ({"messages": ["test"], "max_tokens": 20, "random": "test"}, {}), 1103 ), 1104 ( 1105 # params defined in the param schema should go to params 1106 # rest should go to data 1107 {"messages": ["test"], "max_tokens": 20, "random": "test"}, 1108 ParamSchema([ 1109 ParamSpec("max_tokens", DataType.integer, default=20), 1110 ]), 1111 ({"messages": ["test"], "random": "test"}, {"max_tokens": 20}), 1112 ), 1113 ], 1114 ) 1115 def test_split_data_and_params_for_llm_input(dict_input, param_schema, expected): 1116 data, params = pyfunc_scoring_server._split_data_and_params_for_llm_input( 1117 dict_input, param_schema 1118 ) 1119 expected_data, expected_params = expected 1120 assert data == expected_data 1121 assert params == expected_params