testtrainer.py
1 """ 2 Trainer module tests 3 """ 4 5 import os 6 import unittest 7 import tempfile 8 9 from unittest.mock import patch 10 11 import torch 12 13 from transformers import AutoTokenizer, AutoModelForSequenceClassification 14 15 from txtai.data import Data 16 from txtai.pipeline import HFTrainer, Labels, Questions, Sequences 17 18 19 class TestTrainer(unittest.TestCase): 20 """ 21 Trainer tests. 22 """ 23 24 @classmethod 25 def setUpClass(cls): 26 """ 27 Create default datasets. 28 """ 29 30 cls.data = [{"text": "Dogs", "label": 0}, {"text": "dog", "label": 0}, {"text": "Cats", "label": 1}, {"text": "cat", "label": 1}] * 100 31 32 def testBasic(self): 33 """ 34 Test training a model with basic parameters 35 """ 36 37 trainer = HFTrainer() 38 model, tokenizer = trainer("google/bert_uncased_L-2_H-128_A-2", self.data) 39 40 labels = Labels((model, tokenizer), dynamic=False) 41 self.assertEqual(labels("cat")[0][0], 1) 42 43 def testCLM(self): 44 """ 45 Test training a model with causal language modeling 46 """ 47 48 trainer = HFTrainer() 49 50 # Test default parameters 51 model, _ = trainer("hf-internal-testing/tiny-random-gpt2", self.data, maxlength=16, task="language-generation") 52 self.assertIsNotNone(model) 53 54 # Test pack merging 55 model, _ = trainer("hf-internal-testing/tiny-random-gpt2", self.data, maxlength=16, task="language-generation", merge="pack") 56 self.assertIsNotNone(model) 57 58 # Test no merging 59 model, _ = trainer("hf-internal-testing/tiny-random-gpt2", self.data, maxlength=16, task="language-generation", merge=None) 60 self.assertIsNotNone(model) 61 62 def testCustom(self): 63 """ 64 Test training a model with custom parameters 65 """ 66 67 # pylint: disable=E1120 68 model = AutoModelForSequenceClassification.from_pretrained("google/bert_uncased_L-2_H-128_A-2") 69 tokenizer = AutoTokenizer.from_pretrained("google/bert_uncased_L-2_H-128_A-2") 70 71 trainer = HFTrainer() 72 model, tokenizer = trainer( 73 (model, tokenizer), 74 self.data, 75 self.data, 76 columns=("text", "label"), 77 do_eval=True, 78 output_dir=os.path.join(tempfile.gettempdir(), "trainer"), 79 ) 80 81 labels = Labels((model, tokenizer), dynamic=False) 82 self.assertEqual(labels("cat")[0][0], 1) 83 84 def testDataFrame(self): 85 """ 86 Test training a model with a mock pandas DataFrame 87 """ 88 89 class TestDataFrame: 90 """ 91 Test DataFrame 92 """ 93 94 def __init__(self, data): 95 # Get list of columns 96 self.columns = list(data[0].keys()) 97 98 # Build columnar data view 99 self.data = {} 100 for column in self.columns: 101 self.data[column] = Values([row[column] for row in data]) 102 103 def __getitem__(self, column): 104 return self.data[column] 105 106 class Values: 107 """ 108 Test values list 109 """ 110 111 def __init__(self, values): 112 self.values = list(values) 113 114 def __getitem__(self, index): 115 return self.values[index] 116 117 def unique(self): 118 """ 119 Returns a list of unique values. 120 121 Returns: 122 unique list of values 123 """ 124 125 return set(self.values) 126 127 # Mock DataFrame 128 df = TestDataFrame(self.data) 129 130 trainer = HFTrainer() 131 model, tokenizer = trainer("google/bert_uncased_L-2_H-128_A-2", df) 132 133 labels = Labels((model, tokenizer), dynamic=False) 134 self.assertEqual(labels("cat")[0][0], 1) 135 136 def testDataset(self): 137 """ 138 Test training a model with a mock Hugging Face Dataset 139 """ 140 141 class TestDataset(torch.utils.data.Dataset): 142 """ 143 Test Dataset 144 """ 145 146 def __init__(self, data): 147 self.data = data 148 self.unique = lambda _: [0, 1] 149 150 def __len__(self): 151 return len(self.data) 152 153 def __getitem__(self, index): 154 return self.data[index] 155 156 def column_names(self): 157 """ 158 Returns column names for this dataset 159 160 Returns: 161 list of columns 162 """ 163 164 return ["text", "label"] 165 166 # pylint: disable=W0613 167 def map(self, fn, batched, num_proc, remove_columns): 168 """ 169 Map each dataset row using fn. 170 171 Args: 172 fn: function 173 batched: batch records 174 175 Returns: 176 updated Dataset 177 """ 178 179 self.data = [fn(x) for x in self.data] 180 return self 181 182 ds = TestDataset(self.data) 183 184 trainer = HFTrainer() 185 model, tokenizer = trainer("google/bert_uncased_L-2_H-128_A-2", ds) 186 187 labels = Labels((model, tokenizer), dynamic=False) 188 self.assertEqual(labels("cat")[0][0], 1) 189 190 def testEmpty(self): 191 """ 192 Test an empty training data object 193 """ 194 195 self.assertIsNone(Data(None, None, None).process(None)) 196 197 def testMLM(self): 198 """ 199 Test training a model with masked language modeling. 200 """ 201 202 trainer = HFTrainer() 203 model, _ = trainer("hf-internal-testing/tiny-random-bert", self.data, task="language-modeling") 204 205 # Test model completed successfully 206 self.assertIsNotNone(model) 207 208 def testMultiLabel(self): 209 """ 210 Test training model with labels provided as a list 211 """ 212 213 data = [] 214 for x in self.data: 215 data.append({"text": x["text"], "label": [0.0, 1.0] if x["label"] else [1.0, 0.0]}) 216 217 trainer = HFTrainer() 218 model, tokenizer = trainer("google/bert_uncased_L-2_H-128_A-2", data) 219 220 labels = Labels((model, tokenizer), dynamic=False) 221 self.assertEqual(labels("cat")[0][0], 1) 222 223 @patch("importlib.util.find_spec") 224 def testPEFT(self, spec): 225 """ 226 Test training a model with causal language modeling and PEFT 227 """ 228 229 # Disable triton 230 spec.return_value = None 231 232 trainer = HFTrainer() 233 model, _ = trainer( 234 "hf-internal-testing/tiny-random-gpt2", 235 self.data, 236 maxlength=16, 237 task="language-generation", 238 quantize=True, 239 lora=True, 240 ) 241 242 # Test model completed successfully 243 self.assertIsNotNone(model) 244 245 def testQA(self): 246 """ 247 Test training a QA model 248 """ 249 250 # Training data 251 data = [ 252 {"question": "What ingredient?", "context": "1 can whole tomatoes", "answers": "tomatoes"}, 253 {"question": "What ingredient?", "context": "Crush 1 tomato", "answers": "tomato"}, 254 {"question": "What ingredient?", "context": "1 yellow onion", "answers": "onion"}, 255 {"question": "What ingredient?", "context": "Unwrap 2 red onions", "answers": "onions"}, 256 {"question": "What ingredient?", "context": "1 red pepper", "answers": "pepper"}, 257 {"question": "What ingredient?", "context": "Clean 3 red peppers", "answers": "peppers"}, 258 {"question": "What ingredient?", "context": "1 clove garlic", "answers": "garlic"}, 259 {"question": "What ingredient?", "context": "Unwrap 3 cloves of garlic", "answers": "garlic"}, 260 {"question": "What ingredient?", "context": "3 pieces of ginger", "answers": "ginger"}, 261 {"question": "What ingredient?", "context": "Peel 1 orange", "answers": "orange"}, 262 {"question": "What ingredient?", "context": "1/2 lb beef", "answers": "beef"}, 263 {"question": "What ingredient?", "context": "Roast 3 lbs of beef", "answers": "beef"}, 264 {"question": "What ingredient?", "context": "1 pack of chicken", "answers": "chicken"}, 265 {"question": "What ingredient?", "context": "Forest through the trees", "answers": None}, 266 ] 267 268 trainer = HFTrainer() 269 model, tokenizer = trainer("google/bert_uncased_L-2_H-128_A-2", data, data, task="question-answering", num_train_epochs=40) 270 271 questions = Questions((model, tokenizer), gpu=True) 272 self.assertEqual(questions(["What ingredient?"], ["Peel 1 onion"])[0], "onion") 273 274 def testRegression(self): 275 """ 276 Test training a model with a regression (continuous) output 277 """ 278 279 data = [] 280 for x in self.data: 281 data.append({"text": x["text"], "label": x["label"] + 0.1}) 282 283 trainer = HFTrainer() 284 model, tokenizer = trainer("google/bert_uncased_L-2_H-128_A-2", data) 285 286 labels = Labels((model, tokenizer), dynamic=False) 287 288 # Regression tasks return a single entry with the regression output 289 self.assertGreater(labels("cat")[0][1], 0.5) 290 291 def testRTD(self): 292 """ 293 Test training a language model with replaced token detection 294 """ 295 296 # Save directory 297 output = os.path.join(tempfile.gettempdir(), "trainer.rtd") 298 299 trainer = HFTrainer() 300 model, _ = trainer("hf-internal-testing/tiny-random-electra", self.data, task="token-detection", save_safetensors=False, output_dir=output) 301 302 # Test model completed successfully 303 self.assertIsNotNone(model) 304 305 # Test output directories exist 306 self.assertTrue(os.path.exists(os.path.join(output, "generator"))) 307 self.assertTrue(os.path.exists(os.path.join(output, "discriminator"))) 308 309 def testSeqSeq(self): 310 """ 311 Test training a sequence-sequence model 312 """ 313 314 data = [ 315 {"source": "Running again", "target": "Sleeping again"}, 316 {"source": "Run", "target": "Sleep"}, 317 {"source": "running", "target": "sleeping"}, 318 ] 319 320 trainer = HFTrainer() 321 model, tokenizer = trainer("t5-small", data, task="sequence-sequence", prefix="translate Run to Sleep: ", learning_rate=1e-3) 322 323 # Run run-sleep translation 324 sequences = Sequences((model, tokenizer)) 325 result = sequences("translate Run to Sleep: run") 326 self.assertEqual(result.lower(), "sleep")