/ functions.py
functions.py
  1  import os
  2  import sys
  3  import hashlib
  4  import settings
  5  import magic
  6  import pathlib
  7  import subprocess
  8  import constants
  9  
 10  # Current going with os.system() to run a shell script
 11  # import filetype
 12  # import subprocess
 13  
 14  def listDirs(path):
 15      fileList = []
 16      dirList = []
 17  
 18      if isinstance(path, list):
 19          path = path[0]
 20  
 21      files = os.listdir(path)
 22  
 23      for file in files:
 24          currentDir = pathlib.Path.cwd()
 25          fullPath = currentDir / path
 26          fullPath = os.path.join(fullPath, file)
 27          if os.path.isfile(fullPath):
 28              fileList.append(fullPath)
 29          elif os.path.isdir(fullPath):
 30              dirList.append(fullPath)
 31  
 32      yield fileList
 33  
 34      if len(dirList) > 0:
 35          for dir in listDirs(dirList):
 36              yield dir
 37  
 38  def buildHashedFileList(list):
 39      hashedList = []
 40  
 41      for line in list:
 42          with open(line, 'rb', buffering=0) as file:
 43              bytes = file.read()
 44              hash = hashlib.sha256(bytes).hexdigest()
 45              string = hash + '\t' + line + '\n'
 46              hashedList.append(string)
 47  
 48      return hashedList
 49  
 50  def checkAndResolveType(argOptions, sourceFile):
 51      """Match file with exclusive or inclusive file type list definitions"""
 52      buff = settings.fileBuffer
 53  
 54      with open(sourceFile, "r", buffering=buff) as sourceFileList:
 55          for sourceLine in sourceFileList:
 56              sourceFileHash, sourceFileFullPath = sourceLine.split('\t')
 57  
 58      sourceFileFullPath = sourceFileFullPath.strip('\n')
 59  
 60      # subprocess way / <- this way did not work, why?
 61      # type = subprocess.run([f'file {sourceFileFullPath}'], capture_output=True, text=True, check=True)
 62  
 63      # pythonmagic way
 64      pmagic = magic.Magic()
 65      type = pmagic.from_file(sourceFileFullPath)
 66      extension = constants.typeToExtensionMap[type]
 67  
 68      if argOptions.file_types:
 69          if extension in argOptions.file_types:
 70              return True
 71      elif argOptions.exclude_file_types:
 72          if extension not in argOptions.file_types:
 73              return True
 74  
 75  def isFileSynced(sourceFileHash, destinationFileList):
 76      match = False
 77      buff = settings.fileBuffer
 78  
 79      with open(destinationFileList, "r", buffering=buff) as destinationFileList:
 80          for destinationLine in destinationFileList:
 81              destinationFileHash, destinationFilePath = destinationLine.split('\t')
 82  
 83              if sourceFileHash == destinationFileHash:
 84                  match = True
 85                  break
 86  
 87      return match
 88  
 89  def sync():
 90      buff = settings.fileBuffer
 91      sourceFileStart = 0
 92      destinationFileStart = 0
 93  
 94      with open(settings.sourceOutput, "r", buffering=buff) as sourceFileList:
 95          for sourceLine in sourceFileList:
 96              sourceFileHash, sourceFileFullPath = sourceLine.split('\t')
 97  
 98      # Debuggin
 99      print(__name__)
100  
101  def log():
102      pass  # TODO
103  
104  if __name__ == "__main__":
105      print("This is a support file, not intended to be called directly")