/ src / python / txtai / embeddings / index / indexes.py
indexes.py
  1  """
  2  Indexes module
  3  """
  4  
  5  import os
  6  
  7  from .documents import Documents
  8  
  9  
 10  class Indexes:
 11      """
 12      Manages a collection of subindexes for an embeddings instance.
 13      """
 14  
 15      def __init__(self, embeddings, indexes):
 16          """
 17          Creates a new indexes instance.
 18  
 19          Args:
 20              embeddings: embeddings instance
 21              indexes: dict of subindexes to add
 22          """
 23  
 24          self.embeddings = embeddings
 25          self.indexes = indexes
 26  
 27          self.documents = None
 28          self.checkpoint = None
 29  
 30          # Transform columns
 31          columns = embeddings.config.get("columns", {})
 32          self.text = columns.get("text", "text")
 33          self.object = columns.get("object", "object")
 34  
 35          # Check if top-level indexing is enabled for this embeddings instance
 36          self.indexing = embeddings.model or embeddings.scoring
 37  
 38      def __contains__(self, name):
 39          """
 40          Returns True if name is in this instance, False otherwise.
 41  
 42          Returns:
 43              True if name is in this instance, False otherwise
 44          """
 45  
 46          return name in self.indexes
 47  
 48      def __getitem__(self, name):
 49          """
 50          Looks up an index by name.
 51  
 52          Args:
 53              name: index name
 54  
 55          Returns:
 56              index
 57          """
 58  
 59          return self.indexes[name]
 60  
 61      def __getattr__(self, name):
 62          """
 63          Looks up an index by attribute name.
 64  
 65          Args:
 66              name: index name
 67  
 68          Returns:
 69              index
 70          """
 71  
 72          try:
 73              return self.indexes[name]
 74          except Exception as e:
 75              raise AttributeError(e) from e
 76  
 77      def default(self):
 78          """
 79          Gets the default/first index.
 80  
 81          Returns:
 82              default index
 83          """
 84  
 85          return list(self.indexes.keys())[0]
 86  
 87      def findmodel(self, index=None):
 88          """
 89          Finds a vector model. If index is empty, the first vector model is returned.
 90  
 91          Args:
 92              index: index name to match
 93  
 94          Returns:
 95              Vectors
 96          """
 97  
 98          # Find vector model
 99          matches = [self.indexes[index].findmodel()] if index else [index.findmodel() for index in self.indexes.values() if index.findmodel()]
100          return matches[0] if matches else None
101  
102      def insert(self, documents, index=None, checkpoint=None):
103          """
104          Inserts a batch of documents into each subindex.
105  
106          Args:
107              documents: list of (id, data, tags)
108              index: indexid offset
109              checkpoint: optional checkpoint directory, enables indexing restart
110          """
111  
112          if not self.documents:
113              self.documents = Documents()
114              self.checkpoint = checkpoint
115  
116          # Create batch containing documents added to parent index
117          batch = []
118          for _, document, _ in documents:
119              # Add to documents collection if text or object field is set
120              parent = document
121              if isinstance(parent, dict):
122                  parent = parent.get(self.text, document.get(self.object))
123  
124              # Add if field is available or top-level indexing is disabled
125              if parent is not None or not self.indexing:
126                  batch.append((index, document, None))
127                  index += 1
128  
129          # Add filtered documents batch
130          self.documents.add(batch)
131  
132      def delete(self, ids):
133          """
134          Deletes ids from each subindex.
135  
136          Args:
137              ids: list of ids to delete
138          """
139  
140          for index in self.indexes.values():
141              index.delete(ids)
142  
143      def index(self):
144          """
145          Builds each subindex.
146          """
147  
148          for name, index in self.indexes.items():
149              index.index(self.documents, checkpoint=f"{self.checkpoint}/{name}" if self.checkpoint else None)
150  
151          # Reset document stream
152          self.documents.close()
153          self.documents = None
154          self.checkpoint = None
155  
156      def upsert(self):
157          """
158          Runs upsert for each subindex.
159          """
160  
161          for index in self.indexes.values():
162              index.upsert(self.documents)
163  
164          # Reset document stream
165          self.documents.close()
166          self.documents = None
167  
168      def load(self, path):
169          """
170          Loads each subindex from path.
171  
172          Args:
173              path: directory path to load subindexes
174          """
175  
176          for name, index in self.indexes.items():
177              # Load subindex if it exists, subindexes aren't required to have data
178              directory = os.path.join(path, name)
179              if index.exists(directory):
180                  index.load(directory)
181  
182      def save(self, path):
183          """
184          Saves each subindex to path.
185  
186          Args:
187              path: directory path to save subindexes
188          """
189  
190          for name, index in self.indexes.items():
191              index.save(os.path.join(path, name))
192  
193      def close(self):
194          """
195          Close and free resources used by this instance.
196          """
197  
198          for index in self.indexes.values():
199              index.close()