ocr.py
  1  import os
  2  from io import BytesIO
  3  from pathlib import Path
  4  
  5  import pdf2image
  6  from tqdm import tqdm
  7  from loguru import logger
  8  from PIL.Image import Image
  9  from pypdf import PdfWriter
 10  from pytesseract import pytesseract 
 11  
 12  from src.data_preparation.sourcing import ViaHTTP, Author, get_sources 
 13  from src.setup.paths import OCR_IMAGES, PDFS_AFTER_OCR, TXT_AFTER_OCR, make_fundamental_paths 
 14  
 15  
 16  class OCRModule:
 17      def __init__(self, author: Author, keep_images: bool = True, image_format: str = "JPEG", output_format: str = ".pdf") -> None:
 18          self.author: Author = author 
 19          self.keep_images: bool = keep_images
 20          self.image_format: str = image_format
 21          self.output_format: str = output_format
 22          self.path_to_ocr_images: Path = OCR_IMAGES.joinpath(author.name) 
 23          self.path_to_text_after_ocr: Path = PDFS_AFTER_OCR.joinpath(author.name) if output_format == ".pdf" else TXT_AFTER_OCR.joinpath(author.name)
 24  
 25          make_fundamental_paths()
 26          self.create_ocr_paths_for_author()
 27          assert self.output_format in [".txt", ".pdf"], "Texts that have undergone OCR can only be output as .text and PDF files" 
 28  
 29      def create_ocr_paths_for_author(self):
 30          for path in [self.path_to_ocr_images, self.path_to_text_after_ocr]:
 31              if not Path(path).exists():
 32                  os.mkdir(path)
 33  
 34      def get_path_to_images_from_book(self, book: ViaHTTP) -> Path :
 35          path_to_ocr_images_of_book: Path = self.path_to_ocr_images.joinpath(book.file_name)
 36          if not path_to_ocr_images_of_book.exists():
 37              os.mkdir(path_to_ocr_images_of_book)
 38  
 39          return path_to_ocr_images_of_book 
 40  
 41      def is_book_already_processed(self, book: ViaHTTP) -> bool:
 42          if self.path_to_text_after_ocr.joinpath(f"{book.file_name}" + self.output_format).exists():
 43              logger.success(f'"{book.title}" by {self.author.name} has already been processed.')
 44              return True
 45          else:
 46              return False
 47              
 48  
 49      def extract_text_from_images(self) -> None:
 50          if self.author.books_via_http != None:  # At the moment, the only books that are confirmed to need OCR are among those I got through HTTP 
 51              for book in self.author.books_via_http:
 52  
 53                  if book.needs_ocr and not self.is_book_already_processed(book=book):
 54                      logger.warning(f'"{book.title}" by {self.author.name} requires OCR')
 55                      path_to_ocr_images_of_book: Path = self.get_path_to_images_from_book(book=book) 
 56                      book_file_path: Path = self.author.path_to_raw_data.joinpath(f"{book.file_name}" + ".pdf")
 57  
 58                      if not path_to_ocr_images_of_book.exists():
 59                          os.mkdir(path_to_ocr_images_of_book)
 60  
 61                      logger.info("Taking an image of each page")
 62                      images_from_books: list[Image] = pdf2image.convert_from_path(pdf_path=book_file_path)
 63  
 64                      full_text = ""
 65                      merger = PdfWriter()
 66  
 67                      for i, image in tqdm(
 68                          iterable=enumerate(images_from_books),
 69                          desc="Extracting text from each page..."
 70                      ):
 71                          page_image_path: Path = path_to_ocr_images_of_book.joinpath(f"Page {i+1}.jpg")
 72                          if not page_image_path.exists():
 73                              if (book.start_page != None) and (book.end_page != None):
 74                                  if (i < book.start_page) or (i > book.end_page):
 75                                     continue 
 76  
 77                              if self.keep_images:
 78                                  image.save(page_image_path, format=self.image_format)
 79  
 80                              if self.output_format == "pdf":
 81                                  pdf_after_ocr: bytes|str = pytesseract.image_to_pdf_or_hocr(image=image, lang="eng", extension="pdf") 
 82                                  if isinstance(pdf_after_ocr, bytes):
 83                                      merger.append(BytesIO(pdf_after_ocr)) 
 84                                  else:
 85                                      raise Exception(f'After OCR, the PDF of "{book.title}" is coming out as a string instead of a bytes object')
 86                              else: 
 87                                  raw_text_in_image: bytes|str|dict[str, bytes|str] = pytesseract.image_to_string(image=image, lang="eng") 
 88                                  if isinstance(raw_text_in_image, str):
 89                                      full_text += f"/n/n {raw_text_in_image}"
 90                                  else:
 91                                      raise Exception(f'After OCR, the string content of of "{book.title}" is coming out as a bytes object or a dictionary')
 92  
 93                      file_extension = ".pdf" if self.output_format == "pdf" else "txt"
 94                      path_to_write_into: Path = self.path_to_text_after_ocr.joinpath(book.file_name + file_extension)
 95  
 96                      if self.output_format == "pdf":
 97                          with open(path_to_write_into, mode="wb") as file:
 98                                  _ = merger.write(file)
 99                      else:
100                          with open(path_to_write_into, mode="w") as txt_file:
101                              _ = txt_file.write(full_text)
102  
103                      logger.success("Saved the processed book as a new PDF")
104  
105  
106  if __name__ == "__main__":
107      for author in get_sources():
108          module = OCRModule(author=author)
109          module.extract_text_from_images() 
110