deep_learning.py
1 # # MLflow 3 Deep Learning Example 2 # In this example, we will first run a model training job, which is tracked as an MLflow Run. 3 # Every 10 epochs, we will store model checkpoints, which are tracked as MLflow Logged Models. 4 # We will then select the best checkpoint for production deployment. 5 import pandas as pd 6 import torch 7 from sklearn.datasets import load_iris 8 from sklearn.model_selection import train_test_split 9 from torch import nn 10 11 import mlflow 12 import mlflow.pytorch 13 from mlflow.entities import Dataset 14 15 16 # Helper function to prepare data 17 def prepare_data(df): 18 X = torch.tensor(df.iloc[:, :-1].values, dtype=torch.float32) 19 y = torch.tensor(df.iloc[:, -1].values, dtype=torch.long) 20 return X, y 21 22 23 # Helper function to compute accuracy 24 def compute_accuracy(model, X, y): 25 with torch.no_grad(): 26 outputs = model(X) 27 _, predicted = torch.max(outputs, 1) 28 accuracy = (predicted == y).sum().item() / y.size(0) 29 return accuracy 30 31 32 # Define a basic PyTorch classifier 33 class IrisClassifier(nn.Module): 34 def __init__(self, input_size, hidden_size, output_size): 35 super().__init__() 36 self.fc1 = nn.Linear(input_size, hidden_size) 37 self.relu = nn.ReLU() 38 self.fc2 = nn.Linear(hidden_size, output_size) 39 40 def forward(self, x): 41 x = self.fc1(x) 42 x = self.relu(x) 43 x = self.fc2(x) 44 return x 45 46 47 # Load Iris dataset and prepare the DataFrame 48 iris = load_iris() 49 iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names) 50 iris_df["target"] = iris.target 51 52 # Split into training and testing datasets 53 train_df, test_df = train_test_split(iris_df, test_size=0.2, random_state=42) 54 55 # Prepare training data 56 train_dataset = mlflow.data.from_pandas(train_df, name="train") 57 X_train, y_train = prepare_data(train_dataset.df) 58 59 # Define the PyTorch model and move it to the device 60 input_size = X_train.shape[1] 61 hidden_size = 16 62 output_size = len(iris.target_names) 63 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 64 scripted_model = IrisClassifier(input_size, hidden_size, output_size).to(device) 65 scripted_model = torch.jit.script(scripted_model) 66 67 # Start a run to represent the training job 68 with mlflow.start_run(): 69 # Load the training dataset with MLflow. We will link training metrics to this dataset. 70 train_dataset: Dataset = mlflow.data.from_pandas(train_df, name="train") 71 X_train, y_train = prepare_data(train_dataset.df) 72 73 criterion = nn.CrossEntropyLoss() 74 optimizer = torch.optim.Adam(scripted_model.parameters(), lr=0.01) 75 76 for epoch in range(51): 77 X_train = X_train.to(device) 78 y_train = y_train.to(device) 79 out = scripted_model(X_train) 80 loss = criterion(out, y_train) 81 optimizer.zero_grad() 82 loss.backward() 83 optimizer.step() 84 85 # Log a checkpoint with metrics every 10 epochs 86 if epoch % 10 == 0: 87 # Each newly created LoggedModel checkpoint is linked with its 88 # name, params, and step 89 model_info = mlflow.pytorch.log_model( 90 pytorch_model=scripted_model, 91 name=f"torch-iris-{epoch}", 92 params={ 93 "n_layers": 3, 94 "activation": "ReLU", 95 "criterion": "CrossEntropyLoss", 96 "optimizer": "Adam", 97 }, 98 step=epoch, 99 input_example=X_train.numpy(), 100 ) 101 # Log metric on training dataset at step and link to LoggedModel 102 mlflow.log_metric( 103 key="accuracy", 104 value=compute_accuracy(scripted_model, X_train, y_train), 105 step=epoch, 106 model_id=model_info.model_id, 107 dataset=train_dataset, 108 ) 109 110 # This example produced one MLflow Run (training_run) and 6 MLflow Logged Models, 111 # one for each checkpoint (at steps 0, 10, …, 50). Using MLflow's UI or search API, 112 # we can get the checkpoints and rank them by their accuracy. 113 ranked_checkpoints = mlflow.search_logged_models( 114 output_format="list", order_by=[{"field_name": "metrics.accuracy", "ascending": False}] 115 ) 116 117 best_checkpoint: mlflow.entities.LoggedModel = ranked_checkpoints[0] 118 print(best_checkpoint.metrics[0]) 119 print(best_checkpoint) 120 121 worst_checkpoint: mlflow.entities.LoggedModel = ranked_checkpoints[-1] 122 print(worst_checkpoint.metrics) 123 124 # Once the best checkpoint is selected, that model can be registered to the model registry. 125 mlflow.register_model(f"models:/{best_checkpoint.model_id}", name="my_dl_model")