Spaces:
Running
Running
import json | |
import os | |
from datetime import date, datetime | |
from pathlib import Path | |
import plotly.graph_objects as go | |
from dotenv import load_dotenv | |
from fastapi import Depends, FastAPI, Request | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import HTMLResponse, Response | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from mistralai.client import ChatMessage, MistralClient | |
from pydantic import BaseModel | |
from weather import get_weather | |
# code that gives the date of today | |
today = date.today() | |
today = today.strftime("%Y-%m-%d") | |
# Hugging face space secret retrieval: FIXME | |
def create_env_file(): | |
import os | |
secrets = ['MISTRAL_API_KEY', 'AGRO_API_KEY', 'OPENCAGE_API_KEY'] | |
for secret in secrets: | |
secret_value = os.environ[secret] | |
if secret_value is None: | |
print(f"Please set the environment variable {secret}") | |
else: | |
with open('.env', 'a') as f: | |
f.write(f"{secret}={secret_value}\n") | |
# Hugging face space secret retrieval: FIXME | |
production = False | |
if production: | |
create_env_file() | |
# Load environment variables | |
load_dotenv() | |
#api_key='Yb2kAF0DR4Mva5AEmoYFV3kYRAKdXB7i' | |
#client = MistralClient(api_key=api_key) | |
model = 'mistral-small' | |
title = "Gaia Mistral Chat Demo" | |
description = "Example of simple chatbot with Gradio and Mistral AI via its API" | |
placeholder = "Posez moi une question sur l'agriculture" | |
examples = ["Comment fait on pour produire du maïs ?", | |
"Rédige moi une lettre pour faire un stage dans une exploitation agricole", "Comment reprendre une exploitation agricole ?"] | |
def create_prompt_system(): | |
prompt = "Ton rôle: Assistant agricole\n\n" | |
prompt += "Ton objectif: Aider les agriculteurs dans leur recherche d'information. Tu peux répondre à des questions sur l'agriculture, donner des conseils, ou aider à rédiger des documents administratifs.\n\n" | |
prompt += "Ton public: Agriculteurs, étudiants en agriculture, personnes en reconversion professionnelle, ou toute personne ayant des questions sur l'agriculture.\n\n" | |
prompt += "Ton style: Tu es professionnel, bienveillant, et tu as une connaissance approfondie de l'agriculture. Tu réponds uniquement en français et de manière semi-concise. Tu es capable de répondre à des questions techniques, mais tu sais aussi t'adapter à des personnes qui ne connaissent pas bien le domaine.\n\n" | |
prompt += "Tu disposes du contexte suivant pour répondre aux questions:\n\n" | |
# load all the json files in the root | |
for file in Path('.').glob('*.json'): | |
with open(file, 'r') as f: | |
prompt += f"Contexte: {file.stem}\n\n" | |
# convert the json to a string using the json module | |
file_content = json.load(f) | |
prompt += json.dumps(file_content, indent=4) | |
prompt += "\n\n" | |
prompt += "Tu es prêt à répondre aux questions ?\n\n" | |
return prompt | |
def chat_with_mistral(user_input): | |
#messages = [ChatMessage(role="user", content=user_input)] | |
#chat_response = client.chat(model=model, messages=messages) | |
return 'Donne moi une clé API valide et je te réponds volontiers ;-)' #chat_response.choices[0].message.content | |
# create a FastAPI app | |
app = FastAPI() | |
# Add CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # Allows all origins | |
allow_credentials=True, | |
allow_methods=["*"], # Allows all methods | |
allow_headers=["*"], # Allows all headers | |
) | |
# create a static directory to store the static files | |
static_dir = Path('./static') | |
static_dir.mkdir(parents=True, exist_ok=True) | |
# mount FastAPI StaticFiles server | |
app.mount("/static", StaticFiles(directory=static_dir), name="static") | |
# templating | |
templates = Jinja2Templates(directory="static") | |
with open('data/departements.geojson', 'r') as f: | |
departements = json.load(f) | |
with open('data/regions.geojson', 'r') as f: | |
regions = json.load(f) | |
with open('videos.json', 'r') as f: | |
videos = json.load(f) | |
def create_world_map(lat, lon, dpmts=False, rgns= False): | |
fig = go.Figure() | |
for video_id, video_info in videos.items(): | |
name = video_info['name'] | |
url = video_info['url'] | |
location = video_info['location'] | |
video_lat = video_info['lat'] | |
video_lon = video_info['lon'] | |
fig.add_trace(go.Scattermapbox( | |
mode='markers', | |
lon=[video_lon], | |
lat=[video_lat], | |
marker=go.scattermapbox.Marker( | |
size=14, | |
color='blue' | |
), | |
text=name, | |
hoverinfo='text', | |
showlegend=False, | |
visible=True, | |
)) | |
if dpmts : | |
for feature_departement in departements['features']: | |
if feature_departement['geometry']['type'] == 'Polygon': | |
coords = feature_departement['geometry']['coordinates'][0] | |
lons, lats = zip(*coords) | |
lons = list(lons) | |
lats = list(lats) | |
fig.add_trace(go.Scattermapbox( | |
mode='lines', | |
lon=lons + [lons[0]], | |
lat=lats + [lats[0]], | |
marker=go.scattermapbox.Marker( | |
size=14 | |
), | |
text=feature_departement['properties']['nom'], | |
hoverinfo='text', | |
line=dict( | |
color='blue', | |
width=1 | |
), | |
showlegend=False, | |
visible=True, | |
)) | |
if rgns: | |
for feature_region in regions['features']: | |
if feature_region['geometry']['type'] == 'Polygon': | |
coords = feature_region['geometry']['coordinates'][0] | |
lons, lats = zip(*coords) | |
lons = list(lons) | |
lats = list(lats) | |
fig.add_trace(go.Scattermapbox( | |
mode='lines', | |
lon=lons + [lons[0]], | |
lat=lats + [lats[0]], | |
hoverinfo='text', | |
line=dict( | |
color='red', # Set the line color to red | |
width=1, # Set the width of the line | |
), | |
showlegend=False, | |
visible=True, | |
)) | |
fig.add_trace(go.Scattermapbox( | |
lat=[lat], | |
lon=[lon], | |
mode='markers', | |
marker=go.scattermapbox.Marker(size=14, color='red'), | |
text=['Location'], | |
showlegend=False, | |
hoverinfo='none' | |
)) | |
fig.update_layout( | |
autosize=True, | |
plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)', | |
mapbox_style="open-street-map", | |
hovermode='closest', | |
mapbox=dict( | |
bearing=0, | |
center=go.layout.mapbox.Center( | |
lat=lat, | |
lon=lon, | |
), | |
pitch=0, | |
zoom=5 | |
), | |
) | |
return fig | |
# Profile stuff | |
class UserProfile(BaseModel): | |
name: str | |
age: int | |
location: str | |
lat: float | |
lon: float | |
class UserLocation(BaseModel): | |
city: str | |
class Weather(BaseModel): | |
temperature: float | |
weather: str | |
def save_user_profile(user_profile: UserProfile): | |
with open('user_profile.json', 'w') as f: | |
json.dump(user_profile.dict(), f) | |
return user_profile.dict() | |
def load_user_profile(): | |
with open('user_profile.json', 'r') as f: | |
user_profile = json.load(f) | |
return UserProfile(**user_profile) | |
def update_user_profile(user_profile: UserProfile): | |
with open('user_profile.json', 'w') as f: | |
json.dump(user_profile.dict(), f) | |
return user_profile | |
def load_weather(): | |
with open('Weather.json', 'r') as f: | |
w = json.load(f) | |
return Weather(**w) | |
# Load user location | |
def load_user_location(): | |
with open('user_location.json', 'r') as f: | |
user_location = json.load(f) | |
return UserLocation(**user_location) | |
# Save weather information | |
def save_weather(weather: Weather): | |
with open('Weather.json', 'w') as f: | |
json.dump(weather.dict(), f) | |
async def set_user_location(user_location: UserLocation): | |
# Save the user location as a JSON file | |
with open('user_location.json', 'w') as f: | |
json.dump(user_location.dict(), f) | |
response = Response(headers={"Location": "/home"}) | |
response.status_code = 303 | |
return response | |
# load user profile on startup | |
user_profile = load_user_profile() | |
weather = load_weather() | |
async def enter_location(): | |
return """ | |
<html> | |
<head> | |
<style> | |
body { | |
background-image: url('https://lh3.googleusercontent.com/JKE8WODi6oggtccvEyMnYswDLqSVjDv4FqIGec2qF1doGXf3HTJ5MnMHqG-thNklmxKO6aGf23XiZAFwbaSxt4sTyWc-IT-zyH6aQA=rj-w0-h1600-l80'); | |
background-size: cover; | |
color: white; /* make all the text white */ | |
font-size: 20px; /* increase the font size */ | |
font-family: sans-serif; /* change the font to Arial */ | |
} | |
img { | |
display: block; | |
margin-left: auto; | |
margin-right: auto; | |
width: 50%; | |
} | |
input[type="text"], input[type="submit"] { | |
width: 100%; | |
padding: 10px 20px; | |
margin: 10px 0; | |
box-sizing: border-box; | |
border: none; | |
border-radius: 4px; | |
background-color: #f8f8f8; | |
font-size: 16px; | |
} | |
</style> | |
</head> | |
<body> | |
<h2>Bienvenue sur votre AI-dashboard connecté</h2> | |
<p>Connectez-vous et accédez à des informations personnalisées !</p> | |
<h2> Fonctionnalités</h2> | |
<ul> | |
<li> Parlez avec votre assistant Mistral</li> | |
<li> Obtenez des renseignements météo</li> | |
<li> Partagez des informations avec les agriculteurs de votre région </li> | |
<li> Générer une newsletter personnalisée grâce à Mistral</li> | |
</ul> | |
<form id="locationForm"> | |
<label for="city">Votre ville :</label><br> | |
<input type="text" id="city" name="city"><br> | |
<input type="submit" value="Submit"> | |
</form> | |
<img src="https://next.ink/wp-content/uploads/2024/02/announcing-mistral.png" alt="Mistral Logo" style="display: block; margin-left: auto; margin-right: auto; width: 50%;"> | |
<div> | |
<p>© 2024 AgriHackteurs</p> | |
</div> | |
</body> | |
<script> | |
document.getElementById('locationForm').addEventListener('submit', function(event) { | |
event.preventDefault(); | |
var city = document.getElementById('city').value; | |
fetch('/user_location', { | |
method: 'POST', | |
headers: { | |
'Content-Type': 'application/json', | |
}, | |
body: JSON.stringify({city: city}), | |
}) | |
.then(response => { | |
if (response.ok) { | |
window.location.href = "/home"; | |
} | |
}); | |
}); | |
</script> | |
""" | |
# Home page : using the user profile, display the weather and chat with Mistral AI | |
async def home( | |
request: Request, | |
user_profile: UserProfile = Depends(load_user_profile), | |
weather: Weather = Depends(load_weather), | |
): | |
with open('user_location.json', 'r') as f: | |
user_location = json.load(f) | |
# Get weather data for the user location | |
weather_data, lat, lon = get_weather(user_location['city'], today) | |
# Convert the keys to datetime objects | |
weather_times = {datetime.strptime( | |
time, '%Y-%m-%d %H:%M:%S'): info for time, info in weather_data.items()} | |
# Find the time closest to the current time | |
current_time = datetime.now() | |
#closest_time = min(weather_times.keys(), | |
# key=lambda time: abs(time - current_time)) | |
# Extract weather information for the closest time | |
#weather_info = weather_times[closest_time] | |
# Extract temperature and weather from the weather information | |
#temperature = float(weather_info.split( | |
# ', ')[1].split('°C')[0].split(': ')[1]) | |
#weather = weather_info.split(', ')[2].split(': ')[1] | |
# Create a Weather object from the weather data | |
temperature = float(20) | |
weather = 'Sunny' | |
weather = Weather(temperature=temperature, weather=weather) | |
temperature = weather.temperature | |
weather = weather.weather | |
# create the map | |
fig = create_world_map(lat, lon, dpmts=True, rgns=True) | |
# save the map as a file | |
map_file = static_dir / "map.html" | |
fig.write_html(str(map_file), config={'displayModeBar': False}) | |
# display the map | |
map_html = f'<iframe src="/static/map.html" width="100%" height="100%" ></iframe>' | |
# initialize the chatbot with the system prompt | |
system_prompt = create_prompt_system() | |
chat_with_mistral(system_prompt) | |
return templates.TemplateResponse("layout.html", {"request": request, "user_profile": user_profile, "weather": weather, "map_html": map_html}) | |
class ChatInput(BaseModel): | |
user_input: str | |
async def chat(chat_input: ChatInput): | |
print(chat_input.user_input) | |
return chat_with_mistral(chat_input.user_input) | |
# summarize all the information from the json files | |
async def report(): | |
# load all the json files in the root | |
report = "" | |
for file in Path('.').glob('*.json'): | |
with open(file, 'r') as f: | |
report += f"Contexte: {file.stem}\n\n" | |
# convert the json to a string using the json module | |
file_content = json.load(f) | |
report += json.dumps(file_content, indent=4) | |
report += "\n\n" | |
report += "Synthétise les informations pour l'utilisateur : \n\n" | |
# ask mistral to summarize the report | |
chat_response = client.chat(model=model, messages=[ | |
ChatMessage(role="user", content=report)]) | |
return chat_response.choices[0].message.content | |