utils.py
1 import json 2 import os 3 from typing import TYPE_CHECKING 4 5 from fastapi.testclient import TestClient 6 7 import mlflow 8 from mlflow.pyfunc import scoring_server 9 10 if TYPE_CHECKING: 11 import httpx 12 13 14 def score_model_in_process(model_uri: str, data: str, content_type: str) -> "httpx.Response": 15 """Score a model using in-process FastAPI TestClient (faster than subprocess).""" 16 import pandas as pd 17 18 env_snapshot = os.environ.copy() 19 try: 20 model = mlflow.pyfunc.load_model(model_uri) 21 app = scoring_server.init(model) 22 client = TestClient(app) 23 24 # Convert DataFrame to JSON format if needed (matching RestEndpoint.invoke behavior) 25 if isinstance(data, pd.DataFrame): 26 if content_type == scoring_server.CONTENT_TYPE_CSV: 27 data = data.to_csv(index=False) 28 elif content_type == scoring_server.CONTENT_TYPE_PARQUET: 29 data = data.to_parquet() 30 else: 31 assert content_type == scoring_server.CONTENT_TYPE_JSON 32 data = json.dumps({"dataframe_split": data.to_dict(orient="split")}) 33 elif not isinstance(data, (str, dict)): 34 data = json.dumps({"instances": data}) 35 36 return client.post("/invocations", content=data, headers={"Content-Type": content_type}) 37 finally: 38 os.environ.clear() 39 os.environ.update(env_snapshot)