Spaces:
Build error
Build error
| import numpy as np | |
| from sentence_transformers import SentenceTransformer, util | |
| from open_clip import create_model_from_pretrained, get_tokenizer | |
| import torch | |
| from datasets import load_dataset | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import torch.nn as nn | |
| import boto3 | |
| import streamlit as st | |
| from PIL import Image | |
| from PIL import ImageDraw | |
| from io import BytesIO | |
| import pandas as pd | |
| from typing import List, Union | |
| import concurrent.futures | |
| # Initialize the model globally to avoid reloading each time | |
| model, preprocess = create_model_from_pretrained('hf-hub:timm/ViT-SO400M-14-SigLIP-384') | |
| tokenizer = get_tokenizer('hf-hub:timm/ViT-SO400M-14-SigLIP-384') | |
| #what model do we use? | |
| def encode_query(query: Union[str, Image.Image]) -> torch.Tensor: | |
| """ | |
| Encode the query using the OpenCLIP model. | |
| Parameters | |
| ---------- | |
| query : Union[str, Image.Image] | |
| The query, which can be a text string or an Image object. | |
| Returns | |
| ------- | |
| torch.Tensor | |
| The encoded query vector. | |
| """ | |
| if isinstance(query, Image.Image): | |
| query = preprocess(query).unsqueeze(0) # Preprocess the image and add batch dimension | |
| with torch.no_grad(): | |
| query_embedding = model.encode_image(query) # Get image embedding | |
| elif isinstance(query, str): | |
| text = tokenizer(query, context_length=model.context_length) | |
| with torch.no_grad(): | |
| query_embedding = model.encode_text(text) # Get text embedding | |
| else: | |
| raise ValueError("Query must be either a string or an Image.") | |
| return query_embedding | |
| def load_dataset_with_limit(dataset_name, dataset_subset, search_in_small_objects,limit=1000): | |
| """ | |
| Load a dataset from Hugging Face and limit the number of rows. | |
| """ | |
| if search_in_small_objects: | |
| split = f'Splits_{dataset_subset}' | |
| else: | |
| split = f'Main_{dataset_subset}' | |
| dataset_name = f"quasara-io/{dataset_name}" | |
| dataset = load_dataset(dataset_name, split=split) | |
| total_rows = dataset.num_rows | |
| # Convert to DataFrame and sample if limit is provided | |
| if limit is not None: | |
| df = dataset.to_pandas().sample(n=limit, random_state=42) | |
| else: | |
| df = dataset.to_pandas() | |
| return df,total_rows | |
| def get_image_vectors(df): | |
| # Get the image vectors from the dataframe | |
| image_vectors = np.vstack(df['Vector'].to_numpy()) | |
| return torch.tensor(image_vectors, dtype=torch.float32) | |
| def search(query, df, limit, search_in_images = True): | |
| if search_in_images: | |
| # Encode the image query | |
| query_vector = encode_query(query) | |
| # Get the image vectors from the dataframe | |
| image_vectors = get_image_vectors(df) | |
| # Calculate the cosine similarity between the query vector and each image vector | |
| query_vector = query_vector[0, :].detach().numpy() # Detach and convert to a NumPy array | |
| image_vectors = image_vectors.detach().numpy() # Convert the image vectors to a NumPy array | |
| cosine_similarities = cosine_similarity([query_vector], image_vectors) | |
| # Get the top K indices of the most similar image vectors | |
| top_k_indices = np.argsort(-cosine_similarities[0])[:limit] | |
| # Return the top K indices | |
| return top_k_indices | |
| #Try Batch Search | |
| def batch_search(query, df, batch_size=100000, limit=10): | |
| top_k_indices = [] | |
| # Get the image vectors from the dataframe and ensure they are NumPy arrays | |
| vectors = get_image_vectors(df).numpy() # Convert to NumPy array if it's a tensor | |
| # Encode the query and ensure it's a NumPy array | |
| query_vector = encode_query(query)[0].detach().numpy() # Assuming the first element is the query embedding | |
| # Iterate over the batches and compute cosine similarities | |
| for i in range(0, len(vectors), batch_size): | |
| batch_vectors = vectors[i:i + batch_size] # Extract a batch of vectors | |
| # Compute cosine similarity between the query vector and the batch | |
| batch_similarities = cosine_similarity([query_vector], batch_vectors) | |
| # Get the top-k similar vectors within this batch | |
| top_k_indices.extend(np.argsort(-batch_similarities[0])[:limit]) | |
| return top_k_indices | |
| def get_file_paths(df, top_k_indices, column_name = 'File_Path'): | |
| """ | |
| Retrieve the file paths (or any specific column) from the DataFrame using the top K indices. | |
| Parameters: | |
| - df: pandas DataFrame containing the data | |
| - top_k_indices: numpy array of the top K indices | |
| - column_name: str, the name of the column to fetch (e.g., 'ImagePath') | |
| Returns: | |
| - top_k_paths: list of file paths or values from the specified column | |
| """ | |
| # Fetch the specific column corresponding to the top K indices | |
| top_k_paths = df.iloc[top_k_indices][column_name].tolist() | |
| return top_k_paths | |
| def get_cordinates(df, top_k_indices, column_name = 'Coordinate'): | |
| """ | |
| Retrieve the file paths (or any specific column) from the DataFrame using the top K indices. | |
| Parameters: | |
| - df: pandas DataFrame containing the data | |
| - top_k_indices: numpy array of the top K indices | |
| - column_name: str, the name of the column to fetch (e.g., 'ImagePath') | |
| Returns: | |
| - top_k_paths: list of file paths or values from the specified column | |
| """ | |
| # Fetch the specific column corresponding to the top K indices | |
| top_k_paths = df.iloc[top_k_indices][column_name].tolist() | |
| return top_k_paths | |
| def get_images_from_s3_to_display(bucket_name, file_paths, AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY, folder_name): | |
| """ | |
| Retrieve and display images from AWS S3 in a Streamlit app. | |
| Parameters: | |
| - bucket_name: str, the name of the S3 bucket | |
| - file_paths: list, a list of file paths to retrieve from S3 | |
| Returns: | |
| - None (directly displays images in the Streamlit app) | |
| """ | |
| # Initialize S3 client | |
| s3 = boto3.client( | |
| 's3', | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY | |
| ) | |
| # Iterate over file paths and display each image | |
| for file_path in file_paths: | |
| # Retrieve the image from S3 | |
| s3_object = s3.get_object(Bucket=bucket_name, Key=f"{folder_name}{file_path}") | |
| img_data = s3_object['Body'].read() | |
| # Open the image using PIL and display it using Streamlit | |
| img = Image.open(BytesIO(img_data)) | |
| st.image(img, caption=file_path, use_column_width=True) | |
| def get_images_with_bounding_boxes_from_s3(bucket_name, file_paths, bounding_boxes, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, folder_name): | |
| """ | |
| Retrieve and display images from AWS S3 with corresponding bounding boxes in a Streamlit app. | |
| Parameters: | |
| - bucket_name: str, the name of the S3 bucket | |
| - file_paths: list, a list of file paths to retrieve from S3 | |
| - bounding_boxes: list of numpy arrays or lists, each containing coordinates of bounding boxes (in the form [x_min, y_min, x_max, y_max]) | |
| - AWS_ACCESS_KEY_ID: str, AWS access key ID for authentication | |
| - AWS_SECRET_ACCESS_KEY: str, AWS secret access key for authentication | |
| - folder_name: str, the folder prefix in S3 bucket where the images are stored | |
| Returns: | |
| - None (directly displays images in the Streamlit app with bounding boxes) | |
| """ | |
| # Initialize S3 client | |
| s3 = boto3.client( | |
| 's3', | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY | |
| ) | |
| # Iterate over file paths and corresponding bounding boxes | |
| for file_path, box_coords in zip(file_paths, bounding_boxes): | |
| # Retrieve the image from S3 | |
| s3_object = s3.get_object(Bucket=bucket_name, Key=f"{folder_name}{file_path}") | |
| img_data = s3_object['Body'].read() | |
| # Open the image using PIL | |
| img = Image.open(BytesIO(img_data)) | |
| # Draw bounding boxes on the image | |
| draw = ImageDraw.Draw(img) | |
| # Ensure box_coords is iterable, in case it's a single numpy array or float value | |
| if isinstance(box_coords, (np.ndarray, list)): | |
| # Check if we have multiple bounding boxes or a single one | |
| if len(box_coords) > 0 and isinstance(box_coords[0], (np.ndarray, list)): | |
| # Multiple bounding boxes | |
| for box in box_coords: | |
| x_min, y_min, x_max, y_max = map(int, box) # Convert to integers | |
| draw.rectangle([x_min, y_min, x_max, y_max], outline="red", width=3) | |
| else: | |
| # Single bounding box | |
| x_min, y_min, x_max, y_max = map(int, box_coords) | |
| draw.rectangle([x_min, y_min, x_max, y_max], outline="red", width=3) | |
| else: | |
| raise ValueError(f"Bounding box data for {file_path} is not in an iterable format.") | |
| # Display the image with bounding boxes using Streamlit | |
| st.image(img, caption=file_path, use_column_width=True) | |