/ src / evidently / legacy / calculation_engine / python_engine.py
python_engine.py
  1  import abc
  2  from typing import Dict
  3  from typing import Generic
  4  from typing import List
  5  from typing import Optional
  6  from typing import TypeVar
  7  
  8  import pandas as pd
  9  
 10  from evidently.legacy.base_metric import GenericInputData
 11  from evidently.legacy.base_metric import InputData
 12  from evidently.legacy.base_metric import Metric
 13  from evidently.legacy.calculation_engine.engine import Engine
 14  from evidently.legacy.calculation_engine.engine import EngineDatasets
 15  from evidently.legacy.calculation_engine.engine import TInputData
 16  from evidently.legacy.calculation_engine.metric_implementation import MetricImplementation
 17  from evidently.legacy.features.generated_features import FeatureResult
 18  from evidently.legacy.features.generated_features import GeneratedFeatures
 19  from evidently.legacy.options.base import Options
 20  from evidently.legacy.pipeline.column_mapping import ColumnMapping
 21  from evidently.legacy.utils.data_preprocessing import DataDefinition
 22  from evidently.legacy.utils.data_preprocessing import create_data_definition
 23  
 24  TMetric = TypeVar("TMetric", bound=Metric)
 25  
 26  
 27  class PythonEngine(Engine["PythonMetricImplementation", InputData, pd.DataFrame]):
 28      def convert_input_data(self, data: GenericInputData[pd.DataFrame]) -> InputData:
 29          if not isinstance(data.current_data, pd.DataFrame) or (
 30              data.reference_data is not None and not isinstance(data.reference_data, pd.DataFrame)
 31          ):
 32              raise ValueError("PandasEngine works only with pd.DataFrame input data")
 33          return InputData(
 34              data.reference_data,
 35              data.current_data,
 36              current_additional_features=None,
 37              reference_additional_features=None,
 38              column_mapping=data.column_mapping,
 39              data_definition=data.data_definition,
 40              additional_data=data.additional_data,
 41          )
 42  
 43      def get_data_definition(
 44          self,
 45          current_data,
 46          reference_data,
 47          column_mapping: ColumnMapping,
 48          categorical_features_cardinality: Optional[int] = None,
 49      ):
 50          if not isinstance(current_data, pd.DataFrame) or (
 51              reference_data is not None and not isinstance(reference_data, pd.DataFrame)
 52          ):
 53              raise ValueError("PandasEngine works only with pd.DataFrame input data")
 54          return create_data_definition(reference_data, current_data, column_mapping, categorical_features_cardinality)
 55  
 56      def calculate_additional_features(
 57          self, data: TInputData, features: List[GeneratedFeatures], options: Options
 58      ) -> Dict[GeneratedFeatures, FeatureResult[pd.DataFrame]]:
 59          result: Dict[GeneratedFeatures, FeatureResult[pd.DataFrame]] = {}
 60          for feature in features:
 61              current = feature.generate_features_renamed(data.current_data, data.data_definition, options)
 62              reference = (
 63                  feature.generate_features_renamed(data.reference_data, data.data_definition, options)
 64                  if data.reference_data is not None
 65                  else None
 66              )
 67  
 68              result[feature] = FeatureResult(current, reference)
 69          return result
 70  
 71      def merge_additional_features(
 72          self, features: Dict[GeneratedFeatures, FeatureResult[pd.DataFrame]]
 73      ) -> EngineDatasets[pd.DataFrame]:
 74          currents = []
 75          references = []
 76  
 77          for feature, result in features.items():
 78              currents.append(result.current)
 79              if result.reference is not None:
 80                  references.append(result.reference)
 81  
 82          if len(currents) == 0:
 83              current = None
 84          elif len(currents) == 1:
 85              current = currents[0]
 86          else:
 87              current = currents[0].join(currents[1:])  # type: ignore[arg-type]
 88  
 89          if len(references) == 0:
 90              return EngineDatasets(current=current, reference=None)
 91          if len(references) == 1:
 92              return EngineDatasets(current=current, reference=references[0])
 93          return EngineDatasets(current=current, reference=references[0].join(references[1:]))  # type: ignore[arg-type]
 94  
 95      def get_metric_implementation(self, metric):
 96          impl = super().get_metric_implementation(metric)
 97          if impl is None and isinstance(metric, Metric):
 98  
 99              class _Wrapper(PythonMetricImplementation):
100                  def calculate(self, context, data: InputData):
101                      return self.metric.calculate(data)
102  
103              return _Wrapper(self, metric)
104          return impl
105  
106      def form_datasets(
107          self,
108          data: Optional[InputData],
109          features: List[GeneratedFeatures],
110          data_definition: DataDefinition,
111      ) -> EngineDatasets[pd.DataFrame]:
112          if data is None:
113              return EngineDatasets(current=None, reference=None)
114          rename = {column.name: column.display_name for feature in features for column in feature.list_columns()}
115  
116          current = data.current_data
117          if data.current_additional_features is not None:
118              current = data.current_data.join(data.current_additional_features)
119  
120          current = current.rename(columns=rename)
121          reference = data.reference_data
122          if data.reference_data is not None and data.reference_additional_features is not None:
123              reference = data.reference_data.join(data.reference_additional_features)
124  
125          if reference is not None:
126              reference = reference.rename(columns=rename)
127  
128          return EngineDatasets(reference=reference, current=current)
129  
130  
131  class PythonMetricImplementation(Generic[TMetric], MetricImplementation):
132      def __init__(self, engine: PythonEngine, metric: TMetric):
133          self.engine = engine
134          self.metric = metric
135  
136      @abc.abstractmethod
137      def calculate(self, context, data: InputData):
138          raise NotImplementedError
139  
140      @classmethod
141      def supported_engines(cls):
142          return (PythonEngine,)