python_engine.py
1 import abc 2 from typing import Dict 3 from typing import Generic 4 from typing import List 5 from typing import Optional 6 from typing import TypeVar 7 8 import pandas as pd 9 10 from evidently.legacy.base_metric import GenericInputData 11 from evidently.legacy.base_metric import InputData 12 from evidently.legacy.base_metric import Metric 13 from evidently.legacy.calculation_engine.engine import Engine 14 from evidently.legacy.calculation_engine.engine import EngineDatasets 15 from evidently.legacy.calculation_engine.engine import TInputData 16 from evidently.legacy.calculation_engine.metric_implementation import MetricImplementation 17 from evidently.legacy.features.generated_features import FeatureResult 18 from evidently.legacy.features.generated_features import GeneratedFeatures 19 from evidently.legacy.options.base import Options 20 from evidently.legacy.pipeline.column_mapping import ColumnMapping 21 from evidently.legacy.utils.data_preprocessing import DataDefinition 22 from evidently.legacy.utils.data_preprocessing import create_data_definition 23 24 TMetric = TypeVar("TMetric", bound=Metric) 25 26 27 class PythonEngine(Engine["PythonMetricImplementation", InputData, pd.DataFrame]): 28 def convert_input_data(self, data: GenericInputData[pd.DataFrame]) -> InputData: 29 if not isinstance(data.current_data, pd.DataFrame) or ( 30 data.reference_data is not None and not isinstance(data.reference_data, pd.DataFrame) 31 ): 32 raise ValueError("PandasEngine works only with pd.DataFrame input data") 33 return InputData( 34 data.reference_data, 35 data.current_data, 36 current_additional_features=None, 37 reference_additional_features=None, 38 column_mapping=data.column_mapping, 39 data_definition=data.data_definition, 40 additional_data=data.additional_data, 41 ) 42 43 def get_data_definition( 44 self, 45 current_data, 46 reference_data, 47 column_mapping: ColumnMapping, 48 categorical_features_cardinality: Optional[int] = None, 49 ): 50 if not isinstance(current_data, pd.DataFrame) or ( 51 reference_data is not None and not isinstance(reference_data, pd.DataFrame) 52 ): 53 raise ValueError("PandasEngine works only with pd.DataFrame input data") 54 return create_data_definition(reference_data, current_data, column_mapping, categorical_features_cardinality) 55 56 def calculate_additional_features( 57 self, data: TInputData, features: List[GeneratedFeatures], options: Options 58 ) -> Dict[GeneratedFeatures, FeatureResult[pd.DataFrame]]: 59 result: Dict[GeneratedFeatures, FeatureResult[pd.DataFrame]] = {} 60 for feature in features: 61 current = feature.generate_features_renamed(data.current_data, data.data_definition, options) 62 reference = ( 63 feature.generate_features_renamed(data.reference_data, data.data_definition, options) 64 if data.reference_data is not None 65 else None 66 ) 67 68 result[feature] = FeatureResult(current, reference) 69 return result 70 71 def merge_additional_features( 72 self, features: Dict[GeneratedFeatures, FeatureResult[pd.DataFrame]] 73 ) -> EngineDatasets[pd.DataFrame]: 74 currents = [] 75 references = [] 76 77 for feature, result in features.items(): 78 currents.append(result.current) 79 if result.reference is not None: 80 references.append(result.reference) 81 82 if len(currents) == 0: 83 current = None 84 elif len(currents) == 1: 85 current = currents[0] 86 else: 87 current = currents[0].join(currents[1:]) # type: ignore[arg-type] 88 89 if len(references) == 0: 90 return EngineDatasets(current=current, reference=None) 91 if len(references) == 1: 92 return EngineDatasets(current=current, reference=references[0]) 93 return EngineDatasets(current=current, reference=references[0].join(references[1:])) # type: ignore[arg-type] 94 95 def get_metric_implementation(self, metric): 96 impl = super().get_metric_implementation(metric) 97 if impl is None and isinstance(metric, Metric): 98 99 class _Wrapper(PythonMetricImplementation): 100 def calculate(self, context, data: InputData): 101 return self.metric.calculate(data) 102 103 return _Wrapper(self, metric) 104 return impl 105 106 def form_datasets( 107 self, 108 data: Optional[InputData], 109 features: List[GeneratedFeatures], 110 data_definition: DataDefinition, 111 ) -> EngineDatasets[pd.DataFrame]: 112 if data is None: 113 return EngineDatasets(current=None, reference=None) 114 rename = {column.name: column.display_name for feature in features for column in feature.list_columns()} 115 116 current = data.current_data 117 if data.current_additional_features is not None: 118 current = data.current_data.join(data.current_additional_features) 119 120 current = current.rename(columns=rename) 121 reference = data.reference_data 122 if data.reference_data is not None and data.reference_additional_features is not None: 123 reference = data.reference_data.join(data.reference_additional_features) 124 125 if reference is not None: 126 reference = reference.rename(columns=rename) 127 128 return EngineDatasets(reference=reference, current=current) 129 130 131 class PythonMetricImplementation(Generic[TMetric], MetricImplementation): 132 def __init__(self, engine: PythonEngine, metric: TMetric): 133 self.engine = engine 134 self.metric = metric 135 136 @abc.abstractmethod 137 def calculate(self, context, data: InputData): 138 raise NotImplementedError 139 140 @classmethod 141 def supported_engines(cls): 142 return (PythonEngine,)