import os
import gradio as gr
import requests
import base64
from io import BytesIO
from PIL import Image
import hashlib
import concurrent.futures
import zipfile
import tempfile
# Constants
FREE_URL = os.environ.get("SERVER_URL_FREE")
PREMIUM_URL = os.environ.get("SERVER_URL_PREMIUM")
PREMIUM_URL2 = os.environ.get("SERVER_URL_PREMIUM2")
TOKEN_SERVER_URL = os.environ.get("TOKEN_URL")
API_KEY = os.environ.get("API_KEY")
STATUS_MESSAGES = {
301: "Too many faces in the photo.",
302: "Face is not clear enough.",
303: "No matches found. Try Deep Search with premium token.",
304: "No face in the photo.",
305: "Search blocked for privacy issue.",
401: "Invalid image format.",
402: "Wrong request.",
403: "Please try again later.",
404: "Timeout, try again."
}
FREE_SUFFIX = "*********"
def image_to_base64(image):
buffered = BytesIO()
image.save(buffered, format="JPEG", quality=90)
return base64.b64encode(buffered.getvalue()).decode('utf-8')
def base64_to_image(base64_str):
return base64.b64decode(base64_str + '=' * (-len(base64_str) % 4))
def check_db(img_array, suffix):
hashes = []
out_array = []
for item in img_array:
try:
image_data = base64_to_image(item["image"])
image_hash = hashlib.sha256(image_data).hexdigest()
hashes.append(image_hash)
out_array.append((Image.open(BytesIO(image_data)), item["url"] + suffix))
except base64.binascii.Error as e:
raise ValueError(f"Invalid base64 string: {str(e)}")
except Exception as e:
raise ValueError(f"Error processing image: {str(e)}")
try:
response = requests.post(url=TOKEN_SERVER_URL + '/lookup-hashes', json={"hashes": hashes})
out_array = [value for i, value in enumerate(out_array) if i not in response.json().get('res')]
except:
raise gr.Error("Token Server Error!")
return out_array
def verify_token(token):
try:
response = requests.post(url=TOKEN_SERVER_URL + '/verify-token', json={"token": token})
if response.json().get('status') == 'success':
return PREMIUM_URL
else:
raise gr.Error("Invalid token! For free search, use empty string for token")
except:
raise gr.Error("Invalid token!")
def activate_token(token):
try:
requests.post(url=TOKEN_SERVER_URL + '/activate-token', json={"token": token})
except:
raise gr.Error("Invalid token!")
def get_image_base64(file):
try:
image = Image.open(file)
return image_to_base64(image)
except Exception as e:
raise gr.Error("Please select image file!")
def send_requests(url, file):
with concurrent.futures.ThreadPoolExecutor() as executor:
future1 = executor.submit(requests.post, url, headers={"X-RapidAPI-Key": API_KEY}, json={"image": get_image_base64(file)})
if url == PREMIUM_URL:
future2 = executor.submit(requests.post, PREMIUM_URL2, files={"image": open(file, 'rb')})
response1 = future1.result()
response2 = future2.result()
return response1, response2
else:
response1 = future1.result()
return response1, None
def process_response(response, soc_res, url, token):
status_code = response.status_code
if not soc_res:
if status_code in STATUS_MESSAGES:
gr.Info(STATUS_MESSAGES[status_code])
if status_code > 300:
return []
try:
res = response.json().get('img_array', []) if status_code in [201, 202] else []
if soc_res:
res = soc_res[0] + res + soc_res[1]
suffix = "" if url == PREMIUM_URL else FREE_SUFFIX
out_array = check_db(res, suffix)
if url == PREMIUM_URL:
activate_token(token)
return out_array
except:
raise gr.Error("Try again.")
def process_response2(response):
if response.status_code == 200:
part1 = response.json().get('part1')
part2 = response.json().get('part2')
if not part1 and not part2:
return None
return (part1, part2)
return None
def search_face(file, token=None):
url = FREE_URL
if token:
url = verify_token(token)
response1, response2 = send_requests(url, file)
soc_res = process_response2(response2) if response2 else None
return process_response(response1, soc_res, url, token)
def export_images(items):
if not items:
return None
# Create a zip file in memory
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zip_file:
url_text = ""
for i, item in enumerate(items):
with open(item[0], 'rb') as img_file:
zip_file.writestr(f'image_{i}.jpg', img_file.read())
url_text += f"image_{i}.jpg: {item[1]}\n"
zip_file.writestr("urls.txt", url_text)
zip_buffer.seek(0)
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_file:
temp_file.write(zip_buffer.getvalue())
temp_file_path = temp_file.name
return temp_file_path
custom_css = """
caption.caption {
user-select: text;
cursor: text;
}
div#component-16 {
max-height: 63.39px;
}
.svelte-snayfm {
height: auto;
}
.icon.svelte-snayfm {
width: 48px;
height: 48px;
}
"""
js = """
function aff() {
const links = document.querySelectorAll('a');
const currentUrl = new URL(window.location.href);
const currentParams = currentUrl.searchParams.toString();
links.forEach(link => {
const href = link.getAttribute('href');
if (href && (href.startsWith('https://faceonlive.pocketsflow.com') || href.startsWith('https://faceonlive.com'))) {
const targetUrl = new URL(href);
// Append current page parameters to the link
currentParams.split('&').forEach(param => {
if (param) {
const [key, value] = param.split('=');
targetUrl.searchParams.set(key, value);
}
});
link.setAttribute('href', targetUrl.toString());
console.log(`Updated Link: ${targetUrl.toString()}`);
}
});
return ''
}
"""
head = """
"""
output = gr.Gallery(label="Found Images", columns=[4], object_fit="contain", height="auto", interactive=False)
col2 = gr.Column(scale=2, visible=False)
def init_ui():
return gr.update(visible=True), gr.update(visible=False)
def search_ui():
return gr.update(visible=False), gr.update(visible=True)
def search_face_examples(image):
return search_face(image), gr.update(visible=False), gr.update(visible=True)
with gr.Blocks(css=custom_css, head=head, delete_cache=(3600, 3600)) as demo:
gr.Markdown(
"""
# Free Face Search Online
#### [Discover more about our Face Search on our website.](https://faceonlive.com/face-search-online)
"""
)
with gr.Row():
with gr.Column(scale=1) as col1:
image = gr.Image(type='filepath', height=480)
token = gr.Textbox(placeholder="(Optional) Get Premium Token Below.", label="Premium Token")
gr.HTML("Get Premium Token: Deep Search Including Social Media & Full URL Reveal")
gr.HTML("Protect Your Privacy – Start Your Takedown Now")
search_face_button = gr.Button("Search Face")
gr.Examples(['examples/1.jpg', 'examples/2.jpg'], inputs=image, cache_examples=True, cache_mode='eager', fn=search_face_examples, outputs=[output, col1, col2])
with col2.render():
gr.Markdown("> ### **⚠️ Reminder:** Export images before refreshing the page by clicking the 'Export Images' button.")
output.render()
with gr.Row():
export_button = gr.Button("Export Images")
export_file = gr.File(label="Download")
gr.HTML("Get Premium Token: Deep Search Including Social Media & Full URL Reveal")
gr.HTML("Protect Your Privacy – Start Your Takedown Now")
new_search_button = gr.Button("🚀 Try New Search")
search_face_button.click(search_ui, inputs=[], outputs=[col1, col2], api_name=False)
search_face_button.click(search_face, inputs=[image, token], outputs=[output], api_name=False)
export_button.click(export_images, inputs=[output], outputs=export_file, api_name=False)
new_search_button.click(init_ui, inputs=[], outputs=[col1, col2], api_name=False)
gr.HTML('')
html = gr.HTML()
demo.load(None, inputs=None, outputs=html, js=js)
demo.queue(api_open=False, default_concurrency_limit=8).launch(server_name="0.0.0.0", server_port=7860, show_api=False)