/ src / python / txtai / ann / dense / ggml.py
ggml.py
  1  """
  2  GGML module
  3  """
  4  
  5  import ctypes
  6  import os
  7  
  8  import numpy as np
  9  
 10  # Conditional import
 11  try:
 12      import ggml
 13      from ggml import utils
 14  
 15      LIBGGML = True
 16  except ImportError:
 17      LIBGGML = False
 18  
 19  from ..base import ANN
 20  
 21  
 22  class GGML(ANN):
 23      """
 24      Builds an ANN index backed by GGML.
 25      """
 26  
 27      def __init__(self, config):
 28          super().__init__(config)
 29  
 30          if not LIBGGML:
 31              raise ImportError('GGML is not available - install "ann" extra to enable')
 32  
 33      def load(self, path):
 34          # Create GGML Tensors
 35          self.backend = GGMLTensors(self.setting("gpu", True), self.setting("querysize", 64), self.setting("quantize"))
 36  
 37          # Load existing GGUF file
 38          self.backend.load(path)
 39  
 40      def index(self, embeddings):
 41          # Create GGML Tensors
 42          self.backend = GGMLTensors(self.setting("gpu", True), self.setting("querysize", 64), self.setting("quantize"))
 43  
 44          # Add embeddings data
 45          self.backend.index(embeddings)
 46  
 47          # Add id offset and index build metadata
 48          self.config["offset"] = embeddings.shape[0]
 49          self.metadata(self.settings())
 50  
 51      def append(self, embeddings):
 52          # Append embeddings to existing tensors
 53          self.backend.append(embeddings)
 54  
 55          # Update id offset and index metadata
 56          self.config["offset"] += embeddings.shape[0]
 57          self.metadata()
 58  
 59      def delete(self, ids):
 60          self.backend.delete(ids)
 61  
 62      def search(self, queries, limit):
 63          scores = self.backend.search(queries)
 64  
 65          # Get topn ids
 66          ids = np.argsort(-scores)[:, :limit]
 67  
 68          # Map results to [(id, score)]
 69          results = []
 70          for x, score in enumerate(scores):
 71              # Add results
 72              results.append(list(zip(ids[x].tolist(), score[ids[x]].tolist())))
 73  
 74          return results
 75  
 76      def count(self):
 77          return self.backend.count()
 78  
 79      def save(self, path):
 80          self.backend.save(path)
 81  
 82      def close(self):
 83          # Cleanup resources before setting backend to None
 84          if self.backend:
 85              self.backend.close()
 86  
 87          # Parent logic
 88          super().close()
 89  
 90      def settings(self):
 91          """
 92          Returns settings for this instance.
 93  
 94          Returns:
 95              dict
 96          """
 97  
 98          return {"ggml": ggml.__version__}
 99  
100  
101  class GGMLTensors:
102      """
103      Interface to read and write GGML tensor data.
104      """
105  
106      def __init__(self, gpu, querysize, quantize):
107          """
108          Creates a new GGMLTensors.
109  
110          Args:
111              gpu: if GPU should be used
112              querysize: query buffer size
113              quantize: data quantization setting
114          """
115  
116          # Settings
117          self.gpu, self.querysize, self.quantize = gpu, querysize, quantize
118  
119          # GGML parameters
120          self.context, self.backend = None, None
121          self.buffer, self.queries, self.data, self.deletes = None, None, None, []
122          self.allocator, self.graph, self.output = None, None, None
123  
124      def __del__(self):
125          """
126          Ensure resources are cleaned up.
127          """
128  
129          # Cleanup resources
130          self.close()
131  
132      def load(self, path):
133          """
134          Loads GGML tensors from a GGUF file at path.
135  
136          Args:
137              path: path to GGUF file
138          """
139  
140          # Initialize GGML objects
141          self.context = self.createcontext()
142          self.backend = self.createbackend()
143          self.allocator = self.createallocator(self.backend)
144  
145          # Temporary context for GGUF
146          context = ctypes.c_void_p()
147  
148          # Cast as ggml_context**
149          params = ggml.gguf_init_params(ctx=ctypes.pointer(context), no_alloc=False)
150  
151          # Load GGUF file
152          gguf = ggml.gguf_init_from_file(path.encode("utf-8"), params)
153  
154          # Load tensors from GGUF file
155          self.loadtensors(context)
156  
157          # Create graph operation
158          self.graph, self.output = self.creategraph()
159  
160          # Cleanup temporary resources
161          ggml.gguf_free(gguf)
162          ggml.ggml_free(context)
163  
164      def index(self, embeddings):
165          """
166          Indexes embeddings as GGML tensors.
167  
168          Args:
169              embeddings: embeddings array
170          """
171  
172          # Initialize GGML objects
173          self.context = self.createcontext()
174          self.backend = self.createbackend()
175          self.allocator = self.createallocator(self.backend)
176  
177          # Create query buffer and data tensors
178          self.createtensors(embeddings)
179  
180          # Create graph operation
181          self.graph, self.output = self.creategraph()
182  
183      def append(self, embeddings):
184          """
185          Appends embeddings to GGML tensors.
186  
187          Args:
188              embeddings: embeddings array
189          """
190  
191          # Initialize GGML objects
192          context = self.createcontext()
193          backend = self.createbackend()
194          allocator = self.createallocator(backend)
195  
196          # Merge embeddings tensors
197          buffer, queries, data = self.mergetensors(context, backend, embeddings)
198          deletes = self.deletes
199  
200          # Free existing objects
201          self.close()
202  
203          # Store new objects
204          self.context, self.backend, self.allocator = (context, backend, allocator)
205          self.buffer, self.queries, self.data, self.deletes = (buffer, queries, data, deletes)
206  
207          # Create graph operation
208          self.graph, self.output = self.creategraph()
209  
210      def delete(self, ids):
211          """
212          Delete ids from tensors.
213  
214          Args:
215              ids: ids to delete
216          """
217  
218          shape = utils.get_shape(self.data)
219  
220          # Filter any index greater than size of array
221          ids = [x for x in ids if x < shape[1]]
222          self.deletes.extend(ids)
223  
224      def search(self, queries):
225          """
226          Searches GGML tensors for the best query matches.
227  
228          Args:
229              queries: queries array
230  
231          Returns:
232              query results
233          """
234  
235          # Process queries up to the query buffer size batches
236          batches = []
237          for batch in self.chunk(queries):
238              # Copy queries to buffer
239              ggml.ggml_backend_tensor_set(
240                  self.queries,
241                  ctypes.cast(batch.ctypes.data, ctypes.c_void_p),
242                  0,
243                  batch.nbytes,
244              )
245  
246              # Run matrix multiplication operation
247              ggml.ggml_backend_graph_compute(self.backend, self.graph)
248  
249              # Get size of embeddings data
250              size = utils.get_shape(self.data)[1]
251  
252              # Get and return results
253              results = np.zeros((batch.shape[0], size), dtype=np.float32)
254              ggml.ggml_backend_tensor_get(self.output, ctypes.cast(results.ctypes.data, ctypes.c_void_p), 0, results.nbytes)
255  
256              # Clear deleted rows and add results
257              results[:, self.deletes] = 0
258              batches.append(results)
259  
260          # Combine batches and return as single result
261          return np.concatenate(batches, axis=0)
262  
263      def count(self):
264          """
265          Number of elements in this GGML tensors.
266  
267          Returns:
268              count
269          """
270  
271          return utils.get_shape(self.data)[1] - len(self.deletes) if self.data else 0
272  
273      def save(self, path):
274          """
275          Saves GGML tensors as GGUF to path.
276  
277          Args:
278              path: path to save
279          """
280  
281          # Temporary buffer
282          buffer = None
283  
284          # Init and save data tensor
285          gguf = ggml.gguf_init_empty()
286  
287          # Add the data tensor
288          ggml.ggml_set_name(self.data, b"data")
289          ggml.gguf_add_tensor(gguf, self.data)
290  
291          # Optionally create and add the deletes tensor
292          if self.deletes:
293              deletes = np.array(self.deletes, dtype=np.int64)
294              tensor = ggml.ggml_new_tensor_1d(self.context, ggml.GGML_TYPE_I64, deletes.shape[0])
295              buffer = ggml.ggml_backend_alloc_ctx_tensors(self.context, self.backend)
296  
297              ggml.ggml_backend_tensor_set(
298                  tensor,
299                  ctypes.cast(deletes.ctypes.data, ctypes.c_void_p),
300                  0,
301                  deletes.nbytes,
302              )
303              ggml.ggml_set_name(tensor, b"deletes")
304              ggml.gguf_add_tensor(gguf, tensor)
305  
306          # Write file and free resources
307          ggml.gguf_write_to_file(gguf, path.encode("utf-8"), False)
308          ggml.gguf_free(gguf)
309  
310          if buffer:
311              ggml.ggml_backend_buffer_free(buffer)
312  
313      def close(self):
314          """
315          Closes this instance and frees resources.
316          """
317  
318          if self.buffer:
319              ggml.ggml_backend_buffer_free(self.buffer)
320              self.buffer, self.queries, self.data, self.deletes = None, None, None, []
321  
322          if self.allocator:
323              ggml.ggml_gallocr_free(self.allocator)
324              self.allocator, self.graph = None, None
325  
326          if self.backend:
327              ggml.ggml_backend_free(self.backend)
328              self.backend = None
329  
330          if self.context:
331              # Free quantization memory
332              ggml.ggml_quantize_free()
333  
334              # Free context
335              ggml.ggml_free(self.context)
336              self.context = None
337  
338      def createcontext(self):
339          """
340          Creates a new GGML context.
341  
342          Returns:
343              context
344          """
345  
346          # Base tensor storage
347          size = ggml.ggml_tensor_overhead() * 100
348  
349          # Graph storage
350          size += ggml.ggml_tensor_overhead() * ggml.GGML_DEFAULT_GRAPH_SIZE + ggml.ggml_graph_overhead()
351  
352          # Create GGML context
353          params = ggml.ggml_init_params(mem_size=size, no_alloc=True)
354          context = ggml.ggml_init(params)
355  
356          return context
357  
358      def createbackend(self):
359          """
360          Creates a new GGML backend.
361  
362          Returns:
363              backend
364          """
365  
366          # Attempt to create an accelerated backend
367          backend = ggml.ggml_backend_init_by_type(ggml.GGML_BACKEND_DEVICE_TYPE_GPU, None) if self.gpu else None
368  
369          # Fall back to CPU backend
370          if not backend:
371              backend = ggml.ggml_backend_cpu_init()
372              ggml.ggml_backend_cpu_set_n_threads(backend, os.cpu_count())
373  
374          return backend
375  
376      def createallocator(self, backend):
377          """
378          Creates a new GGML allocator.
379  
380          Args:
381              backend: backend device
382  
383          Returns:
384              allocator
385          """
386  
387          return ggml.ggml_gallocr_new(ggml.ggml_backend_get_default_buffer_type(backend))
388  
389      def createtensors(self, data):
390          """
391          Creates query and data tensors.
392  
393          Args:
394              data: embeddings data
395          """
396  
397          # Derive embeddings data tensor type
398          tensortype = self.tensortype(data)
399  
400          # Queries
401          self.queries = ggml.ggml_new_tensor_2d(self.context, ggml.GGML_TYPE_F32, data.shape[1], self.querysize)
402  
403          # Embeddings data
404          self.data = ggml.ggml_new_tensor_2d(self.context, tensortype, data.shape[1], data.shape[0])
405  
406          # Create buffer
407          self.buffer = ggml.ggml_backend_alloc_ctx_tensors(self.context, self.backend)
408  
409          # Copy embeddings data
410          self.copy(data, self.data, tensortype, 0)
411  
412      def loadtensors(self, context):
413          """
414          Loads existing tensors from context.
415  
416          Args:
417              context: ggml context
418          """
419  
420          # Load data tensor
421          data = ggml.ggml_get_tensor(context, b"data")
422          if data:
423              # Queries
424              shape = utils.get_shape(data)
425              self.queries = ggml.ggml_new_tensor_2d(self.context, ggml.GGML_TYPE_F32, shape[0], self.querysize)
426  
427              # Embeddings data
428              self.data = ggml.ggml_dup_tensor(self.context, data)
429  
430              # Create buffer
431              self.buffer = ggml.ggml_backend_alloc_ctx_tensors(self.context, self.backend)
432  
433              # Copy tensor data to backend
434              ggml.ggml_backend_tensor_set(self.data, ggml.ggml_get_data(data), 0, ggml.ggml_nbytes(data))
435  
436          # Load deletes tensor
437          data = ggml.ggml_get_tensor(context, b"deletes")
438          if data:
439              # Convert to a NumPy array
440              shape = utils.get_shape(data)
441              deletes = np.ctypeslib.as_array(ctypes.cast(ggml.ggml_get_data(data), ctypes.POINTER(ctypes.c_int64)), (shape[0],))
442              self.deletes = deletes.tolist()
443  
444      def mergetensors(self, context, backend, data):
445          """
446          Merges new embeddings data.
447  
448          Args:
449              context: new context
450              backend: new backend
451              data: embeddings data
452  
453          Returns:
454              buffer, queries, data
455          """
456  
457          # Derive embeddings data tensor type
458          tensortype = self.tensortype(data)
459  
460          # Queries
461          queries = ggml.ggml_new_tensor_2d(context, ggml.GGML_TYPE_F32, data.shape[1], self.querysize)
462  
463          # Embeddings data with space for both existing and new data
464          shape = utils.get_shape(self.data)
465          merge = ggml.ggml_new_tensor_2d(context, tensortype, data.shape[1], data.shape[0] + shape[1])
466  
467          # Create new buffer
468          buffer = ggml.ggml_backend_alloc_ctx_tensors(context, backend)
469  
470          # Copy existing embeddings data
471          self.copy(self.data, merge, tensortype, 0)
472  
473          # Copy new embeddings data
474          self.copy(data, merge, tensortype, ggml.ggml_nbytes(self.data))
475  
476          return buffer, queries, merge
477  
478      def creategraph(self):
479          """
480          Creates a new GGML graph.
481  
482          Returns:
483              graph
484          """
485  
486          # Create matrix multiply graph operation
487          graph = ggml.ggml_new_graph(self.context)
488  
489          # Graph operation
490          output = ggml.ggml_mul_mat(self.context, self.data, self.queries)
491  
492          # Setup and allocate graph storage
493          ggml.ggml_build_forward_expand(graph, output)
494          ggml.ggml_gallocr_alloc_graph(self.allocator, graph)
495  
496          return graph, output
497  
498      def tensortype(self, data):
499          """
500          Gets the best matching tensor type for input data.
501  
502          Args:
503              data: embeddings data
504  
505          Returns:
506              best matching GGML data type
507          """
508  
509          # Read tensor type
510          tensortype = self.quantize
511          tensortype = "Q8_0" if isinstance(tensortype, bool) else f"Q{int(tensortype)}_0" if isinstance(tensortype, int) else tensortype
512          tensortype = tensortype.upper() if tensortype else "F32"
513  
514          # Validate tensor type
515          if not hasattr(ggml, f"GGML_TYPE_{tensortype}"):
516              raise ValueError(f"Invalid tensor type {tensortype}")
517  
518          # Get tensor type
519          tensortype = getattr(ggml, f"GGML_TYPE_{tensortype}")
520  
521          # Validate quantization block size
522          blocksize = ggml.ggml_blck_size(tensortype)
523          if data.shape[1] % blocksize != 0:
524              raise ValueError(
525                  f'Invalid quantization configuration "{self.quantize}" with {data.shape[1]} dimensions. Must be a multiple of {blocksize}.'
526              )
527  
528          return tensortype
529  
530      def copy(self, inputs, outputs, tensortype, offset):
531          """
532          Copies input data to backend. Quantizes to desired tensor type, if necessary.
533  
534          Args:
535              inputs: input tensor
536              outputs: output tensor
537              tensortype: desired tensor type
538              offset: data offset index for storage into outputs
539          """
540  
541          if not isinstance(inputs, np.ndarray):
542              # GGML tensor
543              work, size = ggml.ggml_get_data(inputs), ggml.ggml_nbytes(inputs)
544          elif tensortype == ggml.GGML_TYPE_F32:
545              # No quantization needed
546              work, size = inputs.ctypes.data, inputs.nbytes
547          else:
548              # Work array will be garbage collected by Python
549              work = (ctypes.c_float * inputs.shape[0] * inputs.shape[1])()
550  
551              # Quantize vector data
552              size = ggml.ggml_quantize_chunk(
553                  tensortype, ctypes.cast(inputs.ctypes.data, ctypes.POINTER(ctypes.c_float)), work, 0, inputs.shape[0], inputs.shape[1], None
554              )
555  
556          # Copy data to tensor
557          ggml.ggml_backend_tensor_set(outputs, work, offset, size)
558  
559      def chunk(self, queries):
560          """
561          Splits quries into separate batch sizes specified by size.
562  
563          Args:
564              queries: queries
565  
566          Returns:
567              list of evenly sized batches with the last batch having the remaining elements
568          """
569  
570          return [queries[x : x + self.querysize] for x in range(0, len(queries), self.querysize)]