/ src / python / txtai / ann / dense / numpy.py
numpy.py
  1  """
  2  NumPy module
  3  """
  4  
  5  import numpy as np
  6  
  7  from safetensors import safe_open
  8  from safetensors.numpy import save_file
  9  
 10  from ...serialize import SerializeFactory
 11  
 12  from ..base import ANN
 13  
 14  
 15  class NumPy(ANN):
 16      """
 17      Builds an ANN index backed by a NumPy array.
 18      """
 19  
 20      def __init__(self, config):
 21          super().__init__(config)
 22  
 23          # Array function definitions
 24          self.all, self.cat, self.dot, self.zeros = np.all, np.concatenate, np.dot, np.zeros
 25          self.argsort, self.xor, self.clip = np.argsort, np.bitwise_xor, np.clip
 26  
 27          # Scalar quantization
 28          quantize = self.config.get("quantize")
 29          self.qbits = quantize if quantize and isinstance(quantize, int) and not isinstance(quantize, bool) else None
 30  
 31      def load(self, path):
 32          # Load array from file
 33          try:
 34              if self.setting("safetensors"):
 35                  data = self.loadsafetensors(path).get("data")
 36              else:
 37                  data = np.load(path, allow_pickle=False)
 38  
 39              self.backend = self.tensor(data)
 40          except ValueError:
 41              # Backwards compatible support for previously pickled data
 42              self.backend = self.tensor(SerializeFactory.create("pickle").load(path))
 43  
 44      def index(self, embeddings):
 45          # Create index
 46          self.backend = self.tensor(embeddings)
 47  
 48          # Add id offset and index build metadata
 49          self.config["offset"] = embeddings.shape[0]
 50          self.metadata(self.settings())
 51  
 52      def append(self, embeddings):
 53          # Append new data to array
 54          self.backend = self.cat((self.backend, self.tensor(embeddings)), axis=0)
 55  
 56          # Update id offset and index metadata
 57          self.config["offset"] += embeddings.shape[0]
 58          self.metadata()
 59  
 60      def delete(self, ids):
 61          # Filter any index greater than size of array
 62          ids = [x for x in ids if x < self.backend.shape[0]]
 63  
 64          # Clear specified ids
 65          self.backend[ids] = self.tensor(self.zeros((len(ids), self.backend.shape[1])))
 66  
 67      def search(self, queries, limit):
 68          if self.qbits:
 69              # Calculate hamming score for integer vectors
 70              scores = self.hammingscore(queries)
 71          else:
 72              # Dot product on normalized vectors is equal to cosine similarity
 73              scores = self.dot(self.tensor(queries), self.backend.T)
 74  
 75          # Get topn ids
 76          ids = self.argsort(-scores)[:, :limit]
 77  
 78          # Map results to [(id, score)]
 79          results = []
 80          for x, score in enumerate(scores):
 81              # Add results
 82              results.append(list(zip(ids[x].tolist(), score[ids[x]].tolist())))
 83  
 84          return results
 85  
 86      def count(self):
 87          # Get count of non-zero rows (ignores deleted rows)
 88          return self.backend[~self.all(self.backend == 0, axis=1)].shape[0]
 89  
 90      def save(self, path):
 91          # Save array to file. Use stream to prevent ".npy" suffix being added.
 92          if self.setting("safetensors"):
 93              self.savesafetensors({"data": self.numpy(self.backend)}, path)
 94          else:
 95              with open(path, "wb") as handle:
 96                  np.save(handle, self.numpy(self.backend), allow_pickle=False)
 97  
 98      def tensor(self, array):
 99          """
100          Handles backend-specific code such as loading to a GPU device.
101  
102          Args:
103              array: data array
104  
105          Returns:
106              array with backend-specific logic applied
107          """
108  
109          return array
110  
111      def numpy(self, array):
112          """
113          Handles backend-specific code to convert an array to numpy
114  
115          Args:
116              array: data array
117  
118          Returns:
119              numpy array
120          """
121  
122          return array
123  
124      def totype(self, array, dtype):
125          """
126          Casts array to dtype.
127  
128          Args:
129              array: input array
130              dtype: dtype
131  
132          Returns:
133              array cast as dtype
134          """
135  
136          return np.int64(array) if dtype == np.int64 else array
137  
138      def settings(self):
139          """
140          Returns settings for this array.
141  
142          Returns:
143              dict
144          """
145  
146          return {"numpy": np.__version__}
147  
148      def loadsafetensors(self, path):
149          """
150          Loads data from a safetensors file.
151  
152          Args:
153              path: path to safetensors file
154  
155          Returns:
156              dict with metadata + tensors
157          """
158  
159          # Merge metadata and tensors into single dictionary
160          with safe_open(path, framework="np") as f:
161              return {**(f.metadata() if f.metadata() else {}), **{k: f.get_tensor(k) for k in f.keys()}}
162  
163      def savesafetensors(self, data, path, metadata=None):
164          """
165          Saves data and metadata to a safetensors file.
166  
167          Args:
168              data: tensors to save
169              path: output file
170              metadata: additional metadata to save
171          """
172  
173          save_file(data, path, metadata)
174  
175      def hammingscore(self, queries):
176          """
177          Calculates a hamming distance score.
178  
179          This is defined as:
180  
181              score = 1.0 - (hamming distance / total number of bits)
182  
183          Args:
184              queries: queries array
185  
186          Returns:
187              scores
188          """
189  
190          # Build table of number of bits for each distinct uint8 value
191          table = 1 << np.arange(8)
192          table = self.tensor(np.array([np.count_nonzero(x & table) for x in np.arange(256)]))
193  
194          # Number of different bits
195          delta = self.xor(self.tensor(queries[:, None]), self.backend)
196  
197          # Cast to long array
198          delta = self.totype(delta, np.int64)
199  
200          # Calculate score as 1.0 - percentage of different bits
201          # Bound score from 0 to 1
202          return self.clip(1.0 - (table[delta].sum(axis=2) / (self.config["dimensions"] * 8)), 0.0, 1.0)