import os import gradio as gr import requests import base64 from io import BytesIO from PIL import Image import hashlib import concurrent.futures import zipfile import tempfile # Constants FREE_URL = os.environ.get("SERVER_URL_FREE") PREMIUM_URL = os.environ.get("SERVER_URL_PREMIUM") PREMIUM_URL2 = os.environ.get("SERVER_URL_PREMIUM2") TOKEN_SERVER_URL = os.environ.get("TOKEN_URL") API_KEY = os.environ.get("API_KEY") STATUS_MESSAGES = { 301: "Too many faces in the photo.", 302: "Face is not clear enough.", 303: "No matches found. Try Deep Search with premium token.", 304: "No face in the photo.", 305: "Search blocked for privacy issue.", 401: "Invalid image format.", 402: "Wrong request.", 403: "Please try again later.", 404: "Timeout, try again." } FREE_SUFFIX = "*********" def image_to_base64(image): buffered = BytesIO() image.save(buffered, format="JPEG", quality=90) return base64.b64encode(buffered.getvalue()).decode('utf-8') def base64_to_image(base64_str): return base64.b64decode(base64_str + '=' * (-len(base64_str) % 4)) def check_db(img_array, suffix): hashes = [] out_array = [] for item in img_array: try: image_data = base64_to_image(item["image"]) image_hash = hashlib.sha256(image_data).hexdigest() hashes.append(image_hash) out_array.append((Image.open(BytesIO(image_data)), item["url"] + suffix)) except base64.binascii.Error as e: raise ValueError(f"Invalid base64 string: {str(e)}") except Exception as e: raise ValueError(f"Error processing image: {str(e)}") try: response = requests.post(url=TOKEN_SERVER_URL + '/lookup-hashes', json={"hashes": hashes}) out_array = [value for i, value in enumerate(out_array) if i not in response.json().get('res')] except: raise gr.Error("Token Server Error!") return out_array def verify_token(token): try: response = requests.post(url=TOKEN_SERVER_URL + '/verify-token', json={"token": token}) if response.json().get('status') == 'success': return PREMIUM_URL else: raise gr.Error("Invalid token! For free search, use empty string for token") except: raise gr.Error("Invalid token!") def activate_token(token): try: requests.post(url=TOKEN_SERVER_URL + '/activate-token', json={"token": token}) except: raise gr.Error("Invalid token!") def get_image_base64(file): try: image = Image.open(file) return image_to_base64(image) except Exception as e: raise gr.Error("Please select image file!") def send_requests(url, file): with concurrent.futures.ThreadPoolExecutor() as executor: future1 = executor.submit(requests.post, url, headers={"X-RapidAPI-Key": API_KEY}, json={"image": get_image_base64(file)}) if url == PREMIUM_URL: future2 = executor.submit(requests.post, PREMIUM_URL2, files={"image": open(file, 'rb')}) response1 = future1.result() response2 = future2.result() return response1, response2 else: response1 = future1.result() return response1, None def process_response(response, soc_res, url, token): status_code = response.status_code if not soc_res: if status_code in STATUS_MESSAGES: gr.Info(STATUS_MESSAGES[status_code]) if status_code > 300: return [] try: res = response.json().get('img_array', []) if status_code in [201, 202] else [] if soc_res: res = soc_res[0] + res + soc_res[1] suffix = "" if url == PREMIUM_URL else FREE_SUFFIX out_array = check_db(res, suffix) if url == PREMIUM_URL: activate_token(token) return out_array except: raise gr.Error("Try again.") def process_response2(response): if response.status_code == 200: part1 = response.json().get('part1') part2 = response.json().get('part2') if not part1 and not part2: return None return (part1, part2) return None def search_face(file, token=None): url = FREE_URL if token: url = verify_token(token) response1, response2 = send_requests(url, file) soc_res = process_response2(response2) if response2 else None return process_response(response1, soc_res, url, token) def export_images(items): if not items: return None # Create a zip file in memory zip_buffer = BytesIO() with zipfile.ZipFile(zip_buffer, 'w') as zip_file: url_text = "" for i, item in enumerate(items): with open(item[0], 'rb') as img_file: zip_file.writestr(f'image_{i}.jpg', img_file.read()) url_text += f"image_{i}.jpg: {item[1]}\n" zip_file.writestr("urls.txt", url_text) zip_buffer.seek(0) with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_file: temp_file.write(zip_buffer.getvalue()) temp_file_path = temp_file.name return temp_file_path custom_css = """ caption.caption { user-select: text; cursor: text; } div#component-16 { max-height: 63.39px; } .svelte-snayfm { height: auto; } .icon.svelte-snayfm { width: 48px; height: 48px; } """ js = """ function aff() { const links = document.querySelectorAll('a'); const currentUrl = new URL(window.location.href); const currentParams = currentUrl.searchParams.toString(); links.forEach(link => { const href = link.getAttribute('href'); if (href && (href.startsWith('https://faceonlive.pocketsflow.com') || href.startsWith('https://faceonlive.com'))) { const targetUrl = new URL(href); // Append current page parameters to the link currentParams.split('&').forEach(param => { if (param) { const [key, value] = param.split('='); targetUrl.searchParams.set(key, value); } }); link.setAttribute('href', targetUrl.toString()); console.log(`Updated Link: ${targetUrl.toString()}`); } }); return '' } """ head = """ """ output = gr.Gallery(label="Found Images", columns=[4], object_fit="contain", height="auto", interactive=False) col2 = gr.Column(scale=2, visible=False) def init_ui(): return gr.update(visible=True), gr.update(visible=False) def search_ui(): return gr.update(visible=False), gr.update(visible=True) def search_face_examples(image): return search_face(image), gr.update(visible=False), gr.update(visible=True) with gr.Blocks(css=custom_css, head=head, delete_cache=(3600, 3600)) as demo: gr.Markdown( """ # Free Face Search Online #### [Discover more about our Face Search on our website.](https://faceonlive.com/face-search-online)
""" ) with gr.Row(): with gr.Column(scale=1) as col1: image = gr.Image(type='filepath', height=480) token = gr.Textbox(placeholder="(Optional) Get Premium Token Below.", label="Premium Token") gr.HTML("Get Premium Token: Deep Search Including Social Media & Full URL Reveal") gr.HTML("Protect Your Privacy – Start Your Takedown Now") search_face_button = gr.Button("Search Face") gr.Examples(['examples/1.jpg', 'examples/2.jpg'], inputs=image, cache_examples=True, cache_mode='eager', fn=search_face_examples, outputs=[output, col1, col2]) with col2.render(): gr.Markdown("> ### **⚠️ Reminder:** Export images before refreshing the page by clicking the 'Export Images' button.") output.render() with gr.Row(): export_button = gr.Button("Export Images") export_file = gr.File(label="Download") gr.HTML("Get Premium Token: Deep Search Including Social Media & Full URL Reveal") gr.HTML("Protect Your Privacy – Start Your Takedown Now") new_search_button = gr.Button("🚀 Try New Search") search_face_button.click(search_ui, inputs=[], outputs=[col1, col2], api_name=False) search_face_button.click(search_face, inputs=[image, token], outputs=[output], api_name=False) export_button.click(export_images, inputs=[output], outputs=export_file, api_name=False) new_search_button.click(init_ui, inputs=[], outputs=[col1, col2], api_name=False) gr.HTML('') html = gr.HTML() demo.load(None, inputs=None, outputs=html, js=js) demo.queue(api_open=False, default_concurrency_limit=8).launch(server_name="0.0.0.0", server_port=7860, show_api=False)