base.py
1 """ 2 Vectors module 3 """ 4 5 import json 6 import os 7 import tempfile 8 import uuid 9 10 import numpy as np 11 12 from ..pipeline import Tokenizer 13 14 from .recovery import Recovery 15 16 17 class Vectors: 18 """ 19 Base class for vector models. Vector models transform input content into numeric vectors. 20 """ 21 22 def __init__(self, config, scoring, models): 23 """ 24 Creates a new vectors instance. 25 26 Args: 27 config: vector configuration 28 scoring: optional scoring instance for term weighting 29 models: models cache 30 """ 31 32 # Store parameters 33 self.config = config 34 self.scoring = scoring 35 self.models = models 36 37 if config: 38 # Detect if this is an initialized configuration 39 self.initialized = "dimensions" in config 40 41 # Enables optional string tokenization 42 self.tokenize = config.get("tokenize") 43 44 # Load model 45 self.model = self.load(config.get("path")) 46 47 # Encode batch size - controls underlying model batch size when encoding vectors 48 self.encodebatch = config.get("encodebatch", 32) 49 50 # Embeddings instructions 51 self.instructions = config.get("instructions") 52 53 # Truncate embeddings to this dimensionality 54 self.dimensionality = config.get("dimensionality") 55 56 # Scalar quantization - supports 1-bit through 8-bit quantization 57 quantize = config.get("quantize") 58 self.qbits = max(min(quantize, 8), 1) if isinstance(quantize, int) and not isinstance(quantize, bool) else None 59 60 def loadmodel(self, path): 61 """ 62 Loads vector model at path. 63 64 Args: 65 path: path to vector model 66 67 Returns: 68 vector model 69 """ 70 71 raise NotImplementedError 72 73 def encode(self, data, category=None): 74 """ 75 Encodes a batch of data using vector model. 76 77 Args: 78 data: batch of data 79 category: optional category for instruction-based embeddings 80 81 Return: 82 transformed data 83 """ 84 85 raise NotImplementedError 86 87 def load(self, path): 88 """ 89 Loads a model using the current configuration. This method will return previously cached models 90 if available. 91 92 Returns: 93 model 94 """ 95 96 # Check if model is cached 97 if self.models and path in self.models: 98 return self.models[path] 99 100 # Create new model 101 model = self.loadmodel(path) 102 103 # Store model in cache 104 if self.models is not None and path: 105 self.models[path] = model 106 107 return model 108 109 def index(self, documents, batchsize=500, checkpoint=None): 110 """ 111 Converts a list of documents to a temporary file with embeddings arrays. Returns a tuple of document ids, 112 number of dimensions and temporary file with embeddings. 113 114 Args: 115 documents: list of (id, data, tags) 116 batchsize: index batch size 117 checkpoint: optional checkpoint directory, enables indexing restart 118 119 Returns: 120 (ids, dimensions, batches, stream) 121 """ 122 123 ids, dimensions, batches, stream = [], None, 0, None 124 125 # Generate recovery config if checkpoint is set 126 vectorsid = self.vectorsid() if checkpoint else None 127 recovery = Recovery(checkpoint, vectorsid, self.loadembeddings) if checkpoint else None 128 129 # Convert all documents to embedding arrays, stream embeddings to disk to control memory usage 130 with self.spool(checkpoint, vectorsid) as output: 131 stream = output.name 132 batch = [] 133 for document in documents: 134 batch.append(document) 135 136 if len(batch) == batchsize: 137 # Convert batch to embeddings 138 uids, dimensions = self.batch(batch, output, recovery) 139 ids.extend(uids) 140 batches += 1 141 142 batch = [] 143 144 # Final batch 145 if batch: 146 uids, dimensions = self.batch(batch, output, recovery) 147 ids.extend(uids) 148 batches += 1 149 150 return (ids, dimensions, batches, stream) 151 152 def vectors(self, documents, batchsize=500, checkpoint=None, buffer=None, dtype=None): 153 """ 154 Bulk encodes documents into vectors using index(). Return the data as a mmap-ed array. 155 156 Args: 157 documents: list of (id, data, tags) 158 batchsize: index batch size 159 checkpoint: optional checkpoint directory, enables indexing restart 160 buffer: file path used for memmap buffer 161 dtype: dtype for buffer 162 163 Returns: 164 (ids, dimensions, embeddings) 165 """ 166 167 # Consume stream and transform documents to vectors 168 ids, dimensions, batches, stream = self.index(documents, batchsize, checkpoint) 169 170 # Check that embeddings are available and load as a memmap 171 embeddings = None 172 if ids: 173 # Write batches 174 embeddings = np.memmap(buffer, dtype=dtype, shape=(len(ids), dimensions), mode="w+") 175 with open(stream, "rb") as queue: 176 x = 0 177 for _ in range(batches): 178 batch = self.loadembeddings(queue) 179 embeddings[x : x + batch.shape[0]] = batch 180 x += batch.shape[0] 181 182 # Remove temporary file (if checkpointing is disabled) 183 if not checkpoint: 184 os.remove(stream) 185 186 return (ids, dimensions, embeddings) 187 188 def close(self): 189 """ 190 Closes this vectors instance. 191 """ 192 193 self.model = None 194 195 def transform(self, document): 196 """ 197 Transforms document into an embeddings vector. 198 199 Args: 200 document: (id, data, tags) 201 202 Returns: 203 embeddings vector 204 """ 205 206 # Prepare input document for vectors model and build embeddings 207 return self.batchtransform([document])[0] 208 209 def batchtransform(self, documents, category=None): 210 """ 211 Transforms batch of documents into embeddings vectors. 212 213 Args: 214 documents: list of documents used to build embeddings 215 category: category for instruction-based embeddings 216 217 Returns: 218 embeddings vectors 219 """ 220 221 # Prepare input documents for vectors model 222 documents = [self.prepare(data, category) for _, data, _ in documents] 223 224 # Skip encoding data if it's already an array 225 if documents and isinstance(documents[0], np.ndarray): 226 return np.array(documents, dtype=np.float32) 227 228 return self.vectorize(documents, category) 229 230 def dot(self, queries, data): 231 """ 232 Calculates the dot product similarity between queries and documents. This method 233 assumes each of the inputs are normalized. 234 235 Args: 236 queries: queries 237 data: search data 238 239 Returns: 240 dot product scores 241 """ 242 243 return np.dot(queries, data.T).tolist() 244 245 def vectorsid(self): 246 """ 247 Generates vectors uid for this vectors instance. 248 249 Returns: 250 vectors uid 251 """ 252 253 # Select config options that determine uniqueness 254 select = ["path", "method", "tokenizer", "maxlength", "tokenize", "instructions", "dimensionality", "quantize"] 255 config = {k: v for k, v in self.config.items() if k in select} 256 config.update(self.config.get("vectors", {})) 257 258 # Generate a deterministic UUID 259 return str(uuid.uuid5(uuid.NAMESPACE_DNS, json.dumps(config, sort_keys=True))) 260 261 def spool(self, checkpoint, vectorsid): 262 """ 263 Opens a spool file for queuing generated vectors. 264 265 Args: 266 checkpoint: optional checkpoint directory, enables indexing restart 267 vectorsid: vectors uid for current configuration 268 269 Returns: 270 vectors spool file 271 """ 272 273 # Spool to vectors checkpoint file 274 if checkpoint: 275 os.makedirs(checkpoint, exist_ok=True) 276 return open(f"{checkpoint}/{vectorsid}", "wb") 277 278 # Spool to temporary file 279 return tempfile.NamedTemporaryFile(mode="wb", suffix=".npy", delete=False) 280 281 def batch(self, documents, output, recovery): 282 """ 283 Builds a batch of embeddings. 284 285 Args: 286 documents: list of documents used to build embeddings 287 output: output temp file to store embeddings 288 recovery: optional recovery instance 289 290 Returns: 291 (ids, dimensions) list of ids and number of dimensions in embeddings 292 """ 293 294 # Extract ids and prepare input documents for vectors model 295 ids = [uid for uid, _, _ in documents] 296 documents = [self.prepare(data, "data") for _, data, _ in documents] 297 dimensions = None 298 299 # Attempt to read embeddings from a recovery file 300 embeddings = recovery() if recovery else None 301 embeddings = self.vectorize(documents, "data") if embeddings is None else embeddings 302 if embeddings is not None: 303 dimensions = embeddings.shape[1] 304 self.saveembeddings(output, embeddings) 305 306 return (ids, dimensions) 307 308 def prepare(self, data, category=None): 309 """ 310 Prepares input data for vector model. 311 312 Args: 313 data: input data 314 category: category for instruction-based embeddings 315 316 Returns: 317 data formatted for vector model 318 """ 319 320 # Prepares tokens for the model 321 data = self.tokens(data) 322 323 # Default instruction category 324 category = category if category else "query" 325 326 # Prepend instructions, if applicable 327 if self.instructions and category in self.instructions and isinstance(data, str): 328 # Prepend category instruction 329 data = f"{self.instructions[category]}{data}" 330 331 return data 332 333 def tokens(self, data): 334 """ 335 Prepare data as tokens model can accept. 336 337 Args: 338 data: input data 339 340 Returns: 341 tokens formatted for model 342 """ 343 344 # Optional string tokenization 345 if self.tokenize and isinstance(data, str): 346 data = Tokenizer.tokenize(data) 347 348 # Convert token list to string 349 if isinstance(data, list): 350 data = " ".join(data) 351 352 return data 353 354 def vectorize(self, data, category=None): 355 """ 356 Runs data vectorization, which consists of the following steps. 357 358 1. Encode data into vectors using underlying model 359 2. Truncate vectors, if necessary 360 3. Normalize vectors 361 4. Quantize vectors, if necessary 362 363 Args: 364 data: input data 365 category: category for instruction-based embeddings 366 367 Returns: 368 embeddings vectors 369 """ 370 371 # Default instruction category 372 category = category if category else "query" 373 374 # Transform data into vectors 375 embeddings = self.encode(data, category) 376 377 if embeddings is not None: 378 # Truncate embeddings, if necessary 379 if self.dimensionality and self.dimensionality < embeddings.shape[1]: 380 embeddings = self.truncate(embeddings) 381 382 # Normalize data 383 embeddings = self.normalize(embeddings) 384 385 # Apply quantization, if necessary 386 if self.qbits: 387 embeddings = self.quantize(embeddings) 388 389 return embeddings 390 391 def loadembeddings(self, f): 392 """ 393 Loads embeddings from file. 394 395 Args: 396 f: file to load from 397 398 Returns: 399 embeddings 400 """ 401 402 return np.load(f, allow_pickle=False) 403 404 def saveembeddings(self, f, embeddings): 405 """ 406 Saves embeddings to output. 407 408 Args: 409 f: output file 410 embeddings: embeddings to save 411 """ 412 413 np.save(f, embeddings, allow_pickle=False) 414 415 def truncate(self, embeddings): 416 """ 417 Truncates embeddings to the configured dimensionality. 418 419 This is only useful for models trained to store more important information in 420 earlier dimensions such as Matryoshka Representation Learning (MRL). 421 422 Args: 423 embeddings: input embeddings 424 425 Returns: 426 truncated embeddings 427 """ 428 429 return embeddings[:, : self.dimensionality] 430 431 def normalize(self, embeddings): 432 """ 433 Normalizes embeddings using L2 normalization. Operation applied directly on array. 434 435 Args: 436 embeddings: input embeddings 437 438 Returns: 439 embeddings 440 """ 441 442 # Calculation is different for matrices vs vectors 443 if len(embeddings.shape) > 1: 444 embeddings /= np.linalg.norm(embeddings, axis=1)[:, np.newaxis] 445 else: 446 embeddings /= np.linalg.norm(embeddings) 447 448 return embeddings 449 450 def quantize(self, embeddings): 451 """ 452 Quantizes embeddings using scalar quantization. 453 454 Args: 455 embeddings: input embeddings 456 457 Returns: 458 quantized embeddings 459 """ 460 461 # Scale factor is midpoint in range 462 factor = 2 ** (self.qbits - 1) 463 464 # Quantize to uint8 465 scalars = embeddings * factor 466 scalars = scalars.clip(-factor, factor - 1) + factor 467 scalars = scalars.astype(np.uint8) 468 469 # Transform uint8 to bits 470 bits = np.unpackbits(scalars.reshape(-1, 1), axis=1) 471 472 # Remove unused bits (i.e. for 3-bit quantization, the leading 5 bits are removed) 473 bits = bits[:, -self.qbits :] 474 475 # Reshape using original data dimensions and pack bits into uint8 array 476 return np.packbits(bits.reshape(embeddings.shape[0], embeddings.shape[1] * self.qbits), axis=1)