utils.py
1 import json 2 import urllib.parse 3 from typing import Any 4 from typing import Optional 5 from typing import Type 6 from typing import TypeVar 7 from typing import Union 8 9 import requests 10 11 from evidently._pydantic_compat import BaseModel 12 from evidently._pydantic_compat import parse_obj_as 13 from evidently.legacy.ui.storage.common import SECRET_HEADER_NAME 14 from evidently.legacy.utils import NumpyEncoder 15 16 T = TypeVar("T", bound=BaseModel) 17 18 19 class RemoteClientBase: 20 def __init__(self, base_url: str, secret: str = None): 21 self.base_url = base_url 22 self.secret = secret 23 24 def _request( 25 self, 26 path: str, 27 method: str, 28 query_params: Optional[dict] = None, 29 body: Optional[dict] = None, 30 response_model: Optional[Type[T]] = None, 31 ) -> Union[T, requests.Response]: 32 # todo: better encoding 33 headers = {SECRET_HEADER_NAME: self.secret} 34 data = None 35 if body is not None: 36 headers["Content-Type"] = "application/json" 37 38 data = json.dumps(body, allow_nan=True, cls=NumpyEncoder).encode("utf8") 39 40 response = requests.request( 41 method, urllib.parse.urljoin(self.base_url, path), params=query_params, data=data, headers=headers 42 ) 43 response.raise_for_status() 44 if response_model is not None: 45 return parse_obj_as(response_model, response.json()) 46 return response 47 48 49 def parse_json(body: bytes) -> Any: 50 return json.loads(body)