documents.py
1 """ 2 Documents module 3 """ 4 5 import os 6 import tempfile 7 8 from ...serialize import SerializeFactory 9 10 11 class Documents: 12 """ 13 Streams documents to temporary storage. Allows queuing large volumes of content for later indexing. 14 """ 15 16 def __init__(self): 17 """ 18 Creates a new documents stream. 19 """ 20 21 self.documents = None 22 self.batch = 0 23 self.size = 0 24 25 # Pickle serialization - local temporary data 26 self.serializer = SerializeFactory.create("pickle", allowpickle=True) 27 28 def __len__(self): 29 """ 30 Returns total number of queued documents. 31 """ 32 33 return self.size 34 35 def __iter__(self): 36 """ 37 Streams all queued documents. 38 """ 39 40 # Close streaming file 41 self.documents.close() 42 43 # Open stream file 44 with open(self.documents.name, "rb") as queue: 45 # Read each batch 46 for _ in range(self.batch): 47 documents = self.serializer.loadstream(queue) 48 49 # Yield each document 50 yield from documents 51 52 def add(self, documents): 53 """ 54 Adds a batch of documents for indexing. 55 56 Args: 57 documents: list of (id, data, tag) tuples 58 59 Returns: 60 documents 61 """ 62 63 # Create documents file if not already open 64 # pylint: disable=R1732 65 if not self.documents: 66 self.documents = tempfile.NamedTemporaryFile(mode="wb", suffix=".docs", delete=False) 67 68 # Add batch 69 self.serializer.savestream(documents, self.documents) 70 self.batch += 1 71 self.size += len(documents) 72 73 return documents 74 75 def close(self): 76 """ 77 Closes and resets this instance. New sets of documents can be added with additional calls to add. 78 """ 79 80 # Cleanup stream file 81 os.remove(self.documents.name) 82 83 # Reset document parameters 84 self.documents = None 85 self.batch = 0 86 self.size = 0