/ tests / pyfunc / utils.py
utils.py
 1  import json
 2  import os
 3  from typing import TYPE_CHECKING
 4  
 5  from fastapi.testclient import TestClient
 6  
 7  import mlflow
 8  from mlflow.pyfunc import scoring_server
 9  
10  if TYPE_CHECKING:
11      import httpx
12  
13  
14  def score_model_in_process(model_uri: str, data: str, content_type: str) -> "httpx.Response":
15      """Score a model using in-process FastAPI TestClient (faster than subprocess)."""
16      import pandas as pd
17  
18      env_snapshot = os.environ.copy()
19      try:
20          model = mlflow.pyfunc.load_model(model_uri)
21          app = scoring_server.init(model)
22          client = TestClient(app)
23  
24          # Convert DataFrame to JSON format if needed (matching RestEndpoint.invoke behavior)
25          if isinstance(data, pd.DataFrame):
26              if content_type == scoring_server.CONTENT_TYPE_CSV:
27                  data = data.to_csv(index=False)
28              elif content_type == scoring_server.CONTENT_TYPE_PARQUET:
29                  data = data.to_parquet()
30              else:
31                  assert content_type == scoring_server.CONTENT_TYPE_JSON
32                  data = json.dumps({"dataframe_split": data.to_dict(orient="split")})
33          elif not isinstance(data, (str, dict)):
34              data = json.dumps({"instances": data})
35  
36          return client.post("/invocations", content=data, headers={"Content-Type": content_type})
37      finally:
38          os.environ.clear()
39          os.environ.update(env_snapshot)