/ examples / tensorflow / train.py
train.py
  1  # tensorflow 2.x core api
  2  import tensorflow as tf
  3  from sklearn.datasets import load_diabetes
  4  
  5  import mlflow
  6  from mlflow.models import infer_signature
  7  
  8  
  9  class Normalize(tf.Module):
 10      """Data Normalization class"""
 11  
 12      def __init__(self, x):
 13          # Initialize the mean and standard deviation for normalization
 14          self.mean = tf.math.reduce_mean(x, axis=0)
 15          self.std = tf.math.reduce_std(x, axis=0)
 16  
 17      def norm(self, x):
 18          return (x - self.mean) / self.std
 19  
 20      def unnorm(self, x):
 21          return (x * self.std) + self.mean
 22  
 23  
 24  class LinearRegression(tf.Module):
 25      """Linear Regression model class"""
 26  
 27      def __init__(self):
 28          self.built = False
 29  
 30      @tf.function
 31      def __call__(self, x):
 32          # Initialize the model parameters on the first call
 33          if not self.built:
 34              # Randomly generate the weight vector and bias term
 35              rand_w = tf.random.uniform(shape=[x.shape[-1], 1])
 36              rand_b = tf.random.uniform(shape=[])
 37              self.w = tf.Variable(rand_w)
 38              self.b = tf.Variable(rand_b)
 39              self.built = True
 40          y = tf.add(tf.matmul(x, self.w), self.b)
 41          return tf.squeeze(y, axis=1)
 42  
 43  
 44  class ExportModule(tf.Module):
 45      """Exporting TF model"""
 46  
 47      def __init__(self, model, norm_x, norm_y):
 48          # Initialize pre and postprocessing functions
 49          self.model = model
 50          self.norm_x = norm_x
 51          self.norm_y = norm_y
 52  
 53      @tf.function(input_signature=[tf.TensorSpec(shape=[None, None], dtype=tf.float32)])
 54      def __call__(self, x):
 55          # Run the ExportModule for new data points
 56          x = self.norm_x.norm(x)
 57          y = self.model(x)
 58          y = self.norm_y.unnorm(y)
 59          return y
 60  
 61  
 62  def mse_loss(y_pred, y):
 63      """Calculating Mean Square Error Loss function"""
 64      return tf.reduce_mean(tf.square(y_pred - y))
 65  
 66  
 67  if __name__ == "__main__":
 68      # Set a random seed for reproducible results
 69      tf.random.set_seed(42)
 70  
 71      # Load dataset
 72      dataset = load_diabetes(as_frame=True)["frame"]
 73      # Drop missing values
 74      dataset = dataset.dropna()
 75      # using only 1500
 76      dataset = dataset[:1500]
 77      dataset_tf = tf.convert_to_tensor(dataset, dtype=tf.float32)
 78  
 79      # Split dataset into train and test
 80      dataset_shuffled = tf.random.shuffle(dataset_tf, seed=42)
 81      train_data = dataset_shuffled[100:]
 82      test_data = dataset_shuffled[:100]
 83      x_train = train_data[:, :-1]
 84      y_train = train_data[:, -1]
 85      x_test = test_data[:, :-1]
 86      y_test = test_data[:, -1]
 87      # Data normalization
 88      norm_x = Normalize(x_train)
 89      norm_y = Normalize(y_train)
 90      x_train_norm = norm_x.norm(x_train)
 91      y_train_norm = norm_y.norm(y_train)
 92      x_test_norm = norm_x.norm(x_test)
 93      y_test_norm = norm_y.norm(y_test)
 94  
 95      with mlflow.start_run():
 96          # Initialize linear regression model
 97          lin_reg = LinearRegression()
 98  
 99          # Use mini batches for memory efficiency and faster convergence
100          batch_size = 32
101          train_dataset = tf.data.Dataset.from_tensor_slices((x_train_norm, y_train_norm))
102          train_dataset = train_dataset.shuffle(buffer_size=x_train.shape[0]).batch(batch_size)
103          test_dataset = tf.data.Dataset.from_tensor_slices((x_test_norm, y_test_norm))
104          test_dataset = test_dataset.shuffle(buffer_size=x_test.shape[0]).batch(batch_size)
105  
106          # Set training parameters
107          epochs = 100
108          learning_rate = 0.01
109          train_losses = []
110          test_losses = []
111  
112          # Format training loop
113          for epoch in range(epochs):
114              batch_losses_train = []
115              batch_losses_test = []
116  
117              # Iterate through the training data
118              for x_batch, y_batch in train_dataset:
119                  with tf.GradientTape() as tape:
120                      y_pred_batch = lin_reg(x_batch)
121                      batch_loss = mse_loss(y_pred_batch, y_batch)
122                  # Update parameters with respect to the gradient calculations
123                  grads = tape.gradient(batch_loss, lin_reg.variables)
124                  for g, v in zip(grads, lin_reg.variables):
125                      v.assign_sub(learning_rate * g)
126                  # Keep track of batch-level training performance
127                  batch_losses_train.append(batch_loss)
128  
129              # Iterate through the testing data
130              for x_batch, y_batch in test_dataset:
131                  y_pred_batch = lin_reg(x_batch)
132                  batch_loss = mse_loss(y_pred_batch, y_batch)
133                  # Keep track of batch-level testing performance
134                  batch_losses_test.append(batch_loss)
135  
136              # Keep track of epoch-level model performance
137              train_loss = tf.reduce_mean(batch_losses_train)
138              test_loss = tf.reduce_mean(batch_losses_test)
139              train_losses.append(train_loss)
140              test_losses.append(test_loss)
141              if epoch % 10 == 0:
142                  mlflow.log_metric(key="train_losses", value=train_loss, step=epoch)
143                  mlflow.log_metric(key="test_losses", value=test_loss, step=epoch)
144                  print(f"Mean squared error for step {epoch}: {train_loss.numpy():0.3f}")
145  
146          # Log the parameters
147          mlflow.log_params({
148              "epochs": epochs,
149              "learning_rate": learning_rate,
150              "batch_size": batch_size,
151          })
152          # Log the final metrics
153          mlflow.log_metrics({
154              "final_train_loss": train_loss.numpy(),
155              "final_test_loss": test_loss.numpy(),
156          })
157          print(f"\nFinal train loss: {train_loss:0.3f}")
158          print(f"Final test loss: {test_loss:0.3f}")
159  
160          # Export the tensorflow model
161          lin_reg_export = ExportModule(model=lin_reg, norm_x=norm_x, norm_y=norm_y)
162  
163          # Infer model signature
164          predictions = lin_reg_export(x_test)
165          signature = infer_signature(x_test.numpy(), predictions.numpy())
166  
167          mlflow.tensorflow.log_model(lin_reg_export, name="model", signature=signature)