Spaces:
Sleeping
Sleeping
import os | |
import uuid | |
import time | |
import glob | |
import shutil | |
import asyncio | |
import aiohttp | |
import aiofiles | |
import logging | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.firefox.options import Options | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from concurrent.futures import ThreadPoolExecutor | |
from selenium.webdriver.firefox.service import Service as FirefoxService | |
from webdriver_manager.firefox import GeckoDriverManager | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
logger = logging.getLogger(__name__) | |
class GoogleLensPipeline: | |
def __init__( | |
self, | |
link=None, | |
accept_button_xpath=None, | |
file_input_xpath=None, | |
root_directory=None, | |
max_images=100000, # Added max_images parameter | |
): | |
self.link = link or "https://lens.google.com/" | |
self.accept_button_xpath = accept_button_xpath or '//span[text()="Accept all"]' | |
self.file_input_xpath = file_input_xpath or '//input[@type="file"]' | |
self.root_directory = root_directory or os.path.join(os.getcwd(), "images") | |
self.tmp_directory_one = os.path.join(self.root_directory, "tmp_one") | |
self.tmp_directory_two = os.path.join(self.root_directory, "tmp_two") | |
self.final_directory = os.path.join(self.root_directory, "images") | |
self.max_images = max_images # Initialize max_images | |
self.setup_directories() | |
def start_driver(self): | |
try: | |
firefox_options = Options() | |
# firefox_options.binary_location = os.path.abspath("./firefox/firefox") | |
firefox_options.add_argument("--headless") | |
firefox_options.add_argument("--disable-gpu") | |
firefox_options.add_argument("--no-sandbox") | |
# self.driver = webdriver.Firefox(options=firefox_options) | |
self.driver = webdriver.Firefox( | |
service=FirefoxService(GeckoDriverManager().install()), | |
options=firefox_options, | |
) | |
logger.info("Firefox driver started successfully.") | |
except Exception as e: | |
logger.error(f"Failed to start Firefox driver: {str(e)}") | |
def setup_directories(self): | |
try: | |
for val in [ | |
self.root_directory, | |
self.tmp_directory_one, | |
self.tmp_directory_two, | |
self.final_directory, | |
]: | |
os.makedirs(val, exist_ok=True) | |
logger.info("Directories set up successfully.") | |
except Exception as e: | |
logger.error(f"Error setting up directories: {str(e)}") | |
def clean_up_directories(self): | |
try: | |
for val in [self.tmp_directory_one, self.tmp_directory_two]: | |
shutil.rmtree(val) | |
logger.info("Temporary directories cleaned up successfully.") | |
except Exception as e: | |
logger.error(f"Error cleaning up directories: {str(e)}") | |
def open_page(self): | |
try: | |
self.driver.get(self.link) | |
logger.info("Page opened successfully.") | |
except Exception as e: | |
logger.error(f"Failed to open page: {str(e)}") | |
def accept_terms(self, timeout=20): | |
try: | |
wait = WebDriverWait(self.driver, timeout) | |
accept_button = wait.until( | |
EC.element_to_be_clickable((By.XPATH, self.accept_button_xpath)) | |
) | |
accept_button.click() | |
logger.info("Accepted terms successfully.") | |
except Exception as e: | |
logger.error(f"Failed to accept terms: {str(e)}") | |
def upload_image(self, image_path, timeout=20, sleep=10): | |
try: | |
wait = WebDriverWait(self.driver, timeout) | |
file_input = wait.until( | |
EC.presence_of_element_located((By.XPATH, self.file_input_xpath)) | |
) | |
file_input.send_keys(image_path) | |
time.sleep(sleep) | |
logger.info(f"Image uploaded successfully: {image_path}") | |
except Exception as e: | |
logger.error(f"Failed to upload image: {str(e)}") | |
def get_images(self): | |
try: | |
elements = self.driver.execute_script( | |
""" | |
var elements = document.getElementsByClassName("wETe9b jFVN1"); | |
var srcList = []; | |
for (let element of elements) { | |
srcList.push(element.src); | |
} | |
return srcList; | |
""" | |
) | |
self.driver.quit() | |
logger.info("Fetched image URLs successfully.") | |
return elements | |
except Exception as e: | |
logger.error(f"Failed to get images: {str(e)}") | |
self.driver.quit() | |
async def download_image(self, session, url, path): | |
try: | |
async with session.get(url) as response: | |
if response.status == 200: | |
file_path = os.path.join(path, f"{uuid.uuid4()}.jpg") | |
async with aiofiles.open(file_path, "wb") as f: | |
await f.write(await response.read()) | |
logger.info(f"Image downloaded successfully: {url}") | |
else: | |
logger.warning( | |
f"Failed to download image: HTTP status code {response.status}" | |
) | |
except Exception as e: | |
logger.error(f"Error downloading image: {str(e)}") | |
async def download_images(self, elements, path=None): | |
path = path or self.final_directory | |
async with aiohttp.ClientSession() as session: | |
tasks = [self.download_image(session, url, path) for url in elements] | |
await asyncio.gather(*tasks) | |
async def run_pipeline(self, image_path, save_path=None): | |
with ThreadPoolExecutor() as executor: | |
loop = asyncio.get_event_loop() | |
await loop.run_in_executor(executor, self.start_driver) | |
await loop.run_in_executor(executor, self.open_page) | |
await loop.run_in_executor(executor, self.accept_terms) | |
await loop.run_in_executor(executor, self.upload_image, image_path) | |
elements = await loop.run_in_executor(executor, self.get_images) | |
await self.download_images(elements, save_path) | |
async def loop_run_pipeline(self, image_path): | |
try: | |
await self.run_pipeline(image_path, save_path=self.tmp_directory_one) | |
# Check if the maximum number of images has been reached | |
total_images = len(os.listdir(self.final_directory)) | |
if total_images >= self.max_images: | |
logger.info( | |
f"Reached maximum number of images ({self.max_images}). Stopping." | |
) | |
return | |
for val in os.listdir(self.tmp_directory_one): | |
try: | |
await self.run_pipeline( | |
os.path.join(self.tmp_directory_one, val), | |
save_path=self.tmp_directory_two, | |
) | |
shutil.move( | |
os.path.join(self.tmp_directory_one, val), | |
os.path.join(self.final_directory, val), | |
) | |
# Check after each image | |
total_images = len(os.listdir(self.final_directory)) | |
if total_images >= self.max_images: | |
logger.info( | |
f"Reached maximum number of images ({self.max_images}). Stopping." | |
) | |
return | |
except Exception as e: | |
logger.error( | |
f"Error processing file {val} in tmp_directory_one: {str(e)}" | |
) | |
for val in os.listdir(self.tmp_directory_two): | |
try: | |
await self.run_pipeline( | |
os.path.join(self.tmp_directory_two, val), | |
save_path=self.final_directory, | |
) | |
shutil.move( | |
os.path.join(self.tmp_directory_two, val), | |
os.path.join(self.final_directory, val), | |
) | |
# Check after each image | |
total_images = len(os.listdir(self.final_directory)) | |
if total_images >= self.max_images: | |
logger.info( | |
f"Reached maximum number of images ({self.max_images}). Stopping." | |
) | |
return | |
except Exception as e: | |
logger.error( | |
f"Error processing file {val} in tmp_directory_two: {str(e)}" | |
) | |
self.clean_up_directories() | |
except Exception as e: | |
logger.error(f"Error in loop_run_pipeline: {str(e)}") | |
def get_image_paths(directory): | |
# Define the image file extensions you're interested in | |
image_extensions = ("*.jpg", "*.jpeg", "*.png", "*.gif", "*.bmp", "*.tiff") | |
# Use glob to find all files with the specified extensions in the given directory and its subdirectories | |
image_paths = [] | |
for extension in image_extensions: | |
image_paths.extend( | |
glob.glob(os.path.join(directory, "**", extension), recursive=True) | |
) | |
# Get the absolute paths of the images | |
full_image_paths = [os.path.abspath(image_path) for image_path in image_paths] | |
return full_image_paths | |
if __name__ == "__main__": | |
path = "/home/user/app/images/" | |
pipeline = GoogleLensPipeline( | |
root_directory=path, max_images=100000 | |
) # Set max_images | |
for image_path in get_image_paths(path): | |
total_images = len(os.listdir(pipeline.final_directory)) | |
if total_images >= pipeline.max_images: | |
logger.info( | |
f"Reached maximum number of images ({pipeline.max_images}). Stopping." | |
) | |
break | |
asyncio.run(pipeline.loop_run_pipeline(image_path)) | |