search.py
1 import time 2 import requests 3 import firebase_admin 4 import uuid 5 import datetime 6 from firebase_admin import credentials, firestore, storage 7 import random 8 9 TESTING_MODE = False 10 APITOKEN = 'YOUR_TOKEN_HERE' # Your API Token for Facecheck.id 11 12 cred = credentials.Certificate('./service.json') 13 14 firebase_admin.initialize_app(cred, { 15 'storageBucket': 'gs://STORAGEBUCKET_URL_HERE' 16 }) 17 18 db = firestore.client() 19 20 # Use the local file path directly 21 image_file = '/home/captured_image.jpg' # Updated to use a local file path 22 23 def upload_image(image_path): 24 bucket = storage.bucket() # Get the storage bucket 25 blob_name = f"images/{uuid.uuid4()}.jpg" 26 27 blob = bucket.blob(blob_name) 28 29 # Upload the file 30 blob.upload_from_filename(image_path) 31 32 # Optionally, get the public URL 33 return blob.public_url, blob_name 34 35 def search_by_face(image_file): 36 if TESTING_MODE: 37 print('****** TESTING MODE search, results are inaccurate, and queue wait is long, but credits are NOT deducted ******') 38 39 site='https://facecheck.id' 40 headers = {'accept': 'application/json', 'Authorization': APITOKEN} 41 files = {'images': open(image_file, 'rb'), 'id_search': None} 42 response = requests.post(site+'/api/upload_pic', headers=headers, files=files).json() 43 44 if response['error']: 45 return f"{response['error']} ({response['code']})", None 46 47 id_search = response['id_search'] 48 print(response['message'] + ' id_search='+id_search) 49 json_data = {'id_search': id_search, 'with_progress': True, 'status_only': False, 'demo': TESTING_MODE} 50 51 while True: 52 response = requests.post(site+'/api/search', headers=headers, json=json_data).json() 53 if response['error']: 54 return f"{response['error']} ({response['code']})", None 55 if response['output']: 56 return None, response['output']['items'] 57 print(f'{response["message"]} progress: {response["progress"]}%') 58 time.sleep(1) 59 60 61 # Setup Firestore 62 people_ref = db.collection('people').document() 63 # Set the document with the current date and time 64 people_ref.set({ 65 'time': datetime.datetime.now(), 66 }) 67 68 # Search the Internet by face 69 error, urls_images = search_by_face(image_file) 70 71 # public_url, generated_blob_name = upload_image(image_file) 72 73 # Check if urls_images is not empty 74 if urls_images: 75 # Create a subcollection named 'socials' under the current document 76 socials_ref = people_ref.collection('socials') 77 78 for im in urls_images: 79 document_data = { 80 'score': im['score'], 81 'url': im['url'], 82 'image_base64': im['base64'] 83 } 84 print("Preparing to set document with data:", document_data) 85 # Check each key in document_data to ensure it's a string and not empty 86 for key in document_data.keys(): 87 if not isinstance(key, str) or not key: 88 raise ValueError(f"Invalid key detected: '{key}'. Keys must be non-empty strings.") 89 90 social_doc_ref = socials_ref.document() 91 social_doc_ref.set(document_data) 92 93 else: 94 print(error)