indexes.py
1 """ 2 Indexes module 3 """ 4 5 import os 6 7 from .documents import Documents 8 9 10 class Indexes: 11 """ 12 Manages a collection of subindexes for an embeddings instance. 13 """ 14 15 def __init__(self, embeddings, indexes): 16 """ 17 Creates a new indexes instance. 18 19 Args: 20 embeddings: embeddings instance 21 indexes: dict of subindexes to add 22 """ 23 24 self.embeddings = embeddings 25 self.indexes = indexes 26 27 self.documents = None 28 self.checkpoint = None 29 30 # Transform columns 31 columns = embeddings.config.get("columns", {}) 32 self.text = columns.get("text", "text") 33 self.object = columns.get("object", "object") 34 35 # Check if top-level indexing is enabled for this embeddings instance 36 self.indexing = embeddings.model or embeddings.scoring 37 38 def __contains__(self, name): 39 """ 40 Returns True if name is in this instance, False otherwise. 41 42 Returns: 43 True if name is in this instance, False otherwise 44 """ 45 46 return name in self.indexes 47 48 def __getitem__(self, name): 49 """ 50 Looks up an index by name. 51 52 Args: 53 name: index name 54 55 Returns: 56 index 57 """ 58 59 return self.indexes[name] 60 61 def __getattr__(self, name): 62 """ 63 Looks up an index by attribute name. 64 65 Args: 66 name: index name 67 68 Returns: 69 index 70 """ 71 72 try: 73 return self.indexes[name] 74 except Exception as e: 75 raise AttributeError(e) from e 76 77 def default(self): 78 """ 79 Gets the default/first index. 80 81 Returns: 82 default index 83 """ 84 85 return list(self.indexes.keys())[0] 86 87 def findmodel(self, index=None): 88 """ 89 Finds a vector model. If index is empty, the first vector model is returned. 90 91 Args: 92 index: index name to match 93 94 Returns: 95 Vectors 96 """ 97 98 # Find vector model 99 matches = [self.indexes[index].findmodel()] if index else [index.findmodel() for index in self.indexes.values() if index.findmodel()] 100 return matches[0] if matches else None 101 102 def insert(self, documents, index=None, checkpoint=None): 103 """ 104 Inserts a batch of documents into each subindex. 105 106 Args: 107 documents: list of (id, data, tags) 108 index: indexid offset 109 checkpoint: optional checkpoint directory, enables indexing restart 110 """ 111 112 if not self.documents: 113 self.documents = Documents() 114 self.checkpoint = checkpoint 115 116 # Create batch containing documents added to parent index 117 batch = [] 118 for _, document, _ in documents: 119 # Add to documents collection if text or object field is set 120 parent = document 121 if isinstance(parent, dict): 122 parent = parent.get(self.text, document.get(self.object)) 123 124 # Add if field is available or top-level indexing is disabled 125 if parent is not None or not self.indexing: 126 batch.append((index, document, None)) 127 index += 1 128 129 # Add filtered documents batch 130 self.documents.add(batch) 131 132 def delete(self, ids): 133 """ 134 Deletes ids from each subindex. 135 136 Args: 137 ids: list of ids to delete 138 """ 139 140 for index in self.indexes.values(): 141 index.delete(ids) 142 143 def index(self): 144 """ 145 Builds each subindex. 146 """ 147 148 for name, index in self.indexes.items(): 149 index.index(self.documents, checkpoint=f"{self.checkpoint}/{name}" if self.checkpoint else None) 150 151 # Reset document stream 152 self.documents.close() 153 self.documents = None 154 self.checkpoint = None 155 156 def upsert(self): 157 """ 158 Runs upsert for each subindex. 159 """ 160 161 for index in self.indexes.values(): 162 index.upsert(self.documents) 163 164 # Reset document stream 165 self.documents.close() 166 self.documents = None 167 168 def load(self, path): 169 """ 170 Loads each subindex from path. 171 172 Args: 173 path: directory path to load subindexes 174 """ 175 176 for name, index in self.indexes.items(): 177 # Load subindex if it exists, subindexes aren't required to have data 178 directory = os.path.join(path, name) 179 if index.exists(directory): 180 index.load(directory) 181 182 def save(self, path): 183 """ 184 Saves each subindex to path. 185 186 Args: 187 path: directory path to save subindexes 188 """ 189 190 for name, index in self.indexes.items(): 191 index.save(os.path.join(path, name)) 192 193 def close(self): 194 """ 195 Close and free resources used by this instance. 196 """ 197 198 for index in self.indexes.values(): 199 index.close()