|
import gradio as gr |
|
import asyncio |
|
import json |
|
import os |
|
import base64 |
|
from agent import ( |
|
create_doc_agent, create_image_agent, create_json_analyzer_agent, |
|
create_speech_agent, process_document_workflow, process_image_workflow, |
|
complete_analysis_workflow, tts_with_mcp, simulate_process_climate_document, |
|
simulate_analyze_image, simulate_analyze_json_data, simulate_text_to_speech, play_wav |
|
) |
|
from mistralai import Mistral |
|
|
|
client = None |
|
agents = None |
|
|
|
def initialize_client_and_agents(api_key: str): |
|
global client, agents |
|
if client is None or agents is None: |
|
try: |
|
client = Mistral(api_key=api_key) |
|
doc_agent = create_doc_agent(client) |
|
image_agent = create_image_agent(client) |
|
json_analyzer_agent = create_json_analyzer_agent(client) |
|
speech_agent = create_speech_agent(client) |
|
agents = { |
|
"doc_agent_id": doc_agent.id, |
|
"image_agent_id": image_agent.id, |
|
"json_analyzer_agent_id": json_analyzer_agent.id, |
|
"speech_agent_id": speech_agent.id |
|
} |
|
except Exception as e: |
|
return None, f"Error initializing client: {str(e)}" |
|
return client, agents |
|
|
|
custom_css = """ |
|
body { |
|
background: #121212; |
|
color: #ffffff; |
|
} |
|
.gradio-container { |
|
background-color: #1e1e1e; |
|
border-radius: 12px; |
|
box-shadow: 0 4px 12px rgba(0,0,0,0.4); |
|
} |
|
h1, h2 { |
|
color: #80cbc4; |
|
} |
|
.gr-button { |
|
background-color: #26a69a; |
|
color: white; |
|
} |
|
.gr-button:hover { |
|
background-color: #00897b; |
|
} |
|
input, textarea, select { |
|
background-color: #2c2c2c !important; |
|
color: #ffffff; |
|
border: 1px solid #4db6ac; |
|
} |
|
.gr-file label { |
|
background-color: #26a69a; |
|
color: white; |
|
} |
|
.gr-audio { |
|
border-radius: 12px; |
|
box-shadow: 0 0 8px #4db6ac; |
|
} |
|
""" |
|
|
|
|
|
async def run_document_workflow(api_key: str, file, document_type): |
|
if not api_key: |
|
return "Error: Please provide a valid API key." |
|
if file is None: |
|
return "Error: Please upload a document file." |
|
file_path = file.name |
|
client, agents_or_error = initialize_client_and_agents(api_key) |
|
if client is None: |
|
return agents_or_error |
|
try: |
|
response = await process_document_workflow(client, file_path, document_type) |
|
if response and response.choices and response.choices[0].message.tool_calls: |
|
for tool_call in response.choices[0].message.tool_calls: |
|
if tool_call.function.name == "process_climate_document": |
|
result = simulate_process_climate_document(file_path=file_path, document_type=document_type) |
|
return json.dumps(result, indent=2) |
|
return response.choices[0].message.content if response and response.choices else "No response received." |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
async def run_image_workflow(api_key: str, image_file, analysis_focus): |
|
if not api_key: |
|
return "Error: Please provide a valid API key." |
|
if image_file is None: |
|
return "Error: Please upload an image file." |
|
image_path = image_file.name |
|
client, agents_or_error = initialize_client_and_agents(api_key) |
|
if client is None: |
|
return agents_or_error |
|
try: |
|
response = await process_image_workflow(client, image_path, analysis_focus) |
|
if response and response.choices and response.choices[0].message.tool_calls: |
|
for tool_call in response.choices[0].message.tool_calls: |
|
if tool_call.function.name == "analyze_image": |
|
with open(image_path, "rb") as f: |
|
image_data = base64.b64encode(f.read()).decode("utf-8") |
|
result = simulate_analyze_image(image_data, image_format="jpg", analysis_focus=analysis_focus) |
|
return json.dumps(result, indent=2) |
|
return response.choices[0].message.content if response and response.choices else "No response received." |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
async def run_analysis_and_speech_workflow(api_key: str, json_input, analysis_type): |
|
if not api_key: |
|
return "Error: Please provide a valid API key.", None |
|
try: |
|
json_data = json.loads(json_input) |
|
client, agents_or_error = initialize_client_and_agents(api_key) |
|
if client is None: |
|
return agents_or_error, None |
|
json_response, speech_response = await complete_analysis_workflow(client, json_data, max_retries=3) |
|
|
|
output = [] |
|
if json_response and json_response.choices: |
|
output.append("JSON Analysis Response:") |
|
output.append(json_response.choices[0].message.content) |
|
for tool_call in json_response.choices[0].message.tool_calls or []: |
|
if tool_call.function.name == "analyze_json_data": |
|
analysis_result = simulate_analyze_json_data(json_data, analysis_type) |
|
output.append("Analysis Result:") |
|
output.append(json.dumps(analysis_result, indent=2)) |
|
|
|
if speech_response and speech_response.choices: |
|
output.append("\nSpeech Response:") |
|
output.append(speech_response.choices[0].message.content) |
|
for tool_call in speech_response.choices[0].message.tool_calls or []: |
|
if tool_call.function.name == "text_to_speech": |
|
analysis_text = "Climate analysis reveals significant warming trends with regional variations requiring immediate attention." |
|
audio_url = simulate_text_to_speech(analysis_text) |
|
output.append(f"Generated Audio URL: {audio_url}") |
|
play_result = play_wav(audio_url) |
|
output.append(f"Audio Play Result: {play_result}") |
|
if "file://" in audio_url: |
|
audio_path = audio_url.replace("file://", "") |
|
if os.path.exists(audio_path): |
|
return "\n".join(output), audio_path |
|
else: |
|
output.append("Error: Audio file not found.") |
|
|
|
return "\n".join(output), None |
|
except Exception as e: |
|
return f"Error: {str(e)}", None |
|
|
|
|
|
async def run_tts_workflow(api_key: str, text_input): |
|
if not api_key: |
|
return "Error: Please provide a valid API key.", None |
|
client, agents_or_error = initialize_client_and_agents(api_key) |
|
if client is None: |
|
return agents_or_error, None |
|
try: |
|
response = await tts_with_mcp(client, text_input) |
|
output = [] |
|
if response and response.choices: |
|
output.append("TTS Agent Response:") |
|
output.append(response.choices[0].message.content) |
|
for tool_call in response.choices[0].message.tool_calls or []: |
|
if tool_call.function.name == "text_to_speech": |
|
audio_url = simulate_text_to_speech(text=text_input) |
|
output.append(f"Generated Audio URL: {audio_url}") |
|
play_result = play_wav(audio_url) |
|
output.append(f"Audio Play Result: {play_result}") |
|
if "file://" in audio_url: |
|
audio_path = audio_url.replace("file://", "") |
|
if os.path.exists(audio_path): |
|
return "\n".join(output), audio_path |
|
else: |
|
output.append("Error: Audio file not found.") |
|
return "\n".join(output), None |
|
except Exception as e: |
|
return f"Error: {str(e)}", None |
|
|
|
|
|
with gr.Blocks(css=custom_css) as demo: |
|
|
|
gr.Markdown("# MistyClimate Multi-Agent System") |
|
gr.Markdown("## Mistral Multi-Agent Processing System") |
|
gr.Markdown("Enter your Mistral API key and interact with document processing, image analysis, JSON analysis, and text-to-speech functionalities.") |
|
|
|
api_key_input = gr.Textbox(label="Mistral API Key", type="password", placeholder="Enter your Mistral API key here") |
|
|
|
with gr.Tab("Document Processing"): |
|
doc_file = gr.File(label="Upload Document (PDF)") |
|
doc_type = gr.Dropdown(choices=["climate_report", "analysis", "data"], label="Document Type", value="climate_report") |
|
doc_button = gr.Button("Process Document") |
|
doc_output = gr.Textbox(label="Document Processing Output", lines=10) |
|
doc_button.click( |
|
fn=run_document_workflow, |
|
inputs=[api_key_input, doc_file, doc_type], |
|
outputs=doc_output |
|
) |
|
|
|
with gr.Tab("Image Analysis"): |
|
img_file = gr.File(label="Upload Image (PNG/JPG/PDF)") |
|
analysis_focus = gr.Dropdown(choices=["text_extraction", "chart_analysis", "table_extraction"], |
|
label="Analysis Focus", value="text_extraction") |
|
img_button = gr.Button("Analyze Image") |
|
img_output = gr.Textbox(label="Image Analysis Output", lines=10) |
|
img_button.click( |
|
fn=run_image_workflow, |
|
inputs=[api_key_input, img_file, analysis_focus], |
|
outputs=img_output |
|
) |
|
|
|
with gr.Tab("JSON Analysis & Speech"): |
|
json_input = gr.Textbox(label="JSON Data Input", lines=5, |
|
placeholder='{"temperature_data": [20.1, 20.5, 21.2, 21.8], "emissions": [400, 410, 415, 420], "regions": ["Global", "Arctic", "Tropical"]}') |
|
analysis_type = gr.Dropdown(choices=["statistical", "content", "structural"], |
|
label="Analysis Type", value="content") |
|
analysis_button = gr.Button("Run Analysis & Speech") |
|
analysis_output = gr.Textbox(label="Analysis and Speech Output", lines=10) |
|
audio_output = gr.Audio(label="Generated Audio") |
|
analysis_button.click( |
|
fn=run_analysis_and_speech_workflow, |
|
inputs=[api_key_input, json_input, analysis_type], |
|
outputs=[analysis_output, audio_output] |
|
) |
|
|
|
with gr.Tab("Text-to-Speech"): |
|
tts_input = gr.Textbox(label="Text Input", value="hello, and good luck for the hackathon") |
|
tts_button = gr.Button("Generate Speech") |
|
tts_output = gr.Textbox(label="TTS Output", lines=5) |
|
tts_audio = gr.Audio(label="Generated Audio") |
|
tts_button.click( |
|
fn=run_tts_workflow, |
|
inputs=[api_key_input, tts_input], |
|
outputs=[tts_output, tts_audio] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |