ggml.py
1 """ 2 GGML module 3 """ 4 5 import ctypes 6 import os 7 8 import numpy as np 9 10 # Conditional import 11 try: 12 import ggml 13 from ggml import utils 14 15 LIBGGML = True 16 except ImportError: 17 LIBGGML = False 18 19 from ..base import ANN 20 21 22 class GGML(ANN): 23 """ 24 Builds an ANN index backed by GGML. 25 """ 26 27 def __init__(self, config): 28 super().__init__(config) 29 30 if not LIBGGML: 31 raise ImportError('GGML is not available - install "ann" extra to enable') 32 33 def load(self, path): 34 # Create GGML Tensors 35 self.backend = GGMLTensors(self.setting("gpu", True), self.setting("querysize", 64), self.setting("quantize")) 36 37 # Load existing GGUF file 38 self.backend.load(path) 39 40 def index(self, embeddings): 41 # Create GGML Tensors 42 self.backend = GGMLTensors(self.setting("gpu", True), self.setting("querysize", 64), self.setting("quantize")) 43 44 # Add embeddings data 45 self.backend.index(embeddings) 46 47 # Add id offset and index build metadata 48 self.config["offset"] = embeddings.shape[0] 49 self.metadata(self.settings()) 50 51 def append(self, embeddings): 52 # Append embeddings to existing tensors 53 self.backend.append(embeddings) 54 55 # Update id offset and index metadata 56 self.config["offset"] += embeddings.shape[0] 57 self.metadata() 58 59 def delete(self, ids): 60 self.backend.delete(ids) 61 62 def search(self, queries, limit): 63 scores = self.backend.search(queries) 64 65 # Get topn ids 66 ids = np.argsort(-scores)[:, :limit] 67 68 # Map results to [(id, score)] 69 results = [] 70 for x, score in enumerate(scores): 71 # Add results 72 results.append(list(zip(ids[x].tolist(), score[ids[x]].tolist()))) 73 74 return results 75 76 def count(self): 77 return self.backend.count() 78 79 def save(self, path): 80 self.backend.save(path) 81 82 def close(self): 83 # Cleanup resources before setting backend to None 84 if self.backend: 85 self.backend.close() 86 87 # Parent logic 88 super().close() 89 90 def settings(self): 91 """ 92 Returns settings for this instance. 93 94 Returns: 95 dict 96 """ 97 98 return {"ggml": ggml.__version__} 99 100 101 class GGMLTensors: 102 """ 103 Interface to read and write GGML tensor data. 104 """ 105 106 def __init__(self, gpu, querysize, quantize): 107 """ 108 Creates a new GGMLTensors. 109 110 Args: 111 gpu: if GPU should be used 112 querysize: query buffer size 113 quantize: data quantization setting 114 """ 115 116 # Settings 117 self.gpu, self.querysize, self.quantize = gpu, querysize, quantize 118 119 # GGML parameters 120 self.context, self.backend = None, None 121 self.buffer, self.queries, self.data, self.deletes = None, None, None, [] 122 self.allocator, self.graph, self.output = None, None, None 123 124 def __del__(self): 125 """ 126 Ensure resources are cleaned up. 127 """ 128 129 # Cleanup resources 130 self.close() 131 132 def load(self, path): 133 """ 134 Loads GGML tensors from a GGUF file at path. 135 136 Args: 137 path: path to GGUF file 138 """ 139 140 # Initialize GGML objects 141 self.context = self.createcontext() 142 self.backend = self.createbackend() 143 self.allocator = self.createallocator(self.backend) 144 145 # Temporary context for GGUF 146 context = ctypes.c_void_p() 147 148 # Cast as ggml_context** 149 params = ggml.gguf_init_params(ctx=ctypes.pointer(context), no_alloc=False) 150 151 # Load GGUF file 152 gguf = ggml.gguf_init_from_file(path.encode("utf-8"), params) 153 154 # Load tensors from GGUF file 155 self.loadtensors(context) 156 157 # Create graph operation 158 self.graph, self.output = self.creategraph() 159 160 # Cleanup temporary resources 161 ggml.gguf_free(gguf) 162 ggml.ggml_free(context) 163 164 def index(self, embeddings): 165 """ 166 Indexes embeddings as GGML tensors. 167 168 Args: 169 embeddings: embeddings array 170 """ 171 172 # Initialize GGML objects 173 self.context = self.createcontext() 174 self.backend = self.createbackend() 175 self.allocator = self.createallocator(self.backend) 176 177 # Create query buffer and data tensors 178 self.createtensors(embeddings) 179 180 # Create graph operation 181 self.graph, self.output = self.creategraph() 182 183 def append(self, embeddings): 184 """ 185 Appends embeddings to GGML tensors. 186 187 Args: 188 embeddings: embeddings array 189 """ 190 191 # Initialize GGML objects 192 context = self.createcontext() 193 backend = self.createbackend() 194 allocator = self.createallocator(backend) 195 196 # Merge embeddings tensors 197 buffer, queries, data = self.mergetensors(context, backend, embeddings) 198 deletes = self.deletes 199 200 # Free existing objects 201 self.close() 202 203 # Store new objects 204 self.context, self.backend, self.allocator = (context, backend, allocator) 205 self.buffer, self.queries, self.data, self.deletes = (buffer, queries, data, deletes) 206 207 # Create graph operation 208 self.graph, self.output = self.creategraph() 209 210 def delete(self, ids): 211 """ 212 Delete ids from tensors. 213 214 Args: 215 ids: ids to delete 216 """ 217 218 shape = utils.get_shape(self.data) 219 220 # Filter any index greater than size of array 221 ids = [x for x in ids if x < shape[1]] 222 self.deletes.extend(ids) 223 224 def search(self, queries): 225 """ 226 Searches GGML tensors for the best query matches. 227 228 Args: 229 queries: queries array 230 231 Returns: 232 query results 233 """ 234 235 # Process queries up to the query buffer size batches 236 batches = [] 237 for batch in self.chunk(queries): 238 # Copy queries to buffer 239 ggml.ggml_backend_tensor_set( 240 self.queries, 241 ctypes.cast(batch.ctypes.data, ctypes.c_void_p), 242 0, 243 batch.nbytes, 244 ) 245 246 # Run matrix multiplication operation 247 ggml.ggml_backend_graph_compute(self.backend, self.graph) 248 249 # Get size of embeddings data 250 size = utils.get_shape(self.data)[1] 251 252 # Get and return results 253 results = np.zeros((batch.shape[0], size), dtype=np.float32) 254 ggml.ggml_backend_tensor_get(self.output, ctypes.cast(results.ctypes.data, ctypes.c_void_p), 0, results.nbytes) 255 256 # Clear deleted rows and add results 257 results[:, self.deletes] = 0 258 batches.append(results) 259 260 # Combine batches and return as single result 261 return np.concatenate(batches, axis=0) 262 263 def count(self): 264 """ 265 Number of elements in this GGML tensors. 266 267 Returns: 268 count 269 """ 270 271 return utils.get_shape(self.data)[1] - len(self.deletes) if self.data else 0 272 273 def save(self, path): 274 """ 275 Saves GGML tensors as GGUF to path. 276 277 Args: 278 path: path to save 279 """ 280 281 # Temporary buffer 282 buffer = None 283 284 # Init and save data tensor 285 gguf = ggml.gguf_init_empty() 286 287 # Add the data tensor 288 ggml.ggml_set_name(self.data, b"data") 289 ggml.gguf_add_tensor(gguf, self.data) 290 291 # Optionally create and add the deletes tensor 292 if self.deletes: 293 deletes = np.array(self.deletes, dtype=np.int64) 294 tensor = ggml.ggml_new_tensor_1d(self.context, ggml.GGML_TYPE_I64, deletes.shape[0]) 295 buffer = ggml.ggml_backend_alloc_ctx_tensors(self.context, self.backend) 296 297 ggml.ggml_backend_tensor_set( 298 tensor, 299 ctypes.cast(deletes.ctypes.data, ctypes.c_void_p), 300 0, 301 deletes.nbytes, 302 ) 303 ggml.ggml_set_name(tensor, b"deletes") 304 ggml.gguf_add_tensor(gguf, tensor) 305 306 # Write file and free resources 307 ggml.gguf_write_to_file(gguf, path.encode("utf-8"), False) 308 ggml.gguf_free(gguf) 309 310 if buffer: 311 ggml.ggml_backend_buffer_free(buffer) 312 313 def close(self): 314 """ 315 Closes this instance and frees resources. 316 """ 317 318 if self.buffer: 319 ggml.ggml_backend_buffer_free(self.buffer) 320 self.buffer, self.queries, self.data, self.deletes = None, None, None, [] 321 322 if self.allocator: 323 ggml.ggml_gallocr_free(self.allocator) 324 self.allocator, self.graph = None, None 325 326 if self.backend: 327 ggml.ggml_backend_free(self.backend) 328 self.backend = None 329 330 if self.context: 331 # Free quantization memory 332 ggml.ggml_quantize_free() 333 334 # Free context 335 ggml.ggml_free(self.context) 336 self.context = None 337 338 def createcontext(self): 339 """ 340 Creates a new GGML context. 341 342 Returns: 343 context 344 """ 345 346 # Base tensor storage 347 size = ggml.ggml_tensor_overhead() * 100 348 349 # Graph storage 350 size += ggml.ggml_tensor_overhead() * ggml.GGML_DEFAULT_GRAPH_SIZE + ggml.ggml_graph_overhead() 351 352 # Create GGML context 353 params = ggml.ggml_init_params(mem_size=size, no_alloc=True) 354 context = ggml.ggml_init(params) 355 356 return context 357 358 def createbackend(self): 359 """ 360 Creates a new GGML backend. 361 362 Returns: 363 backend 364 """ 365 366 # Attempt to create an accelerated backend 367 backend = ggml.ggml_backend_init_by_type(ggml.GGML_BACKEND_DEVICE_TYPE_GPU, None) if self.gpu else None 368 369 # Fall back to CPU backend 370 if not backend: 371 backend = ggml.ggml_backend_cpu_init() 372 ggml.ggml_backend_cpu_set_n_threads(backend, os.cpu_count()) 373 374 return backend 375 376 def createallocator(self, backend): 377 """ 378 Creates a new GGML allocator. 379 380 Args: 381 backend: backend device 382 383 Returns: 384 allocator 385 """ 386 387 return ggml.ggml_gallocr_new(ggml.ggml_backend_get_default_buffer_type(backend)) 388 389 def createtensors(self, data): 390 """ 391 Creates query and data tensors. 392 393 Args: 394 data: embeddings data 395 """ 396 397 # Derive embeddings data tensor type 398 tensortype = self.tensortype(data) 399 400 # Queries 401 self.queries = ggml.ggml_new_tensor_2d(self.context, ggml.GGML_TYPE_F32, data.shape[1], self.querysize) 402 403 # Embeddings data 404 self.data = ggml.ggml_new_tensor_2d(self.context, tensortype, data.shape[1], data.shape[0]) 405 406 # Create buffer 407 self.buffer = ggml.ggml_backend_alloc_ctx_tensors(self.context, self.backend) 408 409 # Copy embeddings data 410 self.copy(data, self.data, tensortype, 0) 411 412 def loadtensors(self, context): 413 """ 414 Loads existing tensors from context. 415 416 Args: 417 context: ggml context 418 """ 419 420 # Load data tensor 421 data = ggml.ggml_get_tensor(context, b"data") 422 if data: 423 # Queries 424 shape = utils.get_shape(data) 425 self.queries = ggml.ggml_new_tensor_2d(self.context, ggml.GGML_TYPE_F32, shape[0], self.querysize) 426 427 # Embeddings data 428 self.data = ggml.ggml_dup_tensor(self.context, data) 429 430 # Create buffer 431 self.buffer = ggml.ggml_backend_alloc_ctx_tensors(self.context, self.backend) 432 433 # Copy tensor data to backend 434 ggml.ggml_backend_tensor_set(self.data, ggml.ggml_get_data(data), 0, ggml.ggml_nbytes(data)) 435 436 # Load deletes tensor 437 data = ggml.ggml_get_tensor(context, b"deletes") 438 if data: 439 # Convert to a NumPy array 440 shape = utils.get_shape(data) 441 deletes = np.ctypeslib.as_array(ctypes.cast(ggml.ggml_get_data(data), ctypes.POINTER(ctypes.c_int64)), (shape[0],)) 442 self.deletes = deletes.tolist() 443 444 def mergetensors(self, context, backend, data): 445 """ 446 Merges new embeddings data. 447 448 Args: 449 context: new context 450 backend: new backend 451 data: embeddings data 452 453 Returns: 454 buffer, queries, data 455 """ 456 457 # Derive embeddings data tensor type 458 tensortype = self.tensortype(data) 459 460 # Queries 461 queries = ggml.ggml_new_tensor_2d(context, ggml.GGML_TYPE_F32, data.shape[1], self.querysize) 462 463 # Embeddings data with space for both existing and new data 464 shape = utils.get_shape(self.data) 465 merge = ggml.ggml_new_tensor_2d(context, tensortype, data.shape[1], data.shape[0] + shape[1]) 466 467 # Create new buffer 468 buffer = ggml.ggml_backend_alloc_ctx_tensors(context, backend) 469 470 # Copy existing embeddings data 471 self.copy(self.data, merge, tensortype, 0) 472 473 # Copy new embeddings data 474 self.copy(data, merge, tensortype, ggml.ggml_nbytes(self.data)) 475 476 return buffer, queries, merge 477 478 def creategraph(self): 479 """ 480 Creates a new GGML graph. 481 482 Returns: 483 graph 484 """ 485 486 # Create matrix multiply graph operation 487 graph = ggml.ggml_new_graph(self.context) 488 489 # Graph operation 490 output = ggml.ggml_mul_mat(self.context, self.data, self.queries) 491 492 # Setup and allocate graph storage 493 ggml.ggml_build_forward_expand(graph, output) 494 ggml.ggml_gallocr_alloc_graph(self.allocator, graph) 495 496 return graph, output 497 498 def tensortype(self, data): 499 """ 500 Gets the best matching tensor type for input data. 501 502 Args: 503 data: embeddings data 504 505 Returns: 506 best matching GGML data type 507 """ 508 509 # Read tensor type 510 tensortype = self.quantize 511 tensortype = "Q8_0" if isinstance(tensortype, bool) else f"Q{int(tensortype)}_0" if isinstance(tensortype, int) else tensortype 512 tensortype = tensortype.upper() if tensortype else "F32" 513 514 # Validate tensor type 515 if not hasattr(ggml, f"GGML_TYPE_{tensortype}"): 516 raise ValueError(f"Invalid tensor type {tensortype}") 517 518 # Get tensor type 519 tensortype = getattr(ggml, f"GGML_TYPE_{tensortype}") 520 521 # Validate quantization block size 522 blocksize = ggml.ggml_blck_size(tensortype) 523 if data.shape[1] % blocksize != 0: 524 raise ValueError( 525 f'Invalid quantization configuration "{self.quantize}" with {data.shape[1]} dimensions. Must be a multiple of {blocksize}.' 526 ) 527 528 return tensortype 529 530 def copy(self, inputs, outputs, tensortype, offset): 531 """ 532 Copies input data to backend. Quantizes to desired tensor type, if necessary. 533 534 Args: 535 inputs: input tensor 536 outputs: output tensor 537 tensortype: desired tensor type 538 offset: data offset index for storage into outputs 539 """ 540 541 if not isinstance(inputs, np.ndarray): 542 # GGML tensor 543 work, size = ggml.ggml_get_data(inputs), ggml.ggml_nbytes(inputs) 544 elif tensortype == ggml.GGML_TYPE_F32: 545 # No quantization needed 546 work, size = inputs.ctypes.data, inputs.nbytes 547 else: 548 # Work array will be garbage collected by Python 549 work = (ctypes.c_float * inputs.shape[0] * inputs.shape[1])() 550 551 # Quantize vector data 552 size = ggml.ggml_quantize_chunk( 553 tensortype, ctypes.cast(inputs.ctypes.data, ctypes.POINTER(ctypes.c_float)), work, 0, inputs.shape[0], inputs.shape[1], None 554 ) 555 556 # Copy data to tensor 557 ggml.ggml_backend_tensor_set(outputs, work, offset, size) 558 559 def chunk(self, queries): 560 """ 561 Splits quries into separate batch sizes specified by size. 562 563 Args: 564 queries: queries 565 566 Returns: 567 list of evenly sized batches with the last batch having the remaining elements 568 """ 569 570 return [queries[x : x + self.querysize] for x in range(0, len(queries), self.querysize)]