nakas's picture
Update app.py
a51c4e8 verified
raw
history blame
19.4 kB
import gradio as gr
import requests
import folium
import pandas as pd
import time
import os
import zipfile
import io
from typing import Dict, List, Tuple
from datetime import datetime, timedelta
import pytz
class AccurateAirQualityMapper:
"""Air Quality Mapper with precise EPA coordinates"""
def __init__(self):
self.airnow_base_url = "https://files.airnowtech.org"
self.epa_base_url = "https://aqs.epa.gov/aqsweb/airdata"
self.aqi_colors = {
"Good": "#00E400",
"Moderate": "#FFFF00",
"Unhealthy for Sensitive Groups": "#FF7E00",
"Unhealthy": "#FF0000",
"Very Unhealthy": "#8F3F97",
"Hazardous": "#7E0023"
}
self.aqi_ranges = {
(0, 50): "Good",
(51, 100): "Moderate",
(101, 150): "Unhealthy for Sensitive Groups",
(151, 200): "Unhealthy",
(201, 300): "Very Unhealthy",
(301, 500): "Hazardous"
}
# Cache for coordinate lookups
self.coordinate_cache = {}
def download_epa_coordinates(self) -> Dict[str, Tuple[float, float]]:
"""Download EPA monitor coordinates and create lookup dictionary"""
print("πŸ—ΊοΈ Downloading EPA monitor coordinates...")
coordinates = {}
try:
# Download monitor listing (most comprehensive)
monitors_url = f"{self.epa_base_url}/aqs_monitors.zip"
print(f"Downloading: {monitors_url}")
response = requests.get(monitors_url, timeout=60)
if response.status_code == 200:
# Extract CSV from ZIP
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
csv_filename = z.namelist()[0] # Should be monitors.csv
with z.open(csv_filename) as f:
# Read CSV with pandas
df = pd.read_csv(f)
print(f"πŸ“Š Loaded {len(df)} monitor records")
print(f"Columns: {list(df.columns)}")
# Create lookup by AQS ID (State+County+Site+Parameter+POC)
for _, row in df.iterrows():
try:
# Build AQS ID from components
state_code = str(row.get('State Code', '')).zfill(2)
county_code = str(row.get('County Code', '')).zfill(3)
site_number = str(row.get('Site Number', '')).zfill(4)
aqs_id = f"{state_code}{county_code}{site_number}"
# Get coordinates
lat = float(row.get('Latitude', 0))
lon = float(row.get('Longitude', 0))
if lat != 0 and lon != 0 and aqs_id != "0000000":
coordinates[aqs_id] = (lat, lon)
except (ValueError, TypeError):
continue
print(f"βœ… Created coordinate lookup for {len(coordinates)} stations")
else:
print(f"❌ Failed to download monitors: HTTP {response.status_code}")
except Exception as e:
print(f"❌ Error downloading EPA coordinates: {str(e)}")
# Fallback: try sites file
if len(coordinates) < 1000: # If we didn't get enough coordinates
try:
print("πŸ”„ Trying sites file as backup...")
sites_url = f"{self.epa_base_url}/aqs_sites.zip"
response = requests.get(sites_url, timeout=60)
if response.status_code == 200:
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
csv_filename = z.namelist()[0]
with z.open(csv_filename) as f:
df = pd.read_csv(f)
for _, row in df.iterrows():
try:
state_code = str(row.get('State Code', '')).zfill(2)
county_code = str(row.get('County Code', '')).zfill(3)
site_number = str(row.get('Site Number', '')).zfill(4)
aqs_id = f"{state_code}{county_code}{site_number}"
lat = float(row.get('Latitude', 0))
lon = float(row.get('Longitude', 0))
if lat != 0 and lon != 0 and aqs_id not in coordinates:
coordinates[aqs_id] = (lat, lon)
except (ValueError, TypeError):
continue
print(f"βœ… Added {len(coordinates)} total coordinates")
except Exception as e:
print(f"❌ Error with sites backup: {str(e)}")
self.coordinate_cache = coordinates
return coordinates
def get_aqi_category(self, aqi_value: int) -> str:
"""Get AQI category based on value"""
for (min_val, max_val), category in self.aqi_ranges.items():
if min_val <= aqi_value <= max_val:
return category
return "Unknown"
def calculate_aqi(self, parameter: str, value: float) -> int:
"""Calculate AQI for common parameters"""
if parameter == 'OZONE' and value > 0:
if value <= 54: return int((50/54) * value)
elif value <= 70: return int(51 + (49/16) * (value - 54))
elif value <= 85: return int(101 + (49/15) * (value - 70))
elif value <= 105: return int(151 + (49/20) * (value - 85))
else: return int(201 + (199/95) * min(value - 105, 95))
elif parameter == 'PM2.5' and value >= 0:
if value <= 12.0: return int((50/12) * value)
elif value <= 35.4: return int(51 + (49/23.4) * (value - 12))
elif value <= 55.4: return int(101 + (49/20) * (value - 35.4))
elif value <= 150.4: return int(151 + (49/95) * (value - 55.4))
else: return int(201 + (199/149.6) * min(value - 150.4, 149.6))
elif parameter == 'PM10' and value >= 0:
if value <= 54: return int((50/54) * value)
elif value <= 154: return int(51 + (49/100) * (value - 54))
elif value <= 254: return int(101 + (49/100) * (value - 154))
elif value <= 354: return int(151 + (49/100) * (value - 254))
else: return int(201 + (199/146) * min(value - 354, 146))
return 0
def fetch_airnow_bulk_data(self) -> Tuple[List[Dict], str]:
"""Fetch current AirNow bulk data"""
print("🎯 Fetching AirNow bulk data...")
try:
# Get current GMT time
gmt_now = datetime.now(pytz.UTC)
# Try current hour and previous few hours
for hour_offset in range(0, 6):
try:
target_time = gmt_now - timedelta(hours=hour_offset)
filename = f"HourlyData_{target_time.strftime('%Y%m%d%H')}.dat"
url = f"{self.airnow_base_url}/airnow/today/{filename}"
print(f"πŸ” Trying: {url}")
response = requests.get(url, timeout=30)
if response.status_code == 200 and response.text.strip():
print(f"βœ… Found data file with {len(response.text.splitlines())} lines")
# Parse the data
data = self.parse_hourly_data_file(response.text)
if data:
print(f"πŸ“Š Parsed {len(data)} station records")
return data, f"βœ… SUCCESS: {len(data)} monitoring stations from {filename}"
except Exception as e:
print(f"❌ Error trying hour {hour_offset}: {str(e)}")
continue
time.sleep(0.1)
return [], "❌ No recent data files found"
except Exception as e:
return [], f"❌ Error fetching bulk data: {str(e)}"
def parse_hourly_data_file(self, text: str) -> List[Dict]:
"""Parse AirNow hourly data format"""
lines = text.strip().split('\n')
data = []
# Download coordinates if not cached
if not self.coordinate_cache:
self.download_epa_coordinates()
for line in lines:
if not line.strip():
continue
try:
fields = line.split('|')
if len(fields) >= 9:
aqs_id = fields[2] # AQS ID from file
# Look up coordinates
lat, lon = self.coordinate_cache.get(aqs_id[:9], (0, 0)) # Use first 9 chars (site ID)
# Skip if no coordinates found
if lat == 0 and lon == 0:
continue
value = float(fields[7]) if fields[7].replace('.','').replace('-','').isdigit() else 0
parameter = fields[5]
# Only include air quality parameters
if parameter not in ['OZONE', 'PM2.5', 'PM10', 'NO2', 'SO2', 'CO']:
continue
aqi = self.calculate_aqi(parameter, value)
record = {
'DateObserved': fields[0],
'HourObserved': fields[1],
'AQSID': aqs_id,
'SiteName': fields[3],
'ParameterName': parameter,
'ReportingUnits': fields[6],
'Value': value,
'DataSource': fields[8] if len(fields) > 8 else '',
'Latitude': lat,
'Longitude': lon,
'AQI': aqi,
'Category': {'Name': self.get_aqi_category(aqi)},
'ReportingArea': fields[3],
'StateCode': aqs_id[:2] if len(aqs_id) >= 2 else 'US'
}
data.append(record)
except Exception as e:
continue
print(f"βœ… Found coordinates for {len(data)} stations")
return data
def create_map(self, data: List[Dict]) -> str:
"""Create interactive map with accurate coordinates"""
if not data:
m = folium.Map(location=[39.8283, -98.5795], zoom_start=4)
folium.Marker(
[39.8283, -98.5795],
popup="No air quality data available.",
icon=folium.Icon(color='red', icon='info-sign')
).add_to(m)
return m._repr_html_()
# Calculate center
lats = [item['Latitude'] for item in data]
lons = [item['Longitude'] for item in data]
center_lat = sum(lats) / len(lats)
center_lon = sum(lons) / len(lons)
# Create map
m = folium.Map(location=[center_lat, center_lon], zoom_start=4)
# Add markers
for item in data:
try:
lat = item['Latitude']
lon = item['Longitude']
aqi = item['AQI']
parameter = item['ParameterName']
site_name = item['SiteName']
value = item['Value']
units = item['ReportingUnits']
category = item['Category']['Name']
# Create popup
popup_content = f"""
<div style="width: 250px;">
<h4>{site_name}</h4>
<p><b>Parameter:</b> {parameter}</p>
<p><b>Value:</b> {value} {units}</p>
<p><b>AQI:</b> {aqi} ({category})</p>
<p><b>Coordinates:</b> {lat:.4f}, {lon:.4f}</p>
<p><b>Time:</b> {item['DateObserved']} {item['HourObserved']}:00 GMT</p>
<p><b>Station ID:</b> {item['AQSID']}</p>
</div>
"""
# Color based on AQI
if aqi <= 50:
marker_color = 'green'
elif aqi <= 100:
marker_color = 'orange'
elif aqi <= 150:
marker_color = 'orange'
elif aqi <= 200:
marker_color = 'red'
elif aqi <= 300:
marker_color = 'purple'
else:
marker_color = 'darkred'
# Add marker
folium.Marker(
[lat, lon],
popup=folium.Popup(popup_content, max_width=300),
tooltip=f"{site_name}: {parameter} = {value} {units} (AQI: {aqi})",
icon=folium.Icon(color=marker_color, icon='cloud')
).add_to(m)
except Exception as e:
continue
# Add legend
legend_html = """
<div style="position: fixed;
bottom: 50px; left: 50px; width: 180px; height: 200px;
background-color: white; border:2px solid grey; z-index:9999;
font-size:14px; padding: 10px">
<h4>AQI Legend</h4>
<p><i class="fa fa-circle" style="color:green"></i> Good (0-50)</p>
<p><i class="fa fa-circle" style="color:orange"></i> Moderate (51-100)</p>
<p><i class="fa fa-circle" style="color:orange"></i> Unhealthy for Sensitive (101-150)</p>
<p><i class="fa fa-circle" style="color:red"></i> Unhealthy (151-200)</p>
<p><i class="fa fa-circle" style="color:purple"></i> Very Unhealthy (201-300)</p>
<p><i class="fa fa-circle" style="color:darkred"></i> Hazardous (301+)</p>
</div>
"""
m.get_root().html.add_child(folium.Element(legend_html))
return m._repr_html_()
def create_data_table(self, data: List[Dict]) -> pd.DataFrame:
"""Create data table"""
if not data:
return pd.DataFrame()
table_data = []
for item in data:
table_data.append({
'Site Name': item['SiteName'],
'State': item['StateCode'],
'Parameter': item['ParameterName'],
'Value': item['Value'],
'Units': item['ReportingUnits'],
'AQI': item['AQI'],
'Category': item['Category']['Name'],
'Latitude': round(item['Latitude'], 4),
'Longitude': round(item['Longitude'], 4),
'Date': item['DateObserved'],
'Hour (GMT)': item['HourObserved'],
'Station ID': item['AQSID']
})
df = pd.DataFrame(table_data)
return df.sort_values('AQI', ascending=False)
# Initialize mapper
mapper = AccurateAirQualityMapper()
def update_map():
"""Update map with accurate coordinates"""
print("πŸš€ Starting accurate air quality mapping...")
# Fetch data
data, status = mapper.fetch_airnow_bulk_data()
# Create map
map_html = mapper.create_map(data)
# Create table
df = mapper.create_data_table(data)
return map_html, df, status
# Create Gradio interface
with gr.Blocks(title="Accurate AirNow Sensor Map", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# 🎯 Accurate AirNow Air Quality Map
**βœ… PRECISE COORDINATES** - Uses EPA's official monitor coordinate database!
This map displays real-time air quality data with **accurate station locations** by:
1. **Downloading EPA coordinates**: Gets precise lat/lon for every monitoring station
2. **Fetching AirNow bulk data**: Current hourly readings from 2,000+ stations
3. **Accurate mapping**: Stations plotted at their exact geographic locations
## Key Features:
- 🎯 **Precise Locations**: EPA's official coordinate database
- 🌍 **Complete Coverage**: All active AirNow monitoring stations
- ⚑ **Real-time Data**: Latest hourly observations
- πŸ“Š **Air Quality Focus**: OZONE, PM2.5, PM10, NO2, SO2, CO
- πŸ”„ **Auto-updated**: Fresh data every hour
**⚠️ Data Note**: This displays preliminary, real-time data for public information.
For regulatory purposes, use EPA's official AQS data.
"""
)
with gr.Row():
load_button = gr.Button("🎯 Load Accurate Air Quality Map", variant="primary", size="lg")
status_text = gr.Markdown("Click the button above to load current air quality data with precise coordinates.")
with gr.Tabs():
with gr.TabItem("πŸ—ΊοΈ Accurate Map"):
map_output = gr.HTML(label="Air Quality Map with Precise Coordinates")
with gr.TabItem("πŸ“Š Station Data"):
data_table = gr.Dataframe(
label="Air Quality Monitoring Stations",
interactive=False
)
gr.Markdown(
"""
## Data Sources:
**Coordinates**: EPA Air Quality System (AQS) - Official monitor locations
**Air Quality Data**: AirNow hourly bulk files - Real-time observations
**Coverage**: 2,000+ monitoring stations across US, Canada, and parts of Mexico
## Files Used:
- `aqs_monitors.zip` - EPA monitor coordinates (364,377+ records)
- `HourlyData_YYYYMMDDHH.dat` - AirNow real-time observations
## Links:
- [EPA AQS Data](https://aqs.epa.gov/aqsweb/airdata/download_files.html)
- [AirNow Bulk Files](https://files.airnowtech.org/airnow/today/)
- [EPA Monitor Map](https://www.epa.gov/outdoor-air-quality-data/interactive-map-air-quality-monitors)
"""
)
# Set up event handler
load_button.click(
fn=update_map,
inputs=[],
outputs=[map_output, data_table, status_text]
)
if __name__ == "__main__":
demo.launch()