import gradio as gr import asyncio import json import os import base64 from agent import ( create_doc_agent, create_image_agent, create_json_analyzer_agent, create_speech_agent, process_document_workflow, process_image_workflow, complete_analysis_workflow, tts_with_mcp, simulate_process_climate_document, simulate_analyze_image, simulate_analyze_json_data, simulate_text_to_speech, play_wav ) from mistralai import Mistral client = None agents = None def initialize_client_and_agents(api_key: str): global client, agents if client is None or agents is None: try: client = Mistral(api_key=api_key) doc_agent = create_doc_agent(client) image_agent = create_image_agent(client) json_analyzer_agent = create_json_analyzer_agent(client) speech_agent = create_speech_agent(client) agents = { "doc_agent_id": doc_agent.id, "image_agent_id": image_agent.id, "json_analyzer_agent_id": json_analyzer_agent.id, "speech_agent_id": speech_agent.id } except Exception as e: return None, f"Error initializing client: {str(e)}" return client, agents custom_css = """ body { background: #121212; color: #ffffff; } .gradio-container { background-color: #1e1e1e; border-radius: 12px; box-shadow: 0 4px 12px rgba(0,0,0,0.4); } h1, h2 { color: #80cbc4; } .gr-button { background-color: #26a69a; color: white; } .gr-button:hover { background-color: #00897b; } input, textarea, select { background-color: #2c2c2c !important; color: #ffffff; border: 1px solid #4db6ac; } .gr-file label { background-color: #26a69a; color: white; } .gr-audio { border-radius: 12px; box-shadow: 0 0 8px #4db6ac; } """ # Function to handle document processing workflow async def run_document_workflow(api_key: str, file, document_type): if not api_key: return "Error: Please provide a valid API key." if file is None: return "Error: Please upload a document file." file_path = file.name client, agents_or_error = initialize_client_and_agents(api_key) if client is None: return agents_or_error try: response = await process_document_workflow(client, file_path, document_type) if response and response.choices and response.choices[0].message.tool_calls: for tool_call in response.choices[0].message.tool_calls: if tool_call.function.name == "process_climate_document": result = simulate_process_climate_document(file_path=file_path, document_type=document_type) return json.dumps(result, indent=2) return response.choices[0].message.content if response and response.choices else "No response received." except Exception as e: return f"Error: {str(e)}" # Function to handle image processing workflow async def run_image_workflow(api_key: str, image_file, analysis_focus): if not api_key: return "Error: Please provide a valid API key." if image_file is None: return "Error: Please upload an image file." image_path = image_file.name client, agents_or_error = initialize_client_and_agents(api_key) if client is None: return agents_or_error try: response = await process_image_workflow(client, image_path, analysis_focus) if response and response.choices and response.choices[0].message.tool_calls: for tool_call in response.choices[0].message.tool_calls: if tool_call.function.name == "analyze_image": with open(image_path, "rb") as f: image_data = base64.b64encode(f.read()).decode("utf-8") result = simulate_analyze_image(image_data, image_format="jpg", analysis_focus=analysis_focus) return json.dumps(result, indent=2) return response.choices[0].message.content if response and response.choices else "No response received." except Exception as e: return f"Error: {str(e)}" # Function to handle JSON analysis and speech workflow async def run_analysis_and_speech_workflow(api_key: str, json_input, analysis_type): if not api_key: return "Error: Please provide a valid API key.", None try: json_data = json.loads(json_input) client, agents_or_error = initialize_client_and_agents(api_key) if client is None: return agents_or_error, None json_response, speech_response = await complete_analysis_workflow(client, json_data, max_retries=3) output = [] if json_response and json_response.choices: output.append("JSON Analysis Response:") output.append(json_response.choices[0].message.content) for tool_call in json_response.choices[0].message.tool_calls or []: if tool_call.function.name == "analyze_json_data": analysis_result = simulate_analyze_json_data(json_data, analysis_type) output.append("Analysis Result:") output.append(json.dumps(analysis_result, indent=2)) if speech_response and speech_response.choices: output.append("\nSpeech Response:") output.append(speech_response.choices[0].message.content) for tool_call in speech_response.choices[0].message.tool_calls or []: if tool_call.function.name == "text_to_speech": analysis_text = "Climate analysis reveals significant warming trends with regional variations requiring immediate attention." audio_url = simulate_text_to_speech(analysis_text) output.append(f"Generated Audio URL: {audio_url}") play_result = play_wav(audio_url) output.append(f"Audio Play Result: {play_result}") if "file://" in audio_url: audio_path = audio_url.replace("file://", "") if os.path.exists(audio_path): return "\n".join(output), audio_path else: output.append("Error: Audio file not found.") return "\n".join(output), None except Exception as e: return f"Error: {str(e)}", None # Function to handle TTS workflow async def run_tts_workflow(api_key: str, text_input): if not api_key: return "Error: Please provide a valid API key.", None client, agents_or_error = initialize_client_and_agents(api_key) if client is None: return agents_or_error, None try: response = await tts_with_mcp(client, text_input) output = [] if response and response.choices: output.append("TTS Agent Response:") output.append(response.choices[0].message.content) for tool_call in response.choices[0].message.tool_calls or []: if tool_call.function.name == "text_to_speech": audio_url = simulate_text_to_speech(text=text_input) output.append(f"Generated Audio URL: {audio_url}") play_result = play_wav(audio_url) output.append(f"Audio Play Result: {play_result}") if "file://" in audio_url: audio_path = audio_url.replace("file://", "") if os.path.exists(audio_path): return "\n".join(output), audio_path else: output.append("Error: Audio file not found.") return "\n".join(output), None except Exception as e: return f"Error: {str(e)}", None # Gradio interface with gr.Blocks(css=custom_css) as demo: gr.Markdown("# MistyClimate Multi-Agent System") gr.Markdown("## Mistral Multi-Agent Processing System") gr.Markdown("Enter your Mistral API key and interact with document processing, image analysis, JSON analysis, and text-to-speech functionalities.") api_key_input = gr.Textbox(label="Mistral API Key", type="password", placeholder="Enter your Mistral API key here") with gr.Tab("Document Processing"): doc_file = gr.File(label="Upload Document (PDF)") doc_type = gr.Dropdown(choices=["climate_report", "analysis", "data"], label="Document Type", value="climate_report") doc_button = gr.Button("Process Document") doc_output = gr.Textbox(label="Document Processing Output", lines=10) doc_button.click( fn=run_document_workflow, inputs=[api_key_input, doc_file, doc_type], outputs=doc_output ) with gr.Tab("Image Analysis"): img_file = gr.File(label="Upload Image (PNG/JPG/PDF)") analysis_focus = gr.Dropdown(choices=["text_extraction", "chart_analysis", "table_extraction"], label="Analysis Focus", value="text_extraction") img_button = gr.Button("Analyze Image") img_output = gr.Textbox(label="Image Analysis Output", lines=10) img_button.click( fn=run_image_workflow, inputs=[api_key_input, img_file, analysis_focus], outputs=img_output ) with gr.Tab("JSON Analysis & Speech"): json_input = gr.Textbox(label="JSON Data Input", lines=5, placeholder='{"temperature_data": [20.1, 20.5, 21.2, 21.8], "emissions": [400, 410, 415, 420], "regions": ["Global", "Arctic", "Tropical"]}') analysis_type = gr.Dropdown(choices=["statistical", "content", "structural"], label="Analysis Type", value="content") analysis_button = gr.Button("Run Analysis & Speech") analysis_output = gr.Textbox(label="Analysis and Speech Output", lines=10) audio_output = gr.Audio(label="Generated Audio") analysis_button.click( fn=run_analysis_and_speech_workflow, inputs=[api_key_input, json_input, analysis_type], outputs=[analysis_output, audio_output] ) with gr.Tab("Text-to-Speech"): tts_input = gr.Textbox(label="Text Input", value="hello, and good luck for the hackathon") tts_button = gr.Button("Generate Speech") tts_output = gr.Textbox(label="TTS Output", lines=5) tts_audio = gr.Audio(label="Generated Audio") tts_button.click( fn=run_tts_workflow, inputs=[api_key_input, tts_input], outputs=[tts_output, tts_audio] ) if __name__ == "__main__": demo.launch()