|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import base64 |
|
import glob |
|
import json |
|
import re |
|
import zipfile |
|
from io import BytesIO |
|
from datetime import datetime |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
|
import gradio as gr |
|
import openai |
|
from openai import OpenAI |
|
import pytz |
|
import cv2 |
|
from PIL import Image |
|
from PyPDF2 import PdfReader |
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') |
|
OPENAI_ORG_ID = os.getenv('OPENAI_ORG_ID') |
|
HF_KEY = os.getenv('HF_KEY') |
|
FIREWORKS_API_BASE = "https://api.fireworks.ai/inference/v1" |
|
|
|
|
|
DEFAULT_OSS_MODEL = "openai/gpt-oss-120b" |
|
DEFAULT_OPENAI_MODEL = "gpt-4o-2024-05-13" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_llm_client(model_name: str) -> OpenAI: |
|
""" |
|
π Creates and returns an OpenAI client configured for the selected model. |
|
This function is the bouncer of our AI club. It checks your credentials (API keys) |
|
and directs you to the right VIP lounge (API endpoint). |
|
|
|
Args: |
|
model_name (str): The name of the model to use. |
|
|
|
Returns: |
|
OpenAI: A configured OpenAI client instance. |
|
|
|
Raises: |
|
gr.Error: If the required API key for the selected model is not found. |
|
""" |
|
|
|
if model_name == DEFAULT_OSS_MODEL: |
|
if not HF_KEY: |
|
|
|
raise gr.Error("Hugging Face API Key (HF_KEY) is missing! Add it to your Space secrets.") |
|
|
|
return OpenAI(api_key=HF_KEY, base_url=FIREWORKS_API_BASE) |
|
|
|
else: |
|
if not OPENAI_API_KEY: |
|
|
|
raise gr.Error("OpenAI API Key is missing! Add it to your Space secrets.") |
|
|
|
return OpenAI(api_key=OPENAI_API_KEY, organization=OPENAI_ORG_ID) |
|
|
|
def generate_filename(prompt: str, file_type: str, original_name: str = None) -> str: |
|
""" |
|
πΎ Generates a safe, unique, and descriptive filename. |
|
Because 'output_1.txt' is for amateurs. We're creating masterpieces here, |
|
and they deserve a proper name. |
|
|
|
Args: |
|
prompt (str): The user's prompt, used to make the name descriptive. |
|
file_type (str): The file extension (e.g., "md", "png"). |
|
original_name (str, optional): The original name of an uploaded file. |
|
|
|
Returns: |
|
str: A clean, timestamped filename. |
|
""" |
|
|
|
central = pytz.timezone('US/Central') |
|
safe_date_time = datetime.now(central).strftime("%m%d_%H%M") |
|
|
|
|
|
|
|
safe_prompt = re.sub(r'[<>:"/\\|?*\n]', ' ', prompt).strip()[:50] |
|
|
|
|
|
if original_name: |
|
base_name = os.path.splitext(original_name)[0] |
|
file_stem = f"{safe_date_time}_{safe_prompt}_{base_name}"[:100] |
|
else: |
|
file_stem = f"{safe_date_time}_{safe_prompt}"[:100] |
|
|
|
return f"{file_stem}.{file_type}" |
|
|
|
def create_and_save_file(content: str, prompt: str, should_save: bool, file_type: str = "md", original_name: str = None): |
|
""" |
|
βοΈ Saves content to a file if the user has blessed us with permission. |
|
"With great power comes great responsibility." - Uncle Ben. The power to save |
|
is in the user's hands. |
|
|
|
Args: |
|
content (str): The generated content to save. |
|
prompt (str): The user prompt that generated the content. |
|
should_save (bool): A flag indicating whether to save the file. |
|
file_type (str, optional): The file extension. Defaults to "md". |
|
original_name (str, optional): The original name of an input file. |
|
""" |
|
if not should_save: |
|
print("πΎ Save checkbox is unchecked. Skipping file save.") |
|
return |
|
|
|
filename = generate_filename(prompt, file_type, original_name) |
|
try: |
|
with open(filename, "w", encoding="utf-8") as f: |
|
|
|
full_content = f"π‘ PROMPT:\n{prompt}\n\n{'='*20}\n\nπ€ RESPONSE:\n{content}" |
|
f.write(full_content) |
|
print(f"β
Successfully saved conversation to {filename}") |
|
except Exception as e: |
|
|
|
print(f"π₯ Error saving file {filename}: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_text(client: OpenAI, model_name: str, history: list, text_input: str, should_save: bool) -> list: |
|
""" |
|
π¬ Handles a text-only prompt. The bread and butter of chat apps. |
|
It's simple, elegant, and gets the job done. Like a little black dress. |
|
|
|
Args: |
|
client (OpenAI): The configured OpenAI client. |
|
model_name (str): The name of the AI model. |
|
history (list): The conversation history in OpenAI format. |
|
text_input (str): The user's text prompt. |
|
should_save (bool): Flag to determine if the output should be saved. |
|
|
|
Returns: |
|
list: The updated conversation history. |
|
""" |
|
history.append({"role": "user", "content": text_input}) |
|
|
|
completion = client.chat.completions.create( |
|
model=model_name, |
|
messages=history, |
|
stream=False |
|
) |
|
response = completion.choices[0].message.content |
|
|
|
history.append({"role": "assistant", "content": response}) |
|
create_and_save_file(response, text_input, should_save) |
|
return history |
|
|
|
def process_image(client: OpenAI, model_name: str, history: list, image_path: str, user_prompt: str, should_save: bool) -> list: |
|
""" |
|
πΌοΈ Processes an image with a text prompt. A picture is worth a thousand words, |
|
but with AI, it can be worth a thousand lines of code, a poem, or a recipe. |
|
|
|
Args: |
|
client (OpenAI): The configured OpenAI client. |
|
model_name (str): The name of the AI model. |
|
history (list): The conversation history. |
|
image_path (str): The local path to the uploaded image. |
|
user_prompt (str): The text prompt accompanying the image. |
|
should_save (bool): Flag to determine if the output should be saved. |
|
|
|
Returns: |
|
list: The updated conversation history. |
|
""" |
|
|
|
with open(image_path, "rb") as img_file: |
|
base64_image = base64.b64encode(img_file.read()).decode("utf-8") |
|
|
|
|
|
image_message = { |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": user_prompt}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}} |
|
] |
|
} |
|
history.append(image_message) |
|
|
|
response = client.chat.completions.create( |
|
model=model_name, |
|
messages=history, |
|
temperature=0.0 |
|
) |
|
image_response = response.choices[0].message.content |
|
history.append({"role": "assistant", "content": image_response}) |
|
|
|
original_name = os.path.basename(image_path) |
|
create_and_save_file(image_response, user_prompt, should_save, original_name=original_name) |
|
return history |
|
|
|
def process_audio(client: OpenAI, model_name: str, history: list, audio_path: str, user_prompt: str, should_save: bool) -> list: |
|
""" |
|
π€ Transcribes audio using Whisper, then sends the transcript to the chat model. |
|
"Listen to them. The children of the night. What music they make!" - Dracula. |
|
We're listening, and turning that music into text. |
|
|
|
Args: |
|
client (OpenAI): The configured OpenAI client. |
|
model_name (str): The name of the AI model. |
|
history (list): The conversation history. |
|
audio_path (str): Path to the uploaded audio file. |
|
user_prompt (str): The text prompt to guide the response to the transcript. |
|
should_save (bool): Flag to determine if the output should be saved. |
|
|
|
Returns: |
|
list: The updated conversation history. |
|
""" |
|
try: |
|
with open(audio_path, "rb") as audio_file: |
|
|
|
transcription = client.audio.transcriptions.create( |
|
model="whisper-1", |
|
file=audio_file |
|
).text |
|
|
|
|
|
full_prompt = f"{user_prompt}\n\n--- Audio Transcription ---\n{transcription}" |
|
history.append({"role": "user", "content": full_prompt}) |
|
|
|
|
|
completion = client.chat.completions.create(model=model_name, messages=history) |
|
response = completion.choices[0].message.content |
|
history.append({"role": "assistant", "content": response}) |
|
|
|
create_and_save_file(response, full_prompt, should_save, original_name=os.path.basename(audio_path)) |
|
|
|
except openai.BadRequestError as e: |
|
raise gr.Error(f"Audio processing error: {e}") |
|
except Exception as e: |
|
raise gr.Error(f"An unexpected error occurred during audio processing: {e}") |
|
|
|
return history |
|
|
|
def process_video(client: OpenAI, model_name: str, history: list, video_path: str, user_prompt: str, should_save: bool) -> list: |
|
""" |
|
π¬ Processes a video by extracting frames and audio for a comprehensive summary. |
|
"Life moves pretty fast. If you don't stop and look around once in a while, |
|
you could miss it." - Ferris Bueller. We're stopping and looking, frame by frame. |
|
|
|
Args: |
|
client (OpenAI): The configured OpenAI client. |
|
model_name (str): The name of the AI model. |
|
history (list): The conversation history. |
|
video_path (str): Path to the uploaded video file. |
|
user_prompt (str): The text prompt for the video summary. |
|
should_save (bool): Flag to determine if the output should be saved. |
|
|
|
Returns: |
|
list: The updated conversation history. |
|
""" |
|
try: |
|
|
|
|
|
base64Frames = [] |
|
video = cv2.VideoCapture(video_path) |
|
fps = video.get(cv2.CAP_PROP_FPS) |
|
if not fps > 0: raise gr.Error("Could not read video file. Is it valid?") |
|
|
|
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
frames_to_skip = int(fps * 2) |
|
for i in range(0, total_frames, frames_to_skip): |
|
video.set(cv2.CAP_PROP_POS_FRAMES, i) |
|
success, frame = video.read() |
|
if not success: break |
|
_, buffer = cv2.imencode(".jpg", frame) |
|
base64Frames.append(base64.b64encode(buffer).decode("utf-8")) |
|
video.release() |
|
if not base64Frames: raise gr.Error("Could not extract any frames from the video.") |
|
|
|
|
|
|
|
messages = [ |
|
{"type": "text", "text": "These are frames from a video. Please analyze them."}, |
|
*map(lambda x: {"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{x}", "detail": "low"}}, base64Frames), |
|
] |
|
|
|
|
|
audio_path = None |
|
try: |
|
with VideoFileClip(video_path) as clip: |
|
if clip.audio: |
|
audio_path = "temp_video_audio.mp3" |
|
clip.audio.write_audiofile(audio_path, bitrate="32k", logger=None) |
|
|
|
if audio_path: |
|
with open(audio_path, "rb") as audio_file: |
|
transcript = client.audio.transcriptions.create(model="whisper-1", file=audio_file).text |
|
messages.append({"type": "text", "text": f"--- Video Transcription ---\n{transcript}"}) |
|
os.remove(audio_path) |
|
except Exception as e: |
|
print(f"β οΈ Audio extraction/transcription failed or skipped: {e}") |
|
|
|
|
|
messages.append({"type": "text", "text": user_prompt}) |
|
history.append({"role": "user", "content": messages}) |
|
|
|
|
|
response = client.chat.completions.create(model=model_name, messages=history) |
|
result = response.choices[0].message.content |
|
history.append({"role": "assistant", "content": result}) |
|
|
|
create_and_save_file(result, user_prompt, should_save, original_name=os.path.basename(video_path)) |
|
return history |
|
|
|
except Exception as e: |
|
raise gr.Error(f"Video processing failed spectacularly: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_history_to_openai_format(gradio_history: list) -> list: |
|
""" |
|
π Converts Gradio's chat history format to the OpenAI API format. |
|
It's like translating from English to Klingon, but for dictionaries. |
|
|
|
Args: |
|
gradio_history (list): History from a gr.Chatbot component. |
|
|
|
Returns: |
|
list: History formatted for the OpenAI API. |
|
""" |
|
openai_history = [] |
|
for user_msg, bot_msg in gradio_history: |
|
if user_msg: |
|
|
|
if isinstance(user_msg, tuple): |
|
text, file_path = user_msg |
|
|
|
openai_history.append({"role": "user", "content": text}) |
|
else: |
|
openai_history.append({"role": "user", "content": user_msg}) |
|
if bot_msg: |
|
openai_history.append({"role": "assistant", "content": bot_msg}) |
|
return openai_history |
|
|
|
def get_file_processor(file_path: str): |
|
""" |
|
π Determines which processing function to use based on file extension. |
|
A simple but elegant router. The Grand Central Station of file handling. |
|
|
|
Args: |
|
file_path (str): The path to the file. |
|
|
|
Returns: |
|
function: The appropriate processing function or None. |
|
""" |
|
ext = os.path.splitext(file_path)[1].lower() |
|
if ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']: return process_image |
|
if ext in ['.mp3', '.wav', 'm4a', 'flac', 'ogg']: return process_audio |
|
if ext in ['.mp4', '.mov', 'avi']: return process_video |
|
return None |
|
|
|
def handle_multimodal_submit(message: dict, history: list, model_name: str, should_save: bool): |
|
""" |
|
π The main event handler for the chat interface. |
|
This function is the quarterback. It takes the snap (user input), reads the |
|
defense (checks for files), and makes the play (calls the right processor). |
|
|
|
Args: |
|
message (dict): The output from the gr.MultimodalTextbox. |
|
history (list): The current chat history in Gradio format. |
|
model_name (str): The selected AI model. |
|
should_save (bool): The state of the save checkbox. |
|
|
|
Yields: |
|
Updates to the Gradio UI components. |
|
""" |
|
text_prompt = message["text"] |
|
files = message["files"] |
|
|
|
|
|
|
|
|
|
user_turn_content = text_prompt |
|
if files: |
|
file_names = ", ".join([os.path.basename(f.name) for f in files]) |
|
user_turn_content += f"\n\n*π Attached: {file_names}*" |
|
|
|
history.append([user_turn_content, None]) |
|
yield history, gr.MultimodalTextbox(value=None, interactive=False) |
|
|
|
try: |
|
|
|
client = get_llm_client(model_name) |
|
|
|
|
|
openai_history = convert_history_to_openai_format(history[:-1]) |
|
|
|
|
|
if not files: |
|
|
|
updated_openai_history = process_text(client, model_name, openai_history, text_prompt, should_save) |
|
else: |
|
|
|
file_path = files[0].name |
|
processor = get_file_processor(file_path) |
|
if processor: |
|
updated_openai_history = processor(client, model_name, openai_history, file_path, text_prompt, should_save) |
|
else: |
|
raise gr.Error(f"Unsupported file type: {os.path.splitext(file_path)[1]}") |
|
|
|
|
|
history[-1][1] = updated_openai_history[-1]['content'] |
|
yield history, gr.MultimodalTextbox(value=None, interactive=True) |
|
|
|
except Exception as e: |
|
|
|
|
|
history[-1][1] = f"**π₯ An Error Occurred:** {str(e)}" |
|
yield history, gr.MultimodalTextbox(value=message, interactive=True) |
|
|
|
def update_file_list_display(file_types: list): |
|
""" |
|
π Refreshes the list of generated files in the sidebar. |
|
It's like hitting F5, but with more Python. |
|
""" |
|
if not file_types: return gr.update(choices=[], value=[]) |
|
|
|
all_files = [f for f in glob.glob("*.*") if os.path.splitext(f)[1].lower() in file_types and len(os.path.splitext(f)[0]) >= 10] |
|
all_files.sort(key=lambda x: os.path.getmtime(x), reverse=True) |
|
return gr.update(choices=all_files, value=[]) |
|
|
|
def delete_selected_files(files_to_delete: list, current_filter: list): |
|
""" |
|
ποΈ Deletes the files selected by the user. "Execute Order 66." |
|
""" |
|
if not files_to_delete: |
|
gr.Warning("No files selected to delete. Are you just testing the button?") |
|
return update_file_list_display(current_filter) |
|
|
|
for file_path in files_to_delete: |
|
try: |
|
os.remove(file_path) |
|
except OSError as e: |
|
gr.Warning(f"Could not delete {file_path}. It's probably hiding. Error: {e}") |
|
|
|
gr.Info(f"Deleted {len(files_to_delete)} files. They're gone. Reduced to atoms.") |
|
return update_file_list_display(current_filter) |
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft(primary_hue="teal", secondary_hue="orange"), title="ScienceBrain.AI") as demo: |
|
gr.Markdown("# οΏ½π§ ScienceBrain.AI\n*A Multi-Modal Interface for Advanced AI Models*") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1, min_width=300): |
|
|
|
gr.Markdown("### βοΈ Controls") |
|
model_selector = gr.Dropdown( |
|
label="Select Model", |
|
choices=[DEFAULT_OSS_MODEL, DEFAULT_OPENAI_MODEL, "gpt-4o-mini", "gpt-3.5-turbo"], |
|
value=DEFAULT_OSS_MODEL, |
|
) |
|
save_checkbox = gr.Checkbox(label="πΎ Save Session Output", value=True) |
|
clear_btn = gr.Button("ποΈ Clear Session", variant="stop") |
|
|
|
with gr.Accordion("π File Management", open=False): |
|
file_filter = gr.CheckboxGroup( |
|
label="Filter by Type", |
|
choices=[".md", ".png", ".jpg", ".pdf", ".wav", ".mp3", ".mp4"], |
|
value=[".md", ".png"] |
|
) |
|
file_list = gr.CheckboxGroup(label="Generated Files (Select to Delete)", choices=[], value=[]) |
|
with gr.Row(): |
|
refresh_files_btn = gr.Button("π Refresh") |
|
delete_files_btn = gr.Button("ποΈ Delete", variant="primary") |
|
|
|
with gr.Column(scale=4): |
|
|
|
chatbot = gr.Chatbot( |
|
label="Conversation", |
|
bubble_full_width=False, |
|
height=650, |
|
avatar_images=(None, "https://openmoji.org/data/color/svg/1F916.svg") |
|
) |
|
multimodal_input = gr.MultimodalTextbox( |
|
file_types=["image", "audio", "video"], |
|
placeholder="Type a message or upload a file...", |
|
label="Your Input" |
|
) |
|
|
|
|
|
|
|
|
|
|
|
multimodal_input.submit( |
|
fn=handle_multimodal_submit, |
|
inputs=[multimodal_input, chatbot, model_selector, save_checkbox], |
|
outputs=[chatbot, multimodal_input] |
|
) |
|
|
|
|
|
clear_btn.click(fn=lambda: ([], []), inputs=None, outputs=[chatbot, chatbot]) |
|
|
|
|
|
refresh_files_btn.click(fn=update_file_list_display, inputs=[file_filter], outputs=[file_list]) |
|
file_filter.change(fn=update_file_list_display, inputs=[file_filter], outputs=[file_list]) |
|
delete_files_btn.click(fn=delete_selected_files, inputs=[file_list, file_filter], outputs=[file_list]) |
|
|
|
|
|
demo.load(fn=update_file_list_display, inputs=[file_filter], outputs=[file_list]) |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch(debug=True) |