train.py
1 # tensorflow 2.x core api 2 import tensorflow as tf 3 from sklearn.datasets import load_diabetes 4 5 import mlflow 6 from mlflow.models import infer_signature 7 8 9 class Normalize(tf.Module): 10 """Data Normalization class""" 11 12 def __init__(self, x): 13 # Initialize the mean and standard deviation for normalization 14 self.mean = tf.math.reduce_mean(x, axis=0) 15 self.std = tf.math.reduce_std(x, axis=0) 16 17 def norm(self, x): 18 return (x - self.mean) / self.std 19 20 def unnorm(self, x): 21 return (x * self.std) + self.mean 22 23 24 class LinearRegression(tf.Module): 25 """Linear Regression model class""" 26 27 def __init__(self): 28 self.built = False 29 30 @tf.function 31 def __call__(self, x): 32 # Initialize the model parameters on the first call 33 if not self.built: 34 # Randomly generate the weight vector and bias term 35 rand_w = tf.random.uniform(shape=[x.shape[-1], 1]) 36 rand_b = tf.random.uniform(shape=[]) 37 self.w = tf.Variable(rand_w) 38 self.b = tf.Variable(rand_b) 39 self.built = True 40 y = tf.add(tf.matmul(x, self.w), self.b) 41 return tf.squeeze(y, axis=1) 42 43 44 class ExportModule(tf.Module): 45 """Exporting TF model""" 46 47 def __init__(self, model, norm_x, norm_y): 48 # Initialize pre and postprocessing functions 49 self.model = model 50 self.norm_x = norm_x 51 self.norm_y = norm_y 52 53 @tf.function(input_signature=[tf.TensorSpec(shape=[None, None], dtype=tf.float32)]) 54 def __call__(self, x): 55 # Run the ExportModule for new data points 56 x = self.norm_x.norm(x) 57 y = self.model(x) 58 y = self.norm_y.unnorm(y) 59 return y 60 61 62 def mse_loss(y_pred, y): 63 """Calculating Mean Square Error Loss function""" 64 return tf.reduce_mean(tf.square(y_pred - y)) 65 66 67 if __name__ == "__main__": 68 # Set a random seed for reproducible results 69 tf.random.set_seed(42) 70 71 # Load dataset 72 dataset = load_diabetes(as_frame=True)["frame"] 73 # Drop missing values 74 dataset = dataset.dropna() 75 # using only 1500 76 dataset = dataset[:1500] 77 dataset_tf = tf.convert_to_tensor(dataset, dtype=tf.float32) 78 79 # Split dataset into train and test 80 dataset_shuffled = tf.random.shuffle(dataset_tf, seed=42) 81 train_data = dataset_shuffled[100:] 82 test_data = dataset_shuffled[:100] 83 x_train = train_data[:, :-1] 84 y_train = train_data[:, -1] 85 x_test = test_data[:, :-1] 86 y_test = test_data[:, -1] 87 # Data normalization 88 norm_x = Normalize(x_train) 89 norm_y = Normalize(y_train) 90 x_train_norm = norm_x.norm(x_train) 91 y_train_norm = norm_y.norm(y_train) 92 x_test_norm = norm_x.norm(x_test) 93 y_test_norm = norm_y.norm(y_test) 94 95 with mlflow.start_run(): 96 # Initialize linear regression model 97 lin_reg = LinearRegression() 98 99 # Use mini batches for memory efficiency and faster convergence 100 batch_size = 32 101 train_dataset = tf.data.Dataset.from_tensor_slices((x_train_norm, y_train_norm)) 102 train_dataset = train_dataset.shuffle(buffer_size=x_train.shape[0]).batch(batch_size) 103 test_dataset = tf.data.Dataset.from_tensor_slices((x_test_norm, y_test_norm)) 104 test_dataset = test_dataset.shuffle(buffer_size=x_test.shape[0]).batch(batch_size) 105 106 # Set training parameters 107 epochs = 100 108 learning_rate = 0.01 109 train_losses = [] 110 test_losses = [] 111 112 # Format training loop 113 for epoch in range(epochs): 114 batch_losses_train = [] 115 batch_losses_test = [] 116 117 # Iterate through the training data 118 for x_batch, y_batch in train_dataset: 119 with tf.GradientTape() as tape: 120 y_pred_batch = lin_reg(x_batch) 121 batch_loss = mse_loss(y_pred_batch, y_batch) 122 # Update parameters with respect to the gradient calculations 123 grads = tape.gradient(batch_loss, lin_reg.variables) 124 for g, v in zip(grads, lin_reg.variables): 125 v.assign_sub(learning_rate * g) 126 # Keep track of batch-level training performance 127 batch_losses_train.append(batch_loss) 128 129 # Iterate through the testing data 130 for x_batch, y_batch in test_dataset: 131 y_pred_batch = lin_reg(x_batch) 132 batch_loss = mse_loss(y_pred_batch, y_batch) 133 # Keep track of batch-level testing performance 134 batch_losses_test.append(batch_loss) 135 136 # Keep track of epoch-level model performance 137 train_loss = tf.reduce_mean(batch_losses_train) 138 test_loss = tf.reduce_mean(batch_losses_test) 139 train_losses.append(train_loss) 140 test_losses.append(test_loss) 141 if epoch % 10 == 0: 142 mlflow.log_metric(key="train_losses", value=train_loss, step=epoch) 143 mlflow.log_metric(key="test_losses", value=test_loss, step=epoch) 144 print(f"Mean squared error for step {epoch}: {train_loss.numpy():0.3f}") 145 146 # Log the parameters 147 mlflow.log_params({ 148 "epochs": epochs, 149 "learning_rate": learning_rate, 150 "batch_size": batch_size, 151 }) 152 # Log the final metrics 153 mlflow.log_metrics({ 154 "final_train_loss": train_loss.numpy(), 155 "final_test_loss": test_loss.numpy(), 156 }) 157 print(f"\nFinal train loss: {train_loss:0.3f}") 158 print(f"Final test loss: {test_loss:0.3f}") 159 160 # Export the tensorflow model 161 lin_reg_export = ExportModule(model=lin_reg, norm_x=norm_x, norm_y=norm_y) 162 163 # Infer model signature 164 predictions = lin_reg_export(x_test) 165 signature = infer_signature(x_test.numpy(), predictions.numpy()) 166 167 mlflow.tensorflow.log_model(lin_reg_export, name="model", signature=signature)