/ src / python / txtai / vectors / base.py
base.py
  1  """
  2  Vectors module
  3  """
  4  
  5  import json
  6  import os
  7  import tempfile
  8  import uuid
  9  
 10  import numpy as np
 11  
 12  from ..pipeline import Tokenizer
 13  
 14  from .recovery import Recovery
 15  
 16  
 17  class Vectors:
 18      """
 19      Base class for vector models. Vector models transform input content into numeric vectors.
 20      """
 21  
 22      def __init__(self, config, scoring, models):
 23          """
 24          Creates a new vectors instance.
 25  
 26          Args:
 27              config: vector configuration
 28              scoring: optional scoring instance for term weighting
 29              models: models cache
 30          """
 31  
 32          # Store parameters
 33          self.config = config
 34          self.scoring = scoring
 35          self.models = models
 36  
 37          if config:
 38              # Detect if this is an initialized configuration
 39              self.initialized = "dimensions" in config
 40  
 41              # Enables optional string tokenization
 42              self.tokenize = config.get("tokenize")
 43  
 44              # Load model
 45              self.model = self.load(config.get("path"))
 46  
 47              # Encode batch size - controls underlying model batch size when encoding vectors
 48              self.encodebatch = config.get("encodebatch", 32)
 49  
 50              # Embeddings instructions
 51              self.instructions = config.get("instructions")
 52  
 53              # Truncate embeddings to this dimensionality
 54              self.dimensionality = config.get("dimensionality")
 55  
 56              # Scalar quantization - supports 1-bit through 8-bit quantization
 57              quantize = config.get("quantize")
 58              self.qbits = max(min(quantize, 8), 1) if isinstance(quantize, int) and not isinstance(quantize, bool) else None
 59  
 60      def loadmodel(self, path):
 61          """
 62          Loads vector model at path.
 63  
 64          Args:
 65              path: path to vector model
 66  
 67          Returns:
 68              vector model
 69          """
 70  
 71          raise NotImplementedError
 72  
 73      def encode(self, data, category=None):
 74          """
 75          Encodes a batch of data using vector model.
 76  
 77          Args:
 78              data: batch of data
 79              category: optional category for instruction-based embeddings
 80  
 81          Return:
 82              transformed data
 83          """
 84  
 85          raise NotImplementedError
 86  
 87      def load(self, path):
 88          """
 89          Loads a model using the current configuration. This method will return previously cached models
 90          if available.
 91  
 92          Returns:
 93              model
 94          """
 95  
 96          # Check if model is cached
 97          if self.models and path in self.models:
 98              return self.models[path]
 99  
100          # Create new model
101          model = self.loadmodel(path)
102  
103          # Store model in cache
104          if self.models is not None and path:
105              self.models[path] = model
106  
107          return model
108  
109      def index(self, documents, batchsize=500, checkpoint=None):
110          """
111          Converts a list of documents to a temporary file with embeddings arrays. Returns a tuple of document ids,
112          number of dimensions and temporary file with embeddings.
113  
114          Args:
115              documents: list of (id, data, tags)
116              batchsize: index batch size
117              checkpoint: optional checkpoint directory, enables indexing restart
118  
119          Returns:
120              (ids, dimensions, batches, stream)
121          """
122  
123          ids, dimensions, batches, stream = [], None, 0, None
124  
125          # Generate recovery config if checkpoint is set
126          vectorsid = self.vectorsid() if checkpoint else None
127          recovery = Recovery(checkpoint, vectorsid, self.loadembeddings) if checkpoint else None
128  
129          # Convert all documents to embedding arrays, stream embeddings to disk to control memory usage
130          with self.spool(checkpoint, vectorsid) as output:
131              stream = output.name
132              batch = []
133              for document in documents:
134                  batch.append(document)
135  
136                  if len(batch) == batchsize:
137                      # Convert batch to embeddings
138                      uids, dimensions = self.batch(batch, output, recovery)
139                      ids.extend(uids)
140                      batches += 1
141  
142                      batch = []
143  
144              # Final batch
145              if batch:
146                  uids, dimensions = self.batch(batch, output, recovery)
147                  ids.extend(uids)
148                  batches += 1
149  
150          return (ids, dimensions, batches, stream)
151  
152      def vectors(self, documents, batchsize=500, checkpoint=None, buffer=None, dtype=None):
153          """
154          Bulk encodes documents into vectors using index(). Return the data as a mmap-ed array.
155  
156          Args:
157              documents: list of (id, data, tags)
158              batchsize: index batch size
159              checkpoint: optional checkpoint directory, enables indexing restart
160              buffer: file path used for memmap buffer
161              dtype: dtype for buffer
162  
163          Returns:
164              (ids, dimensions, embeddings)
165          """
166  
167          # Consume stream and transform documents to vectors
168          ids, dimensions, batches, stream = self.index(documents, batchsize, checkpoint)
169  
170          # Check that embeddings are available and load as a memmap
171          embeddings = None
172          if ids:
173              # Write batches
174              embeddings = np.memmap(buffer, dtype=dtype, shape=(len(ids), dimensions), mode="w+")
175              with open(stream, "rb") as queue:
176                  x = 0
177                  for _ in range(batches):
178                      batch = self.loadembeddings(queue)
179                      embeddings[x : x + batch.shape[0]] = batch
180                      x += batch.shape[0]
181  
182          # Remove temporary file (if checkpointing is disabled)
183          if not checkpoint:
184              os.remove(stream)
185  
186          return (ids, dimensions, embeddings)
187  
188      def close(self):
189          """
190          Closes this vectors instance.
191          """
192  
193          self.model = None
194  
195      def transform(self, document):
196          """
197          Transforms document into an embeddings vector.
198  
199          Args:
200              document: (id, data, tags)
201  
202          Returns:
203              embeddings vector
204          """
205  
206          # Prepare input document for vectors model and build embeddings
207          return self.batchtransform([document])[0]
208  
209      def batchtransform(self, documents, category=None):
210          """
211          Transforms batch of documents into embeddings vectors.
212  
213          Args:
214              documents: list of documents used to build embeddings
215              category: category for instruction-based embeddings
216  
217          Returns:
218              embeddings vectors
219          """
220  
221          # Prepare input documents for vectors model
222          documents = [self.prepare(data, category) for _, data, _ in documents]
223  
224          # Skip encoding data if it's already an array
225          if documents and isinstance(documents[0], np.ndarray):
226              return np.array(documents, dtype=np.float32)
227  
228          return self.vectorize(documents, category)
229  
230      def dot(self, queries, data):
231          """
232          Calculates the dot product similarity between queries and documents. This method
233          assumes each of the inputs are normalized.
234  
235          Args:
236              queries: queries
237              data: search data
238  
239          Returns:
240              dot product scores
241          """
242  
243          return np.dot(queries, data.T).tolist()
244  
245      def vectorsid(self):
246          """
247          Generates vectors uid for this vectors instance.
248  
249          Returns:
250              vectors uid
251          """
252  
253          # Select config options that determine uniqueness
254          select = ["path", "method", "tokenizer", "maxlength", "tokenize", "instructions", "dimensionality", "quantize"]
255          config = {k: v for k, v in self.config.items() if k in select}
256          config.update(self.config.get("vectors", {}))
257  
258          # Generate a deterministic UUID
259          return str(uuid.uuid5(uuid.NAMESPACE_DNS, json.dumps(config, sort_keys=True)))
260  
261      def spool(self, checkpoint, vectorsid):
262          """
263          Opens a spool file for queuing generated vectors.
264  
265          Args:
266              checkpoint: optional checkpoint directory, enables indexing restart
267              vectorsid: vectors uid for current configuration
268  
269          Returns:
270              vectors spool file
271          """
272  
273          # Spool to vectors checkpoint file
274          if checkpoint:
275              os.makedirs(checkpoint, exist_ok=True)
276              return open(f"{checkpoint}/{vectorsid}", "wb")
277  
278          # Spool to temporary file
279          return tempfile.NamedTemporaryFile(mode="wb", suffix=".npy", delete=False)
280  
281      def batch(self, documents, output, recovery):
282          """
283          Builds a batch of embeddings.
284  
285          Args:
286              documents: list of documents used to build embeddings
287              output: output temp file to store embeddings
288              recovery: optional recovery instance
289  
290          Returns:
291              (ids, dimensions) list of ids and number of dimensions in embeddings
292          """
293  
294          # Extract ids and prepare input documents for vectors model
295          ids = [uid for uid, _, _ in documents]
296          documents = [self.prepare(data, "data") for _, data, _ in documents]
297          dimensions = None
298  
299          # Attempt to read embeddings from a recovery file
300          embeddings = recovery() if recovery else None
301          embeddings = self.vectorize(documents, "data") if embeddings is None else embeddings
302          if embeddings is not None:
303              dimensions = embeddings.shape[1]
304              self.saveembeddings(output, embeddings)
305  
306          return (ids, dimensions)
307  
308      def prepare(self, data, category=None):
309          """
310          Prepares input data for vector model.
311  
312          Args:
313              data: input data
314              category: category for instruction-based embeddings
315  
316          Returns:
317              data formatted for vector model
318          """
319  
320          # Prepares tokens for the model
321          data = self.tokens(data)
322  
323          # Default instruction category
324          category = category if category else "query"
325  
326          # Prepend instructions, if applicable
327          if self.instructions and category in self.instructions and isinstance(data, str):
328              # Prepend category instruction
329              data = f"{self.instructions[category]}{data}"
330  
331          return data
332  
333      def tokens(self, data):
334          """
335          Prepare data as tokens model can accept.
336  
337          Args:
338              data: input data
339  
340          Returns:
341              tokens formatted for model
342          """
343  
344          # Optional string tokenization
345          if self.tokenize and isinstance(data, str):
346              data = Tokenizer.tokenize(data)
347  
348          # Convert token list to string
349          if isinstance(data, list):
350              data = " ".join(data)
351  
352          return data
353  
354      def vectorize(self, data, category=None):
355          """
356          Runs data vectorization, which consists of the following steps.
357  
358            1. Encode data into vectors using underlying model
359            2. Truncate vectors, if necessary
360            3. Normalize vectors
361            4. Quantize vectors, if necessary
362  
363          Args:
364              data: input data
365              category: category for instruction-based embeddings
366  
367          Returns:
368              embeddings vectors
369          """
370  
371          # Default instruction category
372          category = category if category else "query"
373  
374          # Transform data into vectors
375          embeddings = self.encode(data, category)
376  
377          if embeddings is not None:
378              # Truncate embeddings, if necessary
379              if self.dimensionality and self.dimensionality < embeddings.shape[1]:
380                  embeddings = self.truncate(embeddings)
381  
382              # Normalize data
383              embeddings = self.normalize(embeddings)
384  
385              # Apply quantization, if necessary
386              if self.qbits:
387                  embeddings = self.quantize(embeddings)
388  
389          return embeddings
390  
391      def loadembeddings(self, f):
392          """
393          Loads embeddings from file.
394  
395          Args:
396              f: file to load from
397  
398          Returns:
399              embeddings
400          """
401  
402          return np.load(f, allow_pickle=False)
403  
404      def saveembeddings(self, f, embeddings):
405          """
406          Saves embeddings to output.
407  
408          Args:
409              f: output file
410              embeddings: embeddings to save
411          """
412  
413          np.save(f, embeddings, allow_pickle=False)
414  
415      def truncate(self, embeddings):
416          """
417          Truncates embeddings to the configured dimensionality.
418  
419          This is only useful for models trained to store more important information in
420          earlier dimensions such as Matryoshka Representation Learning (MRL).
421  
422          Args:
423              embeddings: input embeddings
424  
425          Returns:
426              truncated embeddings
427          """
428  
429          return embeddings[:, : self.dimensionality]
430  
431      def normalize(self, embeddings):
432          """
433          Normalizes embeddings using L2 normalization. Operation applied directly on array.
434  
435          Args:
436              embeddings: input embeddings
437  
438          Returns:
439              embeddings
440          """
441  
442          # Calculation is different for matrices vs vectors
443          if len(embeddings.shape) > 1:
444              embeddings /= np.linalg.norm(embeddings, axis=1)[:, np.newaxis]
445          else:
446              embeddings /= np.linalg.norm(embeddings)
447  
448          return embeddings
449  
450      def quantize(self, embeddings):
451          """
452          Quantizes embeddings using scalar quantization.
453  
454          Args:
455              embeddings: input embeddings
456  
457          Returns:
458              quantized embeddings
459          """
460  
461          # Scale factor is midpoint in range
462          factor = 2 ** (self.qbits - 1)
463  
464          # Quantize to uint8
465          scalars = embeddings * factor
466          scalars = scalars.clip(-factor, factor - 1) + factor
467          scalars = scalars.astype(np.uint8)
468  
469          # Transform uint8 to bits
470          bits = np.unpackbits(scalars.reshape(-1, 1), axis=1)
471  
472          # Remove unused bits (i.e. for 3-bit quantization, the leading 5 bits are removed)
473          bits = bits[:, -self.qbits :]
474  
475          # Reshape using original data dimensions and pack bits into uint8 array
476          return np.packbits(bits.reshape(embeddings.shape[0], embeddings.shape[1] * self.qbits), axis=1)