/ src / python / txtai / embeddings / index / documents.py
documents.py
 1  """
 2  Documents module
 3  """
 4  
 5  import os
 6  import tempfile
 7  
 8  from ...serialize import SerializeFactory
 9  
10  
11  class Documents:
12      """
13      Streams documents to temporary storage. Allows queuing large volumes of content for later indexing.
14      """
15  
16      def __init__(self):
17          """
18          Creates a new documents stream.
19          """
20  
21          self.documents = None
22          self.batch = 0
23          self.size = 0
24  
25          # Pickle serialization - local temporary data
26          self.serializer = SerializeFactory.create("pickle", allowpickle=True)
27  
28      def __len__(self):
29          """
30          Returns total number of queued documents.
31          """
32  
33          return self.size
34  
35      def __iter__(self):
36          """
37          Streams all queued documents.
38          """
39  
40          # Close streaming file
41          self.documents.close()
42  
43          # Open stream file
44          with open(self.documents.name, "rb") as queue:
45              # Read each batch
46              for _ in range(self.batch):
47                  documents = self.serializer.loadstream(queue)
48  
49                  # Yield each document
50                  yield from documents
51  
52      def add(self, documents):
53          """
54          Adds a batch of documents for indexing.
55  
56          Args:
57              documents: list of (id, data, tag) tuples
58  
59          Returns:
60              documents
61          """
62  
63          # Create documents file if not already open
64          # pylint: disable=R1732
65          if not self.documents:
66              self.documents = tempfile.NamedTemporaryFile(mode="wb", suffix=".docs", delete=False)
67  
68          # Add batch
69          self.serializer.savestream(documents, self.documents)
70          self.batch += 1
71          self.size += len(documents)
72  
73          return documents
74  
75      def close(self):
76          """
77          Closes and resets this instance. New sets of documents can be added with additional calls to add.
78          """
79  
80          # Cleanup stream file
81          os.remove(self.documents.name)
82  
83          # Reset document parameters
84          self.documents = None
85          self.batch = 0
86          self.size = 0